Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read handler to uv_send to detect remote socket close faster #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions src/uv_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct uvClient
char *address; /* Address of the other server */
queue pending; /* Pending send message requests */
queue queue; /* Clients queue */
char dummy_buf[8]; /* Dummy buffer for receive of data */
bool closing; /* True after calling uvClientAbort */
};

Expand Down Expand Up @@ -210,17 +211,20 @@ static int uvClientSend(struct uvClient *c, struct uvSend *send)

/* If there's no connection available, let's queue the request. */
if (c->stream == NULL) {
tracef("no connection available -> enqueue message");
tracef("no connection available to node %lld -> enqueue message",
c->id);
QUEUE_PUSH(&c->pending, &send->queue);
return 0;
}

tracef("connection available -> write message");
tracef("connection available to node %lld:%s -> write message", c->id,
c->address ? c->address : "<unknown>");
send->write.data = send;
rv = uv_write(&send->write, c->stream, send->bufs, send->n_bufs,
uvSendWriteCb);
if (rv != 0) {
tracef("write message failed -> rv %d", rv);
tracef("write message to node %lld:%s failed -> rv %d", c->id,
c->address ? c->address : "<unknown>", rv);
/* UNTESTED: what are the error conditions? perhaps ENOMEM */
return RAFT_IOERR;
}
Expand All @@ -234,7 +238,8 @@ static void uvClientSendPending(struct uvClient *c)
{
int rv;
assert(c->stream != NULL);
tracef("send pending messages");
tracef("send pending messages to node %lld:%s", c->id,
c->address ? c->address : "<unknown>");
while (!QUEUE_IS_EMPTY(&c->pending)) {
queue *head;
struct uvSend *send;
Expand All @@ -254,10 +259,35 @@ static void uvClientSendPending(struct uvClient *c)
static void uvClientTimerCb(uv_timer_t *timer)
{
struct uvClient *c = timer->data;
tracef("timer expired -> attempt to reconnect");
tracef("timer expired -> attempt to reconnect to %lld:%s", c->id,
c->address ? c->address : "<unknown>");
uvClientConnect(c); /* Retry to connect. */
}

static void uvClientAllocCb(uv_handle_t *handle,
size_t suggested_size,
uv_buf_t *buf)
{
struct uvClient *c = handle->data;
(void)suggested_size;
buf->base = c->dummy_buf;
buf->len = sizeof(c->dummy_buf);
}

static void uvClientReadCb(uv_stream_t *stream,
ssize_t nread,
const uv_buf_t *buf)
{
struct uvClient *c = stream->data;
(void)buf;

tracef("received unexpected data (%zd) from node %lld:%s", nread, c->id,
c->address ? c->address : "<unknown>");
if (!c->closing && c->stream != NULL) {
uvClientDisconnect(c);
}
}

/* Return the number of send requests that we have been parked in the send queue
* because no connection is available yet. */
static unsigned uvClientPendingCount(struct uvClient *c)
Expand All @@ -278,7 +308,8 @@ static void uvClientConnectCb(struct raft_uv_connect *req,
unsigned n_pending;
int rv;

tracef("connect attempt completed -> status %s", errCodeToString(status));
tracef("connect attempt completed to node %lld:%s -> status %s", c->id,
c->address ? c->address : "<unknown>", errCodeToString(status));

assert(c->connect.data != NULL);
assert(c->stream == NULL);
Expand Down Expand Up @@ -307,6 +338,14 @@ static void uvClientConnectCb(struct raft_uv_connect *req,
c->stream = stream;
c->n_connect_attempt = 0;
c->stream->data = c;
/* Start a read as we get write errors on some platform (e.g. docker
swarm) very late after the write buffer is totally filled up. */
if (uv_read_start(c->stream, uvClientAllocCb, uvClientReadCb) != 0) {
tracef("Cannot start read for connection to node %lld:%s", c->id,
c->address ? c->address : "<unknown>");
uvClientDisconnect(c);
return;
}
uvClientSendPending(c);
return;
}
Expand All @@ -316,7 +355,8 @@ static void uvClientConnectCb(struct raft_uv_connect *req,
if (n_pending > UV__CLIENT_MAX_PENDING) {
unsigned i;
for (i = 0; i < n_pending - UV__CLIENT_MAX_PENDING; i++) {
tracef("queue full -> evict oldest message");
tracef("queue full for node %lld:%s -> evict oldest message", c->id,
c->address ? c->address : "<unknown>");
queue *head;
struct uvSend *old_send;
struct raft_io_send *old_req;
Expand Down
Loading