Skip to content

Commit

Permalink
Drain rx_chain before closing the connection
Browse files Browse the repository at this point in the history
PUBLISHED_FROM=08eee4052dd9bbc364875a577409cb78665dee30
  • Loading branch information
dimonomid authored and cesantabot committed Dec 29, 2017
1 parent 1ce8adf commit 4eef669
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 32 deletions.
15 changes: 14 additions & 1 deletion common/platforms/lwip/mg_lwip_ev_mgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void mg_ev_mgr_lwip_process_signals(struct mg_mgr *mgr) {
break;
}
case MG_SIG_CLOSE_CONN: {
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
nc->flags |= MG_F_SEND_AND_CLOSE;
mg_close_conn(nc);
break;
}
Expand Down Expand Up @@ -173,6 +173,19 @@ time_t mg_lwip_if_poll(struct mg_iface *iface, int timeout_ms) {
}
num_timers++;
}

if (nc->sock != INVALID_SOCKET) {
/* Try to consume data from cs->rx_chain */
mg_lwip_consume_rx_chain_tcp(nc);

/*
* If the connection is about to close, and rx_chain is finally empty,
* send the MG_SIG_CLOSE_CONN signal
*/
if (cs->draining_rx_chain && cs->rx_chain == NULL) {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
}
}
}
#if 0
DBG(("end poll @%u, %d conns, %d timers (min %u), next in %d ms",
Expand Down
41 changes: 27 additions & 14 deletions common/platforms/lwip/mg_lwip_net_if.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,16 @@ static err_t mg_lwip_tcp_recv_cb(void *arg, struct tcp_pcb *tpcb,
DBG(("%p %p %u %d", nc, tpcb, (p != NULL ? p->tot_len : 0), err));
if (p == NULL) {
if (nc != NULL && !(nc->flags & MG_F_CLOSE_IMMEDIATELY)) {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (cs->rx_chain != NULL) {
/*
* rx_chain still contains non-consumed data, don't close the
* connection
*/
cs->draining_rx_chain = 1;
} else {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
}
} else {
/* Tombstoned connection, do nothing. */
}
Expand Down Expand Up @@ -172,23 +181,12 @@ static err_t mg_lwip_tcp_recv_cb(void *arg, struct tcp_pcb *tpcb,
return ERR_OK;
}

static void mg_lwip_handle_recv_tcp(struct mg_connection *nc) {
static void mg_lwip_consume_rx_chain_tcp(struct mg_connection *nc) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;

#if MG_ENABLE_SSL
if (nc->flags & MG_F_SSL) {
if (nc->flags & MG_F_SSL_HANDSHAKE_DONE) {
mg_lwip_ssl_recv(nc);
} else {
mg_lwip_ssl_do_hs(nc);
}
return;
}
#endif

mgos_lock();
while (cs->rx_chain != NULL && nc->recv_mbuf.len < nc->recv_mbuf_limit) {
struct pbuf *seg = cs->rx_chain;

size_t seg_len = (seg->len - cs->rx_offset);
size_t buf_avail = (nc->recv_mbuf_limit - nc->recv_mbuf.len);
size_t len = MIN(seg_len, buf_avail);
Expand All @@ -211,6 +209,21 @@ static void mg_lwip_handle_recv_tcp(struct mg_connection *nc) {
mgos_lock();
}
mgos_unlock();
}

static void mg_lwip_handle_recv_tcp(struct mg_connection *nc) {
#if MG_ENABLE_SSL
if (nc->flags & MG_F_SSL) {
if (nc->flags & MG_F_SSL_HANDSHAKE_DONE) {
mg_lwip_ssl_recv(nc);
} else {
mg_lwip_ssl_do_hs(nc);
}
return;
}
#endif

mg_lwip_consume_rx_chain_tcp(nc);

if (nc->send_mbuf.len > 0) {
mg_lwip_mgr_schedule_poll(nc->mgr);
Expand Down
4 changes: 3 additions & 1 deletion common/platforms/lwip/mg_lwip_net_if.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ struct mg_lwip_conn_state {
/* Last SSL write size, for retries. */
int last_ssl_write_size;
/* Whether MG_SIG_RECV is already pending for this connection */
int recv_pending;
int recv_pending : 1;
/* Whether the connection is about to close, just `rx_chain` needs to drain */
int draining_rx_chain : 1;
};

enum mg_sig_type {
Expand Down
60 changes: 44 additions & 16 deletions mongoose/mongoose.c
Original file line number Diff line number Diff line change
Expand Up @@ -14792,7 +14792,9 @@ struct mg_lwip_conn_state {
/* Last SSL write size, for retries. */
int last_ssl_write_size;
/* Whether MG_SIG_RECV is already pending for this connection */
int recv_pending;
int recv_pending : 1;
/* Whether the connection is about to close, just `rx_chain` needs to drain */
int draining_rx_chain : 1;
};

enum mg_sig_type {
Expand Down Expand Up @@ -14951,7 +14953,16 @@ static err_t mg_lwip_tcp_recv_cb(void *arg, struct tcp_pcb *tpcb,
DBG(("%p %p %u %d", nc, tpcb, (p != NULL ? p->tot_len : 0), err));
if (p == NULL) {
if (nc != NULL && !(nc->flags & MG_F_CLOSE_IMMEDIATELY)) {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (cs->rx_chain != NULL) {
/*
* rx_chain still contains non-consumed data, don't close the
* connection
*/
cs->draining_rx_chain = 1;
} else {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
}
} else {
/* Tombstoned connection, do nothing. */
}
Expand Down Expand Up @@ -14988,23 +14999,12 @@ static err_t mg_lwip_tcp_recv_cb(void *arg, struct tcp_pcb *tpcb,
return ERR_OK;
}

static void mg_lwip_handle_recv_tcp(struct mg_connection *nc) {
static void mg_lwip_consume_rx_chain_tcp(struct mg_connection *nc) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;

#if MG_ENABLE_SSL
if (nc->flags & MG_F_SSL) {
if (nc->flags & MG_F_SSL_HANDSHAKE_DONE) {
mg_lwip_ssl_recv(nc);
} else {
mg_lwip_ssl_do_hs(nc);
}
return;
}
#endif

mgos_lock();
while (cs->rx_chain != NULL && nc->recv_mbuf.len < nc->recv_mbuf_limit) {
struct pbuf *seg = cs->rx_chain;

size_t seg_len = (seg->len - cs->rx_offset);
size_t buf_avail = (nc->recv_mbuf_limit - nc->recv_mbuf.len);
size_t len = MIN(seg_len, buf_avail);
Expand All @@ -15027,6 +15027,21 @@ static void mg_lwip_handle_recv_tcp(struct mg_connection *nc) {
mgos_lock();
}
mgos_unlock();
}

static void mg_lwip_handle_recv_tcp(struct mg_connection *nc) {
#if MG_ENABLE_SSL
if (nc->flags & MG_F_SSL) {
if (nc->flags & MG_F_SSL_HANDSHAKE_DONE) {
mg_lwip_ssl_recv(nc);
} else {
mg_lwip_ssl_do_hs(nc);
}
return;
}
#endif

mg_lwip_consume_rx_chain_tcp(nc);

if (nc->send_mbuf.len > 0) {
mg_lwip_mgr_schedule_poll(nc->mgr);
Expand Down Expand Up @@ -15647,7 +15662,7 @@ void mg_ev_mgr_lwip_process_signals(struct mg_mgr *mgr) {
break;
}
case MG_SIG_CLOSE_CONN: {
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
nc->flags |= MG_F_SEND_AND_CLOSE;
mg_close_conn(nc);
break;
}
Expand Down Expand Up @@ -15758,6 +15773,19 @@ time_t mg_lwip_if_poll(struct mg_iface *iface, int timeout_ms) {
}
num_timers++;
}

if (nc->sock != INVALID_SOCKET) {
/* Try to consume data from cs->rx_chain */
mg_lwip_consume_rx_chain_tcp(nc);

/*
* If the connection is about to close, and rx_chain is finally empty,
* send the MG_SIG_CLOSE_CONN signal
*/
if (cs->draining_rx_chain && cs->rx_chain == NULL) {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
}
}
}
#if 0
DBG(("end poll @%u, %d conns, %d timers (min %u), next in %d ms",
Expand Down

0 comments on commit 4eef669

Please sign in to comment.