Skip to content

Commit

Permalink
Implement indexing service
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 17, 2024
1 parent b393e97 commit a580285
Show file tree
Hide file tree
Showing 36 changed files with 729 additions and 3,080 deletions.
2 changes: 1 addition & 1 deletion .gitlab/common.gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ variables:
NO_COLOR: '\e[0m'
SECTION: '\e[0K'
PRODUCTION_IMAGE: ${CI_REGISTRY_IMAGE}:${CI_COMMIT_REF_SLUG}
GO_CI_VERSION: '1.22'
GO_CI_VERSION: '1.23'
GO_CI_IMAGE: ${CI_REGISTRY_IMAGE}/ci-golang:${GO_CI_VERSION}

build-image:golang:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22 as build
FROM golang:1.23 as build

ARG GIT_DESCRIBE
ARG GIT_COMMIT
Expand Down
2 changes: 1 addition & 1 deletion cmd/accumulated-bootstrap/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22 as build
FROM golang:1.23 as build

# Build
WORKDIR /root
Expand Down
2 changes: 1 addition & 1 deletion cmd/accumulated-faucet/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22 as build
FROM golang:1.23 as build

# Build
WORKDIR /root
Expand Down
2 changes: 1 addition & 1 deletion cmd/accumulated-http/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22 as build
FROM golang:3 as build

ARG GIT_DESCRIBE
ARG GIT_COMMIT
Expand Down
8 changes: 7 additions & 1 deletion cmd/accumulated/run/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"gitlab.com/accumulatenetwork/accumulate/internal/core/events"
execute "gitlab.com/accumulatenetwork/accumulate/internal/core/execute/multi"
"gitlab.com/accumulatenetwork/accumulate/internal/database"
"gitlab.com/accumulatenetwork/accumulate/internal/database/indexing"
"gitlab.com/accumulatenetwork/accumulate/internal/logging"
"gitlab.com/accumulatenetwork/accumulate/internal/node/abci"
accumulated "gitlab.com/accumulatenetwork/accumulate/internal/node/daemon"
Expand Down Expand Up @@ -431,13 +432,15 @@ func (c *CoreConsensusApp) start(inst *Instance, d *tendermint) (types.Applicati
return nil, err
}

db := database.New(store, d.logger)
bli := indexing.NewBlockLedgerIndexer(inst.context, db, c.Partition.ID)

dialer := inst.p2p.DialNetwork()
client := &message.Client{Transport: &message.RoutedTransport{
Network: inst.config.Network,
Dialer: dialer,
Router: routing.MessageRouter{Router: router},
}}
db := database.New(store, d.logger)
execOpts := execute.Options{
Logger: d.logger.With("module", "executor"),
Database: db,
Expand All @@ -451,6 +454,9 @@ func (c *CoreConsensusApp) start(inst *Instance, d *tendermint) (types.Applicati
NetworkType: c.Partition.Type,
PartitionId: c.Partition.ID,
},
Indexers: []func(*database.Batch){
bli.Write,
},
}

// Why does this exist? Why not just use tmlib.DispatcherClient?
Expand Down
158 changes: 81 additions & 77 deletions go.mod

Large diffs are not rendered by default.

582 changes: 171 additions & 411 deletions go.sum

Large diffs are not rendered by default.

