Skip to content

Commit

Permalink
Introduce LocalRestConnectionClosed exception
Browse files Browse the repository at this point in the history
This allows tests to gracefully handle
peer nodes shutting down.
  • Loading branch information
AndrejMitrovic committed Mar 11, 2021
1 parent aa890bd commit 3da4383
Showing 1 changed file with 91 additions and 9 deletions.
100 changes: 91 additions & 9 deletions source/geod24/LocalRest.d
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ import std.traits : fullyQualifiedName, Parameters, ReturnType;
import core.thread;
import core.time;

/// Thrown when the connection to a peer closed (peer node shut down)
public final class LocalRestConnectionClosed : Exception
{
this () @safe pure nothrow @nogc
{
super("Connection with peer closed");
}
}

/// Data sent by the caller
private struct Command
{
Expand Down Expand Up @@ -498,13 +507,21 @@ public final class Timer
// Run a delegate after timeout, and until this.periodic is false
private void run ()
{
do
try
{
sleep(timeout);
if (this.stopped)
return;
dg();
} while (this.periodic);
do
{
sleep(timeout);
if (this.stopped)
return;
dg();
} while (this.periodic);
}
catch (LocalRestConnectionClosed)
{
import std.stdio;
writeln("LocalRest: Connection to peer lost. Timer shut down.");
}
}

/// Stop the timer. The next time this timer's fiber wakes up
Expand Down Expand Up @@ -1130,7 +1147,7 @@ public final class RemoteAPI (API, alias S = VibeJSONSerializer!()) : API
auto command = Command(this.conn.getNextCommandId(), ovrld.mangleof, SerializedData(serialized));

if(!this.conn.sendCommand(command))
throw new Exception("Connection with peer closed");
throw new LocalRestConnectionClosed();
return this.conn.waitResponse(command.id, this.timeout);
}();

Expand Down Expand Up @@ -2075,9 +2092,8 @@ unittest
node.myping(69);
assert(0);
}
catch (Exception ex)
catch (LocalRestConnectionClosed ex)
{
assert(ex.msg == "Connection with peer closed");
}
}

Expand Down Expand Up @@ -2503,3 +2519,69 @@ unittest
assert(extnode.required() == 42);
assertThrown!ClientException(extnode.optional());
}

/// Graceful handling of shut down nodes
unittest
{
static import geod24.concurrency;
import core.atomic : atomicLoad, atomicStore;

static interface API
{
void test () @safe;
}

static shared bool connection_closed;
__gshared Listener!API api_1;
__gshared Listener!API api_2;

static class Node1 : API
{
Timer timer;
RemoteAPI!API api;

@trusted:
public override void test ()
{
this.api = new RemoteAPI!API(api_2);
this.timer = setTimer(10.msecs, &infinite, true);
}

public void infinite () @trusted
{
try
{
this.api.test();
} // can handle it explicitly, or let the base class handle it
catch (LocalRestConnectionClosed ex)
{
atomicStore(connection_closed, true);
throw ex; // base class will kill the timer
}
}
}

static class Node2 : API
{
@safe:
public override void test ()
{
}
}

scope (exit) thread_joinAll();

auto node_1 = RemoteAPI!API.spawn!Node1();
auto node_2 = RemoteAPI!API.spawn!Node2();
api_1 = node_1.ctrl.listener();
api_2 = node_2.ctrl.listener();

node_1.test();
node_2.ctrl.shutdown();
Thread.sleep(1.seconds); // trigger calling shut down node
node_1.ctrl.shutdown();

size_t count; // up to 3 seconds wait
while (!atomicLoad(connection_closed))
assert(count < 300); count++; Thread.sleep(10.msecs);
}

0 comments on commit 3da4383

Please sign in to comment.