Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(utxorpc): waitForTx call #304

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions internal/node/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,31 @@ func chainSyncRollForwardHandler(
) error {
cfg := config.GetConfig()
if connCfg.ChainSyncEventChan != nil {
var block ledger.Block
switch v := blockData.(type) {
case ledger.Block:
block = v
// Emit block-level event
blockEvt := event.New(
"chainsync.block",
time.Now(),
input_chainsync.NewBlockContext(v, cfg.Node.NetworkMagic),
input_chainsync.NewBlockEvent(v, true),
)
connCfg.ChainSyncEventChan <- blockEvt
// Emit transaction-level events
for t, transaction := range v.Transactions() {
// TODO: do we need to resolve inputs?
// resolvedInputs, err := resolveTransactionInputs(transaction, connCfg)
// if err != nil {
// return fmt.Errorf("failed to resolve inputs for transaction: %w", err)
// }
txEvt := event.New(
"chainsync.transaction",
time.Now(),
input_chainsync.NewTransactionContext(v, transaction, uint32(t), cfg.Node.NetworkMagic),
input_chainsync.NewTransactionEvent(v, transaction, true, nil),
)
connCfg.ChainSyncEventChan <- txEvt
}
/*
case ledger.BlockHeader:
blockSlot := v.SlotNumber()
Expand All @@ -102,13 +123,6 @@ func chainSyncRollForwardHandler(
default:
return fmt.Errorf("unknown block data")
}
evt := event.New(
"chainsync.block",
time.Now(),
input_chainsync.NewBlockContext(block, cfg.Node.NetworkMagic),
input_chainsync.NewBlockEvent(block, true),
)
connCfg.ChainSyncEventChan <- evt
}
return nil
}
107 changes: 65 additions & 42 deletions internal/utxorpc/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/hex"
"fmt"
"log"
"log/slog"

connect "connectrpc.com/connect"
"github.com/blinklabs-io/adder/event"
Expand All @@ -27,7 +28,6 @@ import (
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
submit "github.com/utxorpc/go-codegen/utxorpc/v1alpha/submit"
"github.com/utxorpc/go-codegen/utxorpc/v1alpha/submit/submitconnect"
"golang.org/x/crypto/blake2b"

"github.com/blinklabs-io/cardano-node-api/internal/node"
)
Expand Down Expand Up @@ -104,83 +104,106 @@ func (s *submitServiceServer) SubmitTx(
return connect.NewResponse(resp), nil
}

// WaitForTx
func (s *submitServiceServer) WaitForTx(
ctx context.Context,
req *connect.Request[submit.WaitForTxRequest],
stream *connect.ServerStream[submit.WaitForTxResponse],
) error {

logger := slog.With("component", "WaitForTx")
ref := req.Msg.GetRef() // [][]byte
log.Printf("Got a WaitForTx request with %d transactions", len(ref))
logger.Info("Received WaitForTx request", "transaction_count", len(ref))

// Log the transaction references at debug level
for i, r := range ref {
logger.Debug("Transaction reference", "index", i, "ref", hex.EncodeToString(r))
}

// Setup event channel
eventChan := make(chan event.Event, 10)
eventChan := make(chan event.Event, 100) // Increased buffer size for high-throughput
connCfg := node.ConnectionConfig{
ChainSyncEventChan: eventChan,
}

// Connect to node
logger.Debug("Establishing connection to Ouroboros node...")
oConn, err := node.GetConnection(&connCfg)
if err != nil {
logger.Error("Failed to connect to node", "error", err)
return err
}
defer func() {
// Close Ouroboros connection
logger.Debug("Closing connection to Ouroboros node.")
oConn.Close()
}()

// Get our starting point
var point ocommon.Point
// Get the current chain tip
tip, err := oConn.ChainSync().Client.GetCurrentTip()
if err != nil {
log.Printf("ERROR: %s", err)
logger.Error("Error retrieving current tip", "error", err)
return err
}
point = tip.Point
logger.Debug("Current chain tip retrieved", "tip", tip)

// Start the sync with the node
err = oConn.ChainSync().Client.Sync([]ocommon.Point{point})
logger.Debug("Starting chain synchronization...")
err = oConn.ChainSync().Client.Sync([]ocommon.Point{tip.Point})
if err != nil {
log.Printf("ERROR: %s", err)
logger.Error("Error during chain synchronization", "error", err)
return err
}

// Context cancellation handling
go func() {
<-ctx.Done()
logger.Debug("Client canceled the request. Stopping event processing.")
close(eventChan)
}()

// Wait for events
logger.Debug("Waiting for transaction events...")
for {
evt, ok := <-eventChan
if !ok {
log.Printf("ERROR: channel closed")
return fmt.Errorf("ERROR: channel closed")
}
select {
case <-ctx.Done():
logger.Info("Context canceled. Exiting event loop.")
return ctx.Err()
case evt, ok := <-eventChan:
if !ok {
logger.Error("Event channel closed unexpectedly.")
return fmt.Errorf("event channel closed")
}

switch v := evt.Payload.(type) {
case input_chainsync.TransactionEvent:
for _, r := range ref {
resp := &submit.WaitForTxResponse{}
resp.Ref = r
resp.Stage = submit.Stage_STAGE_UNSPECIFIED
tc := evt.Context.(input_chainsync.TransactionContext)
// taken from gOuroboros generateTransactionHash
tmpHash, err := blake2b.New256(nil)
if err != nil {
return err
}
tmpHash.Write(r)
txHash := hex.EncodeToString(tmpHash.Sum(nil))
// Compare against our event's hash
if txHash == v.Transaction.Hash() {
resp.Stage = submit.Stage_STAGE_CONFIRMED
// Send it!
err = stream.Send(resp)
if err != nil {
return err
// Process the event
switch v := evt.Payload.(type) {
case input_chainsync.TransactionEvent:
logger.Debug("Received TransactionEvent", "hash", v.Transaction.Hash())
for _, r := range ref {
refHash := hex.EncodeToString(r)
eventHash := v.Transaction.Hash()

logger.Debug("Comparing TransactionEvent with reference", "event_hash", eventHash, "reference_hash", refHash)
if refHash == eventHash {
logger.Info("Transaction matches reference", "hash", eventHash)

// Send confirmation response
err = stream.Send(&submit.WaitForTxResponse{
Ref: r,
Stage: submit.Stage_STAGE_CONFIRMED,
})
if err != nil {
if ctx.Err() != nil {
logger.Warn("Client disconnected while sending response", "error", ctx.Err())
return ctx.Err()
}
logger.Error("Error sending response to client", "transaction_hash", eventHash, "error", err)
return err
}
logger.Info("Confirmation response sent", "transaction_hash", eventHash)
return nil // Stop processing after confirming the transaction
}
log.Printf(
"transaction: id: %d, hash: %s",
tc.TransactionIdx,
tc.TransactionHash,
)
}
default:
logger.Debug("Received unsupported event type", "type", evt.Type)
}
}
}
Expand Down