Skip to content

Commit

Permalink
User-specified transaction expiration [#3462]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 19, 2023
1 parent 301747e commit be66895
Show file tree
Hide file tree
Showing 22 changed files with 629 additions and 170 deletions.
4 changes: 2 additions & 2 deletions cmd/accumulated/cmd_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ var cmdSetSchedule = &cobra.Command{
Short: "Set the major block schedule",
Args: cobra.ExactArgs(1),
Run: func(_ *cobra.Command, args []string) {
_, err := core.Cron.Parse(args[0])
cron, err := protocol.ParseCron(args[0])
checkf(err, "CRON expression is invalid")

setNetworkValue(protocol.Globals, func(v *core.GlobalValues) {
v.Globals.MajorBlockSchedule = args[0]
v.Globals.MajorBlockSchedule = cron
})
},
}
Expand Down
2 changes: 1 addition & 1 deletion internal/api/v3/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *NetworkServiceTestSuite) ServiceFor(partition string) api.NetworkServic
func (s *NetworkServiceTestSuite) SetupSuite() {
g := new(core.GlobalValues)
g.Globals = new(NetworkGlobals)
g.Globals.MajorBlockSchedule = "* * * * *"
g.Globals.MajorBlockSchedule = MustParseCron("* * * * *")
g.ExecutorVersion = ExecutorVersionV2

var err error
Expand Down
2 changes: 1 addition & 1 deletion internal/api/v3/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *QuerierTestSuite) SetupSuite() {

g := new(core.GlobalValues)
g.Globals = new(NetworkGlobals)
g.Globals.MajorBlockSchedule = "* * * * *"
g.Globals.MajorBlockSchedule = MustParseCron("* * * * *")
g.ExecutorVersion = ExecutorVersionV2

var err error
Expand Down
3 changes: 1 addition & 2 deletions internal/core/block/blockscheduler/major_block_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/robfig/cron/v3"
"gitlab.com/accumulatenetwork/accumulate/internal/core"
"gitlab.com/accumulatenetwork/accumulate/internal/core/events"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
)
Expand Down Expand Up @@ -43,7 +42,7 @@ func Init(eventBus *events.Bus) *majorBlockScheduler {
}

