Skip to content

Commit

Permalink
Add read handler to uv_send to detect remote socket close faster
Browse files Browse the repository at this point in the history
  • Loading branch information
NorbertHeusser committed Jun 21, 2024
1 parent e104569 commit 2c704ef
Showing 1 changed file with 47 additions and 7 deletions.
54 changes: 47 additions & 7 deletions src/uv_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct uvClient
char *address; /* Address of the other server */
queue pending; /* Pending send message requests */
queue queue; /* Clients queue */
char dummy_buf[8]; /* Dummy buffer for receive of data */
bool closing; /* True after calling uvClientAbort */
};

Expand Down Expand Up @@ -210,17 +211,20 @@ static int uvClientSend(struct uvClient *c, struct uvSend *send)

/* If there's no connection available, let's queue the request. */
if (c->stream == NULL) {
tracef("no connection available -> enqueue message");
tracef("no connection available to node %lld -> enqueue message",
c->id);
QUEUE_PUSH(&c->pending, &send->queue);
return 0;
}

tracef("connection available -> write message");
tracef("connection available to node %lld:%s -> write message", c->id,
c->address ? c->address : "<unknown>");
send->write.data = send;
rv = uv_write(&send->write, c->stream, send->bufs, send->n_bufs,
uvSendWriteCb);
if (rv != 0) {
tracef("write message failed -> rv %d", rv);
tracef("write message to node %lld:%s failed -> rv %d", c->id,
c->address ? c->address : "<unknown>", rv);
/* UNTESTED: what are the error conditions? perhaps ENOMEM */
return RAFT_IOERR;
}
Expand All @@ -234,7 +238,8 @@ static void uvClientSendPending(struct uvClient *c)
{
int rv;
assert(c->stream != NULL);
tracef("send pending messages");
tracef("send pending messages to node %lld:%s", c->id,
c->address ? c->address : "<unknown>");
while (!QUEUE_IS_EMPTY(&c->pending)) {
queue *head;
struct uvSend *send;
Expand All @@ -254,10 +259,35 @@ static void uvClientSendPending(struct uvClient *c)
static void uvClientTimerCb(uv_timer_t *timer)
{
struct uvClient *c = timer->data;
tracef("timer expired -> attempt to reconnect");
tracef("timer expired -> attempt to reconnect to %lld:%s", c->id,
c->address ? c->address : "<unknown>");
uvClientConnect(c); /* Retry to connect. */
}

static void uvClientAllocCb(uv_handle_t *handle,
size_t suggested_size,
uv_buf_t *buf)
{
struct uvClient *c = handle->data;
(void)suggested_size;
buf->base = c->dummy_buf;
buf->len = sizeof(c->dummy_buf);
}

static void uvClientReadCb(uv_stream_t *stream,
ssize_t nread,
const uv_buf_t *buf)
{
struct uvClient *c = stream->data;
(void)buf;

tracef("received unexpected data (%zd) from node %lld:%s", nread, c->id,
c->address ? c->address : "<unknown>");
if (!c->closing && c->stream != NULL) {
uvClientDisconnect(c);
}
}

/* Return the number of send requests that we have been parked in the send queue
* because no connection is available yet. */
static unsigned uvClientPendingCount(struct uvClient *c)
Expand All @@ -278,7 +308,8 @@ static void uvClientConnectCb(struct raft_uv_connect *req,
unsigned n_pending;
int rv;

tracef("connect attempt completed -> status %s", errCodeToString(status));
tracef("connect attempt completed to node %lld:%s -> status %s", c->id,
c->address ? c->address : "<unknown>", errCodeToString(status));

assert(c->connect.data != NULL);
assert(c->stream == NULL);
Expand Down Expand Up @@ -307,6 +338,14 @@ static void uvClientConnectCb(struct raft_uv_connect *req,
c->stream = stream;
c->n_connect_attempt = 0;
c->stream->data = c;
/* Start a read as we get write errors on some platform (e.g. docker
swarm) very late after the write buffer is totally filled up. */
if (uv_read_start(c->stream, uvClientAllocCb, uvClientReadCb) != 0) {
tracef("Cannot start read for connection to node %lld:%s", c->id,
c->address ? c->address : "<unknown>");
uvClientDisconnect(c);
return;
}
uvClientSendPending(c);
return;
}
Expand All @@ -316,7 +355,8 @@ static void uvClientConnectCb(struct raft_uv_connect *req,
if (n_pending > UV__CLIENT_MAX_PENDING) {
unsigned i;
for (i = 0; i < n_pending - UV__CLIENT_MAX_PENDING; i++) {
tracef("queue full -> evict oldest message");
tracef("queue full for node %lld:%s -> evict oldest message", c->id,
c->address ? c->address : "<unknown>");
queue *head;
struct uvSend *old_send;
struct raft_io_send *old_req;
Expand Down

0 comments on commit 2c704ef

Please sign in to comment.