diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 5cc099ab8..8bb80b3c3 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -137,9 +137,8 @@ typedef void (*qdr_link_second_attach_t) (void *context, * @param error Error record if the detach is the result of an error condition, null otherwise * @param first True if this is the first detach (i.e. initiated outbound), False if this is the * the response to a remotely initiated detach - * @param close True if this is a link close, False if this is a link detach */ -typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close); +typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error, bool first); /** * qdr_link_flow_t callback diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index d11c97bf4..b6ae307bf 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1917,7 +1917,7 @@ static void CORE_close_connection(void *context, qdr_connection_t *qdr_conn, qdr } } -static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) +static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first) { qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); if (!qlink) @@ -1949,10 +1949,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error } } - if (close) - qd_link_close(qlink); - else - qd_link_detach(qlink); + qd_link_close(qlink); // // This is the last event for this link that we are going to send into Proton. diff --git a/src/adaptors/amqp/container.c b/src/adaptors/amqp/container.c index 4c0b6b577..39599eeac 100644 --- a/src/adaptors/amqp/container.c +++ b/src/adaptors/amqp/container.c @@ -608,7 +608,6 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } break; - case PN_LINK_REMOTE_DETACH : case PN_LINK_REMOTE_CLOSE : if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) { pn_link = pn_event_link(event); @@ -642,7 +641,6 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } break; - case PN_LINK_LOCAL_DETACH: case PN_LINK_LOCAL_CLOSE: pn_link = pn_event_link(event); if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { @@ -650,6 +648,24 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } break; + case PN_LINK_REMOTE_DETACH: + case PN_LINK_LOCAL_DETACH: + // The router does not support detaching and reattaching links as described in the AMQP 1.0 Specification + // Section 2.6.4 Detaching And Reattaching A Link. If the remote attempts this it is an error - force the + // connection closed. + if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) { + pn_condition_t *cond = pn_connection_condition(conn); + pn_condition_set_name(cond, QD_AMQP_COND_NOT_IMPLEMENTED); + pn_condition_set_description(cond, "Link detach/reattach not supported"); + pn_connection_close(conn); + + pn_link = pn_event_link(event); + qd_link = (qd_link_t*) pn_link_get_context(pn_link); + qd_log(LOG_CONTAINER, QD_LOG_ERROR, + "[C%"PRIu64"][L%"PRIu64"] Error: Link detach/reattach not supported, connection closed", + qd_conn->connection_id, qd_link ? qd_link->link_id : 0); + } + break; case PN_LINK_FLOW : pn_link = pn_event_link(event); @@ -895,15 +911,6 @@ void qd_link_close(qd_link_t *link) } -void qd_link_detach(qd_link_t *link) -{ - if (link->pn_link) { - pn_link_detach(link->pn_link); - pn_link_close(link->pn_link); - } -} - - /** sending link has entered Q3 flow control */ void qd_link_q3_block(qd_link_t *link) { diff --git a/src/adaptors/amqp/container.h b/src/adaptors/amqp/container.h index 4766b7fe5..c133c9671 100644 --- a/src/adaptors/amqp/container.h +++ b/src/adaptors/amqp/container.h @@ -98,7 +98,6 @@ pn_terminus_t *qd_link_target(qd_link_t *link); pn_terminus_t *qd_link_remote_source(qd_link_t *link); pn_terminus_t *qd_link_remote_target(qd_link_t *link); void qd_link_close(qd_link_t *link); -void qd_link_detach(qd_link_t *link); void qd_link_free(qd_link_t *link); void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context); void qd_link_q3_block(qd_link_t *link); diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index c26454b9b..f4ef56ac9 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -2194,7 +2194,7 @@ static void CORE_second_attach(void *context, } -static void CORE_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) +static void CORE_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first) { } diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 0fd864519..c7977dba6 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -494,8 +494,7 @@ int qdr_connection_process(qdr_connection_t *conn) case QDR_LINK_WORK_FIRST_DETACH : case QDR_LINK_WORK_SECOND_DETACH : conn->protocol_adaptor->detach_handler(conn->protocol_adaptor->user_context, link, link_work->error, - link_work->work_type == QDR_LINK_WORK_FIRST_DETACH, - link_work->close_link); + link_work->work_type == QDR_LINK_WORK_FIRST_DETACH); detach_sent = true; break; } @@ -1286,7 +1285,6 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t link->detach_count += 1; qdr_link_work_t *work = qdr_link_work(link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH); - work->close_link = close; if (error) work->error = error; diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index b95c9f5e3..df69d8a6d 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -327,7 +327,6 @@ typedef struct qdr_link_work_t { qdr_error_t *error; int value; qdr_link_work_drain_action_t drain_action; - bool close_link; bool processing; } qdr_link_work_t; diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index 32809465c..ad7a99eb4 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -1083,6 +1083,16 @@ def test_50_extension_capabilities(self): self.assertIn(symbol('ANONYMOUS-RELAY'), rc) self.assertIn(symbol('qd.streaming-links'), rc) + def test_51_unsupported_link_reattach(self): + """ + The router does not support reattaching detached links. Ensure that + clients attempting to detach/reattach are detected and handled as an + connection error + """ + test = LinkReattachTest(self.address) + test.run() + self.assertIsNone(test.error) + class Entity: def __init__(self, status_code, status_description, attrs): @@ -3304,6 +3314,54 @@ def run(self): sleep(0.1) +class LinkReattachTest(MessagingHandler): + """ + The router does not support link detach/re-attach. Attempt to detach a link + without closing it as if attempting to do a re-attach. This should cause + the router to force close the connection with error. + """ + + def __init__(self, addr): + super(LinkReattachTest, self).__init__() + self.address = addr + self.error = None + self.conn = None + self.rx_link = None + + def done(self): + if self.timer: + self.timer.cancel() + if self.conn: + self.conn.close() + + def timeout(self): + self.error = "Timeout Expired" + self.done() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) + self.conn = event.container.connect(self.address) + self.rx_link = event.container.create_receiver(self.conn, + source="test/reattach", + name="ReattachTest") + + def on_link_opened(self, event): + if event.receiver == self.rx_link: + # Issue a non-close detach. This is the first step in the link + # reattach process + self.rx_link.detach() + + def on_connection_error(self, event): + # Success if the router has force-closed due to the reattach attempt + desc = event.connection.remote_condition.description + if "reattach not supported" not in desc: + self.error = f"Unexpected error: {desc}" + self.done() + + def run(self): + Container(self).run() + + class DataConnectionCountTest(TestCase): """ Start the router with different numbers of worker threads and make sure