Skip to content

Commit

Permalink
feat: endpoint for gRPC submit.WatchMempool
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Gianelloni <[email protected]>
  • Loading branch information
wolf31o2 committed May 27, 2024
1 parent 68932e8 commit b34602d
Showing 1 changed file with 88 additions and 0 deletions.
88 changes: 88 additions & 0 deletions internal/utxorpc/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,91 @@ func (s *submitServiceServer) ReadMempool(
}

// WatchMempool
func (s *submitServiceServer) WatchMempool(
ctx context.Context,
req *connect.Request[submit.WatchMempoolRequest],
stream *connect.ServerStream[submit.WatchMempoolResponse],
) error {

predicate := req.Msg.GetPredicate() // Predicate
fieldMask := req.Msg.GetFieldMask()
log.Printf(
"Got a WatchMempool request with predicate %v and fieldMask %v",
predicate,
fieldMask,
)

// Connect to node
oConn, err := node.GetConnection(nil)
if err != nil {
return err
}
defer func() {
// Close Ouroboros connection
oConn.Close()
}()

// Start clients
oConn.LocalTxMonitor().Client.Start()

// Collect TX hashes from the mempool
needsAcquire := false
for {
if needsAcquire {
err = oConn.LocalTxMonitor().Client.Acquire()
if err != nil {
log.Printf("ERROR: %s", err)
return err
}
}
txRawBytes, err := oConn.LocalTxMonitor().Client.NextTx()
if err != nil {
log.Printf("ERROR: %s", err)
return err
}
// No transactions in mempool, release and continue
if txRawBytes == nil {
err := oConn.LocalTxMonitor().Client.Release()
if err != nil {
log.Printf("ERROR: %s", err)
return err
}
needsAcquire = true
continue
}

txType, err := ledger.DetermineTransactionType(txRawBytes)
if err != nil {
return err
}
tx, err := ledger.NewTransactionFromCbor(txType, txRawBytes)
if err != nil {
return err
}
cTx := tx.Utxorpc() // *cardano.Tx
resp := &submit.WatchMempoolResponse{}
var act submit.AnyChainTx
var actr submit.AnyChainTx_Raw
actr.Raw = txRawBytes
act.Type = &actr
record := &submit.TxInMempool{
Tx: &act,
Stage: submit.Stage_STAGE_MEMPOOL,
}
resp.Tx = record
if record.Tx.String() == cTx.String() {
if predicate == nil {
err := stream.Send(resp)
if err != nil {
return err
}
} else {
// TODO: filter from all Predicate types
err := stream.Send(resp)
if err != nil {
return err
}
}
}
}
}

0 comments on commit b34602d

Please sign in to comment.