26 changes: 12 additions & 14 deletions internal/core/execute/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,18 @@ func (v *ValidatorUpdate) Equal(u *ValidatorUpdate) bool {

// Options are the options for constructing an [Executor]
type Options struct {
Logger log.Logger //
Database database.Beginner //
Key ed25519.PrivateKey // Private validator key
Router routing.Router //
Describe DescribeShim // Network description
EventBus *events.Bus //
BackgroundTaskLauncher func(func()) // Background task launcher
NewDispatcher func() Dispatcher // Synthetic transaction dispatcher factory
Sequencer private.Sequencer // Synthetic and anchor sequence API service
Querier api.Querier // Query API service
EnableHealing bool //
Logger log.Logger //
Database database.Beginner //
Key ed25519.PrivateKey // Private validator key
Router routing.Router //
Describe DescribeShim // Network description
EventBus *events.Bus //
BackgroundTaskLauncher func(func()) // Background task launcher
NewDispatcher func() Dispatcher // Synthetic transaction dispatcher factory
Sequencer private.Sequencer // Synthetic and anchor sequence API service
Querier api.Querier // Query API service
EnableHealing bool //
Indexers []func(*database.Batch) // Indexing processes
}

// A Dispatcher dispatches synthetic transactions produced by the executor.
Expand Down Expand Up @@ -130,9 +131,6 @@ type BlockState interface {

// Commit commits changes made by this block.
Commit() error

// Discard discards changes made by this block.
Discard()
}

func NewDatabaseObserver() database.Observer {
Expand Down
4 changes: 4 additions & 0 deletions internal/core/execute/multi/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ func (s *BlockStateV1) DidCompleteMajorBlock() (uint64, time.Time, bool) {
}

func (s *BlockStateV1) Hash() ([32]byte, error) {
if s.IsEmpty() {
_, root, err := (*ExecutorV1)(s.Executor).LastBlock()
return root, err
}
return s.Block.Batch.GetBptRootHash()
}

Expand Down
31 changes: 23 additions & 8 deletions internal/core/execute/v2/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,38 @@ func (s *closedBlock) DidUpdateValidators() ([]*execute.ValidatorUpdate, bool) {
}

func (s *closedBlock) Hash() ([32]byte, error) {
if s.IsEmpty() {
_, root, err := s.Executor.LastBlock()
return root, err
}
return s.Batch.GetBptRootHash()
}

func (s *closedBlock) Commit() error {
if s.IsEmpty() {
s.Discard()
return nil
}

err := s.Executor.EventBus.Publish(execute.WillCommitBlock{
Block: s,
})
if err != nil {
return errors.UnknownError.Wrap(err)
} else {
err := s.Executor.EventBus.Publish(execute.WillCommitBlock{
Block: s,
})
if err != nil {
return errors.UnknownError.Wrap(err)
}

err = s.Batch.Commit()
if err != nil {
return errors.UnknownError.Wrap(err)
}
}

return s.Batch.Commit()
// Run indexers
batch := s.Executor.Database.Begin(true)
defer batch.Discard()
for _, fn := range s.Executor.Indexers {
fn(batch)
}
return batch.Commit()
}

func (s *closedBlock) Discard() {
Expand Down
5 changes: 2 additions & 3 deletions internal/core/execute/v2/block/block_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/record"
"gitlab.com/accumulatenetwork/accumulate/pkg/url"
"gitlab.com/accumulatenetwork/accumulate/protocol"
)
Expand Down Expand Up @@ -175,7 +174,7 @@ func (block *Block) Close() (execute.BlockState, error) {
bl.Index = block.Index
bl.Time = block.Time
bl.Entries = block.State.ChainUpdates.Entries
err = ledger.BlockLedger().Append(record.NewKey(block.Index), bl)
err = ledger.BlockLedger().Append(block.Index, bl)
} else {
bl := new(protocol.BlockLedger)
bl.Url = m.Describe.Ledger().JoinPath(strconv.FormatUint(block.Index, 10))
Expand Down Expand Up @@ -313,7 +312,7 @@ func (b *Block) executePostUpdateActions() error {
if err != nil {
return errors.UnknownError.WithFormat("decode root index chain entry %d: %w", i+j, err)
}
err = acct.BlockLedger().Append(record.NewKey(entry.BlockIndex), &database.BlockLedger{})
err = acct.BlockLedger().Append(entry.BlockIndex, &database.BlockLedger{})
if err != nil {
return errors.UnknownError.WithFormat("add ledger entry for block %d: %w", i+j, err)
}
Expand Down
32 changes: 32 additions & 0 deletions internal/database/block_ledger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package database

import (
"gitlab.com/accumulatenetwork/accumulate/internal/database/record"
"gitlab.com/accumulatenetwork/accumulate/pkg/database/indexing"
)

type BlockLedgerLog struct {
*indexing.Log[*BlockLedger]
}

func (c *Account) newBlockLedger() *BlockLedgerLog {
return &BlockLedgerLog{indexing.NewLog[*BlockLedger](c.logger.L, c.store, c.key.Append("BlockLedger"), 4<<10)}
}

func (b *BlockLedgerLog) Append(block uint64, ledger *BlockLedger) error {
return b.Log.Append(record.NewKey(block), ledger)
}

func (b *BlockLedgerLog) Replace(block uint64, ledger *BlockLedger) error {
return b.Log.Replace(record.NewKey(block), ledger)
}

func (b *BlockLedgerLog) Find(block uint64) indexing.Query[*BlockLedger] {
return b.Log.Find(record.NewKey(block))
}
3 changes: 1 addition & 2 deletions internal/database/indexing/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
"time"

"gitlab.com/accumulatenetwork/accumulate/internal/database"
"gitlab.com/accumulatenetwork/accumulate/internal/database/record"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"gitlab.com/accumulatenetwork/accumulate/protocol"
)

func LoadBlockLedger(ledger *database.Account, index uint64) (time.Time, []*protocol.BlockEntry, error) {
_, l1, err := ledger.BlockLedger().Find(record.NewKey(index)).Exact().Get()
_, l1, err := ledger.BlockLedger().Find(index).Exact().Get()
switch {
case err != nil && !errors.Is(err, errors.NotFound):
return time.Time{}, nil, errors.UnknownError.Wrap(err)
Expand Down
144 changes: 144 additions & 0 deletions internal/database/indexing/block_ledger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2024 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package indexing

import (
"context"
"fmt"
"log/slog"

"gitlab.com/accumulatenetwork/accumulate/internal/database"
"gitlab.com/accumulatenetwork/accumulate/pkg/url"
"gitlab.com/accumulatenetwork/accumulate/protocol"
)

type BlockLedgerIndexer struct {
db database.Beginner
context context.Context
cancel context.CancelFunc
url *url.URL
scanDone chan struct{}
read chan uint64
write chan func(func(*database.Batch))
}

func NewBlockLedgerIndexer(ctx context.Context, db database.Beginner, partitionID string) *BlockLedgerIndexer {
ctx, cancel := context.WithCancel(ctx)
u := protocol.PartitionUrl(partitionID).JoinPath(protocol.Ledger)
scanDone := make(chan struct{})
read := make(chan uint64)
write := make(chan func(func(*database.Batch)))
b := &BlockLedgerIndexer{db, ctx, cancel, u, scanDone, read, write}
go b.scan()
go b.run()
return b
}

func (b *BlockLedgerIndexer) Write(batch *database.Batch) {
done := make(chan struct{})
select {
case <-b.context.Done():
case b.write <- func(f func(*database.Batch)) { f(batch); close(done) }:
<-done
}
}

// ScanDone returns a channel that is closed once the block ledger has been
// scanned.
func (b *BlockLedgerIndexer) ScanDone() <-chan struct{} {
return b.scanDone
}

func (b *BlockLedgerIndexer) scan() {
defer close(b.scanDone)
batch := b.db.Begin(false)
defer batch.Discard()

for e := range batch.Account(b.url).BlockLedger().Scan().All() {
select {
default:
case <-b.context.Done():
return
}

k, l, err := e.Get()
if err != nil {
slog.ErrorContext(b.context, "Error scanning block ledger", "error", err)
continue
}

// Is the entry populated?
if l.Index != 0 {
continue
}

select {
case b.read <- k.Get(0).(uint64):
case <-b.context.Done():
return
}
}
}

func (b *BlockLedgerIndexer) run() {
batch := b.db.Begin(false)

// This is a separate closure that captures batch so that it will behave
// unsurprisingly when the variable is reassigned
defer func() { batch.Discard() }()

var entries []*database.BlockLedger
for {
select {
case index := <-b.read:
var l *protocol.BlockLedger
err := batch.Account(b.url.JoinPath(fmt.Sprint(index))).Main().GetAs(&l)
if err != nil {
slog.ErrorContext(b.context, "Error scanning block ledger", "error", err)
continue
}

entries = append(entries, &database.BlockLedger{
Index: l.Index,
Time: l.Time,
Entries: l.Entries,
})

case fn := <-b.write:
if len(entries) == 0 {
fn(func(batch *database.Batch) {})
continue
}

fn(func(batch *database.Batch) {
// Write entries
ledger := batch.Account(b.url).BlockLedger()
for _, e := range entries {
err := ledger.Replace(e.Index, e)
if err != nil {
slog.ErrorContext(b.context, "Error writing block ledger", "error", err)
}
}

// TODO: Delete accounts
})

// Recreate the read batch
clear(entries)
entries = entries[:0]
batch.Discard()
batch = b.db.Begin(false)

case <-b.context.Done():
return
}
}
}

func (b *BlockLedgerIndexer) Stop() {
b.cancel()
}
4 changes: 2 additions & 2 deletions internal/database/model.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@
- name: BlockLedger
description: indexes entries of a block. Should only be used on the system ledger
type: other
dataType: indexing.Log[*BlockLedger]
constructor: newBlockEntryLog
dataType: BlockLedgerLog
omitConstructor: true
pointer: true

- name: Transaction
Expand Down
Loading

0 comments on commit a580285

Please sign in to comment.