Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Unable to Consume Messages using AsyncSubscribe in RabbitMQ Storage Plugin #4394

Open
2 of 3 tasks
Pil0tXia opened this issue Aug 23, 2023 · 7 comments
Open
2 of 3 tasks
Labels
bug Something isn't working help wanted Extra attention is needed

Comments

@Pil0tXia
Copy link
Member

Search before asking

  • I had searched in the issues and found no similar issues.

Environment

Windows

EventMesh version

master

What happened

When using RabbitMQ as the storage plugin, the org.apache.eventmesh.tcp.demo.pub.cloudevents.AsyncPublish can place messages into the queue. However, the org.apache.eventmesh.tcp.demo.sub.cloudevents.AsyncSubscribe is unable to consume the messages.

The RabbitMQ storage plugin was not authored by xwm1992.

How to reproduce

image
image
image

Debug logs

org.apache.eventmesh.tcp.demo.pub.cloudevents.AsyncPublish:

2023-08-23 19:00:42,022 INFO  [main] CloudEventTCPSubClient(CloudEventTCPSubClient.java:73) - SimpleSubClientImpl|997|started!
2023-08-23 19:00:42,028 INFO  [main] AsyncPublish(AsyncPublish.java:58) - begin send async msg[0]: CloudEvent{id='ebe0443b-d6cd-49aa-8b14-46932343ed92', source=/, type='cloudevents', datacontenttype='application/cloudevents+json', subject='TEST-TOPIC-TCP-ASYNC', data=BytesCloudEventData{value=[123, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 116, 101, 115, 116, 65, 115, 121, 110, 99, 77, 101, 115, 115, 97, 103, 101, 34, 125]}, extensions={ttl=30000}}
2023-08-23 19:00:42,050 INFO  [main] CloudEventTCPPubClient(CloudEventTCPPubClient.java:108) - SimplePubClientImpl cloud event|997|publish|send|type=ASYNC_MESSAGE_TO_SERVER|protocol=cloudevents|msg=org.apache.eventmesh.common.protocol.tcp.Package@25f4878b
2023-08-23 19:00:42,050 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=7750722003
2023-08-23 19:00:42,055 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"ASYNC_MESSAGE_TO_SERVER","code":0,"seq":"7750722003","properties":{"protocolversion":"1.0","protocoldesc":"tcp","protocoltype":"cloudevents"},"command":"ASYNC_MESSAGE_TO_SERVER"},"body":"eyJzcGVjdmVyc2lvbiI6IjEuMCIsImlkIjoiZWJlMDQ0M2ItZDZjZC00OWFhLThiMTQtNDY5MzIzNDNlZDkyIiwic291cmNlIjoiLyIsInR5cGUiOiJjbG91ZGV2ZW50cyIsImRhdGFjb250ZW50dHlwZSI6ImFwcGxpY2F0aW9uL2Nsb3VkZXZlbnRzK2pzb24iLCJzdWJqZWN0IjoiVEVTVC1UT1BJQy1UQ1AtQVNZTkMiLCJ0dGwiOiIzMDAwMCIsImRhdGEiOnsiY29udGVudCI6InRlc3RBc3luY01lc3NhZ2UifX0="}
2023-08-23 19:00:42,172 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"ASYNC_MESSAGE_TO_SERVER_ACK","code":0,"desc":"success","seq":"7750722003","properties":{},"command":"ASYNC_MESSAGE_TO_SERVER_ACK"}
2023-08-23 19:00:42,172 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:162) - Decode bodyJson={"data":{"node":{"content":"testAsyncMessage"}},"id":"ebe0443b-d6cd-49aa-8b14-46932343ed92","source":"/","type":"cloudevents","subject":"TEST-TOPIC-TCP-ASYNC","dataContentType":"application/cloudevents+json","specVersion":"V1","extensionNames":["protocolversion","reqeventmesh2mqtimestamp","reqsendeventmeship","protocoldesc","protocoltype","ttl","reqc2eventmeshtimestamp"],"attributeNames":["datacontenttype","subject","specversion","id","source","type"]}
2023-08-23 19:00:42,173 INFO  [nioEventLoopGroup-2-1] AbstractEventMeshTCPPubHandler(AbstractEventMeshTCPPubHandler.java:45) - SimplePubClientImpl|receive|msg=org.apache.eventmesh.common.protocol.tcp.Package@4ae1b36b
2023-08-23 19:00:43,187 INFO  [main] AsyncPublish(AsyncPublish.java:58) - begin send async msg[1]: CloudEvent{id='ea09364e-e3ac-49b6-bcc4-31f61e432156', source=/, type='cloudevents', datacontenttype='application/cloudevents+json', subject='TEST-TOPIC-TCP-ASYNC', data=BytesCloudEventData{value=[123, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 116, 101, 115, 116, 65, 115, 121, 110, 99, 77, 101, 115, 115, 97, 103, 101, 34, 125]}, extensions={ttl=30000}}
2023-08-23 19:00:43,187 INFO  [main] CloudEventTCPPubClient(CloudEventTCPPubClient.java:108) - SimplePubClientImpl cloud event|997|publish|send|type=ASYNC_MESSAGE_TO_SERVER|protocol=cloudevents|msg=org.apache.eventmesh.common.protocol.tcp.Package@4e423aa2
2023-08-23 19:00:43,187 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=1345627544
2023-08-23 19:00:43,187 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"ASYNC_MESSAGE_TO_SERVER","code":0,"seq":"1345627544","properties":{"protocolversion":"1.0","protocoldesc":"tcp","protocoltype":"cloudevents"},"command":"ASYNC_MESSAGE_TO_SERVER"},"body":"eyJzcGVjdmVyc2lvbiI6IjEuMCIsImlkIjoiZWEwOTM2NGUtZTNhYy00OWI2LWJjYzQtMzFmNjFlNDMyMTU2Iiwic291cmNlIjoiLyIsInR5cGUiOiJjbG91ZGV2ZW50cyIsImRhdGFjb250ZW50dHlwZSI6ImFwcGxpY2F0aW9uL2Nsb3VkZXZlbnRzK2pzb24iLCJzdWJqZWN0IjoiVEVTVC1UT1BJQy1UQ1AtQVNZTkMiLCJ0dGwiOiIzMDAwMCIsImRhdGEiOnsiY29udGVudCI6InRlc3RBc3luY01lc3NhZ2UifX0="}
2023-08-23 19:00:43,192 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"ASYNC_MESSAGE_TO_SERVER_ACK","code":0,"desc":"success","seq":"1345627544","properties":{},"command":"ASYNC_MESSAGE_TO_SERVER_ACK"}
2023-08-23 19:00:43,192 DEBUG [nioEventLoopGroup-2-1] Codec(Codec.java:162) - Decode bodyJson={"data":{"node":{"content":"testAsyncMessage"}},"id":"ea09364e-e3ac-49b6-bcc4-31f61e432156","source":"/","type":"cloudevents","subject":"TEST-TOPIC-TCP-ASYNC","dataContentType":"application/cloudevents+json","specVersion":"V1","extensionNames":["protocolversion","reqeventmesh2mqtimestamp","reqsendeventmeship","protocoldesc","protocoltype","ttl","reqc2eventmeshtimestamp"],"attributeNames":["datacontenttype","subject","specversion","id","source","type"]}
2023-08-23 19:00:43,192 INFO  [nioEventLoopGroup-2-1] AbstractEventMeshTCPPubHandler(AbstractEventMeshTCPPubHandler.java:45) - SimplePubClientImpl|receive|msg=org.apache.eventmesh.common.protocol.tcp.Package@1b2e16dc

