diff --git a/source/agora/consensus/protocol/Nominator.d b/source/agora/consensus/protocol/Nominator.d index 670fa9c42db..60818916270 100644 --- a/source/agora/consensus/protocol/Nominator.d +++ b/source/agora/consensus/protocol/Nominator.d @@ -60,7 +60,7 @@ import std.container : DList; import std.conv; import std.format; import std.path : buildPath; -import std.range : assumeSorted; +import std.range : assumeSorted, chain; import core.time; // TODO: The block should probably have a size limit rather than a maximum @@ -110,12 +110,24 @@ public extern (C++) class Nominator : SCPDriver /// Last height that we finished the nomination round private Height heighest_ballot_height; - /// Periodic nomination timer. It runs every second and checks the clock - /// time to see if it's time to start nominating. We do not use the - /// `BlockInterval` interval directly because this makes the timer - /// succeptible to clock drift. Instead, the clock is checked every second. - /// Note that Clock network synchronization is not yet implemented. - private ITimer nomination_timer; + /// + protected enum TimersIdx + { + /// Periodic nomination timer. It runs every second and checks the clock + /// time to see if it's time to start nominating. We do not use the + /// `BlockInterval` interval directly because this makes the timer + /// succeptible to clock drift. Instead, the clock is checked every second. + /// Note that Clock network synchronization is not yet implemented. + Nomination, + /// Timer used for processing queued incoming SCP Envelope messages + Envelope, + /// Timer passed in from FullNode or Validator to trigger block, + /// signature and tx catchup + Catchup, + } + + /// Timers this node has started not including the SCP slot timers + protected ITimer[TimersIdx.max + 1] timers; /// SCPEnvelopeStore instance protected SCPEnvelopeStore store; @@ -148,15 +160,9 @@ public extern (C++) class Nominator : SCPDriver /// List of incoming SCPEnvelopes that need to be processed private DList!SCPEnvelope queued_envelopes; - /// Timer used for processing queued incoming SCP Envelope messages - private ITimer envelope_timer; - /// Envelope process task delay private enum EnvTaskDelay = 10.msecs; - // Timer used to enable catchup in background as task - private ITimer catchup_timer; - /// catchup task delay private enum CatchupTaskDelay = 10.msecs; @@ -204,11 +210,11 @@ extern(D): this.ledger = ledger; this.enroll_man = enroll_man; this.store = new SCPEnvelopeStore(cacheDB); - // Create and stop timer immediately - this.envelope_timer = this.taskman.setTimer(EnvTaskDelay, - &this.envelopeProcessTask, Periodic.No); - this.envelope_timer.stop(); - this.catchup_timer = catchup_timer; + // Create stopped timers + this.timers[TimersIdx.Envelope] = this.taskman.createTimer(&this.envelopeProcessTask); + this.timers[TimersIdx.Nomination] = this.taskman.createTimer(&this.checkNominate); + this.timers[TimersIdx.Catchup] = catchup_timer; // Created in caller + // Find the node id of this validator and create an SCPObject Hash[] utxo_keys; this.enroll_man.getEnrolledUTXOs(Height(1), utxo_keys); @@ -220,10 +226,14 @@ extern(D): this.acceptBlock = externalize; } - /// Shut down the envelope processing timer + /// Shut down the timers public void shutdown () @safe { - this.envelope_timer.stop(); + this.timers[].chain(this.active_timers[]).each!((t) + { + if (t !is null) + t.stop(); + }); } /// Processes incoming queued envelopes @@ -346,13 +356,11 @@ extern(D): public void startNominatingTimer () @trusted nothrow { - if (this.nomination_timer !is null) // already running - return; - - // For unittests we don't want to wait 1 second between checks log.info("Starting nominating timer.."); - this.nomination_timer = this.taskman.setTimer(this.nomination_interval, - &this.checkNominate, Periodic.Yes); + if (this.timers[TimersIdx.Nomination] is null) + this.timers[TimersIdx.Nomination] = this.taskman.createTimer(&this.checkNominate); + + this.timers[TimersIdx.Nomination].rearm(this.nomination_interval, false); } /*************************************************************************** @@ -365,10 +373,11 @@ extern(D): public void stopNominatingTimer () @safe nothrow { - if (this.nomination_timer !is null) + log.info("Stopping nominating timer."); + if (this.timers[TimersIdx.Nomination] !is null) { - this.nomination_timer.stop(); - this.nomination_timer = null; + this.timers[TimersIdx.Nomination].stop(); + this.timers[TimersIdx.Nomination] = null; } } @@ -462,6 +471,10 @@ extern(D): protected void checkNominate () @safe { + scope (exit) + { + this.timers[TimersIdx.Nomination].rearm(this.nomination_interval, false); + } const slot_idx = this.ledger.height() + 1; const cur_time = this.clock.networkTime(); const next_nomination = this.getExpectedBlockTime(); @@ -478,7 +491,7 @@ extern(D): this.log.trace( "checkNominate(): Last block ({}) doesn't have majority signatures, signed={}", this.ledger.height(), this.ledger.lastBlock().header.validators); - this.catchup_timer.rearm(CatchupTaskDelay, false); + this.timers[TimersIdx.Catchup].rearm(CatchupTaskDelay, false); return; } @@ -577,7 +590,7 @@ extern(D): if (!proc) { this.queued_envelopes.insertBack(envelope.serializeFull.deserializeFull!SCPEnvelope()); - this.envelope_timer.rearm(EnvTaskDelay, Periodic.No); + this.timers[TimersIdx.Envelope].rearm(EnvTaskDelay, false); continue; } auto shared_env = this.wrapEnvelope(envelope); @@ -608,7 +621,7 @@ extern(D): this.queued_envelopes.insertBack(copied); else this.queued_envelopes.insertFront(copied); - this.envelope_timer.rearm(EnvTaskDelay, Periodic.No); + this.timers[TimersIdx.Envelope].rearm(EnvTaskDelay, false); this.seen_envs.put(env_hash); } } @@ -626,7 +639,7 @@ extern(D): { // ignore messages if `startNominatingTimer` was never called or // if `stopNominatingTimer` was called - if (this.nomination_timer is null) + if (this.timers[TimersIdx.Nomination] is null) return; const Block last_block = this.ledger.lastBlock(); @@ -913,13 +926,13 @@ extern(D): if (fail_reason == this.ledger.InvalidConsensusDataReason.NotInPool) { log.trace("validateValue(): This node can not yet fully validate this value: {}. Data: {}", fail_reason, data.prettify); - this.catchup_timer.rearm(CatchupTaskDelay, false); + this.timers[TimersIdx.Catchup].rearm(CatchupTaskDelay, false); return ValidationLevel.kMaybeValidValue; } if (fail_reason == this.ledger.InvalidConsensusDataReason.MisMatchingCoinbase) { log.error("validateValue(): Validation failed: {}. Will check for missing signatures", fail_reason); - this.catchup_timer.rearm(CatchupTaskDelay, false); + this.timers[TimersIdx.Catchup].rearm(CatchupTaskDelay, false); } else {