Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #1681: remove unsupported AMQP link detach handling #1682

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,8 @@ typedef void (*qdr_link_second_attach_t) (void *context,
* @param error Error record if the detach is the result of an error condition, null otherwise
* @param first True if this is the first detach (i.e. initiated outbound), False if this is the
* the response to a remotely initiated detach
* @param close True if this is a link close, False if this is a link detach
*/
typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close);
typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error, bool first);

/**
* qdr_link_flow_t callback
Expand Down
7 changes: 2 additions & 5 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1917,7 +1917,7 @@ static void CORE_close_connection(void *context, qdr_connection_t *qdr_conn, qdr
}
}

static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
if (!qlink)
Expand Down Expand Up @@ -1949,10 +1949,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
}
}

if (close)
qd_link_close(qlink);
else
qd_link_detach(qlink);
qd_link_close(qlink);

//
// This is the last event for this link that we are going to send into Proton.
Expand Down
29 changes: 18 additions & 11 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,6 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
break;

case PN_LINK_REMOTE_DETACH :
case PN_LINK_REMOTE_CLOSE :
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
pn_link = pn_event_link(event);
Expand Down Expand Up @@ -642,14 +641,31 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
break;

case PN_LINK_LOCAL_DETACH:
case PN_LINK_LOCAL_CLOSE:
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
break;

case PN_LINK_REMOTE_DETACH:
case PN_LINK_LOCAL_DETACH:
// The router does not support detaching and reattaching links as described in the AMQP 1.0 Specification
// Section 2.6.4 Detaching And Reattaching A Link. If the remote attempts this it is an error - force the
// connection closed.
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
pn_condition_t *cond = pn_connection_condition(conn);
pn_condition_set_name(cond, QD_AMQP_COND_NOT_IMPLEMENTED);
pn_condition_set_description(cond, "Link detach/reattach not supported");
pn_connection_close(conn);

pn_link = pn_event_link(event);
qd_link = (qd_link_t*) pn_link_get_context(pn_link);
qd_log(LOG_CONTAINER, QD_LOG_ERROR,
"[C%"PRIu64"][L%"PRIu64"] Error: Link detach/reattach not supported, connection closed",
qd_conn->connection_id, qd_link ? qd_link->link_id : 0);
}
break;

case PN_LINK_FLOW :
pn_link = pn_event_link(event);
Expand Down Expand Up @@ -895,15 +911,6 @@ void qd_link_close(qd_link_t *link)
}


void qd_link_detach(qd_link_t *link)
{
if (link->pn_link) {
pn_link_detach(link->pn_link);
pn_link_close(link->pn_link);
}
}


/** sending link has entered Q3 flow control */
void qd_link_q3_block(qd_link_t *link)
{
Expand Down
1 change: 0 additions & 1 deletion src/adaptors/amqp/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ pn_terminus_t *qd_link_target(qd_link_t *link);
pn_terminus_t *qd_link_remote_source(qd_link_t *link);
pn_terminus_t *qd_link_remote_target(qd_link_t *link);
void qd_link_close(qd_link_t *link);
void qd_link_detach(qd_link_t *link);
void qd_link_free(qd_link_t *link);
void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context);
void qd_link_q3_block(qd_link_t *link);
Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -2194,7 +2194,7 @@ static void CORE_second_attach(void *context,
}


static void CORE_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close)
static void CORE_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first)
{
}

Expand Down
4 changes: 1 addition & 3 deletions src/router_core/connections.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,7 @@ int qdr_connection_process(qdr_connection_t *conn)
case QDR_LINK_WORK_FIRST_DETACH :
case QDR_LINK_WORK_SECOND_DETACH :
conn->protocol_adaptor->detach_handler(conn->protocol_adaptor->user_context, link, link_work->error,
link_work->work_type == QDR_LINK_WORK_FIRST_DETACH,
link_work->close_link);
link_work->work_type == QDR_LINK_WORK_FIRST_DETACH);
detach_sent = true;
break;
}
Expand Down Expand Up @@ -1286,7 +1285,6 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t

link->detach_count += 1;
qdr_link_work_t *work = qdr_link_work(link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH);
work->close_link = close;

if (error)
work->error = error;
Expand Down
1 change: 0 additions & 1 deletion src/router_core/router_core_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ typedef struct qdr_link_work_t {
qdr_error_t *error;
int value;
qdr_link_work_drain_action_t drain_action;
bool close_link;
bool processing;
} qdr_link_work_t;

Expand Down
58 changes: 58 additions & 0 deletions tests/system_tests_one_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,16 @@ def test_50_extension_capabilities(self):
self.assertIn(symbol('ANONYMOUS-RELAY'), rc)
self.assertIn(symbol('qd.streaming-links'), rc)

def test_51_unsupported_link_reattach(self):
"""
The router does not support reattaching detached links. Ensure that
clients attempting to detach/reattach are detected and handled as an
connection error
"""
test = LinkReattachTest(self.address)
test.run()
self.assertIsNone(test.error)


class Entity:
def __init__(self, status_code, status_description, attrs):
Expand Down Expand Up @@ -3304,6 +3314,54 @@ def run(self):
sleep(0.1)


class LinkReattachTest(MessagingHandler):
"""
The router does not support link detach/re-attach. Attempt to detach a link
without closing it as if attempting to do a re-attach. This should cause
the router to force close the connection with error.
"""

def __init__(self, addr):
super(LinkReattachTest, self).__init__()
self.address = addr
self.error = None
self.conn = None
self.rx_link = None

def done(self):
if self.timer:
self.timer.cancel()
if self.conn:
self.conn.close()

def timeout(self):
self.error = "Timeout Expired"
self.done()

def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.address)
self.rx_link = event.container.create_receiver(self.conn,
source="test/reattach",
name="ReattachTest")

def on_link_opened(self, event):
if event.receiver == self.rx_link:
# Issue a non-close detach. This is the first step in the link
# reattach process
self.rx_link.detach()

def on_connection_error(self, event):
# Success if the router has force-closed due to the reattach attempt
desc = event.connection.remote_condition.description
if "reattach not supported" not in desc:
self.error = f"Unexpected error: {desc}"
self.done()

def run(self):
Container(self).run()


class DataConnectionCountTest(TestCase):
"""
Start the router with different numbers of worker threads and make sure
Expand Down