func (s *majorBlockScheduler) onWillChangeGlobals(event events.WillChangeGlobals) (err error) {
s.majorBlockSchedule, err = core.Cron.Parse(event.New.Globals.MajorBlockSchedule)
s.majorBlockSchedule = event.New.Globals.MajorBlockSchedule
s.nextMajorBlockTime = time.Time{}
return errors.UnknownError.Wrap(err)
}
Expand Down
90 changes: 69 additions & 21 deletions internal/core/execute/v2/block/block_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,14 @@ func (block *Block) Close() (execute.BlockState, error) {
t := time.Now()

// Record pending transactions
ledgerUrl := m.Describe.NodeUrl(protocol.Ledger)
ledger := block.Batch.Account(ledgerUrl)
if len(block.State.Pending) > 0 {
// Get the major block height
major, err := getMajorHeight(m.Describe, block.Batch)
if err != nil {
return nil, errors.UnknownError.Wrap(err)
}

// Set the expiration height
if m.globals.Active.Globals.Limits.PendingMajorBlocks == 0 {
major += 14 // default to 2 weeks
} else {
major += m.globals.Active.Globals.Limits.PendingMajorBlocks
}

// Record the IDs
err = ledger.Events().Major().Pending(major).Add(block.State.GetPending()...)
if err != nil {
return nil, errors.UnknownError.WithFormat("store pending expirations: %w", err)
}
err = block.recordTransactionExpiration()
if err != nil {
return nil, errors.UnknownError.Wrap(err)
}

// Load the main chain of the minor root
ledgerUrl := m.Describe.NodeUrl(protocol.Ledger)
ledger := block.Batch.Account(ledgerUrl)
rootChain, err := ledger.RootChain().Get()
if err != nil {
return nil, errors.UnknownError.WithFormat("load root chain: %w", err)
Expand Down Expand Up @@ -252,6 +236,70 @@ func (block *Block) Close() (execute.BlockState, error) {
return &closedBlock{*block, valUp}, nil
}

func (block *Block) recordTransactionExpiration() error {
if len(block.State.Pending) == 0 {
return nil
}

// Get the currentMajor block height
currentMajor, err := getMajorHeight(block.Executor.Describe, block.Batch)
if err != nil {
return errors.UnknownError.Wrap(err)
}

// Set the expiration height
var max uint64
if block.Executor.globals.Active.Globals.Limits.PendingMajorBlocks == 0 {
max = 14 // default to 2 weeks
} else {
max = block.Executor.globals.Active.Globals.Limits.PendingMajorBlocks
}

// Determine which major block each transaction should expire on
pending := map[uint64][]*url.TxID{}
for _, txn := range block.State.GetPending() {
var count uint64
switch {
case !block.Executor.globals.Active.ExecutorVersion.V2BaikonurEnabled():
// Old logic
count = max

case txn.Header.Expire == nil || txn.Header.Expire.AtTime == nil:
// No expiration specified, use the default
count = max

default:
// Always at least the next major block
count = 1

// Increment until the expire time is after the expected major block time
now := block.Time
schedule := block.Executor.globals.Active.Globals.MajorBlockSchedule
for txn.Header.Expire.AtTime.After(schedule.Next(now)) {
now = schedule.Next(now)
count++
}
}

if count <= 0 || count > max {
count = max
}

major := currentMajor + count
pending[major] = append(pending[major], txn.ID())
}

// Record the IDs
ledger := block.Batch.Account(block.Executor.Describe.NodeUrl(protocol.Ledger))
for major, ids := range pending {
err = ledger.Events().Major().Pending(major).Add(ids...)
if err != nil {
return errors.UnknownError.WithFormat("store pending expirations: %w", err)
}
}
return nil
}

func getMajorHeight(desc execute.DescribeShim, batch *database.Batch) (uint64, error) {
c := batch.Account(desc.AnchorPool()).MajorBlockChain()
head, err := c.Head().Get()
Expand Down
26 changes: 21 additions & 5 deletions internal/core/execute/v2/block/msg_credit_payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,38 @@ func (x CreditPayment) Process(batch *database.Batch, ctx *MessageContext) (_ *p

func (CreditPayment) process(batch *database.Batch, ctx *MessageContext, pay *messaging.CreditPayment) error {
// Record the payment
txn := batch.Account(pay.TxID.Account()).Transaction(pay.TxID.Hash())
err := txn.RecordHistory(ctx.message)
acctTxn := batch.Account(pay.TxID.Account()).Transaction(pay.TxID.Hash())
err := acctTxn.RecordHistory(ctx.message)
if err != nil {
return errors.UnknownError.WithFormat("record history: %w", err)
}

err = txn.Payments().Add(pay.Hash())
err = acctTxn.Payments().Add(pay.Hash())
if err != nil {
return errors.UnknownError.WithFormat("record payment: %w", err)
}

if !pay.Initiator {
return nil
}

// If the transaction is being initiated, mark it as pending. This only
// persists if the transaction remains pending through the end of the block.
if pay.Initiator {
ctx.State.MarkTransactionPending(pay.TxID)
var txn *protocol.Transaction
if ctx.GetActiveGlobals().ExecutorVersion.V2BaikonurEnabled() {
// Load the transaction
txn, err = ctx.getTransaction(batch, pay.TxID.Hash())
if err != nil {
return errors.UnknownError.WithFormat("load transaction: %w", err)
}

} else {
// Fake transaction
txn = new(protocol.Transaction)
txn.Header.Principal = pay.TxID.Account()
txn.Body = &protocol.RemoteTransaction{Hash: pay.TxID.Hash()}
}

ctx.State.MarkTransactionPending(txn)
return nil
}
15 changes: 8 additions & 7 deletions internal/core/execute/v2/block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"gitlab.com/accumulatenetwork/accumulate/internal/core/execute/v2/chain"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging"
"gitlab.com/accumulatenetwork/accumulate/pkg/url"
"gitlab.com/accumulatenetwork/accumulate/protocol"
)

// BlockState tracks various metrics of a block of transactions as they are
Expand All @@ -29,7 +30,7 @@ type BlockState struct {

Anchor *BlockAnchorState

Pending map[[32]byte]*url.TxID
Pending map[[32]byte]*protocol.Transaction
Events int
}

Expand Down Expand Up @@ -61,11 +62,11 @@ func (s *BlockState) Empty() bool {
len(s.ChainUpdates.Entries) == 0
}

func (s *BlockState) MarkTransactionPending(id *url.TxID) {
func (s *BlockState) MarkTransactionPending(txn *protocol.Transaction) {
if s.Pending == nil {
s.Pending = map[[32]byte]*url.TxID{}
s.Pending = map[[32]byte]*protocol.Transaction{}
}
s.Pending[id.Hash()] = id
s.Pending[txn.Hash()] = txn
}

func (s *BlockState) MarkTransactionDelivered(id *url.TxID) {
Expand All @@ -75,13 +76,13 @@ func (s *BlockState) MarkTransactionDelivered(id *url.TxID) {
delete(s.Pending, id.Hash())
}

func (s *BlockState) GetPending() []*url.TxID {
l := make([]*url.TxID, 0, len(s.Pending))
func (s *BlockState) GetPending() []*protocol.Transaction {
l := make([]*protocol.Transaction, 0, len(s.Pending))
for _, p := range s.Pending {
l = append(l, p)
}
sort.Slice(l, func(i, j int) bool {
return l[i].Compare(l[j]) < 0
return l[i].ID().Compare(l[j].ID()) < 0
})
return l
}
Expand Down
7 changes: 7 additions & 0 deletions internal/core/execute/v2/block/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ func (t *TransactionContext) processTransaction(batch *database.Batch) (*protoco
}
}

// Check if the transaction has expired
if t.transaction.Header.Expire != nil &&
t.transaction.Header.Expire.AtTime != nil &&
t.Block.Time.After(*t.transaction.Header.Expire.AtTime) {
return t.recordFailedTransaction(batch, delivery, errors.Expired.With("transaction deadline exceeded"))
}

// Load the principal
principal, err := batch.Account(delivery.Transaction.Header.Principal).Main().Get()
switch {
Expand Down
13 changes: 11 additions & 2 deletions pkg/build/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package build

import (
"math/big"
"time"

"gitlab.com/accumulatenetwork/accumulate/protocol"
)
Expand Down Expand Up @@ -37,8 +38,16 @@ func (b TransactionBuilder) Metadata(metadata []byte) TransactionBuilder {
return b
}

func (b TransactionBuilder) HoldUntil(threshold protocol.BlockThreshold) TransactionBuilder {
b.t.Header.HoldUntil = &threshold
func (b TransactionBuilder) HoldUntil(opts protocol.HoldUntilOptions) TransactionBuilder {
b.t.Header.HoldUntil = &opts
return b
}

func (b TransactionBuilder) ExpireAtTime(v time.Time) TransactionBuilder {
if b.t.Header.Expire == nil {
b.t.Header.Expire = new(protocol.ExpireOptions)
}
b.t.Header.Expire.AtTime = &v
return b
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/types/network/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func NewGlobals(g *GlobalValues) *GlobalValues {
if g.Globals.ValidatorAcceptThreshold.Numerator == 0 {
g.Globals.ValidatorAcceptThreshold.Set(2, 3)
}
if g.Globals.MajorBlockSchedule == "" {
g.Globals.MajorBlockSchedule = protocol.DefaultMajorBlockSchedule
if g.Globals.MajorBlockSchedule == nil {
g.Globals.MajorBlockSchedule = protocol.MustParseCron(protocol.DefaultMajorBlockSchedule)
}
if g.Globals.FeeSchedule == nil {
g.Globals.FeeSchedule = new(protocol.FeeSchedule)
Expand Down
80 changes: 80 additions & 0 deletions protocol/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2023 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package protocol

import (
"encoding/json"
"io"
"time"

"github.com/robfig/cron/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/encoding"
)

type CronSchedule struct {
src string
inner cron.Schedule
}

func ParseCron(spec string) (*CronSchedule, error) {
schedule, err := cron.ParseStandard(spec)
if err != nil {
return nil, err
}
return &CronSchedule{spec, schedule}, nil
}

func MustParseCron(spec string) *CronSchedule {
s, err := ParseCron(spec)
if err != nil {
panic(err)
}
return s
}

func (s *CronSchedule) Next(t time.Time) time.Time {
return s.inner.Next(t)
}

func (s *CronSchedule) Copy() *CronSchedule {
return s // CronSchedule is immutable so there's no need to copy
}

func (s *CronSchedule) Equal(r *CronSchedule) bool {
return s.src == r.src
}

func (s *CronSchedule) MarshalJSON() ([]byte, error) {
return json.Marshal(s.src)
}

func (s *CronSchedule) UnmarshalJSON(data []byte) error {
err := json.Unmarshal(data, &s.src)
if err != nil {
return err
}

s.inner, err = cron.ParseStandard(s.src)
return err
}

func (s *CronSchedule) MarshalBinary() ([]byte, error) {
return encoding.MarshalString(s.src), nil
}

func (s *CronSchedule) UnmarshalBinaryFrom(rd io.Reader) error {
b, err := io.ReadAll(rd)
if err != nil {
return err
}
s.src, err = encoding.UnmarshalString(b)
if err != nil {
return err
}
s.inner, err = cron.ParseStandard(s.src)
return err
}
4 changes: 3 additions & 1 deletion protocol/general.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ NetworkGlobals:
type: Rational
marshal-as: reference
- name: MajorBlockSchedule
type: string
description: a cron expression defining the (approximate) major blocks interval
type: CronSchedule
marshal-as: reference
pointer: true
- name: AnchorEmptyBlocks
description: controls whether an anchor is sent for a block if the block contains no transactions other than a directory anchor
type: bool
Expand Down
Loading

0 comments on commit be66895

Please sign in to comment.