diff --git a/src/uv_send.c b/src/uv_send.c index 04c2367d..66de5478 100644 --- a/src/uv_send.c +++ b/src/uv_send.c @@ -47,6 +47,7 @@ struct uvClient char *address; /* Address of the other server */ queue pending; /* Pending send message requests */ queue queue; /* Clients queue */ + char dummy_buf[8]; /* Dummy buffer for receive of data */ bool closing; /* True after calling uvClientAbort */ }; @@ -210,17 +211,20 @@ static int uvClientSend(struct uvClient *c, struct uvSend *send) /* If there's no connection available, let's queue the request. */ if (c->stream == NULL) { - tracef("no connection available -> enqueue message"); + tracef("no connection available to node %lld -> enqueue message", + c->id); QUEUE_PUSH(&c->pending, &send->queue); return 0; } - tracef("connection available -> write message"); + tracef("connection available to node %lld:%s -> write message", c->id, + c->address ? c->address : ""); send->write.data = send; rv = uv_write(&send->write, c->stream, send->bufs, send->n_bufs, uvSendWriteCb); if (rv != 0) { - tracef("write message failed -> rv %d", rv); + tracef("write message to node %lld:%s failed -> rv %d", c->id, + c->address ? c->address : "", rv); /* UNTESTED: what are the error conditions? perhaps ENOMEM */ return RAFT_IOERR; } @@ -234,7 +238,8 @@ static void uvClientSendPending(struct uvClient *c) { int rv; assert(c->stream != NULL); - tracef("send pending messages"); + tracef("send pending messages to node %lld:%s", c->id, + c->address ? c->address : ""); while (!QUEUE_IS_EMPTY(&c->pending)) { queue *head; struct uvSend *send; @@ -254,10 +259,35 @@ static void uvClientSendPending(struct uvClient *c) static void uvClientTimerCb(uv_timer_t *timer) { struct uvClient *c = timer->data; - tracef("timer expired -> attempt to reconnect"); + tracef("timer expired -> attempt to reconnect to %lld:%s", c->id, + c->address ? c->address : ""); uvClientConnect(c); /* Retry to connect. */ } +static void uvClientAllocCb(uv_handle_t *handle, + size_t suggested_size, + uv_buf_t *buf) +{ + struct uvClient *c = handle->data; + (void)suggested_size; + buf->base = c->dummy_buf; + buf->len = sizeof(c->dummy_buf); +} + +static void uvClientReadCb(uv_stream_t *stream, + ssize_t nread, + const uv_buf_t *buf) +{ + struct uvClient *c = stream->data; + (void)buf; + + tracef("received unexpected data (%zd) from node %lld:%s", nread, c->id, + c->address ? c->address : ""); + if (!c->closing && c->stream != NULL) { + uvClientDisconnect(c); + } +} + /* Return the number of send requests that we have been parked in the send queue * because no connection is available yet. */ static unsigned uvClientPendingCount(struct uvClient *c) @@ -278,7 +308,8 @@ static void uvClientConnectCb(struct raft_uv_connect *req, unsigned n_pending; int rv; - tracef("connect attempt completed -> status %s", errCodeToString(status)); + tracef("connect attempt completed to node %lld:%s -> status %s", c->id, + c->address ? c->address : "", errCodeToString(status)); assert(c->connect.data != NULL); assert(c->stream == NULL); @@ -307,6 +338,14 @@ static void uvClientConnectCb(struct raft_uv_connect *req, c->stream = stream; c->n_connect_attempt = 0; c->stream->data = c; + /* Start a read as we get write errors on some platform (e.g. docker + swarm) very late after the write buffer is totally filled up. */ + if (uv_read_start(c->stream, uvClientAllocCb, uvClientReadCb) != 0) { + tracef("Cannot start read for connection to node %lld:%s", c->id, + c->address ? c->address : ""); + uvClientDisconnect(c); + return; + } uvClientSendPending(c); return; } @@ -316,7 +355,8 @@ static void uvClientConnectCb(struct raft_uv_connect *req, if (n_pending > UV__CLIENT_MAX_PENDING) { unsigned i; for (i = 0; i < n_pending - UV__CLIENT_MAX_PENDING; i++) { - tracef("queue full -> evict oldest message"); + tracef("queue full for node %lld:%s -> evict oldest message", c->id, + c->address ? c->address : ""); queue *head; struct uvSend *old_send; struct raft_io_send *old_req;