Skip to content

Commit

Permalink
Mark connection as readable and read data on connect/reconnect
Browse files Browse the repository at this point in the history
Prior to this commit the CustomTCPConnectionHandler logic did not
set the connection as readable and/or try to read data when a new
connection or re-established connection occurred. This resulted in
the behavior seen in WallarooLabs/wally#2281
as part of the 0.5.0 release of Wallaroo.

This commit properly sets the connection as readable and tries to
read data on a connect/reconnect. It also combines the connect and
reconnect logic into a single if statement to avoid duplicating
the same logic to make maintenance a bit easier.
  • Loading branch information
dipinhora committed Jul 20, 2018
1 parent a10e66b commit 69bde55
Showing 1 changed file with 7 additions and 28 deletions.
35 changes: 7 additions & 28 deletions pony-kafka/customnet/custom_tcp_connection_handler.pony
Original file line number Diff line number Diff line change
Expand Up @@ -333,34 +333,10 @@ class CustomTCPConnectionHandler is TCPConnectionHandler
var fd = @pony_asio_event_fd(event)
_connect_count = _connect_count - 1

if not _connected and not _closed then
// We don't have a connection yet.
if @pony_os_connected[Bool](fd) then
// The connection was successful, make it ours.
_fd = fd
_event = event
_connected = true
_writeable = true

notify.connected(_conn)
_queue_read()

// Don't call _complete_writes, as Windows will see this as a
// closed connection.
ifdef not windows then
if _pending_writes() then
//sent all data; release backpressure
_release_backpressure()
end
end
else
// The connection failed, unsubscribe the event and close.
@pony_asio_event_unsubscribe(event)
@pony_os_socket_close[None](fd)
_notify_connecting()
end
elseif not _connected and _closed then
@printf[I32]("Reconnection asio event\n".cstring())
if (not _connected and not _closed)
or (not _connected and _closed) then
// We don't have a connection yet
// or we have a re-connection after a disonnect.

if @pony_os_connected[Bool](fd) then
// The connection was successful, make it ours.
Expand All @@ -369,19 +345,22 @@ class CustomTCPConnectionHandler is TCPConnectionHandler

// clear anything pending to be sent because on recovery we're
// going to have to replay from our queue when requested
// for new connections this is effectively a no-op
_pending_writev.clear()
_pending.clear()
_pending_writev_total = 0

_connected = true
_writeable = true
_readable = true

_closed = false
_shutdown = false
_shutdown_peer = false

notify.connected(_conn)
_queue_read()
pending_reads()

// Don't call _complete_writes, as Windows will see this as a
// closed connection.
Expand Down

0 comments on commit 69bde55

Please sign in to comment.