Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fail to run message.requeue(); #45

Open
gzliudan opened this issue Dec 28, 2017 · 2 comments
Open

fail to run message.requeue(); #45

gzliudan opened this issue Dec 28, 2017 · 2 comments

Comments

@gzliudan
Copy link

nsq version:1.0
os:centos 7.4

codes:
public static void main( String[] args )
{
NSQLookup lookup = new DefaultNSQLookup();
lookup.addLookupAddress("192.168.1.228", 4161);
NSQConsumer consumer = new NSQConsumer(lookup, "TestTopic", "dusti", (message) -> {
System.out.println("received: " + new String(message.getMessage()));
message.finished();
message.requeue();
});

    consumer.start();
}

below is run time messages:
17:00:37.080 INFO Created connection: knowledgebase:4150 - Connection.
17:00:37.100 INFO IdentifyResponse: {"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250} - NSQFeatureDetectionHandler.channelRead0
17:00:37.101 INFO reinstall LengthFieldBasedFrameDecoder - NSQFeatureDetectionHandler.eject
17:00:37.102 INFO Server identification: {"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250} - Connection.
received: test8
received: test9
十二月 28, 2017 5:00:53 下午 io.netty.util.concurrent.SingleThreadEventExecutor runAllTasks
警告: A task raised an exception.
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at com.github.brainlag.nsq.Connection.incoming(Connection.java:129)
at com.github.brainlag.nsq.netty.NSQHandler.lambda$channelRead0$3(NSQHandler.java:41)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:402)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)

@Andy320
Copy link

Andy320 commented Feb 6, 2018

I came across the same problem when receiving messages from nsq.

The java code below

`
protected MsgConsumer(MsgConfig config, String topic) {

    NSQLookup lookup = new DefaultNSQLookup();
    lookup.addLookupAddress(config.getHost(), config.getPort());
    logger.info("start to init nsq consumer, host={},port={},topic={}", config.getHost(), config.getPort(), topic);

    NSQConsumer consumer = new NSQConsumer(lookup, topic, channel,new NSQMessageCallback (){
        @Override
        public void message(NSQMessage message) {
            try {
                message.finished();
                String msg = new String(message.getMessage(), "UTF-8");
                System.out.println("接收 rev:"+msg);
                if (Strings.isNullOrEmpty(msg)) {
                    logger.info("consume msg is empty.");
                    return;
                }
                onMessage(msg);

            } catch (Exception e) {
                message.requeue();
                logger.error("failed to consume message({}), cause: {}",
                        message, Throwables.getStackTraceAsString(e));
            }
        }
    });
    consumer.start();

}

`

The exception below
{"date":"2018-02-06T16:44:43.231","traceId":"null","sequenceId":"null","level":"WARN","appName":"box-service","class":"io.netty.util.concurrent.SingleThreadEventExecutor","method":"warn","line":"151","message":"A task raised an exception. java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at com.github.brainlag.nsq.Connection.incoming(Connection.java:129)
at com.github.brainlag.nsq.netty.NSQHandler.lambda$channelRead0$3(NSQHandler.java:41)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:402)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)

@hankchan101
Copy link

how can I fix it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants