diff --git a/api/node.go b/api/node.go index 4a45d4ee..639cee45 100644 --- a/api/node.go +++ b/api/node.go @@ -9,6 +9,6 @@ import ( type Node interface { ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (code codes.Code, requestID string, results execute.ResultMap, peers execute.Cluster, err error) - ExecutionResult(id string) (execute.Result, bool) + ExecutionResult(id string) (execute.ResultMap, bool) PublishFunctionInstall(ctx context.Context, uri string, cid string, subgroup string) error } diff --git a/api/result_test.go b/api/result_test.go index 827fadcb..c321bc37 100644 --- a/api/result_test.go +++ b/api/result_test.go @@ -29,17 +29,17 @@ func TestAPI_ExecutionResult(t *testing.T) { err = srv.ExecutionResult(ctx) require.NoError(t, err) - var res execute.Result + var res execute.ResultMap require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &res)) require.Equal(t, http.StatusOK, rec.Result().StatusCode) - require.Equal(t, mocks.GenericExecutionResult, res) + require.Equal(t, mocks.GenericExecutionResultMap, res) }) t.Run("response not found", func(t *testing.T) { node := mocks.BaselineNode(t) - node.ExecutionResultFunc = func(id string) (execute.Result, bool) { - return execute.Result{}, false + node.ExecutionResultFunc = func(id string) (execute.ResultMap, bool) { + return execute.ResultMap{}, false } srv := api.New(mocks.NoopLogger, node) diff --git a/consensus/pbft/config.go b/consensus/pbft/config.go index 382097bf..5ba9d01b 100644 --- a/consensus/pbft/config.go +++ b/consensus/pbft/config.go @@ -14,7 +14,7 @@ import ( type Option func(*Config) // PostProcessFunc is invoked by the replica after execution is done. -type PostProcessFunc func(requestID string, origin peer.ID, request execute.Request, result execute.Result) +type PostProcessFunc func(requestID string, origin peer.ID, request execute.Request, result execute.NodeResult) var DefaultConfig = Config{ NetworkTimeout: NetworkTimeout, diff --git a/consensus/pbft/execute.go b/consensus/pbft/execute.go index 6d7039fe..12d7648a 100644 --- a/consensus/pbft/execute.go +++ b/consensus/pbft/execute.go @@ -95,34 +95,34 @@ func (r *Replica) execute(ctx context.Context, view uint, sequence uint, digest log.Warn().Err(err).Msg("could not get metadata") } - msg := response.Execute{ - BaseMessage: blockless.BaseMessage{TraceInfo: r.cfg.TraceInfo}, - Code: res.Code, - RequestID: request.ID, - Results: execute.ResultMap{ - r.id: execute.NodeResult{ - Result: res, - Metadata: metadata, - }, - }, - PBFT: response.PBFTResultInfo{ + nres := execute.NodeResult{ + Result: res, + Metadata: metadata, + PBFT: execute.PBFTResultInfo{ View: r.view, RequestTimestamp: request.Timestamp, Replica: r.id, }, } + err = nres.Sign(r.host.PrivateKey()) + if err != nil { + return fmt.Errorf("could not sign execution result: %w", err) + } + + msg := response.Execute{ + BaseMessage: blockless.BaseMessage{TraceInfo: r.cfg.TraceInfo}, + Code: res.Code, + RequestID: request.ID, + Results: execute.ResultMap{r.id: nres}, + } + // Save this executions in case it's requested again. r.executions[request.ID] = msg // Invoke specified post processor functions. for _, proc := range r.cfg.PostProcessors { - proc(request.ID, request.Origin, request.Execute, res) - } - - err = msg.Sign(r.host.PrivateKey()) - if err != nil { - return fmt.Errorf("could not sign execution request: %w", err) + proc(request.ID, request.Origin, request.Execute, nres) } err = r.send(ctx, request.Origin, &msg, blockless.ProtocolID) diff --git a/consensus/raft/fsm.go b/consensus/raft/fsm.go index b29f6e98..1e7d8f1c 100644 --- a/consensus/raft/fsm.go +++ b/consensus/raft/fsm.go @@ -22,7 +22,7 @@ type FSMLogEntry struct { Execute execute.Request `json:"execute,omitempty"` } -type FSMProcessFunc func(req FSMLogEntry, res execute.Result) +type FSMProcessFunc func(req FSMLogEntry, res execute.NodeResult) type fsmExecutor struct { log zerolog.Logger @@ -36,7 +36,7 @@ func newFsmExecutor(log zerolog.Logger, executor blockless.Executor, processors ps = append(ps, processors...) start := time.Now() - ps = append(ps, func(req FSMLogEntry, res execute.Result) { + ps = append(ps, func(req FSMLogEntry, _ execute.NodeResult) { // Global metrics handle. metrics.MeasureSinceWithLabels(raftExecutionTimeMetric, start, []metrics.Label{{Name: "function", Value: req.Execute.FunctionID}}) }) @@ -50,7 +50,7 @@ func newFsmExecutor(log zerolog.Logger, executor blockless.Executor, processors return &fsm } -func (f fsmExecutor) Apply(log *raft.Log) interface{} { +func (f fsmExecutor) Apply(log *raft.Log) any { f.log.Info().Msg("applying log entry") @@ -70,9 +70,13 @@ func (f fsmExecutor) Apply(log *raft.Log) interface{} { return fmt.Errorf("could not execute function: %w", err) } + nres := execute.NodeResult{ + Result: res, + } + // Execute processors. for _, proc := range f.processors { - proc(logEntry, res) + proc(logEntry, nres) } f.log.Info().Str("request", logEntry.RequestID).Msg("FSM successfully executed function") diff --git a/host/discovery.go b/host/discovery.go index 2273d963..e9da3461 100644 --- a/host/discovery.go +++ b/host/discovery.go @@ -42,6 +42,7 @@ func (h *Host) ConnectToKnownPeers(ctx context.Context) error { case <-ctx.Done(): ticker.Stop() h.log.Debug().Msg("stopping boot node reachability monitoring") + return } } }(ctx) diff --git a/models/execute/response.go b/models/execute/result.go similarity index 53% rename from models/execute/response.go rename to models/execute/result.go index 8005b09a..67062fe4 100644 --- a/models/execute/response.go +++ b/models/execute/result.go @@ -1,9 +1,13 @@ package execute import ( + "encoding/hex" "encoding/json" + "errors" + "fmt" "time" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/blocklessnetwork/b7s/models/codes" @@ -12,7 +16,10 @@ import ( // NodeResult is an annotated execution result. type NodeResult struct { Result - Metadata any `json:"metadata,omitempty"` + // Signed digest of the response. + Signature string `json:"signature,omitempty"` + PBFT PBFTResultInfo `json:"pbft,omitempty"` + Metadata any `json:"metadata,omitempty"` } // Result describes an execution result. @@ -44,6 +51,12 @@ type Usage struct { MemoryMaxKB int64 `json:"memory_max_kb,omitempty"` } +type PBFTResultInfo struct { + View uint `json:"view"` + RequestTimestamp time.Time `json:"request_timestamp,omitempty"` + Replica peer.ID `json:"replica,omitempty"` +} + // ResultMap contains execution results from multiple peers. type ResultMap map[peer.ID]NodeResult @@ -61,3 +74,51 @@ func (m ResultMap) MarshalJSON() ([]byte, error) { return json.Marshal(em) } + +func (r *NodeResult) Sign(key crypto.PrivKey) error { + + cp := *r + // Exclude some of the fields from the signature. + cp.Signature = "" + + payload, err := json.Marshal(cp) + if err != nil { + return fmt.Errorf("could not get byte representation of the record: %w", err) + } + + sig, err := key.Sign(payload) + if err != nil { + return fmt.Errorf("could not sign digest: %w", err) + } + + r.Signature = hex.EncodeToString(sig) + return nil +} + +func (r NodeResult) VerifySignature(key crypto.PubKey) error { + + cp := r + // Exclude some of the fields from the signature. + cp.Signature = "" + + payload, err := json.Marshal(cp) + if err != nil { + return fmt.Errorf("could not get byte representation of the record: %w", err) + } + + sig, err := hex.DecodeString(r.Signature) + if err != nil { + return fmt.Errorf("could not decode signature from hex: %w", err) + } + + ok, err := key.Verify(payload, sig) + if err != nil { + return fmt.Errorf("could not verify signature: %w", err) + } + + if !ok { + return errors.New("invalid signature") + } + + return nil +} diff --git a/models/execute/request_signature_test.go b/models/execute/signature_test.go similarity index 53% rename from models/execute/request_signature_test.go rename to models/execute/signature_test.go index 35ea180c..bf747456 100644 --- a/models/execute/request_signature_test.go +++ b/models/execute/signature_test.go @@ -3,11 +3,12 @@ package execute import ( "testing" + "github.com/blocklessnetwork/b7s/models/codes" "github.com/libp2p/go-libp2p/core/crypto" "github.com/stretchr/testify/require" ) -func TestExecute_Signing(t *testing.T) { +func TestRequestExecute_Signing(t *testing.T) { sampleReq := Request{ FunctionID: "function-di", @@ -57,6 +58,55 @@ func TestExecute_Signing(t *testing.T) { }) } +func TestResultExecute_Signing(t *testing.T) { + + sampleRes := NodeResult{ + Result: Result{ + Code: codes.Unknown, + Result: RuntimeOutput{ + Stdout: "generic-execution-result", + Stderr: "generic-execution-log", + ExitCode: 0, + }, + }, + } + + t.Run("nominal case", func(t *testing.T) { + + res := sampleRes + priv, pub := newKey(t) + + err := res.Sign(priv) + require.NoError(t, err) + + err = res.VerifySignature(pub) + require.NoError(t, err) + }) + t.Run("empty signature verification fails", func(t *testing.T) { + + res := sampleRes + res.Signature = "" + + _, pub := newKey(t) + + err := res.VerifySignature(pub) + require.Error(t, err) + }) + t.Run("tampered data signature verification fails", func(t *testing.T) { + + res := sampleRes + priv, pub := newKey(t) + + err := res.Sign(priv) + require.NoError(t, err) + + res.Result.Result.Stdout += " " + + err = res.VerifySignature(pub) + require.Error(t, err) + }) +} + func newKey(t *testing.T) (crypto.PrivKey, crypto.PubKey) { t.Helper() priv, pub, err := crypto.GenerateKeyPair(crypto.Ed25519, 0) diff --git a/models/response/execute.go b/models/response/execute.go index 52eb51ed..0a0db873 100644 --- a/models/response/execute.go +++ b/models/response/execute.go @@ -1,14 +1,7 @@ package response import ( - "encoding/hex" "encoding/json" - "errors" - "fmt" - "time" - - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" "github.com/blocklessnetwork/b7s/models/blockless" "github.com/blocklessnetwork/b7s/models/codes" @@ -25,10 +18,6 @@ type Execute struct { Results execute.ResultMap `json:"results,omitempty"` Cluster execute.Cluster `json:"cluster,omitempty"` - PBFT PBFTResultInfo `json:"pbft,omitempty"` - // Signed digest of the response. - Signature string `json:"signature,omitempty"` - // Used to communicate the reason for failure to the user. ErrorMessage string `json:"message,omitempty"` } @@ -61,59 +50,3 @@ func (e Execute) MarshalJSON() ([]byte, error) { } return json.Marshal(rec) } - -type PBFTResultInfo struct { - View uint `json:"view"` - RequestTimestamp time.Time `json:"request_timestamp,omitempty"` - Replica peer.ID `json:"replica,omitempty"` -} - -func (e *Execute) Sign(key crypto.PrivKey) error { - - cp := *e - // Exclude some of the fields from the signature. - cp.Signature = "" - cp.BaseMessage = blockless.BaseMessage{} - - payload, err := json.Marshal(cp) - if err != nil { - return fmt.Errorf("could not get byte representation of the record: %w", err) - } - - sig, err := key.Sign(payload) - if err != nil { - return fmt.Errorf("could not sign digest: %w", err) - } - - e.Signature = hex.EncodeToString(sig) - return nil -} - -func (e Execute) VerifySignature(key crypto.PubKey) error { - - cp := e - // Exclude some of the fields from the signature. - cp.Signature = "" - cp.BaseMessage = blockless.BaseMessage{} - - payload, err := json.Marshal(cp) - if err != nil { - return fmt.Errorf("could not get byte representation of the record: %w", err) - } - - sig, err := hex.DecodeString(e.Signature) - if err != nil { - return fmt.Errorf("could not decode signature from hex: %w", err) - } - - ok, err := key.Verify(payload, sig) - if err != nil { - return fmt.Errorf("could not verify signature: %w", err) - } - - if !ok { - return errors.New("invalid signature") - } - - return nil -} diff --git a/models/response/execute_test.go b/models/response/execute_test.go deleted file mode 100644 index 18dad754..00000000 --- a/models/response/execute_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package response - -import ( - "testing" - - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/stretchr/testify/require" - - "github.com/blocklessnetwork/b7s/models/codes" - "github.com/blocklessnetwork/b7s/models/execute" - "github.com/blocklessnetwork/b7s/testing/mocks" -) - -func TestExecute_Signing(t *testing.T) { - - sampleRes := Execute{ - RequestID: mocks.GenericUUID.String(), - Code: codes.OK, - Results: execute.ResultMap{ - mocks.GenericPeerID: execute.NodeResult{Result: mocks.GenericExecutionResult}, - }, - Cluster: execute.Cluster{ - Peers: mocks.GenericPeerIDs[:4], - }, - } - - t.Run("nominal case", func(t *testing.T) { - - res := sampleRes - priv, pub := newKey(t) - - err := res.Sign(priv) - require.NoError(t, err) - - err = res.VerifySignature(pub) - require.NoError(t, err) - }) - t.Run("empty signature verification fails", func(t *testing.T) { - - res := sampleRes - res.Signature = "" - - _, pub := newKey(t) - - err := res.VerifySignature(pub) - require.Error(t, err) - }) - t.Run("tampered data signature verification fails", func(t *testing.T) { - - res := sampleRes - priv, pub := newKey(t) - - err := res.Sign(priv) - require.NoError(t, err) - - res.RequestID += " " - - err = res.VerifySignature(pub) - require.Error(t, err) - }) -} - -func newKey(t *testing.T) (crypto.PrivKey, crypto.PubKey) { - t.Helper() - priv, pub, err := crypto.GenerateKeyPair(crypto.Ed25519, 0) - require.NoError(t, err) - - return priv, pub -} diff --git a/node/cluster.go b/node/cluster.go index 906616c1..e0f58f0b 100644 --- a/node/cluster.go +++ b/node/cluster.go @@ -135,14 +135,13 @@ func (n *Node) formCluster(ctx context.Context, requestID string, replicas []pee go func() { defer rw.Done() key := consensusResponseKey(requestID, rp) - res, ok := n.consensusResponses.WaitFor(clusterCtx, key) + fc, ok := n.consensusResponses.WaitFor(clusterCtx, key) if !ok { return } n.log.Info().Str("request", requestID).Str("peer", rp.String()).Msg("accounted consensus cluster response from roll called peer") - fc := res.(response.FormCluster) if fc.Code != codes.OK { log.Warn().Str("peer", rp.String()).Msg("peer failed to join consensus cluster") return diff --git a/node/cluster_pbft_integration_test.go b/node/cluster_pbft_integration_test.go index 6e0cc0ab..846c6623 100644 --- a/node/cluster_pbft_integration_test.go +++ b/node/cluster_pbft_integration_test.go @@ -193,7 +193,7 @@ This is the end of my program // Wait for the installation request to be processed. installWG.Wait() - t.Log("worker node installed function") + t.Log("worker nodes installed function") // Phase 5: Request execution from the head node. diff --git a/node/consensus.go b/node/consensus.go index 61a4c312..fdb7b000 100644 --- a/node/consensus.go +++ b/node/consensus.go @@ -28,31 +28,23 @@ type consensusExecutor interface { func (n *Node) createRaftCluster(ctx context.Context, from peer.ID, fc request.FormCluster) error { - // Add a callback function to cache the execution result - cacheFn := func(req raft.FSMLogEntry, res execute.Result) { - n.executeResponses.Set(req.RequestID, res) - } - // Add a callback function to send the execution result to origin. - sendFn := func(req raft.FSMLogEntry, res execute.Result) { + sendFn := func(req raft.FSMLogEntry, res execute.NodeResult) { ctx, cancel := context.WithTimeout(context.Background(), consensusClusterSendTimeout) defer cancel() - metadata, err := n.cfg.MetadataProvider.Metadata(req.Execute, res.Result) + metadata, err := n.cfg.MetadataProvider.Metadata(req.Execute, res.Result.Result) if err != nil { n.log.Warn().Err(err).Msg("could not get metadata") } + res.Metadata = metadata + msg := response.Execute{ Code: res.Code, RequestID: req.RequestID, - Results: execute.ResultMap{ - n.host.ID(): execute.NodeResult{ - Result: res, - Metadata: metadata, - }, - }, + Results: singleNodeResultMap(n.host.ID(), res), } err = n.send(ctx, req.Origin, &msg) @@ -61,6 +53,11 @@ func (n *Node) createRaftCluster(ctx context.Context, from peer.ID, fc request.F } } + // Add a callback function to cache the execution result + cacheFn := func(req raft.FSMLogEntry, res execute.NodeResult) { + n.executeResponses.Set(req.RequestID, singleNodeResultMap(n.host.ID(), res)) + } + rh, err := raft.New( n.log, n.host, @@ -88,8 +85,8 @@ func (n *Node) createRaftCluster(ctx context.Context, from peer.ID, fc request.F func (n *Node) createPBFTCluster(ctx context.Context, from peer.ID, fc request.FormCluster) error { - cacheFn := func(requestID string, origin peer.ID, request execute.Request, result execute.Result) { - n.executeResponses.Set(requestID, result) + cacheFn := func(requestID string, origin peer.ID, request execute.Request, result execute.NodeResult) { + n.executeResponses.Set(requestID, singleNodeResultMap(n.host.ID(), result)) } // If we have tracing enabled we will have trace info in the context. diff --git a/node/execute.go b/node/execute.go index d420f803..a00adfda 100644 --- a/node/execute.go +++ b/node/execute.go @@ -27,7 +27,7 @@ func (n *Node) processExecuteResponse(ctx context.Context, from peer.ID, res res n.log.Debug().Str("request", res.RequestID).Str("from", from.String()).Msg("received execution response") key := executionResultKey(res.RequestID, from) - n.executeResponses.Set(key, res) + n.executeResponses.Set(key, res.Results) return nil } diff --git a/node/execution_results.go b/node/execution_results.go index 7b232c39..043d10b6 100644 --- a/node/execution_results.go +++ b/node/execution_results.go @@ -9,7 +9,6 @@ import ( "github.com/blocklessnetwork/b7s/consensus/pbft" "github.com/blocklessnetwork/b7s/models/execute" - "github.com/blocklessnetwork/b7s/models/response" ) // gatherExecutionResultsPBFT collects execution results from a PBFT cluster. This means f+1 identical results. @@ -33,6 +32,12 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, out execute.ResultMap = make(map[peer.ID]execute.NodeResult) ) + // We use a map as a simple way to count identical results. + // Equality means same result (process outputs) and same request timestamp. + peerResultMapKey := func(res execute.NodeResult) string { + return fmt.Sprintf("%+#v-%s", res.Result.Result, res.PBFT.RequestTimestamp.String()) + } + wg.Add(len(peers)) for _, rp := range peers { @@ -47,7 +52,10 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, n.log.Info().Str("peer", sender.String()).Str("request", requestID).Msg("accounted execution response from peer") - er := res.(response.Execute) + er, ok := res[sender] + if !ok { + return + } pub, err := sender.ExtractPublicKey() if err != nil { @@ -61,25 +69,19 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, return } - exres, ok := er.Results[sender] - if !ok { - return - } - lock.Lock() defer lock.Unlock() - // Equality means same result (output) and same timestamp. - reskey := fmt.Sprintf("%+#v-%s", exres.Result.Result, er.PBFT.RequestTimestamp.String()) + reskey := peerResultMapKey(er) result, ok := results[reskey] if !ok { results[reskey] = aggregatedResult{ - result: exres.Result, + result: er.Result, peers: []peer.ID{ sender, }, metadata: map[peer.ID]any{ - sender: exres.Metadata, + sender: er.Metadata, }, } return @@ -87,7 +89,7 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, // Record which peers have this result, and their metadata. result.peers = append(result.peers, sender) - result.metadata[sender] = exres.Metadata + result.metadata[sender] = er.Metadata results[reskey] = result @@ -139,9 +141,7 @@ func (n *Node) gatherExecutionResults(ctx context.Context, requestID string, pee n.log.Info().Str("peer", rp.String()).Msg("accounted execution response from peer") - er := res.(response.Execute) - - exres, ok := er.Results[rp] + exres, ok := res[rp] if !ok { return } @@ -156,3 +156,9 @@ func (n *Node) gatherExecutionResults(ctx context.Context, requestID string, pee return results } + +func singleNodeResultMap(id peer.ID, res execute.NodeResult) execute.ResultMap { + return map[peer.ID]execute.NodeResult{ + id: res, + } +} diff --git a/node/internal/waitmap/waitmap.go b/node/internal/waitmap/waitmap.go index 0bd73a28..3af8b434 100644 --- a/node/internal/waitmap/waitmap.go +++ b/node/internal/waitmap/waitmap.go @@ -2,43 +2,45 @@ package waitmap import ( "context" + "math" "sync" + + "github.com/hashicorp/golang-lru/simplelru" ) // WaitMap is a key-value store that enables not only setting and getting // values from a map, but also waiting until value for a key becomes available. -// Important: Since this implementation is tied pretty closely to how it will be used, -// (as an internal package), it has the peculiar behavior of only the first `Set` setting -// the value. Subsequent `Sets()` are recorded, but don't change the returned value. -type WaitMap struct { +type WaitMap[K comparable, V any] struct { sync.Mutex - m map[string][]any - subs map[string][]chan any + cache *simplelru.LRU + subs map[K][]chan V } // New creates a new WaitMap. -func New() *WaitMap { +func New[K comparable, V any](size int) *WaitMap[K, V] { + + if size <= 0 { + size = math.MaxInt + } + + // Only possible cause of an error is providing an invalid size value + cache, _ := simplelru.NewLRU(size, nil) - wm := WaitMap{ - m: make(map[string][]any), - subs: make(map[string][]chan any), + wm := WaitMap[K, V]{ + cache: cache, + subs: make(map[K][]chan V), } return &wm } // Set sets the value for a key. If the value already exists, we append it to a list. -func (w *WaitMap) Set(key string, value any) { +func (w *WaitMap[K, V]) Set(key K, value V) { w.Lock() defer w.Unlock() - _, ok := w.m[key] - if !ok { - w.m[key] = make([]any, 0) - } - - w.m[key] = append(w.m[key], value) + w.cache.Add(key, value) // Send the new value to any waiting subscribers of the key. for _, sub := range w.subs[key] { @@ -48,18 +50,18 @@ func (w *WaitMap) Set(key string, value any) { } // Wait will wait until the value for a key becomes available. -func (w *WaitMap) Wait(key string) any { +func (w *WaitMap[K, V]) Wait(key K) V { w.Lock() // Unlock cannot be deferred so we can ublock Set() while waiting. - values, ok := w.m[key] + value, ok := w.cache.Get(key) if ok { w.Unlock() - return values[0] + return value.(V) } // If there's no value yet, subscribe to any new values for this key. - ch := make(chan any) + ch := make(chan V) w.subs[key] = append(w.subs[key], ch) w.Unlock() @@ -67,43 +69,41 @@ func (w *WaitMap) Wait(key string) any { } // WaitFor will wait for the value for a key to become available, but no longer than the specified duration. -func (w *WaitMap) WaitFor(ctx context.Context, key string) (any, bool) { +func (w *WaitMap[K, V]) WaitFor(ctx context.Context, key K) (V, bool) { w.Lock() // Unlock cannot be deferred so we can ublock Set() while waiting. - values, ok := w.m[key] + value, ok := w.cache.Get(key) if ok { w.Unlock() - return values[0], true + return value.(V), true } // If there's no value yet, subscribe to any new values for this key. // Use a bufferred channel since we might bail before collecting our value. - ch := make(chan any, 1) + ch := make(chan V, 1) w.subs[key] = append(w.subs[key], ch) w.Unlock() select { case <-ctx.Done(): - return nil, false + zero := *new(V) + return zero, false case value := <-ch: return value, true } } // Get will return the current value for the key, if any. -func (w *WaitMap) Get(key string) (any, bool) { +func (w *WaitMap[K, V]) Get(key K) (V, bool) { w.Lock() defer w.Unlock() - values, ok := w.m[key] + value, ok := w.cache.Get(key) if !ok { - return values, ok + zero := *new(V) + return zero, ok } - // As noted in the comment at the beginning of this file, - // this is special behavior because of the way this map will be used. - // Get will always return the first value. - value := values[0] - return value, true + return value.(V), true } diff --git a/node/internal/waitmap/waitmap_internal_test.go b/node/internal/waitmap/waitmap_test.go similarity index 86% rename from node/internal/waitmap/waitmap_internal_test.go rename to node/internal/waitmap/waitmap_test.go index 8022ee58..5d096db3 100644 --- a/node/internal/waitmap/waitmap_internal_test.go +++ b/node/internal/waitmap/waitmap_test.go @@ -1,4 +1,4 @@ -package waitmap +package waitmap_test import ( "context" @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/blocklessnetwork/b7s/node/internal/waitmap" "github.com/stretchr/testify/require" ) @@ -23,12 +24,9 @@ func TestWaitMap(t *testing.T) { value = "dummy-value" ) - wm := New() + wm := waitmap.New[string, string](0) wm.Set(key, value) - require.Len(t, wm.m, 1) - require.Equal(t, value, wm.m[key][0]) - retrieved, ok := wm.Get(key) require.True(t, ok) require.Equal(t, value, retrieved) @@ -40,7 +38,7 @@ func TestWaitMap(t *testing.T) { key = "dummy-key" ) - wm := New() + wm := waitmap.New[string, string](0) _, ok := wm.Get(key) require.False(t, ok) @@ -57,7 +55,7 @@ func TestWaitMap(t *testing.T) { retrieved string ) - wm := New() + wm := waitmap.New[string, string](0) var wg sync.WaitGroup wg.Add(1) @@ -66,7 +64,7 @@ func TestWaitMap(t *testing.T) { defer wg.Done() waited := wm.Wait(key) - retrieved = waited.(string) + retrieved = waited }() // Delay so that the goroutine actually has to wait. @@ -91,7 +89,7 @@ func TestWaitMap(t *testing.T) { value = "dummy-value" ) - wm := New() + wm := waitmap.New[string, string](0) wm.Set(key, value) @@ -106,7 +104,7 @@ func TestWaitMap(t *testing.T) { value = "dummy-value" ) - wm := New() + wm := waitmap.New[string, string](0) var wg sync.WaitGroup wg.Add(3) @@ -115,7 +113,7 @@ func TestWaitMap(t *testing.T) { defer wg.Done() waited := wm.Wait(key) - require.Equal(t, value, waited.(string)) + require.Equal(t, value, waited) } // Spin up three goroutines - they should all get the same result. @@ -135,7 +133,7 @@ func TestWaitMap(t *testing.T) { value = "dummy-value" ) - wm := New() + wm := waitmap.New[string, string](0) var wg sync.WaitGroup wg.Add(1) @@ -147,7 +145,7 @@ func TestWaitMap(t *testing.T) { retrieved, ok := wm.WaitFor(ctx, key) require.True(t, ok) - require.Equal(t, value, retrieved.(string)) + require.Equal(t, value, retrieved) }() // Delay so that the goroutine actually has to wait. @@ -166,7 +164,7 @@ func TestWaitMap(t *testing.T) { timeout = 10 * time.Millisecond ) - wm := New() + wm := waitmap.New[string, string](0) var wg sync.WaitGroup wg.Add(1) @@ -192,6 +190,6 @@ func TestWaitMap(t *testing.T) { retrieved, ok := wm.WaitFor(ctx, key) require.True(t, ok) - require.Equal(t, value, retrieved.(string)) + require.Equal(t, value, retrieved) }) } diff --git a/node/node.go b/node/node.go index de16cb73..18442277 100644 --- a/node/node.go +++ b/node/node.go @@ -13,6 +13,8 @@ import ( "github.com/blocklessnetwork/b7s/host" "github.com/blocklessnetwork/b7s/info" "github.com/blocklessnetwork/b7s/models/blockless" + "github.com/blocklessnetwork/b7s/models/execute" + "github.com/blocklessnetwork/b7s/models/response" "github.com/blocklessnetwork/b7s/node/internal/waitmap" "github.com/blocklessnetwork/b7s/telemetry/tracing" ) @@ -45,8 +47,8 @@ type Node struct { // clusterLock is used to synchronize access to the `clusters` map. clusterLock sync.RWMutex - executeResponses *waitmap.WaitMap - consensusResponses *waitmap.WaitMap + executeResponses *waitmap.WaitMap[string, execute.ResultMap] + consensusResponses *waitmap.WaitMap[string, response.FormCluster] // Telemetry tracer *tracing.Tracer @@ -87,8 +89,8 @@ func New(log zerolog.Logger, host *host.Host, store blockless.PeerStore, fstore rollCall: newQueue(rollCallQueueBufferSize), clusters: make(map[string]consensusExecutor), - executeResponses: waitmap.New(), - consensusResponses: waitmap.New(), + executeResponses: waitmap.New[string, execute.ResultMap](executionResultCacheSize), + consensusResponses: waitmap.New[string, response.FormCluster](0), tracer: tracing.NewTracer(tracerName), metrics: metrics.Default(), diff --git a/node/params.go b/node/params.go index 6b2c430e..89b92638 100644 --- a/node/params.go +++ b/node/params.go @@ -28,6 +28,8 @@ const ( syncInterval = time.Hour // How often do we recheck function installations. allowErrorLeakToTelemetry = false // By default we will not send processing errors to telemetry tracers. + + executionResultCacheSize = 1000 ) // Raft and consensus related parameters. diff --git a/node/rest.go b/node/rest.go index 0a95ec87..a43e283d 100644 --- a/node/rest.go +++ b/node/rest.go @@ -27,9 +27,8 @@ func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, subgrou } // ExecutionResult fetches the execution result from the node cache. -func (n *Node) ExecutionResult(id string) (execute.Result, bool) { - res, ok := n.executeResponses.Get(id) - return res.(execute.Result), ok +func (n *Node) ExecutionResult(id string) (execute.ResultMap, bool) { + return n.executeResponses.Get(id) } // PublishFunctionInstall publishes a function install message. diff --git a/node/worker_execute.go b/node/worker_execute.go index 2fd6700d..4521eb12 100644 --- a/node/worker_execute.go +++ b/node/worker_execute.go @@ -49,11 +49,11 @@ func (n *Node) workerProcessExecute(ctx context.Context, from peer.ID, req reque log.Info().Str("code", code.String()).Msg("execution complete") - // Cache the execution result. - n.executeResponses.Set(requestID, result) - // Create the execution response from the execution result. rm := execute.ResultMap{n.host.ID(): {Result: result, Metadata: metadata}} + + n.executeResponses.Set(requestID, rm) + res := req.Response(code).WithResults(rm) // Send the response, whatever it may be (success or failure). diff --git a/testing/mocks/generic.go b/testing/mocks/generic.go index c94748e4..b0ea83fd 100644 --- a/testing/mocks/generic.go +++ b/testing/mocks/generic.go @@ -40,6 +40,12 @@ var ( }, } + GenericExecutionResultMap = execute.ResultMap{ + GenericPeerID: { + Result: GenericExecutionResult, + }, + } + GenericExecutionRequest = execute.Request{ FunctionID: "generic-function-id", Method: "wasm", diff --git a/testing/mocks/node.go b/testing/mocks/node.go index be03cd22..a9cfae82 100644 --- a/testing/mocks/node.go +++ b/testing/mocks/node.go @@ -11,7 +11,7 @@ import ( // Node implements the `Node` interface expected by the API. type Node struct { ExecuteFunctionFunc func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) - ExecutionResultFunc func(id string) (execute.Result, bool) + ExecutionResultFunc func(id string) (execute.ResultMap, bool) PublishFunctionInstallFunc func(ctx context.Context, uri string, cid string, subgroup string) error } @@ -21,17 +21,11 @@ func BaselineNode(t *testing.T) *Node { node := Node{ ExecuteFunctionFunc: func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { - results := execute.ResultMap{ - GenericPeerID: execute.NodeResult{ - Result: GenericExecutionResult, - }, - } - // TODO: Add a generic cluster info - return GenericExecutionResult.Code, GenericUUID.String(), results, execute.Cluster{}, nil + return GenericExecutionResult.Code, GenericUUID.String(), GenericExecutionResultMap, execute.Cluster{}, nil }, - ExecutionResultFunc: func(id string) (execute.Result, bool) { - return GenericExecutionResult, true + ExecutionResultFunc: func(id string) (execute.ResultMap, bool) { + return GenericExecutionResultMap, true }, PublishFunctionInstallFunc: func(ctx context.Context, uri string, cid string, subgroup string) error { return nil @@ -45,7 +39,7 @@ func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, subgrou return n.ExecuteFunctionFunc(ctx, req, subgroup) } -func (n *Node) ExecutionResult(id string) (execute.Result, bool) { +func (n *Node) ExecutionResult(id string) (execute.ResultMap, bool) { return n.ExecutionResultFunc(id) }