Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2772 - part A: Refactor timers in Nominator #3024

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 48 additions & 35 deletions source/agora/consensus/protocol/Nominator.d
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import std.container : DList;
import std.conv;
import std.format;
import std.path : buildPath;
import std.range : assumeSorted;
import std.range : assumeSorted, chain;
import core.time;

// TODO: The block should probably have a size limit rather than a maximum
Expand Down Expand Up @@ -110,12 +110,24 @@ public extern (C++) class Nominator : SCPDriver
/// Last height that we finished the nomination round
private Height heighest_ballot_height;

/// Periodic nomination timer. It runs every second and checks the clock
/// time to see if it's time to start nominating. We do not use the
/// `BlockInterval` interval directly because this makes the timer
/// succeptible to clock drift. Instead, the clock is checked every second.
/// Note that Clock network synchronization is not yet implemented.
private ITimer nomination_timer;
///
protected enum TimersIdx
{
/// Periodic nomination timer. It runs every second and checks the clock
/// time to see if it's time to start nominating. We do not use the
/// `BlockInterval` interval directly because this makes the timer
/// succeptible to clock drift. Instead, the clock is checked every second.
/// Note that Clock network synchronization is not yet implemented.
Nomination,
/// Timer used for processing queued incoming SCP Envelope messages
Envelope,
/// Timer passed in from FullNode or Validator to trigger block,
/// signature and tx catchup
Catchup,
}

/// Timers this node has started not including the SCP slot timers
protected ITimer[TimersIdx.max + 1] timers;

/// SCPEnvelopeStore instance
protected SCPEnvelopeStore store;
Expand Down Expand Up @@ -148,15 +160,9 @@ public extern (C++) class Nominator : SCPDriver
/// List of incoming SCPEnvelopes that need to be processed
private DList!SCPEnvelope queued_envelopes;

/// Timer used for processing queued incoming SCP Envelope messages
private ITimer envelope_timer;

/// Envelope process task delay
private enum EnvTaskDelay = 10.msecs;

// Timer used to enable catchup in background as task
private ITimer catchup_timer;

/// catchup task delay
private enum CatchupTaskDelay = 10.msecs;

Expand Down Expand Up @@ -204,11 +210,11 @@ extern(D):
this.ledger = ledger;
this.enroll_man = enroll_man;
this.store = new SCPEnvelopeStore(cacheDB);
// Create and stop timer immediately
this.envelope_timer = this.taskman.setTimer(EnvTaskDelay,
&this.envelopeProcessTask, Periodic.No);
this.envelope_timer.stop();
this.catchup_timer = catchup_timer;
// Create stopped timers
this.timers[TimersIdx.Envelope] = this.taskman.createTimer(&this.envelopeProcessTask);
this.timers[TimersIdx.Nomination] = this.taskman.createTimer(&this.checkNominate);
this.timers[TimersIdx.Catchup] = catchup_timer; // Created in caller

// Find the node id of this validator and create an SCPObject
Hash[] utxo_keys;
this.enroll_man.getEnrolledUTXOs(Height(1), utxo_keys);
Expand All @@ -220,10 +226,14 @@ extern(D):
this.acceptBlock = externalize;
}

/// Shut down the envelope processing timer
/// Shut down the timers
public void shutdown () @safe
{
this.envelope_timer.stop();
this.timers[].chain(this.active_timers[]).each!((t)
{
if (t !is null)
t.stop();
});
}

/// Processes incoming queued envelopes
Expand Down Expand Up @@ -346,13 +356,11 @@ extern(D):

public void startNominatingTimer () @trusted nothrow
{
if (this.nomination_timer !is null) // already running
return;

// For unittests we don't want to wait 1 second between checks
log.info("Starting nominating timer..");
this.nomination_timer = this.taskman.setTimer(this.nomination_interval,
&this.checkNominate, Periodic.Yes);
if (this.timers[TimersIdx.Nomination] is null)
this.timers[TimersIdx.Nomination] = this.taskman.createTimer(&this.checkNominate);

this.timers[TimersIdx.Nomination].rearm(this.nomination_interval, false);
}

