From a540e8a0a185fbbe4753b382daa3cd94416a5038 Mon Sep 17 00:00:00 2001 From: Pil0tXia Date: Tue, 2 Apr 2024 19:21:50 +0800 Subject: [PATCH 1/9] Handle exception loop by closeOnError --- .../eventmesh/client/grpc/consumer/SubStreamHandler.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index d34a0c79b1..79bfc4d9ad 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -96,6 +96,7 @@ public void onNext(final CloudEvent message) { @Override public void onError(final Throwable t) { log.error("Received Server side error", t); + close(); } @Override @@ -145,6 +146,7 @@ private void senderOnNext(final CloudEvent subscription) { } } catch (Exception e) { log.error("StreamObserver Error onNext", e); + close(); } } @@ -152,6 +154,7 @@ private void senderOnComplete() { try { synchronized (sender) { sender.onCompleted(); + sender = null; } } catch (Exception e) { log.error("StreamObserver Error onComplete", e); From e9c7ce905589d93bd70b47631299dbc9a996dabb Mon Sep 17 00:00:00 2001 From: Pil0tXia Date: Tue, 2 Apr 2024 19:27:07 +0800 Subject: [PATCH 2/9] Lombok optimization --- .../grpc/consumer/EventMeshGrpcConsumer.java | 21 +------------------ 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java index 6ceefbe762..2fbf40c1ba 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java @@ -303,6 +303,7 @@ public void close() { } } + @Data private static class SubscriptionInfo { private transient SubscriptionItem subscriptionItem; @@ -314,25 +315,5 @@ private static class SubscriptionInfo { this.url = url; this.grpcType = grpcType; } - - public GrpcType getGrpcType() { - return grpcType; - } - - public SubscriptionItem getSubscriptionItem() { - return subscriptionItem; - } - - public void setSubscriptionItem(final SubscriptionItem subscriptionItem) { - this.subscriptionItem = subscriptionItem; - } - - public String getUrl() { - return url; - } - - public void setUrl(final String url) { - this.url = url; - } } } From f5898d80531723b2a2fed07b2087ea5716fe1ea9 Mon Sep 17 00:00:00 2001 From: Pil0tXia Date: Tue, 2 Apr 2024 20:21:59 +0800 Subject: [PATCH 3/9] some format optimization --- .../grpc/consumer/EventMeshGrpcConsumer.java | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java index 2fbf40c1ba..623ca1c6fe 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java @@ -87,8 +87,7 @@ public EventMeshGrpcConsumer(final EventMeshGrpcClientConfig clientConfig) { } public void init() { - this.channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()).usePlaintext() - .build(); + this.channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()).usePlaintext().build(); this.consumerClient = ConsumerServiceGrpc.newBlockingStub(channel); this.consumerAsyncClient = ConsumerServiceGrpc.newStub(channel); this.heartbeatClient = HeartbeatServiceGrpc.newBlockingStub(channel); @@ -125,8 +124,8 @@ public void subscribe(final List subscriptionItems) { addSubscription(subscriptionItems, SDK_STREAM_URL, GrpcType.STREAM); - CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, - subscriptionItems); + CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription( + clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems); synchronized (this) { if (subStreamHandler == null) { subStreamHandler = new SubStreamHandler<>(consumerAsyncClient, clientConfig, listener); @@ -137,8 +136,8 @@ public void subscribe(final List subscriptionItems) { } private Response subscribeWebhook(List subscriptionItems, String url) { - final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, - url, subscriptionItems); + final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription( + clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems); try { CloudEvent response = consumerClient.subscribe(subscription); log.info("Received response:{}", response); @@ -169,8 +168,8 @@ public Response unsubscribe(final List subscriptionItems, fina removeSubscription(subscriptionItems); - final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, - subscriptionItems); + final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription( + clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems); try { final CloudEvent response = consumerClient.unsubscribe(cloudEvent); log.info("Received response:{}", response); @@ -191,8 +190,8 @@ public Response unsubscribe(final List subscriptionItems) { removeSubscription(subscriptionItems); - final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, - subscriptionItems); + final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription( + clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems); try { final CloudEvent response = consumerClient.unsubscribe(cloudEvent); @@ -277,14 +276,12 @@ private void resubscribe() { subscriptionGroup.forEach((url, items) -> { if (isStreamSub.get()) { - CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, - url, - items); + CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription( + clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, items); subStreamHandler.sendSubscription(subscription); } else { subscribeWebhook(items, url); } - }); } From 774397fcae4905dc0ec8230eab18bf655f202e8f Mon Sep 17 00:00:00 2001 From: Pil0tXia Date: Fri, 12 Apr 2024 00:30:55 +0800 Subject: [PATCH 4/9] Avoid closing multiple times --- .../eventmesh/client/grpc/consumer/SubStreamHandler.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index 79bfc4d9ad..18ffd7f193 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -40,6 +40,8 @@ public class SubStreamHandler extends Thread implements Serializable { private final transient CountDownLatch latch = new CountDownLatch(1); + private volatile boolean isClosed = false; + private final transient ConsumerServiceStub consumerAsyncClient; private final transient EventMeshGrpcClientConfig clientConfig; @@ -130,6 +132,12 @@ public void run() { } public void close() { + // Avoid closing multiple times + if (isClosed) { + return; + } + isClosed = true; + if (this.sender != null) { senderOnComplete(); } From d0ac6c0900a7be661f8a3e434ae364c42ba56c96 Mon Sep 17 00:00:00 2001 From: Pil0tXia Date: Fri, 12 Apr 2024 14:09:15 +0800 Subject: [PATCH 5/9] Remove redundant set null --- .../apache/eventmesh/client/grpc/consumer/SubStreamHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index 18ffd7f193..89bbb9e2a0 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -162,7 +162,6 @@ private void senderOnComplete() { try { synchronized (sender) { sender.onCompleted(); - sender = null; } } catch (Exception e) { log.error("StreamObserver Error onComplete", e); From 878009a99b1b64ffdec0b579bb564b8c9e31afec Mon Sep 17 00:00:00 2001 From: Pil0tXia Date: Fri, 12 Apr 2024 14:18:04 +0800 Subject: [PATCH 6/9] Revert "Avoid closing multiple times" This reverts commit 774397fcae4905dc0ec8230eab18bf655f202e8f. --- .../eventmesh/client/grpc/consumer/SubStreamHandler.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index 89bbb9e2a0..3f187ddce4 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -40,8 +40,6 @@ public class SubStreamHandler extends Thread implements Serializable { private final transient CountDownLatch latch = new CountDownLatch(1); - private volatile boolean isClosed = false; - private final transient ConsumerServiceStub consumerAsyncClient; private final transient EventMeshGrpcClientConfig clientConfig; @@ -132,12 +130,6 @@ public void run() { } public void close() { - // Avoid closing multiple times - if (isClosed) { - return; - } - isClosed = true; - if (this.sender != null) { senderOnComplete(); } From 8e812cdcb68a7a1e90241b30739cba77998b82e5 Mon Sep 17 00:00:00 2001 From: Pil0tXia Date: Sat, 13 Apr 2024 14:38:19 +0800 Subject: [PATCH 7/9] Use synchronized latch to keep senderOnComplete called once --- .../client/grpc/consumer/SubStreamHandler.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index 3f187ddce4..421ab3363b 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -130,12 +130,15 @@ public void run() { } public void close() { - if (this.sender != null) { - senderOnComplete(); + synchronized (sender) { + if (latch.getCount() == 0) { + return; + } + if (this.sender != null) { + senderOnComplete(); + } + latch.countDown(); } - - latch.countDown(); - log.info("SubStreamHandler closed."); } From 4d8694898e20210aced416f71349c4a220257331 Mon Sep 17 00:00:00 2001 From: Pil0tXia Date: Sat, 13 Apr 2024 16:44:01 +0800 Subject: [PATCH 8/9] Use boolean to prevent latch called by somebody else --- .../eventmesh/client/grpc/consumer/SubStreamHandler.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index 421ab3363b..b8dfa9f321 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -40,6 +40,8 @@ public class SubStreamHandler extends Thread implements Serializable { private final transient CountDownLatch latch = new CountDownLatch(1); + private volatile boolean isClosed = false; + private final transient ConsumerServiceStub consumerAsyncClient; private final transient EventMeshGrpcClientConfig clientConfig; @@ -131,13 +133,14 @@ public void run() { public void close() { synchronized (sender) { - if (latch.getCount() == 0) { + if (isClosed) { return; } if (this.sender != null) { senderOnComplete(); } latch.countDown(); + isClosed = true; } log.info("SubStreamHandler closed."); } From dbabca713d2fe7f1d7b58bf4bc2e371a00a1cc0a Mon Sep 17 00:00:00 2001 From: Pil0tXia Date: Sat, 13 Apr 2024 19:58:45 +0800 Subject: [PATCH 9/9] Remove the unique callee/caller close() of onCompleted() --- .../client/grpc/consumer/SubStreamHandler.java | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index b8dfa9f321..59436be08f 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -40,8 +40,6 @@ public class SubStreamHandler extends Thread implements Serializable { private final transient CountDownLatch latch = new CountDownLatch(1); - private volatile boolean isClosed = false; - private final transient ConsumerServiceStub consumerAsyncClient; private final transient EventMeshGrpcClientConfig clientConfig; @@ -104,7 +102,6 @@ public void onError(final Throwable t) { @Override public void onCompleted() { log.info("Finished receiving messages from server."); - close(); } }; } @@ -132,16 +129,11 @@ public void run() { } public void close() { - synchronized (sender) { - if (isClosed) { - return; - } - if (this.sender != null) { - senderOnComplete(); - } - latch.countDown(); - isClosed = true; + if (this.sender != null) { + senderOnComplete(); } + + latch.countDown(); log.info("SubStreamHandler closed."); }