Skip to content

Commit

Permalink
Added filters to program subscribe methods. (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiago18c authored Jun 10, 2022
1 parent ec1e0b5 commit 9dd423a
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 23 deletions.
2 changes: 1 addition & 1 deletion SharedBuildProperties.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Product>Solnet</Product>
<Version>6.0.11</Version>
<Version>6.0.12</Version>
<Copyright>Copyright 2022 &#169; Solnet</Copyright>
<Authors>blockmountain</Authors>
<PublisherName>blockmountain</PublisherName>
Expand Down
37 changes: 25 additions & 12 deletions src/Solnet.Rpc/IStreamingRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Solnet.Rpc.Models;
using Solnet.Rpc.Types;
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Threading.Tasks;

Expand Down Expand Up @@ -43,7 +44,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>The task object representing the asynchronous operation.</returns>
Task<SubscriptionState> SubscribeAccountInfoAsync(string pubkey, Action<SubscriptionState, ResponseValue<AccountInfo>> callback, Commitment commitment = Commitment.Finalized);
Task<SubscriptionState> SubscribeAccountInfoAsync(string pubkey, Action<SubscriptionState,
ResponseValue<AccountInfo>> callback, Commitment commitment = Commitment.Finalized);

/// <summary>
/// Subscribes to the AccountInfo. This is a synchronous and blocking function.
Expand All @@ -55,7 +57,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>Returns an object representing the state of the subscription.</returns>
SubscriptionState SubscribeAccountInfo(string pubkey, Action<SubscriptionState, ResponseValue<AccountInfo>> callback, Commitment commitment = Commitment.Finalized);
SubscriptionState SubscribeAccountInfo(string pubkey, Action<SubscriptionState, ResponseValue<AccountInfo>> callback,
Commitment commitment = Commitment.Finalized);

/// <summary>
/// Subscribes asynchronously to Token Account notifications. Note: Only works if the account is a Token Account.
Expand All @@ -67,7 +70,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>The task object representing the asynchronous operation.</returns>
Task<SubscriptionState> SubscribeTokenAccountAsync(string pubkey, Action<SubscriptionState, ResponseValue<TokenAccountInfo>> callback, Commitment commitment = Commitment.Finalized);
Task<SubscriptionState> SubscribeTokenAccountAsync(string pubkey, Action<SubscriptionState,
ResponseValue<TokenAccountInfo>> callback, Commitment commitment = Commitment.Finalized);

/// <summary>
/// Subscribes to Token Account notifications. Note: Only works if the account is a Token Account.
Expand All @@ -79,7 +83,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>Returns an object representing the state of the subscription.</returns>
SubscriptionState SubscribeTokenAccount(string pubkey, Action<SubscriptionState, ResponseValue<TokenAccountInfo>> callback, Commitment commitment = Commitment.Finalized);
SubscriptionState SubscribeTokenAccount(string pubkey, Action<SubscriptionState, ResponseValue<TokenAccountInfo>> callback,
Commitment commitment = Commitment.Finalized);

/// <summary>
/// Subscribes asynchronously to the logs notifications that mention a given public key.
Expand All @@ -91,7 +96,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>The task object representing the asynchronous operation.</returns>
Task<SubscriptionState> SubscribeLogInfoAsync(string pubkey, Action<SubscriptionState, ResponseValue<LogInfo>> callback, Commitment commitment = Commitment.Finalized);
Task<SubscriptionState> SubscribeLogInfoAsync(string pubkey, Action<SubscriptionState, ResponseValue<LogInfo>> callback,
Commitment commitment = Commitment.Finalized);

/// <summary>
/// Subscribes asynchronously to the logs notifications.
Expand All @@ -103,7 +109,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>The task object representing the asynchronous operation.</returns>
Task<SubscriptionState> SubscribeLogInfoAsync(LogsSubscriptionType subscriptionType, Action<SubscriptionState, ResponseValue<LogInfo>> callback, Commitment commitment = Commitment.Finalized);
Task<SubscriptionState> SubscribeLogInfoAsync(LogsSubscriptionType subscriptionType, Action<SubscriptionState,
ResponseValue<LogInfo>> callback, Commitment commitment = Commitment.Finalized);

/// <summary>
/// Subscribes to the logs notifications that mention a given public key. This is a synchronous and blocking function.
Expand All @@ -115,7 +122,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>Returns an object representing the state of the subscription.</returns>
SubscriptionState SubscribeLogInfo(string pubkey, Action<SubscriptionState, ResponseValue<LogInfo>> callback, Commitment commitment = Commitment.Finalized);
SubscriptionState SubscribeLogInfo(string pubkey, Action<SubscriptionState, ResponseValue<LogInfo>> callback,
Commitment commitment = Commitment.Finalized);

/// <summary>
/// Subscribes to the logs notifications. This is a synchronous and blocking function.
Expand All @@ -127,7 +135,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>Returns an object representing the state of the subscription.</returns>
SubscriptionState SubscribeLogInfo(LogsSubscriptionType subscriptionType, Action<SubscriptionState, ResponseValue<LogInfo>> callback, Commitment commitment = Commitment.Finalized);
SubscriptionState SubscribeLogInfo(LogsSubscriptionType subscriptionType, Action<SubscriptionState,
ResponseValue<LogInfo>> callback, Commitment commitment = Commitment.Finalized);

/// <summary>
/// Subscribes asynchronously to a transaction signature to receive notification when the transaction is confirmed.
Expand All @@ -139,7 +148,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>The task object representing the asynchronous operation.</returns>
Task<SubscriptionState> SubscribeSignatureAsync(string transactionSignature, Action<SubscriptionState, ResponseValue<ErrorResult>> callback, Commitment commitment = Commitment.Finalized);
Task<SubscriptionState> SubscribeSignatureAsync(string transactionSignature, Action<SubscriptionState,
ResponseValue<ErrorResult>> callback, Commitment commitment = Commitment.Finalized);

/// <summary>
/// Subscribes to a transaction signature to receive notification when the transaction is confirmed. This is a synchronous and blocking function.
Expand All @@ -151,7 +161,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>Returns an object representing the state of the subscription.</returns>
SubscriptionState SubscribeSignature(string transactionSignature, Action<SubscriptionState, ResponseValue<ErrorResult>> callback, Commitment commitment = Commitment.Finalized);
SubscriptionState SubscribeSignature(string transactionSignature, Action<SubscriptionState, ResponseValue<ErrorResult>> callback,
Commitment commitment = Commitment.Finalized);

/// <summary>
/// Subscribes asynchronously to changes to a given program account data.
Expand All @@ -163,7 +174,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>The task object representing the asynchronous operation.</returns>
Task<SubscriptionState> SubscribeProgramAsync(string programPubkey, Action<SubscriptionState, ResponseValue<AccountKeyPair>> callback, Commitment commitment = Commitment.Finalized);
Task<SubscriptionState> SubscribeProgramAsync(string programPubkey, Action<SubscriptionState, ResponseValue<AccountKeyPair>> callback,
Commitment commitment = Commitment.Finalized, int? dataSize = null, IList<MemCmp> memCmpList = null);

/// <summary>
/// Subscribes to changes to a given program account data. This is a synchronous and blocking function.
Expand All @@ -175,7 +187,8 @@ public interface IStreamingRpcClient : IDisposable
/// <param name="callback">The callback to handle data notifications.</param>
/// <param name="commitment">The state commitment to consider when querying the ledger state.</param>
/// <returns>Returns an object representing the state of the subscription.</returns>
SubscriptionState SubscribeProgram(string programPubkey, Action<SubscriptionState, ResponseValue<AccountKeyPair>> callback, Commitment commitment = Commitment.Finalized);
SubscriptionState SubscribeProgram(string programPubkey, Action<SubscriptionState, ResponseValue<AccountKeyPair>> callback,
Commitment commitment = Commitment.Finalized, int? dataSize = null, IList<MemCmp> memCmpList = null);

/// <summary>
/// Subscribes asynchronously to receive notifications anytime a slot is processed by the validator.
Expand Down
31 changes: 21 additions & 10 deletions src/Solnet.Rpc/SolanaStreamingRpcClient.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
using Microsoft.Extensions.Logging;
using Solnet.Rpc.Converters;
using Solnet.Rpc.Core;
using Solnet.Rpc.Core.Http;
using Solnet.Rpc.Core.Sockets;
using Solnet.Rpc.Messages;
using Solnet.Rpc.Models;
using Solnet.Rpc.Types;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
Expand Down Expand Up @@ -440,17 +442,25 @@ public SubscriptionState SubscribeSignature(string transactionSignature, Action<

#region Program
/// <inheritdoc cref="IStreamingRpcClient.SubscribeProgramAsync(string, Action{SubscriptionState, ResponseValue{AccountKeyPair}}, Commitment)"/>
public async Task<SubscriptionState> SubscribeProgramAsync(string programPubkey, Action<SubscriptionState, ResponseValue<AccountKeyPair>> callback, Commitment commitment = Commitment.Finalized)
public async Task<SubscriptionState> SubscribeProgramAsync(string programPubkey, Action<SubscriptionState,
ResponseValue<AccountKeyPair>> callback, Commitment commitment = Commitment.Finalized, int? dataSize = null,
IList<MemCmp> memCmpList = null)
{
var parameters = new List<object> { programPubkey };
var configParams = new Dictionary<string, object> { { "encoding", "base64" } };

if (commitment != Commitment.Finalized)
List<object> filters = Parameters.Create(ConfigObject.Create(KeyValue.Create("dataSize", dataSize)));
if (memCmpList != null)
{
configParams.Add("commitment", commitment);
filters ??= new List<object>();
filters.AddRange(memCmpList.Select(filter => ConfigObject.Create(KeyValue.Create("memcmp",
ConfigObject.Create(KeyValue.Create("offset", filter.Offset),
KeyValue.Create("bytes", filter.Bytes))))));
}

parameters.Add(configParams);

List<object> parameters = Parameters.Create(
programPubkey,
ConfigObject.Create(
KeyValue.Create("encoding", "base64"),
KeyValue.Create("filters", filters),
commitment != Commitment.Finalized ? KeyValue.Create("commitment", commitment) : null));

var sub = new SubscriptionState<ResponseValue<AccountKeyPair>>(this, SubscriptionChannel.Program, callback, parameters);

Expand All @@ -459,8 +469,9 @@ public async Task<SubscriptionState> SubscribeProgramAsync(string programPubkey,
}

/// <inheritdoc cref="IStreamingRpcClient.SubscribeProgram(string, Action{SubscriptionState, ResponseValue{AccountKeyPair}}, Commitment)"/>
public SubscriptionState SubscribeProgram(string programPubkey, Action<SubscriptionState, ResponseValue<AccountKeyPair>> callback, Commitment commitment = Commitment.Finalized)
=> SubscribeProgramAsync(programPubkey, callback, commitment).Result;
public SubscriptionState SubscribeProgram(string programPubkey, Action<SubscriptionState, ResponseValue<AccountKeyPair>> callback,
Commitment commitment = Commitment.Finalized, int? dataSize = null, IList<MemCmp> memCmpList = null)
=> SubscribeProgramAsync(programPubkey, callback, commitment, dataSize, memCmpList).Result;
#endregion

#region SlotInfo
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"method":"programSubscribe","params":["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin",{"encoding":"base64","filters":[{"dataSize":3228}]}],"jsonrpc":"2.0","id":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"method":"programSubscribe","params":["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin",{"encoding":"base64","filters":[{"dataSize":3228},{"memcmp":{"offset":45,"bytes":"CuieVDEDtLo7FypA9SbLM9saXFdb1dsshEkyErMqkRQq"}}]}],"jsonrpc":"2.0","id":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"method":"programSubscribe","params":["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin",{"encoding":"base64","filters":[{"memcmp":{"offset":45,"bytes":"CuieVDEDtLo7FypA9SbLM9saXFdb1dsshEkyErMqkRQq"}}]}],"jsonrpc":"2.0","id":0}
87 changes: 87 additions & 0 deletions test/Solnet.Rpc.Test/SolanaStreamingClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Solnet.Rpc.Messages;
using Solnet.Rpc.Models;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.WebSockets;
using System.Text;
Expand Down Expand Up @@ -442,6 +443,92 @@ public void SubscribeProgramTest()
Assert.AreEqual(458553192193UL, resultNotification.Value.Account.Lamports);
}

[TestMethod]
public void SubscribeProgramFilters()
{
var expected = File.ReadAllText("Resources/Streaming/Program/ProgramSubscribeFilters.json");
var result = new ReadOnlyMemory<byte>();

SetupAction(out Action<SubscriptionState, ResponseValue<AccountKeyPair>> action,
(x) => {},
(x) => result = x,
Array.Empty<byte>(),
Array.Empty<byte>());

var sut = new SolanaStreamingRpcClient("wss://api.mainnet-beta.solana.com/", null, _socketMock.Object);

sut.ConnectAsync().Wait();
_ = sut.SubscribeProgram("9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin", (s, e) => {},
dataSize:3228, memCmpList: new List<MemCmp>() { new MemCmp() {Offset = 45, Bytes = "CuieVDEDtLo7FypA9SbLM9saXFdb1dsshEkyErMqkRQq"}});
_subConfirmEvent.Set();


_socketMock.Verify(s => s.SendAsync(It.IsAny<ReadOnlyMemory<byte>>(),
WebSocketMessageType.Text,
true,
It.IsAny<CancellationToken>()));
var res = Encoding.UTF8.GetString(result.Span);
Assert.AreEqual(expected, res);
}


[TestMethod]
public void SubscribeProgramMemcmpFilters()
{
var expected = File.ReadAllText("Resources/Streaming/Program/ProgramSubscribeMemcmpFilter.json");
var result = new ReadOnlyMemory<byte>();

SetupAction(out Action<SubscriptionState, ResponseValue<AccountKeyPair>> action,
(x) => {},
(x) => result = x,
Array.Empty<byte>(),
Array.Empty<byte>());

var sut = new SolanaStreamingRpcClient("wss://api.mainnet-beta.solana.com/", null, _socketMock.Object);

sut.ConnectAsync().Wait();
_ = sut.SubscribeProgram("9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin", (s, e) => {},
memCmpList: new List<MemCmp>() { new MemCmp() {Offset = 45, Bytes = "CuieVDEDtLo7FypA9SbLM9saXFdb1dsshEkyErMqkRQq"}});
_subConfirmEvent.Set();


_socketMock.Verify(s => s.SendAsync(It.IsAny<ReadOnlyMemory<byte>>(),
WebSocketMessageType.Text,
true,
It.IsAny<CancellationToken>()));
var res = Encoding.UTF8.GetString(result.Span);
Assert.AreEqual(expected, res);
}


[TestMethod]
public void SubscribeProgramDataFilter()
{
var expected = File.ReadAllText("Resources/Streaming/Program/ProgramSubscribeDataSizeFilter.json");
var result = new ReadOnlyMemory<byte>();

SetupAction(out Action<SubscriptionState, ResponseValue<AccountKeyPair>> action,
(x) => {},
(x) => result = x,
Array.Empty<byte>(),
Array.Empty<byte>());

var sut = new SolanaStreamingRpcClient("wss://api.mainnet-beta.solana.com/", null, _socketMock.Object);

sut.ConnectAsync().Wait();
_ = sut.SubscribeProgram("9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin", (s, e) => {},
dataSize:3228);
_subConfirmEvent.Set();


_socketMock.Verify(s => s.SendAsync(It.IsAny<ReadOnlyMemory<byte>>(),
WebSocketMessageType.Text,
true,
It.IsAny<CancellationToken>()));
var res = Encoding.UTF8.GetString(result.Span);
Assert.AreEqual(expected, res);
}

[TestMethod]
public void SubscribeProgramConfirmed()
{
Expand Down
Loading

0 comments on commit 9dd423a

Please sign in to comment.