Skip to content

Commit

Permalink
some format optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
Pil0tXia committed Apr 2, 2024
1 parent e9c7ce9 commit f5898d8
Showing 1 changed file with 11 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public EventMeshGrpcConsumer(final EventMeshGrpcClientConfig clientConfig) {
}

public void init() {
this.channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()).usePlaintext()
.build();
this.channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()).usePlaintext().build();
this.consumerClient = ConsumerServiceGrpc.newBlockingStub(channel);
this.consumerAsyncClient = ConsumerServiceGrpc.newStub(channel);
this.heartbeatClient = HeartbeatServiceGrpc.newBlockingStub(channel);
Expand Down Expand Up @@ -125,8 +124,8 @@ public void subscribe(final List<SubscriptionItem> subscriptionItems) {

addSubscription(subscriptionItems, SDK_STREAM_URL, GrpcType.STREAM);

CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null,
subscriptionItems);
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems);
synchronized (this) {
if (subStreamHandler == null) {
subStreamHandler = new SubStreamHandler<>(consumerAsyncClient, clientConfig, listener);
Expand All @@ -137,8 +136,8 @@ public void subscribe(final List<SubscriptionItem> subscriptionItems) {
}

private Response subscribeWebhook(List<SubscriptionItem> subscriptionItems, String url) {
final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE,
url, subscriptionItems);
final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems);
try {
CloudEvent response = consumerClient.subscribe(subscription);
log.info("Received response:{}", response);
Expand Down Expand Up @@ -169,8 +168,8 @@ public Response unsubscribe(final List<SubscriptionItem> subscriptionItems, fina

removeSubscription(subscriptionItems);

final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url,
subscriptionItems);
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems);
try {
final CloudEvent response = consumerClient.unsubscribe(cloudEvent);
log.info("Received response:{}", response);
Expand All @@ -191,8 +190,8 @@ public Response unsubscribe(final List<SubscriptionItem> subscriptionItems) {

removeSubscription(subscriptionItems);

final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null,
subscriptionItems);
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems);

try {
final CloudEvent response = consumerClient.unsubscribe(cloudEvent);
Expand Down Expand Up @@ -277,14 +276,12 @@ private void resubscribe() {

subscriptionGroup.forEach((url, items) -> {
if (isStreamSub.get()) {
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE,
url,
items);
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(

Check warning on line 279 in eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-sdks/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.java#L279

Added line #L279 was not covered by tests
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, items);
subStreamHandler.sendSubscription(subscription);
} else {
subscribeWebhook(items, url);
}

});
}

Expand Down

0 comments on commit f5898d8

Please sign in to comment.