/***************************************************************************
Expand All @@ -365,10 +373,11 @@ extern(D):

public void stopNominatingTimer () @safe nothrow
{
if (this.nomination_timer !is null)
log.info("Stopping nominating timer.");
if (this.timers[TimersIdx.Nomination] !is null)
{
this.nomination_timer.stop();
this.nomination_timer = null;
this.timers[TimersIdx.Nomination].stop();
this.timers[TimersIdx.Nomination] = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually dont need to null it here, we can reuse the same timer every slot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try to remove it but we compare it to null here:

private void handleSCPEnvelope (in SCPEnvelope envelope) @trusted
{
// ignore messages if `startNominatingTimer` was never called or
// if `stopNominatingTimer` was called
if (this.nomination_timer is null)
return;

I don't think we can check if the timer is stopped.
Maybe pending could be used but I think it may be false when actually running the task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then feel free to merge when CI is green

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, rebased on a green upstream 🤞

}
}

Expand Down Expand Up @@ -462,6 +471,10 @@ extern(D):

protected void checkNominate () @safe
{
scope (exit)
{
this.timers[TimersIdx.Nomination].rearm(this.nomination_interval, false);
}
const slot_idx = this.ledger.height() + 1;
const cur_time = this.clock.networkTime();
const next_nomination = this.getExpectedBlockTime();
Expand All @@ -478,7 +491,7 @@ extern(D):
this.log.trace(
"checkNominate(): Last block ({}) doesn't have majority signatures, signed={}",
this.ledger.height(), this.ledger.lastBlock().header.validators);
this.catchup_timer.rearm(CatchupTaskDelay, false);
this.timers[TimersIdx.Catchup].rearm(CatchupTaskDelay, false);
return;
}

Expand Down Expand Up @@ -577,7 +590,7 @@ extern(D):
if (!proc)
{
this.queued_envelopes.insertBack(envelope.serializeFull.deserializeFull!SCPEnvelope());
this.envelope_timer.rearm(EnvTaskDelay, Periodic.No);
this.timers[TimersIdx.Envelope].rearm(EnvTaskDelay, false);
continue;
}
auto shared_env = this.wrapEnvelope(envelope);
Expand Down Expand Up @@ -608,7 +621,7 @@ extern(D):
this.queued_envelopes.insertBack(copied);
else
this.queued_envelopes.insertFront(copied);
this.envelope_timer.rearm(EnvTaskDelay, Periodic.No);
this.timers[TimersIdx.Envelope].rearm(EnvTaskDelay, false);
this.seen_envs.put(env_hash);
}
}
Expand All @@ -626,7 +639,7 @@ extern(D):
{
// ignore messages if `startNominatingTimer` was never called or
// if `stopNominatingTimer` was called
if (this.nomination_timer is null)
if (this.timers[TimersIdx.Nomination] is null)
return;

const Block last_block = this.ledger.lastBlock();
Expand Down Expand Up @@ -913,13 +926,13 @@ extern(D):
if (fail_reason == this.ledger.InvalidConsensusDataReason.NotInPool)
{
log.trace("validateValue(): This node can not yet fully validate this value: {}. Data: {}", fail_reason, data.prettify);
this.catchup_timer.rearm(CatchupTaskDelay, false);
this.timers[TimersIdx.Catchup].rearm(CatchupTaskDelay, false);
return ValidationLevel.kMaybeValidValue;
}
if (fail_reason == this.ledger.InvalidConsensusDataReason.MisMatchingCoinbase)
{
log.error("validateValue(): Validation failed: {}. Will check for missing signatures", fail_reason);
this.catchup_timer.rearm(CatchupTaskDelay, false);
this.timers[TimersIdx.Catchup].rearm(CatchupTaskDelay, false);
}
else
{
Expand Down