org.apache.eventmesh.tcp.demo.sub.cloudevents.AsyncSubscribe:

2023-08-23 19:00:43,479 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"HELLO_REQUEST","code":0,"seq":"3280786264","properties":{},"command":"HELLO_REQUEST"},"body":{"env":"test","subsystem":"5017","path":"/data/app/umg_proxy","pid":42893,"host":"localhost","port":9362,"version":"2.0.11","username":"PU4283","password":"21524617","idc":"FT","group":"EventmeshTestGroup","purpose":"sub","unack":0}}
2023-08-23 19:00:43,484 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"HELLO_RESPONSE","code":0,"desc":"success","seq":"3280786264","properties":{},"command":"HELLO_RESPONSE"}
2023-08-23 19:00:43,484 INFO  [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:48) - |receive|type=HELLO_RESPONSE|msg=org.apache.eventmesh.common.protocol.tcp.Package@3e758221
2023-08-23 19:00:43,485 ERROR [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:66) - msg ignored|HELLO_RESPONSE|org.apache.eventmesh.common.protocol.tcp.Package@3e758221
2023-08-23 19:00:43,485 INFO  [main] CloudEventTCPSubClient(CloudEventTCPSubClient.java:73) - SimpleSubClientImpl|745|started!
2023-08-23 19:00:43,489 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=1162117433
2023-08-23 19:00:43,497 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"SUBSCRIBE_REQUEST","code":0,"seq":"1162117433","properties":{},"command":"SUBSCRIBE_REQUEST"},"body":{"topicList":[{"topic":"TEST-TOPIC-TCP-ASYNC","mode":"CLUSTERING","type":"ASYNC"}]}}
2023-08-23 19:00:43,538 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"SUBSCRIBE_RESPONSE","code":0,"desc":"success","seq":"1162117433","properties":{},"command":"SUBSCRIBE_RESPONSE"}
2023-08-23 19:00:43,539 INFO  [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:48) - |receive|type=SUBSCRIBE_RESPONSE|msg=org.apache.eventmesh.common.protocol.tcp.Package@6b701d42
2023-08-23 19:00:43,539 ERROR [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:66) - msg ignored|SUBSCRIBE_RESPONSE|org.apache.eventmesh.common.protocol.tcp.Package@6b701d42
2023-08-23 19:00:43,539 INFO  [main] RequestContext(RequestContext.java:76) - _RequestContext|create|key=8573122138
2023-08-23 19:00:43,540 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:62) - Encoder pkg={"header":{"cmd":"LISTEN_REQUEST","code":0,"seq":"8573122138","properties":{},"command":"LISTEN_REQUEST"}}
2023-08-23 19:00:43,631 DEBUG [nioEventLoopGroup-3-1] Codec(Codec.java:150) - Decode headerJson={"cmd":"LISTEN_RESPONSE","code":0,"desc":"success","seq":"8573122138","properties":{},"command":"LISTEN_RESPONSE"}
2023-08-23 19:00:43,631 INFO  [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:48) - |receive|type=LISTEN_RESPONSE|msg=org.apache.eventmesh.common.protocol.tcp.Package@13383e23
2023-08-23 19:00:43,631 ERROR [nioEventLoopGroup-3-1] AbstractEventMeshTCPSubHandler(AbstractEventMeshTCPSubHandler.java:66) - msg ignored|LISTEN_RESPONSE|org.apache.eventmesh.common.protocol.tcp.Package@13383e23
2023-08-23 19:01:13,482 INFO  [TCPClientScheduler-1] RequestContext(RequestContext.java:76) - _RequestContext|create|key=8077210517

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@fabian4
Copy link
Member

