diff --git a/internal/utxorpc/submit.go b/internal/utxorpc/submit.go index bac6dce..4da2c97 100644 --- a/internal/utxorpc/submit.go +++ b/internal/utxorpc/submit.go @@ -166,3 +166,91 @@ func (s *submitServiceServer) ReadMempool( } // WatchMempool +func (s *submitServiceServer) WatchMempool( + ctx context.Context, + req *connect.Request[submit.WatchMempoolRequest], + stream *connect.ServerStream[submit.WatchMempoolResponse], +) error { + + predicate := req.Msg.GetPredicate() // Predicate + fieldMask := req.Msg.GetFieldMask() + log.Printf( + "Got a WatchMempool request with predicate %v and fieldMask %v", + predicate, + fieldMask, + ) + + // Connect to node + oConn, err := node.GetConnection(nil) + if err != nil { + return err + } + defer func() { + // Close Ouroboros connection + oConn.Close() + }() + + // Start clients + oConn.LocalTxMonitor().Client.Start() + + // Collect TX hashes from the mempool + needsAcquire := false + for { + if needsAcquire { + err = oConn.LocalTxMonitor().Client.Acquire() + if err != nil { + log.Printf("ERROR: %s", err) + return err + } + } + txRawBytes, err := oConn.LocalTxMonitor().Client.NextTx() + if err != nil { + log.Printf("ERROR: %s", err) + return err + } + // No transactions in mempool, release and continue + if txRawBytes == nil { + err := oConn.LocalTxMonitor().Client.Release() + if err != nil { + log.Printf("ERROR: %s", err) + return err + } + needsAcquire = true + continue + } + + txType, err := ledger.DetermineTransactionType(txRawBytes) + if err != nil { + return err + } + tx, err := ledger.NewTransactionFromCbor(txType, txRawBytes) + if err != nil { + return err + } + cTx := tx.Utxorpc() // *cardano.Tx + resp := &submit.WatchMempoolResponse{} + var act submit.AnyChainTx + var actr submit.AnyChainTx_Raw + actr.Raw = txRawBytes + act.Type = &actr + record := &submit.TxInMempool{ + Tx: &act, + Stage: submit.Stage_STAGE_MEMPOOL, + } + resp.Tx = record + if record.Tx.String() == cTx.String() { + if predicate == nil { + err := stream.Send(resp) + if err != nil { + return err + } + } else { + // TODO: filter from all Predicate types + err := stream.Send(resp) + if err != nil { + return err + } + } + } + } +}