From 5c0b782d8e427a957cfdab26086d57d304abc9c7 Mon Sep 17 00:00:00 2001 From: Andrej Mitrovic Date: Thu, 11 Mar 2021 16:36:59 +0900 Subject: [PATCH] Introduce LocalRestConnectionClosed exception This allows tests to gracefully handle peer nodes shutting down. --- source/geod24/LocalRest.d | 94 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 91 insertions(+), 3 deletions(-) diff --git a/source/geod24/LocalRest.d b/source/geod24/LocalRest.d index 8523cf69..64e49286 100644 --- a/source/geod24/LocalRest.d +++ b/source/geod24/LocalRest.d @@ -110,6 +110,15 @@ import std.traits : fullyQualifiedName, Parameters, ReturnType; import core.thread; import core.time; +/// Thrown when the connection to a peer closed (peer node shut down) +public final class LocalRestConnectionClosed : Exception +{ + this () @safe pure nothrow @nogc + { + super("Connection with peer closed"); + } +} + /// Data sent by the caller private struct Command { @@ -1130,7 +1139,7 @@ public final class RemoteAPI (API, alias S = VibeJSONSerializer!()) : API auto command = Command(this.conn.getNextCommandId(), ovrld.mangleof, SerializedData(serialized)); if(!this.conn.sendCommand(command)) - throw new Exception("Connection with peer closed"); + throw new LocalRestConnectionClosed(); return this.conn.waitResponse(command.id, this.timeout); }(); @@ -2075,9 +2084,8 @@ unittest node.myping(69); assert(0); } - catch (Exception ex) + catch (LocalRestConnectionClosed ex) { - assert(ex.msg == "Connection with peer closed"); } } @@ -2503,3 +2511,83 @@ unittest assert(extnode.required() == 42); assertThrown!ClientException(extnode.optional()); } + +/// Graceful handling of shut down nodes +unittest +{ + static import geod24.concurrency; + import core.atomic : atomicLoad, atomicStore; + + static interface API + { + void test () @safe; + void shutdown () @safe; + } + + static shared bool connection_closed; + __gshared Listener!API api_1; + __gshared Listener!API api_2; + + static class Node1 : API + { + Timer timer; + RemoteAPI!API api; + + @trusted: + public override void test () + { + this.api = new RemoteAPI!API(api_2); + this.timer = setTimer(10.msecs, &infinite, true); + } + + public override void shutdown () + { + if (this.timer !is null) + this.timer.stop(); + } + + public void infinite () @trusted + { + try + { + this.api.test(); + } + catch (LocalRestConnectionClosed) + { + atomicStore(connection_closed, true); + } + } + } + + static class Node2 : API + { + @safe: + public override void test () + { + } + + public override void shutdown () + { + } + } + + scope (exit) thread_joinAll(); + + auto node_1 = RemoteAPI!API.spawn!Node1(); + auto node_2 = RemoteAPI!API.spawn!Node2(); + api_1 = node_1.ctrl.listener(); + api_2 = node_2.ctrl.listener(); + + node_1.test(); + node_2.ctrl.shutdown(); + Thread.sleep(1.seconds); // trigger calling shut down node + node_1.ctrl.shutdown(); + + size_t count; + while (!atomicLoad(connection_closed)) + { + assert(count < 300); // up to 3 seconds wait + count++; + Thread.sleep(10.msecs); + } +}