fabian4 commented Aug 23, 2023

I can't reproduce it since the meshMQAdmin plugin for rabbitmq is still in progress.

23:05:29.431 [main] ERROR org.apache.eventmesh.runtime.core.plugin.MQAdminWrapper - can't load the meshMQAdmin plugin, please check.
23:05:29.434 [main] ERROR org.apache.eventmesh.runtime.boot.EventMeshStartup - EventMesh start fail.
java.lang.RuntimeException: doesn't load the meshMQAdmin plugin, please check.
	at org.apache.eventmesh.runtime.core.plugin.MQAdminWrapper.<init>(MQAdminWrapper.java:41) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
	at org.apache.eventmesh.runtime.admin.handler.TopicHandler.<init>(TopicHandler.java:77) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
	at org.apache.eventmesh.runtime.admin.controller.ClientManageController.initClientHandler(ClientManageController.java:152) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
	at org.apache.eventmesh.runtime.admin.controller.ClientManageController.start(ClientManageController.java:117) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
	at org.apache.eventmesh.runtime.boot.EventMeshServer.start(EventMeshServer.java:169) ~[eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
	at org.apache.eventmesh.runtime.boot.EventMeshStartup.main(EventMeshStartup.java:40) [eventmesh-runtime-1.9.0-release.jar:1.9.0-release]
	at org.apache.eventmesh.starter.StartUp.main(StartUp.java:25) [main/:?]

> Task :eventmesh-starter:StartUp.main() FAILED

Maybe you can provide a valid branch for this scenario or the eventmesh-conncetor-rabbitmq may do some help. #4393

BTW I think it will be more clear if you can provide the detail log in org.apache.eventmesh.storage.rabbitmq.consumer.RabbitmqConsumer.

@Pil0tXia
Copy link
Member Author

Pil0tXia commented Aug 24, 2023

@fabian4 Thank you for your attention. You may merge PR #4395 to resolve the startup failure you mentioned and proceed to reach the bug of this PR.

BTW, there is no message production/consumption relationship between the connector module and the storage-plugin module.

@pandaapo
Copy link
Member

pandaapo commented Aug 24, 2023

You may merge PR #4395 to resolve the startup failure you mentioned and proceed to reach the bug of this PR.

Do you mean that the current master code will not encounter this bug? If that's the case, I suggest you wait until PR 4395 is merged before opening the issue. Otherwise, it will be confusing.

BTW, there is no message production/consumption relationship between the connector module and the storage-plugin module.

Community is moving the storage module to the connector module. The storage module may be removed in the future.

fabian4 added a commit to fabian4/eventmesh that referenced this issue Aug 24, 2023
@fabian4
Copy link
Member

fabian4 commented Aug 24, 2023

After checkout https://github.com/Pil0tXia/eventmesh/tree/pil0txia_feat_4390 in #4395

Eventmesh-storage-rabbitmq still works when producing and consuming messages.

my env: https://github.com/fabian4/eventmesh/tree/pil0txia_feat_4390

image

image

fabian4 added a commit to fabian4/eventmesh that referenced this issue Aug 24, 2023
@pandaapo
Copy link
Member

pandaapo commented Aug 24, 2023

Was it fixed by your modifications here? @fabian4

3f1783f

@fabian4
Copy link
Member

fabian4 commented Aug 24, 2023

I think the problem is in producer. We should bind the channel with routeKey and exchangeName when we init the producer connection otherwise there is no message in the queue to consume. (But we actually do bind it when we init the consumer connection. @Pil0tXia

image

@Override
public void init(Properties properties) throws Exception {
this.rabbitmqClient = new RabbitmqClient(rabbitmqConnectionFactory);
this.connection = rabbitmqClient.getConnection(configurationHolder.getHost(), configurationHolder.getUsername(),
configurationHolder.getPasswd(), configurationHolder.getPort(), configurationHolder.getVirtualHost());
this.channel = rabbitmqConnectionFactory.createChannel(connection);
}

@Override
public void subscribe(String topic) {
rabbitmqClient.binding(channel, configurationHolder.getExchangeType(), configurationHolder.getExchangeName(),
configurationHolder.getRoutingKey(), configurationHolder.getQueueName());
executor.execute(rabbitmqConsumerHandler);
}

@Override
public void start() throws Exception {
if (!started) {
rabbitmqClient.binding(channel, sinkConfig.getConnectorConfig().getExchangeType(), sinkConfig.getConnectorConfig().getExchangeName(),
sinkConfig.getConnectorConfig().getRoutingKey(), sinkConfig.getConnectorConfig().getQueueName());
started = true;
}
}

@Override
public void start() throws Exception {
if (!started) {
rabbitmqClient.binding(channel, sourceConfig.getConnectorConfig().getExchangeType(), sourceConfig.getConnectorConfig().getExchangeName(),
sourceConfig.getConnectorConfig().getRoutingKey(), sourceConfig.getConnectorConfig().getQueueName());
executor.execute(this.rabbitMQSourceHandler);
started = true;
}
}


Was it fixed by your modifications here? @fabian4
3f1783f

@pandaapo This is to roll back the change I have made when I do some local test. It's irrelevant.

Copy link
Contributor

github-actions bot commented Apr 9, 2024

It has been 90 days since the last activity on this issue. Apache EventMesh values the voices of the community. Please don't hesitate to share your latest insights on this matter at any time, as the community is more than willing to engage in discussions regarding the development and optimization directions of this feature.

If you feel that your issue has been resolved, please feel free to close it. Should you have any additional information to share, you are welcome to reopen this issue.

@github-actions github-actions bot added the Stale label Apr 9, 2024
@Pil0tXia Pil0tXia added help wanted Extra attention is needed and removed Stale labels Apr 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

3 participants