Skip to content

Commit

Permalink
Refactor timers used in Nominator
Browse files Browse the repository at this point in the history
Use enums to identify timers within an array of timers. Just create them
in the constructor and arm them when needed. Stop all timers on
shutdown.
  • Loading branch information
hewison-chris committed Feb 11, 2022
1 parent 921147c commit 87f499d
Showing 1 changed file with 48 additions and 35 deletions.
83 changes: 48 additions & 35 deletions source/agora/consensus/protocol/Nominator.d
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import std.container : DList;
import std.conv;
import std.format;
import std.path : buildPath;
import std.range : assumeSorted;
import std.range : assumeSorted, chain;
import core.time;

// TODO: The block should probably have a size limit rather than a maximum
Expand Down Expand Up @@ -110,12 +110,24 @@ public extern (C++) class Nominator : SCPDriver
/// Last height that we finished the nomination round
private Height heighest_ballot_height;

/// Periodic nomination timer. It runs every second and checks the clock
/// time to see if it's time to start nominating. We do not use the
/// `BlockInterval` interval directly because this makes the timer
/// succeptible to clock drift. Instead, the clock is checked every second.
/// Note that Clock network synchronization is not yet implemented.
private ITimer nomination_timer;
///
protected enum TimersIdx
{
/// Periodic nomination timer. It runs every second and checks the clock
/// time to see if it's time to start nominating. We do not use the
/// `BlockInterval` interval directly because this makes the timer
/// succeptible to clock drift. Instead, the clock is checked every second.
/// Note that Clock network synchronization is not yet implemented.
Nomination,
/// Timer used for processing queued incoming SCP Envelope messages
Envelope,
/// Timer passed in from FullNode or Validator to trigger block,
/// signature and tx catchup
Catchup,
}

/// Timers this node has started not including the SCP slot timers
protected ITimer[TimersIdx.max + 1] timers;

/// SCPEnvelopeStore instance
protected SCPEnvelopeStore store;
Expand Down Expand Up @@ -148,15 +160,9 @@ public extern (C++) class Nominator : SCPDriver
/// List of incoming SCPEnvelopes that need to be processed
private DList!SCPEnvelope queued_envelopes;

/// Timer used for processing queued incoming SCP Envelope messages
private ITimer envelope_timer;

/// Envelope process task delay
private enum EnvTaskDelay = 10.msecs;

// Timer used to enable catchup in background as task
private ITimer catchup_timer;

/// catchup task delay
private enum CatchupTaskDelay = 10.msecs;

Expand Down Expand Up @@ -204,11 +210,11 @@ extern(D):
this.ledger = ledger;
this.enroll_man = enroll_man;
this.store = new SCPEnvelopeStore(cacheDB);
// Create and stop timer immediately
this.envelope_timer = this.taskman.setTimer(EnvTaskDelay,
&this.envelopeProcessTask, Periodic.No);
this.envelope_timer.stop();
this.catchup_timer = catchup_timer;
// Create stopped timers
this.timers[TimersIdx.Envelope] = this.taskman.createTimer(&this.envelopeProcessTask);
this.timers[TimersIdx.Nomination] = this.taskman.createTimer(&this.checkNominate);
this.timers[TimersIdx.Catchup] = catchup_timer; // Created in caller

// Find the node id of this validator and create an SCPObject
Hash[] utxo_keys;
this.enroll_man.getEnrolledUTXOs(Height(1), utxo_keys);
Expand All @@ -220,10 +226,14 @@ extern(D):
this.acceptBlock = externalize;
}

/// Shut down the envelope processing timer
/// Shut down the timers
public void shutdown () @safe
{
this.envelope_timer.stop();
this.timers[].chain(this.active_timers[]).each!((t)
{
if (t !is null)
t.stop();
});
}

/// Processes incoming queued envelopes
Expand Down Expand Up @@ -346,13 +356,11 @@ extern(D):

