From a273731cca80c6b5250c03e3e36a74bb7e31c0b6 Mon Sep 17 00:00:00 2001 From: Chris Gianelloni Date: Sat, 11 May 2024 12:57:32 -0400 Subject: [PATCH] feat: ReadMempool endpoint for gRPC submit module for LocalTxMonitor Signed-off-by: Chris Gianelloni --- internal/utxorpc/submit.go | 63 +++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/internal/utxorpc/submit.go b/internal/utxorpc/submit.go index 5abf8d6..866e4b5 100644 --- a/internal/utxorpc/submit.go +++ b/internal/utxorpc/submit.go @@ -41,8 +41,8 @@ func (s *submitServiceServer) SubmitTx( // txRawList txRawList := req.Msg.GetTx() // []*AnyChainTx - resp := &submit.SubmitTxResponse{} log.Printf("Got a SubmitTx request with %d transactions", len(txRawList)) + resp := &submit.SubmitTxResponse{} // Connect to node oConn, err := node.GetConnection(nil) @@ -100,5 +100,66 @@ func (s *submitServiceServer) SubmitTx( // WaitForTx // ReadMempool +func (s *submitServiceServer) ReadMempool( + ctx context.Context, + req *connect.Request[submit.ReadMempoolRequest], +) (*connect.Response[submit.ReadMempoolResponse], error) { + + // This is GetTxs until https://github.com/utxorpc/spec/pull/95 + txim := req.Msg.GetTxs() // []*TxInMempool + log.Printf("Got a ReadMempool request with %d transactions", len(txim)) + resp := &submit.ReadMempoolResponse{} + + // Connect to node + oConn, err := node.GetConnection(nil) + if err != nil { + return nil, err + } + defer func() { + // Close Ouroboros connection + oConn.Close() + }() + + // Start LocalTxMonitor client + oConn.LocalTxMonitor().Client.Start() + + // Collect TX hashes from the mempool + mempool := []*submit.TxInMempool{} + for { + txRawBytes, err := oConn.LocalTxMonitor().Client.NextTx() + if err != nil { + log.Printf("ERROR: %s", err) + return nil, err + } + // No transactions in mempool + if txRawBytes == nil { + break + } + var act submit.AnyChainTx + var actr submit.AnyChainTx_Raw + actr.Raw = txRawBytes + act.Type = &actr + record := &submit.TxInMempool{ + Tx: &act, + Stage: submit.Stage_STAGE_MEMPOOL, + } + mempool = append(mempool, record) + } + + // Check each requested Tx against our mempool + for _, txi := range txim { + txi.Stage = submit.Stage_STAGE_UNSPECIFIED + for _, tx := range mempool { + if txi.Stage == submit.Stage_STAGE_MEMPOOL { + break + } + if txi.Tx.String() == tx.Tx.String() { + txi.Stage = submit.Stage_STAGE_MEMPOOL + } + } + resp.Stage = append(resp.Stage, txi.Stage) + } + return connect.NewResponse(resp), nil +} // WatchMempool