Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus] Add MinBlockTime to delay mempool reaping #924

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8608bce
[WIP] Add MinBlockTime
red-0ne Jul 21, 2023
4a25b62
[Consensus] Feat: Configurable min block production time
red-0ne Jul 25, 2023
1028ae9
[Consensus] Refactor: decouple timer registration from subscription
red-0ne Jul 26, 2023
a7fd743
address review comments
red-0ne Jul 31, 2023
867e841
[Docs] Update development docs to warn to not use the changelog hook …
h5law Jul 24, 2023
0d448c9
[IBC] chore: Rename FlushAllEntries => FlushCachesToStore (#934)
h5law Jul 24, 2023
accccfc
[Utility] Feat: add client-side session cache (#888)
adshmh Jul 25, 2023
3165b8d
[IBC] Clone `cosmos/ics23` protobuf definitions into IBC repo (#922)
h5law Jul 26, 2023
990321e
[CLI] Consistent config/flag parsing & common helpers (#891)
bryanchriswhite Jul 26, 2023
21d7024
[IBC] Change Events to not have a Height field and use uint64 in quer…
h5law Jul 26, 2023
c67fa14
[IBC] Add ICS-02 Client Interfaces (#932)
h5law Jul 26, 2023
db8d8d6
[Persistence] Adds `node` subcommand to CLI (#935)
dylanlott Jul 26, 2023
74a5816
[IBC] chore: enable IBC module in k8s validators (#941)
h5law Jul 27, 2023
950ccc3
[Utility] Use TreeStore as source of truth (#937)
h5law Jul 27, 2023
d3bf0ad
[IBC] Enable validators and thus IBC host creation in K8s (#942)
h5law Jul 28, 2023
c903ca1
[Utility] Create trustless_relay_validation.md (#938)
adshmh Jul 31, 2023
298b08f
[Persistence] Adds atomic Update for TreeStore (#861)
dylanlott Jul 31, 2023
a68af5c
[chore] Replaces multierr usage with go native errors package (#939)
dylanlott Jul 31, 2023
0941549
hack: 😴 sleep enough for cli debug subcommands to broadcast (#954)
0xBigBoss Jul 31, 2023
50f8846
DevLog 12 (#957)
Olshansk Aug 1, 2023
e0e9fd4
[Utility] servicer signs relays (#952)
adshmh Aug 1, 2023
2a226cc
[LocalNet] Fix metrics scraping (#940)
okdas Aug 1, 2023
6c7599e
prevent sending to closed channels
red-0ne Aug 2, 2023
92ece19
disable block preparation delay when manual mode is on
red-0ne Aug 4, 2023
fef4217
[E2E Test] Utilities for State Sync Test (#874)
Olshansk Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions consensus/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Implement minimum block production pace by delaying block preparation
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

## [0.0.0.54] - 2023-06-13

- Fix tests
Expand Down
50 changes: 50 additions & 0 deletions consensus/e2e_tests/pacemaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,56 @@ func forcePacemakerTimeout(t *testing.T, clockMock *clock.Mock, paceMakerTimeout
advanceTime(t, clockMock, paceMakerTimeout+10*time.Millisecond)
}

func TestPacemakerMinBlockTime(t *testing.T) {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// Test preparation
clockMock := clock.NewMock()
timeReminder(t, clockMock, time.Second)

// UnitTestNet configs
paceMakerTimeoutMsec := uint64(10000)
consensusMessageTimeout := time.Duration(paceMakerTimeoutMsec / 5) // Must be smaller than pacemaker timeout because we expect a deterministic number of consensus messages.
paceMakerMinBlockTimeMsec := uint64(5000) // Make sure it is larger than the consensus message timeout
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
runtimeMgrs := GenerateNodeRuntimeMgrs(t, numValidators, clockMock)
for _, runtimeConfig := range runtimeMgrs {
consCfg := runtimeConfig.GetConfig().Consensus.PacemakerConfig
consCfg.TimeoutMsec = paceMakerTimeoutMsec
consCfg.MinBlockTimeMsec = paceMakerMinBlockTimeMsec
}
buses := GenerateBuses(t, runtimeMgrs)

// Create & start test pocket nodes
eventsChannel := make(modules.EventsChannel, 100)
pocketNodes := CreateTestConsensusPocketNodes(t, buses, eventsChannel)
err := StartAllTestPocketNodes(t, pocketNodes)
require.NoError(t, err)

// Debug message to start consensus by triggering next view
for _, pocketNode := range pocketNodes {
TriggerNextView(t, pocketNode)
}

consMod := pocketNodes[1].GetBus().GetConsensusModule()

newRoundMessages, err := waitForProposalMsgs(t, clockMock, eventsChannel, pocketNodes, 1, uint8(consensus.NewRound), 0, 0, numValidators*numValidators, consensusMessageTimeout, true)
require.NoError(t, err)
whenBroadcast := clockMock.Now()
broadcastMessages(t, newRoundMessages, pocketNodes)
finishedBroadcast := uint64(clockMock.Now().Sub(whenBroadcast).Milliseconds())
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
beforePrepareTimeout := time.Duration((paceMakerMinBlockTimeMsec - finishedBroadcast) * uint64(time.Millisecond))

step := typesCons.HotstuffStep(consMod.CurrentStep())
require.Equal(t, consensus.NewRound, step)

advanceTime(t, clockMock, clock.Duration(beforePrepareTimeout))
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
step = typesCons.HotstuffStep(consMod.CurrentStep())
// Should still be blocking proposal step
require.Equal(t, consensus.NewRound, step)

//advanceTime(t, clockMock, 8000*time.Millisecond)
//step = typesCons.HotstuffStep(consMod.CurrentStep())
//require.Equal(t, consensus.Prepare, step)
}

// TODO: Implement these tests and use them as a starting point for new ones. Consider using ChatGPT to help you out :)

func TestPacemakerDifferentHeightsCatchup(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions consensus/hotstuff_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
return
}

// DISCUSS: Do we need to pause for `MinBlockFreqMSec` here to let more transactions or should we stick with optimistic responsiveness?

if err := m.didReceiveEnoughMessageForStep(NewRound); err != nil {
m.logger.Info().Fields(hotstuffMsgToLoggingFields(msg)).Msgf("⏳ Waiting ⏳for more messages; %s", err.Error())
return
Expand Down Expand Up @@ -64,6 +62,11 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM
// TODO: Add test to make sure same block is not applied twice if round is interrupted after being 'Applied'.
// TODO: Add more unit tests for these checks...
if m.shouldPrepareNewBlock(highPrepareQC) {
doPrepare := m.paceMaker.ProcessDelayedBlockPrepare()
if !doPrepare {
m.logger.Info().Msg("skip prepare new block")
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
return
}
block, err := m.prepareBlock(highPrepareQC)
if err != nil {
m.logger.Error().Err(err).Msg(typesCons.ErrPrepareBlock.Error())
Expand Down
90 changes: 87 additions & 3 deletions consensus/pacemaker/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pacemaker
import (
"context"
"fmt"
"sync"
"time"

consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry"
Expand Down Expand Up @@ -38,6 +39,7 @@ type Pacemaker interface {
PacemakerDebug

ShouldHandleMessage(message *typesCons.HotstuffMessage) (bool, error)
ProcessDelayedBlockPrepare() bool

RestartTimer()
NewHeight()
Expand All @@ -48,9 +50,10 @@ type pacemaker struct {
base_modules.IntegrableModule
base_modules.InterruptableModule

pacemakerCfg *configs.PacemakerConfig
roundTimeout time.Duration
roundCancelFunc context.CancelFunc
pacemakerCfg *configs.PacemakerConfig
roundTimeout time.Duration
roundCancelFunc context.CancelFunc
latestPrepareRequest latestPrepareRequest

// Only used for development and debugging.
debug pacemakerDebug
Expand All @@ -60,6 +63,15 @@ type pacemaker struct {
logPrefix string
}

// Structure to handle delaying block preparation (reaping the block mempool)
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
type latestPrepareRequest struct {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
m sync.Mutex
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
ch chan bool
cancelFunc context.CancelFunc
blockProposed bool
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
deadlinePassed bool
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}

func CreatePacemaker(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(pacemaker).Create(bus, options...)
}
Expand All @@ -85,13 +97,21 @@ func (*pacemaker) Create(bus modules.Bus, options ...modules.ModuleOption) (modu
debugTimeBetweenStepsMsec: m.pacemakerCfg.GetDebugTimeBetweenStepsMsec(),
quorumCertificate: nil,
}
m.latestPrepareRequest = latestPrepareRequest{
m: sync.Mutex{},
ch: nil,
cancelFunc: nil,
blockProposed: false,
deadlinePassed: false,
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}

return m, nil
}

func (m *pacemaker) Start() error {
m.logger = logger.Global.CreateLoggerForModule(m.GetModuleName())
m.RestartTimer()
m.RestartBlockProposalTimer()
return nil
}

Expand Down Expand Up @@ -171,6 +191,7 @@ func (m *pacemaker) ShouldHandleMessage(msg *typesCons.HotstuffMessage) (bool, e

func (m *pacemaker) RestartTimer() {
// NOTE: Not deferring a cancel call because this function is asynchronous.
// DISCUSS: Should we have a lock to manipulate m.roundCancelFunc?
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
if m.roundCancelFunc != nil {
m.roundCancelFunc()
}
Expand All @@ -191,6 +212,45 @@ func (m *pacemaker) RestartTimer() {
}()
}

func (m *pacemaker) RestartBlockProposalTimer() {
m.latestPrepareRequest.m.Lock()
defer m.latestPrepareRequest.m.Unlock()

if m.latestPrepareRequest.ch != nil {
m.latestPrepareRequest.ch <- false
}

if m.latestPrepareRequest.cancelFunc != nil {
m.latestPrepareRequest.cancelFunc()
}

m.latestPrepareRequest.blockProposed = false
m.latestPrepareRequest.deadlinePassed = false

ctx, cancel := context.WithCancel(context.TODO())
m.latestPrepareRequest.cancelFunc = cancel
clock := m.GetBus().GetRuntimeMgr().GetClock()
minBlockTime := time.Duration(m.pacemakerCfg.MinBlockTimeMsec * uint64(time.Millisecond))

go func() {
select {
case <-ctx.Done():
return
case <-clock.After(minBlockTime):
m.latestPrepareRequest.m.Lock()
defer m.latestPrepareRequest.m.Unlock()

if m.latestPrepareRequest.ch != nil {
m.latestPrepareRequest.blockProposed = true
m.latestPrepareRequest.ch <- true
m.latestPrepareRequest.ch = nil
}

m.latestPrepareRequest.deadlinePassed = true
}
}()
}

func (m *pacemaker) InterruptRound(reason string) {
defer m.RestartTimer()

Expand Down Expand Up @@ -225,6 +285,7 @@ func (m *pacemaker) InterruptRound(reason string) {

func (m *pacemaker) NewHeight() {
defer m.RestartTimer()
defer m.RestartBlockProposalTimer()

consensusMod := m.GetBus().GetConsensusModule()
consensusMod.ResetRound(true)
Expand All @@ -243,6 +304,29 @@ func (m *pacemaker) NewHeight() {
)
}

func (m *pacemaker) ProcessDelayedBlockPrepare() bool {
m.latestPrepareRequest.m.Lock()

if m.latestPrepareRequest.blockProposed {
return false
}

if m.latestPrepareRequest.deadlinePassed {
return true
}

// there is already a block preparer candidate, we cancel it and start a new one
// DISCUSS: This is needed if we want to get the latest QC for the block proposal.
if m.latestPrepareRequest.ch != nil {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
m.latestPrepareRequest.ch <- false
}

m.latestPrepareRequest.ch = make(chan bool)
m.latestPrepareRequest.m.Unlock()

return <-m.latestPrepareRequest.ch
}

func (m *pacemaker) startNextView(qc *typesCons.QuorumCertificate, forceNextView bool) {
defer m.RestartTimer()

Expand Down
1 change: 1 addition & 0 deletions runtime/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func NewDefaultConfig(options ...func(*Config)) *Config {
TimeoutMsec: defaults.DefaultPacemakerTimeoutMsec,
Manual: defaults.DefaultPacemakerManual,
DebugTimeBetweenStepsMsec: defaults.DefaultPacemakerDebugTimeBetweenStepsMsec,
MinBlockTimeMsec: defaults.DefaultPacemakerMinBlockTimeMsec,
},
},
Utility: &UtilityConfig{
Expand Down
1 change: 1 addition & 0 deletions runtime/configs/proto/consensus_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ message PacemakerConfig {
uint64 timeout_msec = 1;
bool manual = 2;
uint64 debug_time_between_steps_msec = 3;
uint64 min_block_time_msec = 4;
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions runtime/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
DefaultPacemakerTimeoutMsec = uint64(10000)
DefaultPacemakerManual = true
DefaultPacemakerDebugTimeBetweenStepsMsec = uint64(1000)
DefaultPacemakerMinBlockTimeMsec = uint64(5000)
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// utility
DefaultUtilityMaxMempoolTransactionBytes = uint64(1024 ^ 3) // 1GB V0 defaults
DefaultUtilityMaxMempoolTransactions = uint32(9000)
Expand Down
2 changes: 2 additions & 0 deletions runtime/docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Add a new MinBlockTimeMsec config field to consensus
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

## [0.0.0.44] - 2023-06-26

- Add a new ServiceConfig field to servicer config
Expand Down
1 change: 1 addition & 0 deletions runtime/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,7 @@ func TestNewManagerFromReaders(t *testing.T) {
TimeoutMsec: 10000,
Manual: true,
DebugTimeBetweenStepsMsec: 1000,
MinBlockTimeMsec: 5000,
},
ServerModeEnabled: true,
},
Expand Down