Skip to content

Commit

Permalink
Merge pull request #60 from nats-io/request-n-responses-bugfix
Browse files Browse the repository at this point in the history
Fix request with multiple responses dropping messages
  • Loading branch information
wallyqs authored Oct 23, 2018
2 parents f271ca8 + 43bf45e commit f1173f4
Showing 1 changed file with 5 additions and 14 deletions.
19 changes: 5 additions & 14 deletions nats/io/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,10 +710,12 @@ def wait_for_msgs():
msg = yield sub.pending_queue.get()
if msg is None:
break
sub.pending_size -= len(msg.data)

sub.received += 1
sub.pending_size -= len(msg.data)
if sub.max_msgs > 0 and sub.received >= sub.max_msgs:
# If we have hit the max for delivered msgs, remove sub.
self._subs.pop(sub.sid, None)
self._remove_subscription(sub)

# Invoke depending of type of handler.
Expand All @@ -724,18 +726,12 @@ def wait_for_msgs():
# should be processed.
self._loop.spawn_callback(sub.cb, msg)
else:
# Call it and take the possible future in the loop.
yield sub.cb(msg)
except Exception as e:
# All errors from calling an async subscriber
# handler are async errors.
if err_cb is not None:
yield err_cb(e)
finally:
if sub.max_msgs > 0 and sub.received >= sub.max_msgs:
# If we have hit the max for delivered msgs, remove sub.
self._remove_subscription(sub)
break

# Bind the subscription and error cb if present
wait_for_msgs.sub = sub
Expand Down Expand Up @@ -879,17 +875,13 @@ def _process_msg(self, sid, subject, reply, data):
sub = self._subs.get(sid)
if sub is None:
raise tornado.gen.Return()
sub.received += 1

if sub.max_msgs > 0 and sub.received >= sub.max_msgs:
# Enough messages so can throwaway subscription now.
self._subs.pop(sid, None)

# Check if it is an old style request.
if sub.future is not None:
sub.future.set_result(msg)

# Discard subscription since done
self._subs.pop(sid, None)
self._remove_subscription(sub)
raise tornado.gen.Return()

Expand All @@ -906,8 +898,7 @@ def _process_msg(self, sid, subject, reply, data):
if self._error_cb is not None:
yield self._error_cb(ErrSlowConsumer())
raise tornado.gen.Return()

yield sub.pending_queue.put_nowait(msg)
sub.pending_queue.put_nowait(msg)
except tornado.queues.QueueFull:
if self._error_cb is not None:
yield self._error_cb(ErrSlowConsumer())
Expand Down

0 comments on commit f1173f4

Please sign in to comment.