From 7994cb1bea84bb3bc0888a5b93d9f06fa4e96db2 Mon Sep 17 00:00:00 2001 From: scwlkq Date: Tue, 20 Feb 2024 17:31:09 +0800 Subject: [PATCH 1/6] Add null check in writeOffset method --- .../offsetmgmt/api/storage/OffsetStorageWriterImpl.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java index 682205c4a6..1252a039f5 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java @@ -53,9 +53,12 @@ public OffsetStorageWriterImpl(String connectorName, OffsetManagementService off } @Override - public void writeOffset(RecordPartition partition, RecordOffset position) { - ConnectorRecordPartition extendRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartition()); - data.put(extendRecordPartition, position); + public void writeOffset(RecordPartition partition, RecordOffset offset) { + ConnectorRecordPartition extendRecordPartition = null; + if (partition != null && partition.getPartition() != null) { + extendRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartition()); + } + data.put(extendRecordPartition, offset); } /** From 0e356cbfaf68b140ca3122a6a351b6d5dfdd5361 Mon Sep 17 00:00:00 2001 From: scwlkq Date: Tue, 20 Feb 2024 17:31:36 +0800 Subject: [PATCH 2/6] delete todo --- .../org/apache/eventmesh/openconnect/util/CloudEventUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java index 5d4a77ff78..3fb5ea2b74 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java @@ -67,7 +67,7 @@ public static CloudEvent convertRecordToEvent(ConnectRecord connectRecord) { public static ConnectRecord convertEventToRecord(CloudEvent event) { byte[] body = Objects.requireNonNull(event.getData()).toBytes(); LogUtil.info(log, "handle receive events {}", () -> new String(event.getData().toBytes(), Constants.DEFAULT_CHARSET)); - // todo: recordPartition & recordOffset + ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis(), body); for (String extensionName : event.getExtensionNames()) { connectRecord.addExtension(extensionName, Objects.requireNonNull(event.getExtension(extensionName)).toString()); From 4831157f509c043bb1faf579dd7e51b0e6439411 Mon Sep 17 00:00:00 2001 From: scwlkq Date: Tue, 20 Feb 2024 17:40:14 +0800 Subject: [PATCH 3/6] Move data.put inside null check in writeOffset method --- .../offsetmgmt/api/storage/OffsetStorageWriterImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java index 1252a039f5..462cbc0c4a 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java @@ -57,8 +57,8 @@ public void writeOffset(RecordPartition partition, RecordOffset offset) { ConnectorRecordPartition extendRecordPartition = null; if (partition != null && partition.getPartition() != null) { extendRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartition()); + data.put(extendRecordPartition, offset); } - data.put(extendRecordPartition, offset); } /** From 8bb0915578854f6e778ff234c3618fefb2b643e4 Mon Sep 17 00:00:00 2001 From: scwlkq Date: Tue, 20 Feb 2024 21:17:11 +0800 Subject: [PATCH 4/6] simplify if judgement --- .../offsetmgmt/api/storage/OffsetStorageWriterImpl.java | 9 ++++----- eventmesh-runtime/conf/eventmesh.properties | 2 +- eventmesh-starter/build.gradle | 1 + 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java index 462cbc0c4a..4bf496537a 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java @@ -54,8 +54,8 @@ public OffsetStorageWriterImpl(String connectorName, OffsetManagementService off @Override public void writeOffset(RecordPartition partition, RecordOffset offset) { - ConnectorRecordPartition extendRecordPartition = null; - if (partition != null && partition.getPartition() != null) { + ConnectorRecordPartition extendRecordPartition; + if (partition != null) { extendRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartition()); data.put(extendRecordPartition, offset); } @@ -122,9 +122,8 @@ private Future sendOffsetFuture(long flushId) { } /** - * Closes this stream and releases any system resources associated - * with it. If the stream is already closed then invoking this - * method has no effect. + * Closes this stream and releases any system resources associated with it. If the stream is already closed then invoking this method has no + * effect. * * @throws IOException if an I/O error occurs */ diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index cabe3f9bc5..7e75384ea7 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -77,7 +77,7 @@ eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8 eventMesh.connector.plugin.type=standalone # storage plugin -eventMesh.storage.plugin.type=standalone +eventMesh.storage.plugin.type=rocketmq # security plugin eventMesh.server.security.enabled=false diff --git a/eventmesh-starter/build.gradle b/eventmesh-starter/build.gradle index 0bc2208fd1..86710d3f90 100644 --- a/eventmesh-starter/build.gradle +++ b/eventmesh-starter/build.gradle @@ -17,4 +17,5 @@ dependencies { implementation project(":eventmesh-runtime") + implementation project(":eventmesh-storage-plugin:eventmesh-storage-rocketmq") } \ No newline at end of file From 7ec403aca5f8bb1c702c89b64b788b6444ac3cd3 Mon Sep 17 00:00:00 2001 From: scwlkq Date: Tue, 20 Feb 2024 21:19:46 +0800 Subject: [PATCH 5/6] remove dev environment --- eventmesh-runtime/conf/eventmesh.properties | 2 +- eventmesh-starter/build.gradle | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 7e75384ea7..cabe3f9bc5 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -77,7 +77,7 @@ eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8 eventMesh.connector.plugin.type=standalone # storage plugin -eventMesh.storage.plugin.type=rocketmq +eventMesh.storage.plugin.type=standalone # security plugin eventMesh.server.security.enabled=false diff --git a/eventmesh-starter/build.gradle b/eventmesh-starter/build.gradle index 86710d3f90..0bc2208fd1 100644 --- a/eventmesh-starter/build.gradle +++ b/eventmesh-starter/build.gradle @@ -17,5 +17,4 @@ dependencies { implementation project(":eventmesh-runtime") - implementation project(":eventmesh-storage-plugin:eventmesh-storage-rocketmq") } \ No newline at end of file From cf6108ebdf3de9755362f6b3707e193086d2273c Mon Sep 17 00:00:00 2001 From: scwlkq Date: Tue, 20 Feb 2024 22:13:53 +0800 Subject: [PATCH 6/6] fix style --- .../offsetmgmt/api/storage/OffsetStorageWriterImpl.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java index 4bf496537a..76931d85a9 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java @@ -122,8 +122,9 @@ private Future sendOffsetFuture(long flushId) { } /** - * Closes this stream and releases any system resources associated with it. If the stream is already closed then invoking this method has no - * effect. + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. * * @throws IOException if an I/O error occurs */