Skip to content

Commit

Permalink
Consent Streaming (#435)
Browse files Browse the repository at this point in the history
* bump the pod

* update to latest libxmtp

* clean up the consent code and add streaming

* add a test for it

* get the tests passing

* fix up the flakiness in the test
  • Loading branch information
nplasterer authored Nov 25, 2024
1 parent 20dfbe7 commit ff69eaf
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 58 deletions.
4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ let package = Package(
.package(url: "https://github.com/tesseract-one/CSecp256k1.swift.git", from: "0.2.0"),
.package(url: "https://github.com/bufbuild/connect-swift", exact: "1.0.0"),
.package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.4.3"),
.package(url: "https://github.com/xmtp/libxmtp-swift.git", exact: "3.0.7"),
.package(url: "https://github.com/krzyzanowskim/CryptoSwift.git", exact: "1.8.3")
.package(url: "https://github.com/krzyzanowskim/CryptoSwift.git", exact: "1.8.3"),
.package(url: "https://github.com/xmtp/libxmtp-swift.git", exact: "3.0.7")
],
targets: [
.target(
Expand Down
4 changes: 0 additions & 4 deletions Sources/XMTPiOS/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,6 @@ public final class Client {
try await ffiClient.sendSyncRequest(kind: .messages)
}

public func syncConsent() async throws {
try await ffiClient.sendSyncRequest(kind: .consent)
}

public func inboxState(refreshFromNetwork: Bool) async throws -> InboxState
{
return InboxState(
Expand Down
16 changes: 16 additions & 0 deletions Sources/XMTPiOS/Extensions/Ffi.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ extension FfiConsentState {
}
}

extension FfiConsentEntityType {
var fromFFI: EntryType {
switch self {
case .inboxId: return EntryType.inbox_id
case .address: return EntryType.address
case .conversationId: return EntryType.conversation_id
}
}
}

extension EntryType {
var toFFI: FfiConsentEntityType {
switch self {
Expand All @@ -62,3 +72,9 @@ extension ConsentListEntry {
)
}
}

extension FfiConsent {
var fromFfi: ConsentListEntry {
ConsentListEntry(value: self.entity, entryType: self.entityType.fromFFI, consentType: self.state.fromFFI)
}
}
93 changes: 62 additions & 31 deletions Sources/XMTPiOS/PrivatePreferences.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ public enum EntryType: String, Codable {
}

public struct ConsentListEntry: Codable, Hashable {
public init(value: String, entryType: EntryType, consentType: ConsentState) {
public init(value: String, entryType: EntryType, consentType: ConsentState)
{
self.value = value
self.entryType = entryType
self.consentType = consentType
}

static func address(_ address: String, type: ConsentState = .unknown)
-> ConsentListEntry
{
Expand All @@ -25,7 +26,8 @@ public struct ConsentListEntry: Codable, Hashable {
conversationId: String, type: ConsentState = ConsentState.unknown
) -> ConsentListEntry {
ConsentListEntry(
value: conversationId, entryType: .conversation_id, consentType: type)
value: conversationId, entryType: .conversation_id,
consentType: type)
}

static func inboxId(_ inboxId: String, type: ConsentState = .unknown)
Expand All @@ -44,25 +46,8 @@ public struct ConsentListEntry: Codable, Hashable {
}
}

public enum ContactError: Error {
case invalidIdentifier
}

public actor EntriesManager {
public var map: [String: ConsentListEntry] = [:]

func set(_ key: String, _ object: ConsentListEntry) {
map[key] = object
}

func get(_ key: String) -> ConsentListEntry? {
map[key]
}
}

public class ConsentList {
public let entriesManager = EntriesManager()
var lastFetched: Date?
/// Provides access to contact bundles.
public actor PrivatePreferences {
var client: Client
var ffiClient: FfiXmtpClient

Expand All @@ -82,7 +67,9 @@ public class ConsentList {
).fromFFI
}

public func conversationState(conversationId: String) async throws -> ConsentState {
public func conversationState(conversationId: String) async throws
-> ConsentState
{
return try await ffiClient.getConsentState(
entityType: .conversationId,
entity: conversationId
Expand All @@ -95,17 +82,61 @@ public class ConsentList {
entity: inboxId
).fromFFI
}

public func syncConsent() async throws {
try await ffiClient.sendSyncRequest(kind: .consent)
}

public func streamConsent()
-> AsyncThrowingStream<ConsentListEntry, Error>
{
AsyncThrowingStream { continuation in
let ffiStreamActor = FfiStreamActor()

let consentCallback = ConsentCallback(client: self.client) {
records in
guard !Task.isCancelled else {
continuation.finish()
Task {
await ffiStreamActor.endStream()
}
return
}
for consent in records {
continuation.yield(consent.fromFfi)
}
}

let task = Task {
let stream = await ffiClient.conversations().streamConsent(
callback: consentCallback)
await ffiStreamActor.setFfiStream(stream)
}

continuation.onTermination = { _ in
task.cancel()
Task {
await ffiStreamActor.endStream()
}
}
}
}
}

/// Provides access to contact bundles.
public actor PrivatePreferences {
var client: Client
var ffiClient: FfiXmtpClient
public var consentList: ConsentList
final class ConsentCallback: FfiConsentCallback {
let client: Client
let callback: ([FfiConsent]) -> Void

init(client: Client, ffiClient: FfiXmtpClient) {
init(client: Client, _ callback: @escaping ([FfiConsent]) -> Void) {
self.client = client
self.ffiClient = ffiClient
consentList = ConsentList(client: client, ffiClient: ffiClient)
self.callback = callback
}

func onConsentUpdate(consent: [FfiConsent]) {
callback(consent)
}

func onError(error: FfiSubscribeError) {
print("Error ConsentCallback \(error)")
}
}
64 changes: 61 additions & 3 deletions Tests/XMTPTests/ConversationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class ConversationTests: XCTestCase {
try await boDm?.sync()
try await alixClient.conversations.sync()
try await alixClient2.conversations.sync()
try await alixClient2.syncConsent()
try await alixClient2.preferences.syncConsent()
try await alixClient.conversations.syncAllConversations()
sleep(2)
try await alixClient2.conversations.syncAllConversations()
Expand All @@ -209,7 +209,7 @@ class ConversationTests: XCTestCase {
if let dm2 = try alixClient2.findConversation(conversationId: dm.id) {
XCTAssertEqual(try dm2.consentState(), .denied)

try await alixClient2.preferences.consentList.setConsentState(
try await alixClient2.preferences.setConsentState(
entries: [
ConsentListEntry(
value: dm2.id,
Expand All @@ -218,12 +218,70 @@ class ConversationTests: XCTestCase {
)
]
)
let convoState = try await alixClient2.preferences.consentList
let convoState = try await alixClient2.preferences
.conversationState(
conversationId: dm2.id)
XCTAssertEqual(convoState, .allowed)
XCTAssertEqual(try dm2.consentState(), .allowed)
}
}

func testStreamConsent() async throws {
let fixtures = try await fixtures()

let key = try Crypto.secureRandomBytes(count: 32)
let alix = try PrivateKey.generate()

let alixClient = try await Client.create(
account: alix,
options: .init(
api: .init(env: .local, isSecure: false),
dbEncryptionKey: key,
dbDirectory: "xmtp_db"
)
)

let alixGroup = try await alixClient.conversations.newGroup(with: [fixtures.bo.walletAddress])

let alixClient2 = try await Client.create(
account: alix,
options: .init(
api: .init(env: .local, isSecure: false),
dbEncryptionKey: key,
dbDirectory: "xmtp_db2"
)
)

try await alixGroup.send(content: "Hello")
try await alixClient.conversations.sync()
try await alixClient2.conversations.sync()
try await alixClient.conversations.syncAllConversations()
try await alixClient2.conversations.syncAllConversations()
let alixGroup2 = try alixClient2.findGroup(groupId: alixGroup.id)!

var consentList = [ConsentListEntry]()
let expectation = XCTestExpectation(description: "Stream Consent")
expectation.expectedFulfillmentCount = 3

Task(priority: .userInitiated) {
for try await entry in await alixClient.preferences.streamConsent() {
consentList.append(entry)
expectation.fulfill()
}
}
sleep(1)
try await alixGroup2.updateConsentState(state: .denied)
let dm = try await alixClient2.conversations.newConversation(with: fixtures.caro.walletAddress)
try await dm.updateConsentState(state: .denied)

sleep(5)
try await alixClient.conversations.syncAllConversations()
try await alixClient2.conversations.syncAllConversations()

await fulfillment(of: [expectation], timeout: 3)
print(consentList)
XCTAssertEqual(try alixGroup.consentState(), .denied)
}


}
10 changes: 5 additions & 5 deletions Tests/XMTPTests/DmTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class DmTests: XCTestCase {
_ = try await dm.send(content: "gm")
try await dm.sync()

let dmState = try await fixtures.boClient.preferences.consentList
let dmState = try await fixtures.boClient.preferences
.conversationState(conversationId: dm.id)
XCTAssertEqual(dmState, .allowed)
XCTAssertEqual(try dm.consentState(), .allowed)
Expand Down Expand Up @@ -248,24 +248,24 @@ class DmTests: XCTestCase {
let dm = try await fixtures.boClient.conversations.findOrCreateDm(
with: fixtures.alix.walletAddress)

let isDm = try await fixtures.boClient.preferences.consentList
let isDm = try await fixtures.boClient.preferences
.conversationState(conversationId: dm.id)
XCTAssertEqual(isDm, .allowed)
XCTAssertEqual(try dm.consentState(), .allowed)

try await fixtures.boClient.preferences.consentList.setConsentState(
try await fixtures.boClient.preferences.setConsentState(
entries: [
ConsentListEntry(
value: dm.id, entryType: .conversation_id,
consentType: .denied)
])
let isDenied = try await fixtures.boClient.preferences.consentList
let isDenied = try await fixtures.boClient.preferences
.conversationState(conversationId: dm.id)
XCTAssertEqual(isDenied, .denied)
XCTAssertEqual(try dm.consentState(), .denied)

try await dm.updateConsentState(state: .allowed)
let isAllowed = try await fixtures.boClient.preferences.consentList
let isAllowed = try await fixtures.boClient.preferences
.conversationState(conversationId: dm.id)
XCTAssertEqual(isAllowed, .allowed)
XCTAssertEqual(try dm.consentState(), .allowed)
Expand Down
Loading

0 comments on commit ff69eaf

Please sign in to comment.