public void startNominatingTimer () @trusted nothrow
{
if (this.nomination_timer !is null) // already running
return;

// For unittests we don't want to wait 1 second between checks
log.info("Starting nominating timer..");
this.nomination_timer = this.taskman.setTimer(this.nomination_interval,
&this.checkNominate, Periodic.Yes);
if (this.timers[TimersIdx.Nomination] is null)
this.timers[TimersIdx.Nomination] = this.taskman.createTimer(&this.checkNominate);

this.timers[TimersIdx.Nomination].rearm(this.nomination_interval, false);
}

/***************************************************************************
Expand All @@ -365,10 +373,11 @@ extern(D):

public void stopNominatingTimer () @safe nothrow
{
if (this.nomination_timer !is null)
log.info("Stopping nominating timer.");
if (this.timers[TimersIdx.Nomination] !is null)
{
this.nomination_timer.stop();
this.nomination_timer = null;
this.timers[TimersIdx.Nomination].stop();
this.timers[TimersIdx.Nomination] = null;
}
}

Expand Down Expand Up @@ -462,6 +471,10 @@ extern(D):

protected void checkNominate () @safe
{
scope (exit)
{
this.timers[TimersIdx.Nomination].rearm(this.nomination_interval, false);
}
const slot_idx = this.ledger.height() + 1;
const cur_time = this.clock.networkTime();
const next_nomination = this.getExpectedBlockTime();
Expand All @@ -478,7 +491,7 @@ extern(D):
this.log.trace(
"checkNominate(): Last block ({}) doesn't have majority signatures, signed={}",
this.ledger.height(), this.ledger.lastBlock().header.validators);
this.catchup_timer.rearm(CatchupTaskDelay, false);
this.timers[TimersIdx.Catchup].rearm(CatchupTaskDelay, false);
return;
}

Expand Down Expand Up @@ -577,7 +590,7 @@ extern(D):
if (!proc)
{
this.queued_envelopes.insertBack(envelope.serializeFull.deserializeFull!SCPEnvelope());
this.envelope_timer.rearm(EnvTaskDelay, Periodic.No);
this.timers[TimersIdx.Envelope].rearm(EnvTaskDelay, false);
continue;
}
auto shared_env = this.wrapEnvelope(envelope);
Expand Down Expand Up @@ -608,7 +621,7 @@ extern(D):
this.queued_envelopes.insertBack(copied);
else
this.queued_envelopes.insertFront(copied);
this.envelope_timer.rearm(EnvTaskDelay, Periodic.No);
this.timers[TimersIdx.Envelope].rearm(EnvTaskDelay, false);
this.seen_envs.put(env_hash);
}
}
Expand All @@ -626,7 +639,7 @@ extern(D):
{
// ignore messages if `startNominatingTimer` was never called or
// if `stopNominatingTimer` was called
if (this.nomination_timer is null)
if (this.timers[TimersIdx.Nomination] is null)
return;

const Block last_block = this.ledger.lastBlock();
Expand Down Expand Up @@ -913,13 +926,13 @@ extern(D):
if (fail_reason == this.ledger.InvalidConsensusDataReason.NotInPool)
{
log.trace("validateValue(): This node can not yet fully validate this value: {}. Data: {}", fail_reason, data.prettify);
this.catchup_timer.rearm(CatchupTaskDelay, false);
this.timers[TimersIdx.Catchup].rearm(CatchupTaskDelay, false);
return ValidationLevel.kMaybeValidValue;
}
if (fail_reason == this.ledger.InvalidConsensusDataReason.MisMatchingCoinbase)
{
log.error("validateValue(): Validation failed: {}. Will check for missing signatures", fail_reason);
this.catchup_timer.rearm(CatchupTaskDelay, false);
this.timers[TimersIdx.Catchup].rearm(CatchupTaskDelay, false);
}
else
{
Expand Down

0 comments on commit 87f499d

Please sign in to comment.