From 37e2e5ede13f92fb4a181ec48ceba316e4addaac Mon Sep 17 00:00:00 2001 From: Ales Verbic Date: Tue, 19 Nov 2024 08:39:44 -0500 Subject: [PATCH] fix(utxorpc): waitForTx call Signed-off-by: Ales Verbic --- internal/node/chainsync.go | 32 +++++++---- internal/utxorpc/submit.go | 107 ++++++++++++++++++++++--------------- 2 files changed, 88 insertions(+), 51 deletions(-) diff --git a/internal/node/chainsync.go b/internal/node/chainsync.go index 1fb5efd..0cfc551 100644 --- a/internal/node/chainsync.go +++ b/internal/node/chainsync.go @@ -82,10 +82,31 @@ func chainSyncRollForwardHandler( ) error { cfg := config.GetConfig() if connCfg.ChainSyncEventChan != nil { - var block ledger.Block switch v := blockData.(type) { case ledger.Block: - block = v + // Emit block-level event + blockEvt := event.New( + "chainsync.block", + time.Now(), + input_chainsync.NewBlockContext(v, cfg.Node.NetworkMagic), + input_chainsync.NewBlockEvent(v, true), + ) + connCfg.ChainSyncEventChan <- blockEvt + // Emit transaction-level events + for t, transaction := range v.Transactions() { + // TODO: do we need to resolve inputs? + // resolvedInputs, err := resolveTransactionInputs(transaction, connCfg) + // if err != nil { + // return fmt.Errorf("failed to resolve inputs for transaction: %w", err) + // } + txEvt := event.New( + "chainsync.transaction", + time.Now(), + input_chainsync.NewTransactionContext(v, transaction, uint32(t), cfg.Node.NetworkMagic), + input_chainsync.NewTransactionEvent(v, transaction, true, nil), + ) + connCfg.ChainSyncEventChan <- txEvt + } /* case ledger.BlockHeader: blockSlot := v.SlotNumber() @@ -102,13 +123,6 @@ func chainSyncRollForwardHandler( default: return fmt.Errorf("unknown block data") } - evt := event.New( - "chainsync.block", - time.Now(), - input_chainsync.NewBlockContext(block, cfg.Node.NetworkMagic), - input_chainsync.NewBlockEvent(block, true), - ) - connCfg.ChainSyncEventChan <- evt } return nil } diff --git a/internal/utxorpc/submit.go b/internal/utxorpc/submit.go index c22a013..1eea260 100644 --- a/internal/utxorpc/submit.go +++ b/internal/utxorpc/submit.go @@ -19,6 +19,7 @@ import ( "encoding/hex" "fmt" "log" + "log/slog" connect "connectrpc.com/connect" "github.com/blinklabs-io/adder/event" @@ -27,7 +28,6 @@ import ( ocommon "github.com/blinklabs-io/gouroboros/protocol/common" submit "github.com/utxorpc/go-codegen/utxorpc/v1alpha/submit" "github.com/utxorpc/go-codegen/utxorpc/v1alpha/submit/submitconnect" - "golang.org/x/crypto/blake2b" "github.com/blinklabs-io/cardano-node-api/internal/node" ) @@ -104,83 +104,106 @@ func (s *submitServiceServer) SubmitTx( return connect.NewResponse(resp), nil } -// WaitForTx func (s *submitServiceServer) WaitForTx( ctx context.Context, req *connect.Request[submit.WaitForTxRequest], stream *connect.ServerStream[submit.WaitForTxResponse], ) error { + logger := slog.With("component", "WaitForTx") ref := req.Msg.GetRef() // [][]byte - log.Printf("Got a WaitForTx request with %d transactions", len(ref)) + logger.Info("Received WaitForTx request", "transaction_count", len(ref)) + + // Log the transaction references at debug level + for i, r := range ref { + logger.Debug("Transaction reference", "index", i, "ref", hex.EncodeToString(r)) + } // Setup event channel - eventChan := make(chan event.Event, 10) + eventChan := make(chan event.Event, 100) // Increased buffer size for high-throughput connCfg := node.ConnectionConfig{ ChainSyncEventChan: eventChan, } + // Connect to node + logger.Debug("Establishing connection to Ouroboros node...") oConn, err := node.GetConnection(&connCfg) if err != nil { + logger.Error("Failed to connect to node", "error", err) return err } defer func() { - // Close Ouroboros connection + logger.Debug("Closing connection to Ouroboros node.") oConn.Close() }() - // Get our starting point - var point ocommon.Point + // Get the current chain tip tip, err := oConn.ChainSync().Client.GetCurrentTip() if err != nil { - log.Printf("ERROR: %s", err) + logger.Error("Error retrieving current tip", "error", err) return err } - point = tip.Point + logger.Debug("Current chain tip retrieved", "tip", tip) // Start the sync with the node - err = oConn.ChainSync().Client.Sync([]ocommon.Point{point}) + logger.Debug("Starting chain synchronization...") + err = oConn.ChainSync().Client.Sync([]ocommon.Point{tip.Point}) if err != nil { - log.Printf("ERROR: %s", err) + logger.Error("Error during chain synchronization", "error", err) return err } + // Context cancellation handling + go func() { + <-ctx.Done() + logger.Debug("Client canceled the request. Stopping event processing.") + close(eventChan) + }() + // Wait for events + logger.Debug("Waiting for transaction events...") for { - evt, ok := <-eventChan - if !ok { - log.Printf("ERROR: channel closed") - return fmt.Errorf("ERROR: channel closed") - } + select { + case <-ctx.Done(): + logger.Info("Context canceled. Exiting event loop.") + return ctx.Err() + case evt, ok := <-eventChan: + if !ok { + logger.Error("Event channel closed unexpectedly.") + return fmt.Errorf("event channel closed") + } - switch v := evt.Payload.(type) { - case input_chainsync.TransactionEvent: - for _, r := range ref { - resp := &submit.WaitForTxResponse{} - resp.Ref = r - resp.Stage = submit.Stage_STAGE_UNSPECIFIED - tc := evt.Context.(input_chainsync.TransactionContext) - // taken from gOuroboros generateTransactionHash - tmpHash, err := blake2b.New256(nil) - if err != nil { - return err - } - tmpHash.Write(r) - txHash := hex.EncodeToString(tmpHash.Sum(nil)) - // Compare against our event's hash - if txHash == v.Transaction.Hash() { - resp.Stage = submit.Stage_STAGE_CONFIRMED - // Send it! - err = stream.Send(resp) - if err != nil { - return err + // Process the event + switch v := evt.Payload.(type) { + case input_chainsync.TransactionEvent: + logger.Debug("Received TransactionEvent", "hash", v.Transaction.Hash()) + for _, r := range ref { + refHash := hex.EncodeToString(r) + eventHash := v.Transaction.Hash() + + logger.Debug("Comparing TransactionEvent with reference", "event_hash", eventHash, "reference_hash", refHash) + if refHash == eventHash { + logger.Info("Transaction matches reference", "hash", eventHash) + + // Send confirmation response + err = stream.Send(&submit.WaitForTxResponse{ + Ref: r, + Stage: submit.Stage_STAGE_CONFIRMED, + }) + if err != nil { + if ctx.Err() != nil { + logger.Warn("Client disconnected while sending response", "error", ctx.Err()) + return ctx.Err() + } + logger.Error("Error sending response to client", "transaction_hash", eventHash, "error", err) + return err + } + logger.Info("Confirmation response sent", "transaction_hash", eventHash) + return nil // Stop processing after confirming the transaction } - log.Printf( - "transaction: id: %d, hash: %s", - tc.TransactionIdx, - tc.TransactionHash, - ) } + default: + logger.Debug("Received unsupported event type", "type", evt.Type) } } }