diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java index 6ceefbe762..623ca1c6fe 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java @@ -87,8 +87,7 @@ public EventMeshGrpcConsumer(final EventMeshGrpcClientConfig clientConfig) { } public void init() { - this.channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()).usePlaintext() - .build(); + this.channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()).usePlaintext().build(); this.consumerClient = ConsumerServiceGrpc.newBlockingStub(channel); this.consumerAsyncClient = ConsumerServiceGrpc.newStub(channel); this.heartbeatClient = HeartbeatServiceGrpc.newBlockingStub(channel); @@ -125,8 +124,8 @@ public void subscribe(final List subscriptionItems) { addSubscription(subscriptionItems, SDK_STREAM_URL, GrpcType.STREAM); - CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, - subscriptionItems); + CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription( + clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems); synchronized (this) { if (subStreamHandler == null) { subStreamHandler = new SubStreamHandler<>(consumerAsyncClient, clientConfig, listener); @@ -137,8 +136,8 @@ public void subscribe(final List subscriptionItems) { } private Response subscribeWebhook(List subscriptionItems, String url) { - final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, - url, subscriptionItems); + final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription( + clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems); try { CloudEvent response = consumerClient.subscribe(subscription); log.info("Received response:{}", response); @@ -169,8 +168,8 @@ public Response unsubscribe(final List subscriptionItems, fina removeSubscription(subscriptionItems); - final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, - subscriptionItems); + final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription( + clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems); try { final CloudEvent response = consumerClient.unsubscribe(cloudEvent); log.info("Received response:{}", response); @@ -191,8 +190,8 @@ public Response unsubscribe(final List subscriptionItems) { removeSubscription(subscriptionItems); - final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, - subscriptionItems); + final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription( + clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems); try { final CloudEvent response = consumerClient.unsubscribe(cloudEvent); @@ -277,14 +276,12 @@ private void resubscribe() { subscriptionGroup.forEach((url, items) -> { if (isStreamSub.get()) { - CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, - url, - items); + CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription( + clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, items); subStreamHandler.sendSubscription(subscription); } else { subscribeWebhook(items, url); } - }); } @@ -303,6 +300,7 @@ public void close() { } } + @Data private static class SubscriptionInfo { private transient SubscriptionItem subscriptionItem; @@ -314,25 +312,5 @@ private static class SubscriptionInfo { this.url = url; this.grpcType = grpcType; } - - public GrpcType getGrpcType() { - return grpcType; - } - - public SubscriptionItem getSubscriptionItem() { - return subscriptionItem; - } - - public void setSubscriptionItem(final SubscriptionItem subscriptionItem) { - this.subscriptionItem = subscriptionItem; - } - - public String getUrl() { - return url; - } - - public void setUrl(final String url) { - this.url = url; - } } } diff --git a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java index d34a0c79b1..59436be08f 100644 --- a/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java +++ b/eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/SubStreamHandler.java @@ -96,12 +96,12 @@ public void onNext(final CloudEvent message) { @Override public void onError(final Throwable t) { log.error("Received Server side error", t); + close(); } @Override public void onCompleted() { log.info("Finished receiving messages from server."); - close(); } }; } @@ -134,7 +134,6 @@ public void close() { } latch.countDown(); - log.info("SubStreamHandler closed."); } @@ -145,6 +144,7 @@ private void senderOnNext(final CloudEvent subscription) { } } catch (Exception e) { log.error("StreamObserver Error onNext", e); + close(); } }