From 3da4383b7e7ca386636777fd53cb5b007c1cf6ee Mon Sep 17 00:00:00 2001 From: Andrej Mitrovic Date: Thu, 11 Mar 2021 16:36:59 +0900 Subject: [PATCH] Introduce LocalRestConnectionClosed exception This allows tests to gracefully handle peer nodes shutting down. --- source/geod24/LocalRest.d | 100 ++++++++++++++++++++++++++++++++++---- 1 file changed, 91 insertions(+), 9 deletions(-) diff --git a/source/geod24/LocalRest.d b/source/geod24/LocalRest.d index 8523cf69..de2e649d 100644 --- a/source/geod24/LocalRest.d +++ b/source/geod24/LocalRest.d @@ -110,6 +110,15 @@ import std.traits : fullyQualifiedName, Parameters, ReturnType; import core.thread; import core.time; +/// Thrown when the connection to a peer closed (peer node shut down) +public final class LocalRestConnectionClosed : Exception +{ + this () @safe pure nothrow @nogc + { + super("Connection with peer closed"); + } +} + /// Data sent by the caller private struct Command { @@ -498,13 +507,21 @@ public final class Timer // Run a delegate after timeout, and until this.periodic is false private void run () { - do + try { - sleep(timeout); - if (this.stopped) - return; - dg(); - } while (this.periodic); + do + { + sleep(timeout); + if (this.stopped) + return; + dg(); + } while (this.periodic); + } + catch (LocalRestConnectionClosed) + { + import std.stdio; + writeln("LocalRest: Connection to peer lost. Timer shut down."); + } } /// Stop the timer. The next time this timer's fiber wakes up @@ -1130,7 +1147,7 @@ public final class RemoteAPI (API, alias S = VibeJSONSerializer!()) : API auto command = Command(this.conn.getNextCommandId(), ovrld.mangleof, SerializedData(serialized)); if(!this.conn.sendCommand(command)) - throw new Exception("Connection with peer closed"); + throw new LocalRestConnectionClosed(); return this.conn.waitResponse(command.id, this.timeout); }(); @@ -2075,9 +2092,8 @@ unittest node.myping(69); assert(0); } - catch (Exception ex) + catch (LocalRestConnectionClosed ex) { - assert(ex.msg == "Connection with peer closed"); } } @@ -2503,3 +2519,69 @@ unittest assert(extnode.required() == 42); assertThrown!ClientException(extnode.optional()); } + +/// Graceful handling of shut down nodes +unittest +{ + static import geod24.concurrency; + import core.atomic : atomicLoad, atomicStore; + + static interface API + { + void test () @safe; + } + + static shared bool connection_closed; + __gshared Listener!API api_1; + __gshared Listener!API api_2; + + static class Node1 : API + { + Timer timer; + RemoteAPI!API api; + + @trusted: + public override void test () + { + this.api = new RemoteAPI!API(api_2); + this.timer = setTimer(10.msecs, &infinite, true); + } + + public void infinite () @trusted + { + try + { + this.api.test(); + } // can handle it explicitly, or let the base class handle it + catch (LocalRestConnectionClosed ex) + { + atomicStore(connection_closed, true); + throw ex; // base class will kill the timer + } + } + } + + static class Node2 : API + { + @safe: + public override void test () + { + } + } + + scope (exit) thread_joinAll(); + + auto node_1 = RemoteAPI!API.spawn!Node1(); + auto node_2 = RemoteAPI!API.spawn!Node2(); + api_1 = node_1.ctrl.listener(); + api_2 = node_2.ctrl.listener(); + + node_1.test(); + node_2.ctrl.shutdown(); + Thread.sleep(1.seconds); // trigger calling shut down node + node_1.ctrl.shutdown(); + + size_t count; // up to 3 seconds wait + while (!atomicLoad(connection_closed)) + assert(count < 300); count++; Thread.sleep(10.msecs); +}