Skip to content

Commit

Permalink
Expose the sequence service [#3382]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 17, 2023
1 parent 7bfe888 commit bb98e25
Show file tree
Hide file tree
Showing 36 changed files with 537 additions and 146 deletions.
2 changes: 1 addition & 1 deletion .mockery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ case: underscore
recursive: true
keeptree: true
with-expecter: true
name: ^(NodeService|ConsensusService|NetworkService|MetricsService|Querier|Submitter|Validator|EventService)$
name: ^(NodeService|ConsensusService|NetworkService|MetricsService|Querier|Submitter|Validator|EventService|Sequencer)$
2 changes: 1 addition & 1 deletion cmd/accumulated-http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func run(_ *cobra.Command, args []string) {

} else {
var found bool
for _, peer := range tr.DB().Peers() {
for _, peer := range tr.DB().Peers.Load() {
if peer.Network(args[0]).Service(dirNetSvc).Last.Success != nil {
found = true
}
Expand Down
4 changes: 3 additions & 1 deletion internal/api/private/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"gitlab.com/accumulatenetwork/accumulate/pkg/url"
)

//go:generate go run gitlab.com/accumulatenetwork/accumulate/tools/cmd/gen-types --package private types.yml

const ServiceTypeSequencer api.ServiceType = 0xF001

type Sequencer interface {
Sequence(ctx context.Context, src, dst *url.URL, num uint64) (*api.MessageRecord[messaging.Message], error)
Sequence(ctx context.Context, src, dst *url.URL, num uint64, opts SequenceOptions) (*api.MessageRecord[messaging.Message], error)
}
6 changes: 6 additions & 0 deletions internal/api/private/types.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SequenceOptions:
fields:
- name: NodeID
type: p2p.PeerID
marshal-as: union
zero-value: '""'
147 changes: 147 additions & 0 deletions internal/api/private/types_gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2022 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package private

// GENERATED BY go run ./tools/cmd/gen-types. DO NOT EDIT.

//lint:file-ignore S1001,S1002,S1008,SA4013 generated code

import (
"bytes"
"encoding/json"
"errors"
"io"
"strings"

"gitlab.com/accumulatenetwork/accumulate/pkg/types/encoding"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/p2p"
)

type SequenceOptions struct {
fieldsSet []bool
NodeID p2p.PeerID `json:"nodeID,omitempty" form:"nodeID" query:"nodeID" validate:"required"`
extraData []byte
}

func (v *SequenceOptions) Copy() *SequenceOptions {
u := new(SequenceOptions)

if v.NodeID != "" {
u.NodeID = p2p.CopyPeerID(v.NodeID)
}
if len(v.extraData) > 0 {
u.extraData = make([]byte, len(v.extraData))
copy(u.extraData, v.extraData)
}

return u
}

func (v *SequenceOptions) CopyAsInterface() interface{} { return v.Copy() }

func (v *SequenceOptions) Equal(u *SequenceOptions) bool {
if !(p2p.EqualPeerID(v.NodeID, u.NodeID)) {
return false
}

return true
}

var fieldNames_SequenceOptions = []string{
1: "NodeID",
}

func (v *SequenceOptions) MarshalBinary() ([]byte, error) {
if v == nil {
return []byte{encoding.EmptyObject}, nil
}

buffer := new(bytes.Buffer)
writer := encoding.NewWriter(buffer)

if !(v.NodeID == ("")) {
writer.WriteValue(1, v.NodeID.MarshalBinary)
}

_, _, err := writer.Reset(fieldNames_SequenceOptions)
if err != nil {
return nil, encoding.Error{E: err}
}
buffer.Write(v.extraData)
return buffer.Bytes(), nil
}

func (v *SequenceOptions) IsValid() error {
var errs []string

if len(v.fieldsSet) > 0 && !v.fieldsSet[0] {
errs = append(errs, "field NodeID is missing")
} else if v.NodeID == ("") {
errs = append(errs, "field NodeID is not set")
}

switch len(errs) {
case 0:
return nil
case 1:
return errors.New(errs[0])
default:
return errors.New(strings.Join(errs, "; "))
}
}

func (v *SequenceOptions) UnmarshalBinary(data []byte) error {
return v.UnmarshalBinaryFrom(bytes.NewReader(data))
}

func (v *SequenceOptions) UnmarshalBinaryFrom(rd io.Reader) error {
reader := encoding.NewReader(rd)

reader.ReadValue(1, func(r io.Reader) error {
x, err := p2p.UnmarshalPeerIDFrom(r)
if err == nil {
v.NodeID = x
}
return err
})

seen, err := reader.Reset(fieldNames_SequenceOptions)
if err != nil {
return encoding.Error{E: err}
}
v.fieldsSet = seen
v.extraData, err = reader.ReadAll()
if err != nil {
return encoding.Error{E: err}
}
return nil
}

func (v *SequenceOptions) MarshalJSON() ([]byte, error) {
u := struct {
NodeID *encoding.JsonUnmarshalWith[p2p.PeerID] `json:"nodeID,omitempty"`
}{}
if !(v.NodeID == ("")) {
u.NodeID = &encoding.JsonUnmarshalWith[p2p.PeerID]{Value: v.NodeID, Func: p2p.UnmarshalPeerIDJSON}
}
return json.Marshal(&u)
}

func (v *SequenceOptions) UnmarshalJSON(data []byte) error {
u := struct {
NodeID *encoding.JsonUnmarshalWith[p2p.PeerID] `json:"nodeID,omitempty"`
}{}
u.NodeID = &encoding.JsonUnmarshalWith[p2p.PeerID]{Value: v.NodeID, Func: p2p.UnmarshalPeerIDJSON}
if err := json.Unmarshal(data, &u); err != nil {
return err
}
if u.NodeID != nil {
v.NodeID = u.NodeID.Value
}

return nil
}
13 changes: 13 additions & 0 deletions internal/api/routing/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,19 @@ func (r MessageRouter) Route(msg message.Message) (multiaddr.Multiaddr, error) {
return nil, errors.BadRequest.WithFormat("%v is not a partition URL", msg.Source)
}

if msg.NodeID == "" {
return service.Multiaddr(), nil
}

// Send the request to /p2p/{id}/acc-svc/{service}:{partition}
c1, err := multiaddr.NewComponent("p2p", msg.NodeID.String())
if err != nil {
return nil, errors.BadRequest.WithFormat("build multiaddr: %w", err)
}
c2 := service.Multiaddr()

return c1.Encapsulate(c2), nil

default:
return nil, errors.BadRequest.WithFormat("%v is not routable", msg.Type())
}
Expand Down
3 changes: 2 additions & 1 deletion internal/api/v2/query_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"
"time"

"gitlab.com/accumulatenetwork/accumulate/internal/api/private"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/merkle"
Expand Down Expand Up @@ -1212,7 +1213,7 @@ func (m *JrpcMethods) QuerySynth(ctx context.Context, params json.RawMessage) in
src = req.Source.JoinPath(protocol.Synthetic)
}

r, err := recordIs[*api.MessageRecord[messaging.Message]](m.Sequencer.Sequence(ctx, src, req.Destination, req.SequenceNumber))
r, err := recordIs[*api.MessageRecord[messaging.Message]](m.Sequencer.Sequence(ctx, src, req.Destination, req.SequenceNumber, private.SequenceOptions{}))
if err != nil {
return accumulateError(err)
}
Expand Down
11 changes: 10 additions & 1 deletion internal/api/v3/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,16 @@ func NewSequencer(params SequencerParams) *Sequencer {

func (s *Sequencer) Type() api.ServiceType { return private.ServiceTypeSequencer }

func (s *Sequencer) Sequence(ctx context.Context, src, dst *url.URL, num uint64) (*api.MessageRecord[messaging.Message], error) {
func (s *Sequencer) Sequence(ctx context.Context, src, dst *url.URL, num uint64, _ private.SequenceOptions) (*api.MessageRecord[messaging.Message], error) {
if src == nil {
return nil, errors.BadRequest.With("missing source")
}
if dst == nil {
return nil, errors.BadRequest.With("missing destination")
}
if num == 0 {
return nil, errors.BadRequest.With("missing sequence number")
}
if !s.partition.URL.ParentOf(src) {
return nil, errors.BadRequest.WithFormat("requested source is %s but this partition is %s", src.RootIdentity(), s.partitionID)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/api/v3/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"gitlab.com/accumulatenetwork/accumulate/internal/api/private"
dut "gitlab.com/accumulatenetwork/accumulate/internal/api/v3"
"gitlab.com/accumulatenetwork/accumulate/internal/core"
"gitlab.com/accumulatenetwork/accumulate/internal/core/events"
Expand Down Expand Up @@ -74,7 +75,7 @@ func TestSequencer(t *testing.T) {
ValidatorKey: net.Bvns[0].Nodes[0].PrivValKey,
})

anchor, err := svc.Sequence(context.Background(), PartitionUrl("BVN0").JoinPath(AnchorPool), DnUrl().JoinPath(AnchorPool), 1)
anchor, err := svc.Sequence(context.Background(), PartitionUrl("BVN0").JoinPath(AnchorPool), DnUrl().JoinPath(AnchorPool), 1, private.SequenceOptions{})
require.NoError(t, err)
require.IsType(t, (*messaging.TransactionMessage)(nil), anchor.Message)
require.IsType(t, (*BlockValidatorAnchor)(nil), anchor.Message.(*messaging.TransactionMessage).Transaction.Body)
Expand All @@ -84,7 +85,7 @@ func TestSequencer(t *testing.T) {
require.IsType(t, (*PartitionSignature)(nil), sigs[0].Message.(*messaging.SignatureMessage).Signature)
require.IsType(t, (*ED25519Signature)(nil), sigs[1].Message.(*messaging.SignatureMessage).Signature)

synth, err := svc.Sequence(context.Background(), PartitionUrl("BVN0").JoinPath(Synthetic), PartitionUrl("BVN1").JoinPath(Synthetic), 1)
synth, err := svc.Sequence(context.Background(), PartitionUrl("BVN0").JoinPath(Synthetic), PartitionUrl("BVN1").JoinPath(Synthetic), 1, private.SequenceOptions{})
require.NoError(t, err)
require.IsType(t, (*messaging.TransactionMessage)(nil), anchor.Message)
require.IsType(t, (*SyntheticDepositTokens)(nil), synth.Message.(*messaging.TransactionMessage).Transaction.Body)
Expand Down
3 changes: 2 additions & 1 deletion internal/core/execute/v1/block/block_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"gitlab.com/accumulatenetwork/accumulate/internal/api/private"
"gitlab.com/accumulatenetwork/accumulate/internal/core/events"
"gitlab.com/accumulatenetwork/accumulate/internal/core/execute"
"gitlab.com/accumulatenetwork/accumulate/internal/database"
Expand Down Expand Up @@ -451,7 +452,7 @@ func (x *Executor) requestMissingTransactionsFromPartition(ctx context.Context,
x.logger.Info(message, "seq-num", seqNum, "source", partition.Url)

// Request the transaction by sequence number
resp, err := x.Sequencer.Sequence(ctx, src, dest, seqNum)
resp, err := x.Sequencer.Sequence(ctx, src, dest, seqNum, private.SequenceOptions{})
if err != nil {
x.logger.Error("Failed to request sequenced transaction", "error", err, "from", src, "seq-num", seqNum)
continue
Expand Down
4 changes: 2 additions & 2 deletions internal/core/execute/v1/simulator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ func (s *simService) Validate(ctx context.Context, envelope *messaging.Envelope,

// Sequence routes the source to a partition and calls Sequence on the first
// node of that partition, returning the result.
func (s *simService) Sequence(ctx context.Context, src, dst *url.URL, num uint64) (*api.MessageRecord[messaging.Message], error) {
func (s *simService) Sequence(ctx context.Context, src, dst *url.URL, num uint64, opts private.SequenceOptions) (*api.MessageRecord[messaging.Message], error) {
part, err := s.router.RouteAccount(src)
if err != nil {
return nil, errors.UnknownError.Wrap(err)
}
return s.Executors[part].service.Private().Sequence(ctx, src, dst, num)
return s.Executors[part].service.Private().Sequence(ctx, src, dst, num, opts)
}

// partService implements API v3 for a partition.
Expand Down
3 changes: 2 additions & 1 deletion internal/core/execute/v2/block/block_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"gitlab.com/accumulatenetwork/accumulate/internal/api/private"
"gitlab.com/accumulatenetwork/accumulate/internal/core/events"
"gitlab.com/accumulatenetwork/accumulate/internal/core/execute"
"gitlab.com/accumulatenetwork/accumulate/internal/database"
Expand Down Expand Up @@ -349,7 +350,7 @@ func (x *Executor) requestMissingTransactionsFromPartition(ctx context.Context,
x.logger.Info(message, "seq-num", seqNum, "source", partition.Url)

// Request the transaction by sequence number
resp, err := x.Sequencer.Sequence(ctx, src, dest, seqNum)
resp, err := x.Sequencer.Sequence(ctx, src, dest, seqNum, private.SequenceOptions{})
if err != nil {
x.logger.Error("Failed to request sequenced transaction", "error", err, "from", src, "seq-num", seqNum)
continue
Expand Down
1 change: 1 addition & 0 deletions internal/node/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func NewHandler(opts Options) (*Handler, error) {
jsonrpc.Submitter{Submitter: client},
jsonrpc.Validator{Validator: client},
jsonrpc.Faucet{Faucet: client},
jsonrpc.Sequencer{Sequencer: client.Private()},
)
if err != nil {
return nil, errors.UnknownError.WithFormat("initialize API v3: %w", err)
Expand Down
24 changes: 19 additions & 5 deletions pkg/api/v3/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/AccumulateNetwork/jsonrpc2/v15"
"gitlab.com/accumulatenetwork/accumulate/internal/api/private"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
Expand Down Expand Up @@ -42,23 +43,23 @@ func NewClient(server string) *Client {
return c
}

func (c *Client) NodeInfo(ctx context.Context, opts NodeInfoOptions) (*api.NodeInfo, error) {
func (c *Client) NodeInfo(ctx context.Context, opts api.NodeInfoOptions) (*api.NodeInfo, error) {
return sendRequestUnmarshalAs[*api.NodeInfo](c, ctx, "node-info", &message.NodeInfoRequest{NodeInfoOptions: opts})
}

func (c *Client) FindService(ctx context.Context, opts FindServiceOptions) ([]*api.FindServiceResult, error) {
func (c *Client) FindService(ctx context.Context, opts api.FindServiceOptions) ([]*api.FindServiceResult, error) {
return sendRequestUnmarshalAs[[]*api.FindServiceResult](c, ctx, "find-service", &message.FindServiceRequest{FindServiceOptions: opts})
}

func (c *Client) ConsensusStatus(ctx context.Context, opts ConsensusStatusOptions) (*api.ConsensusStatus, error) {
func (c *Client) ConsensusStatus(ctx context.Context, opts api.ConsensusStatusOptions) (*api.ConsensusStatus, error) {
return sendRequestUnmarshalAs[*api.ConsensusStatus](c, ctx, "consensus-status", &message.ConsensusStatusRequest{ConsensusStatusOptions: opts})
}

func (c *Client) NetworkStatus(ctx context.Context, opts NetworkStatusOptions) (*api.NetworkStatus, error) {
func (c *Client) NetworkStatus(ctx context.Context, opts api.NetworkStatusOptions) (*api.NetworkStatus, error) {
return sendRequestUnmarshalAs[*api.NetworkStatus](c, ctx, "network-status", &message.NetworkStatusRequest{NetworkStatusOptions: opts})
}

func (c *Client) Metrics(ctx context.Context, opts MetricsOptions) (*api.Metrics, error) {
func (c *Client) Metrics(ctx context.Context, opts api.MetricsOptions) (*api.Metrics, error) {
return sendRequestUnmarshalAs[*api.Metrics](c, ctx, "metrics", &message.MetricsRequest{MetricsOptions: opts})
}

Expand All @@ -82,6 +83,19 @@ func (c *Client) Faucet(ctx context.Context, account *url.URL, opts api.FaucetOp
return sendRequestUnmarshalAs[*api.Submission](c, ctx, "faucet", req)
}

type PrivateClient Client

var _ private.Sequencer = (*PrivateClient)(nil)

func (c *Client) Private() *PrivateClient {
return (*PrivateClient)(c)
}

func (c *PrivateClient) Sequence(ctx context.Context, src, dst *url.URL, num uint64, opts private.SequenceOptions) (*api.MessageRecord[messaging.Message], error) {
req := &message.PrivateSequenceRequest{Source: src, Destination: dst, SequenceNumber: num, SequenceOptions: opts}
return sendRequestUnmarshalAs[*api.MessageRecord[messaging.Message]]((*Client)(c), ctx, "private-sequence", req)
}

func (c *Client) sendRequest(ctx context.Context, method string, req, resp interface{}) error {
jc := jsonrpc2.Client{Client: c.Client}
err := jc.Request(ctx, c.Server, method, req, &resp)
Expand Down
Loading

0 comments on commit bb98e25

Please sign in to comment.