diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index c52322d4..95851d44 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -65,7 +65,6 @@ public actor Conversations { guard let v3Client = client.v3Client else { return } - try await v3Client.conversations().sync() } @@ -73,21 +72,16 @@ public actor Conversations { guard let v3Client = client.v3Client else { return [] } - var options = FfiListConversationsOptions(createdAfterNs: nil, createdBeforeNs: nil, limit: nil) - if let createdAfter { options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch) } - if let createdBefore { options.createdBeforeNs = Int64(createdBefore.millisecondsSinceEpoch) } - if let limit { options.limit = Int64(limit) } - return try await v3Client.conversations().list(opts: options).map { $0.fromFFI(client: client) } } @@ -97,19 +91,17 @@ public actor Conversations { let groupCallback = GroupStreamCallback(client: self.client) { group in continuation.yield(group) } - guard let stream = try await self.client.v3Client?.conversations().stream(callback: groupCallback) else { continuation.finish(throwing: GroupError.streamingFailure) return } - continuation.onTermination = { @Sendable reason in stream.end() } } } } - + private func streamGroupConversations() -> AsyncThrowingStream { AsyncThrowingStream { continuation in Task { @@ -122,7 +114,7 @@ public actor Conversations { } } - public func newGroup(with addresses: [String], + public func newGroup(with addresses: [String], permissions: GroupPermissions = .allMembers, name: String = "", imageUrlSquare: String = "" @@ -130,11 +122,9 @@ public actor Conversations { guard let v3Client = client.v3Client else { throw GroupError.alphaMLSNotEnabled } - if addresses.first(where: { $0.lowercased() == client.address.lowercased() }) != nil { throw GroupError.memberCannotBeSelf } - let erroredAddresses = try await withThrowingTaskGroup(of: (String?).self) { group in for address in addresses { group.addTask { @@ -145,29 +135,23 @@ public actor Conversations { } } } - var results: [String] = [] for try await result in group { if let result { results.append(result) } } - return results } - if !erroredAddresses.isEmpty { throw GroupError.memberNotRegistered(erroredAddresses) } - let group = try await v3Client.conversations().createGroup(accountAddresses: addresses, opts: FfiCreateGroupOptions(permissions: permissions, groupName: name, groupImageUrlSquare: imageUrlSquare - )).fromFFI(client: client) - + )).fromFFI(client: client) try await client.contacts.allowGroups(groupIds: [group.id]) - return group } @@ -188,7 +172,9 @@ public actor Conversations { createdAtNs: data.createdNs )) } - conversationsByTopic[conversation.topic] = conversation + Task { + await self.addConversation(conversation) + } return conversation } @@ -262,11 +248,9 @@ public actor Conversations { Topic.userInvite(client.address).description, Topic.userIntro(client.address).description, ] - for conversation in try await list() { topics.append(conversation.topic) } - do { for try await envelope in client.subscribe(topics: topics) { if let conversation = conversationsByTopic[envelope.contentTopic] { @@ -274,11 +258,11 @@ public actor Conversations { continuation.yield(decoded) } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { let conversation = try fromInvite(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation + await self.addConversation(conversation) break // Break so we can resubscribe with the new conversation } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { let conversation = try fromIntro(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation + await self.addConversation(conversation) let decoded = try conversation.decode(envelope) continuation.yield(decoded) break // Break so we can resubscribe with the new conversation @@ -293,7 +277,7 @@ public actor Conversations { } } } - + public func streamAllGroupMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in Task { @@ -302,12 +286,10 @@ public actor Conversations { continuation.yield(decodedMessage) } } - guard let stream = try await client.v3Client?.conversations().streamAllMessages(messageCallback: messageCallback) else { continuation.finish(throwing: GroupError.streamingFailure) return } - continuation.onTermination = { @Sendable reason in stream.end() } @@ -315,7 +297,6 @@ public actor Conversations { } } - public func streamAllMessages(includeGroups: Bool = false) async throws -> AsyncThrowingStream { AsyncThrowingStream { continuation in @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { @@ -329,7 +310,6 @@ public actor Conversations { continuation.finish(throwing: error) } } - Task { await forwardStreamToMerged(stream: try streamAllV2Messages()) } @@ -340,7 +320,7 @@ public actor Conversations { } } } - + public func streamAllGroupDecryptedMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in Task { @@ -360,13 +340,13 @@ public actor Conversations { } } } - + public func streamAllDecryptedMessages(includeGroups: Bool = false) -> AsyncThrowingStream { AsyncThrowingStream { continuation in @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { do { var iterator = stream.makeAsyncIterator() - while let element = try await iterator.next() { + while let element = try await iterator.next() { continuation.yield(element) } continuation.finish() @@ -374,7 +354,6 @@ public actor Conversations { continuation.finish(throwing: error) } } - Task { await forwardStreamToMerged(stream: try streamAllV2DecryptedMessages()) } @@ -386,7 +365,6 @@ public actor Conversations { } } - func streamAllV2DecryptedMessages() async throws -> AsyncThrowingStream { return AsyncThrowingStream { continuation in Task { @@ -395,11 +373,9 @@ public actor Conversations { Topic.userInvite(client.address).description, Topic.userIntro(client.address).description, ] - for conversation in try await list() { topics.append(conversation.topic) } - do { for try await envelope in client.subscribe(topics: topics) { if let conversation = conversationsByTopic[envelope.contentTopic] { @@ -407,11 +383,11 @@ public actor Conversations { continuation.yield(decoded) } else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") { let conversation = try fromInvite(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation + await self.addConversation(conversation) break // Break so we can resubscribe with the new conversation } else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") { let conversation = try fromIntro(envelope: envelope) - conversationsByTopic[conversation.topic] = conversation + await self.addConversation(conversation) let decoded = try conversation.decrypt(envelope) continuation.yield(decoded) break // Break so we can resubscribe with the new conversation @@ -430,7 +406,6 @@ public actor Conversations { public func fromInvite(envelope: Envelope) throws -> Conversation { let sealedInvitation = try SealedInvitation(serializedData: envelope.message) let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) - return try .v2(ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header)) } @@ -438,10 +413,8 @@ public actor Conversations { let messageV1 = try MessageV1.fromBytes(envelope.message) let senderAddress = try messageV1.header.sender.walletAddress let recipientAddress = try messageV1.header.recipient.walletAddress - let peerAddress = client.address == senderAddress ? recipientAddress : senderAddress let conversationV1 = ConversationV1(client: client, peerAddress: peerAddress, sentAt: messageV1.sentAt) - return .v1(conversationV1) } @@ -450,17 +423,16 @@ public actor Conversations { (($0.value.conversationID ?? "") == (conversationID ?? "")) })?.value } - + public func fromWelcome(envelopeBytes: Data) async throws -> Group? { guard let v3Client = client.v3Client else { return nil } - let group = try await v3Client.conversations().processStreamedWelcomeMessage(envelopeBytes: envelopeBytes) return Group(ffiGroup: group, client: client) } - public func newConversation(with peerAddress: String, context: InvitationV1.Context? = nil, consentProofPayload: ConsentProofPayload? = nil) async throws -> Conversation { + public func newConversation(with peerAddress: String, context: InvitationV1.Context? = nil, consentProofPayload: ConsentProofPayload? = nil) async throws -> Conversation { if peerAddress.lowercased() == client.address.lowercased() { throw ConversationError.recipientIsSender } @@ -468,31 +440,28 @@ public actor Conversations { if let existing = try findExistingConversation(with: peerAddress, conversationID: context?.conversationID) { return existing } - guard let contact = try await client.contacts.find(peerAddress) else { throw ConversationError.recipientNotOnNetwork } - _ = try await list() // cache old conversations and check again if let existing = try findExistingConversation(with: peerAddress, conversationID: context?.conversationID) { return existing } - // We don't have an existing conversation, make a v2 one let recipient = try contact.toSignedPublicKeyBundle() let invitation = try InvitationV1.createDeterministic( sender: client.keys, recipient: recipient, context: context, - consentProofPayload: consentProofPayload + consentProofPayload: consentProofPayload ) let sealedInvitation = try await sendInvitation(recipient: recipient, invitation: invitation, created: Date()) let conversationV2 = try ConversationV2.create(client: client, invitation: invitation, header: sealedInvitation.v1.header) - try await client.contacts.allow(addresses: [peerAddress]) - let conversation: Conversation = .v2(conversationV2) - conversationsByTopic[conversation.topic] = conversation + Task { + await self.addConversation(conversation) + } return conversation } @@ -500,26 +469,20 @@ public actor Conversations { AsyncThrowingStream { continuation in Task { var streamedConversationTopics: Set = [] - for try await envelope in client.subscribe(topics: [.userIntro(client.address), .userInvite(client.address)]) { if envelope.contentTopic == Topic.userIntro(client.address).description { let conversationV1 = try fromIntro(envelope: envelope) - if streamedConversationTopics.contains(conversationV1.topic.description) { continue } - streamedConversationTopics.insert(conversationV1.topic.description) continuation.yield(conversationV1) } - if envelope.contentTopic == Topic.userInvite(client.address).description { let conversationV2 = try fromInvite(envelope: envelope) - if streamedConversationTopics.contains(conversationV2.topic) { continue } - streamedConversationTopics.insert(conversationV2.topic) continuation.yield(conversationV2) } @@ -528,154 +491,138 @@ public actor Conversations { } } - public func streamAll() -> AsyncThrowingStream { - AsyncThrowingStream { continuation in - @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { - do { - var iterator = stream.makeAsyncIterator() - while let element = try await iterator.next() { - continuation.yield(element) - } - continuation.finish() - } catch { - continuation.finish(throwing: error) - } - } - - Task { - await forwardStreamToMerged(stream: stream()) - } - Task { - await forwardStreamToMerged(stream: streamGroupConversations()) - } - } - } + public func streamAll() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + do { + var iterator = stream.makeAsyncIterator() + while let element = try await iterator.next() { + continuation.yield(element) + } + continuation.finish() + } catch { + continuation.finish(throwing: error) + } + } + Task { + await forwardStreamToMerged(stream: stream()) + } + Task { + await forwardStreamToMerged(stream: streamGroupConversations()) + } + } + } private func makeConversation(from sealedInvitation: SealedInvitation) throws -> ConversationV2 { let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys) - - let conversation = try ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header) + return try ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header) + } - return conversation + private func validateConsentSignature(signature: String, clientAddress: String, peerAddress: String, timestamp: UInt64) -> Bool { + // timestamp should be in the past + if timestamp > UInt64(Date().timeIntervalSince1970 * 1000) { + return false + } + let thirtyDaysAgo = Date().addingTimeInterval(-30 * 24 * 60 * 60) + let thirtyDaysAgoTimestamp = UInt64(thirtyDaysAgo.timeIntervalSince1970 * 1000) + if timestamp < thirtyDaysAgoTimestamp { + return false + } + let message = Signature.consentProofText(peerAddress: peerAddress, timestamp: timestamp) + guard let signatureData = Data(hex: signature) else { + print("Invalid signature format") + return false + } + do { + let ethMessage = try Signature.ethHash(message) + let recoveredKey = try KeyUtilx.recoverPublicKey(message: ethMessage, signature: signatureData) + let address = KeyUtilx.generateAddress(from: recoveredKey).toChecksumAddress() + return clientAddress == address + } catch { + return false + } + } + + private func handleConsentProof(consentProof: ConsentProofPayload, peerAddress: String) async throws { + let signature = consentProof.signature + if (signature == "") { + return + } + if (!validateConsentSignature(signature: signature, clientAddress: client.address, peerAddress: peerAddress, timestamp: consentProof.timestamp)) { + return + } + let contacts = client.contacts + _ = try await contacts.refreshConsentList() + if await (contacts.consentList.state(address: peerAddress) == .unknown) { + try await contacts.allow(addresses: [peerAddress]) + } } - - private func validateConsentSignature(signature: String, clientAddress: String, peerAddress: String, timestamp: UInt64) -> Bool { - // timestamp should be in the past - if timestamp > UInt64(Date().timeIntervalSince1970 * 1000) { - return false - } - let thirtyDaysAgo = Date().addingTimeInterval(-30 * 24 * 60 * 60) - let thirtyDaysAgoTimestamp = UInt64(thirtyDaysAgo.timeIntervalSince1970 * 1000) - if timestamp < thirtyDaysAgoTimestamp { - return false - } - - let message = Signature.consentProofText(peerAddress: peerAddress, timestamp: timestamp) - - guard let signatureData = Data(hex: signature) else { - print("Invalid signature format") - return false - } - - do { - let ethMessage = try Signature.ethHash(message) - let recoveredKey = try KeyUtilx.recoverPublicKey(message: ethMessage, signature: signatureData) - let address = KeyUtilx.generateAddress(from: recoveredKey).toChecksumAddress() - - return clientAddress == address - } catch { - return false - } - } - - private func handleConsentProof(consentProof: ConsentProofPayload, peerAddress: String) async throws { - let signature = consentProof.signature - if (signature == "") { - return - } - - if (!validateConsentSignature(signature: signature, clientAddress: client.address, peerAddress: peerAddress, timestamp: consentProof.timestamp)) { - return - } - let contacts = client.contacts - _ = try await contacts.refreshConsentList() - if await (contacts.consentList.state(address: peerAddress) == .unknown) { - try await contacts.allow(addresses: [peerAddress]) - } - } public func list(includeGroups: Bool = false) async throws -> [Conversation] { - if (includeGroups) { + if includeGroups { try await sync() let groups = try await groups() - - groups.forEach { group in - conversationsByTopic[group.id.toHex] = Conversation.group(group) + for group in groups { + await self.addConversation(.group(group)) } } var newConversations: [Conversation] = [] - let mostRecent = conversationsByTopic.values.max { a, b in - a.createdAt < b.createdAt - } + let mostRecent = await self.getMostRecentConversation() let pagination = Pagination(after: mostRecent?.createdAt) do { let seenPeers = try await listIntroductionPeers(pagination: pagination) for (peerAddress, sentAt) in seenPeers { - newConversations.append( - Conversation.v1( - ConversationV1( - client: client, - peerAddress: peerAddress, - sentAt: sentAt - ) - ) - ) + let newConversation = Conversation.v1(ConversationV1(client: client, peerAddress: peerAddress, sentAt: sentAt)) + newConversations.append(newConversation) } } catch { print("Error loading introduction peers: \(error)") } - for sealedInvitation in try await listInvitations(pagination: pagination) { do { - let newConversation = Conversation.v2(try makeConversation(from: sealedInvitation)) - newConversations.append( - newConversation - ) - if let consentProof = newConversation.consentProof { - if consentProof.signature != "" { - try await self.handleConsentProof(consentProof: consentProof, peerAddress: newConversation.peerAddress) - } - } + let newConversation = Conversation.v2(try makeConversation(from: sealedInvitation)) + newConversations.append(newConversation) + if let consentProof = newConversation.consentProof, consentProof.signature != "" { + try await self.handleConsentProof(consentProof: consentProof, peerAddress: newConversation.peerAddress) + } } catch { print("Error loading invitations: \(error)") } } + for conversation in newConversations { + if try conversation.peerAddress != client.address && Topic.isValidTopic(topic: conversation.topic) { + await self.addConversation(conversation) + } + } + return await self.getSortedConversations() + } - try newConversations - .filter { try $0.peerAddress != client.address && Topic.isValidTopic(topic: $0.topic) } - .forEach { conversationsByTopic[$0.topic] = $0 } + private func addConversation(_ conversation: Conversation) async { + conversationsByTopic[conversation.topic] = conversation + } - // TODO(perf): use DB to persist + sort + private func getMostRecentConversation() async -> Conversation? { + return conversationsByTopic.values.max { a, b in + a.createdAt < b.createdAt + } + } + + private func getSortedConversations() async -> [Conversation] { return conversationsByTopic.values.sorted { a, b in a.createdAt < b.createdAt } } - + public func getHmacKeys(request: Xmtp_KeystoreApi_V1_GetConversationHmacKeysRequest? = nil) -> Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse { let thirtyDayPeriodsSinceEpoch = Int(Date().timeIntervalSince1970) / (60 * 60 * 24 * 30) var hmacKeysResponse = Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse() - var topics = conversationsByTopic - if let requestTopics = request?.topics, !requestTopics.isEmpty { topics = topics.filter { requestTopics.contains($0.key) } } - for (topic, conversation) in topics { guard let keyMaterial = conversation.keyMaterial else { continue } - var hmacKeys = Xmtp_KeystoreApi_V1_GetConversationHmacKeysResponse.HmacKeys() - for period in (thirtyDayPeriodsSinceEpoch - 1)...(thirtyDayPeriodsSinceEpoch + 1) { let info = "\(period)-\(client.address)" do { @@ -690,7 +637,6 @@ public actor Conversations { } hmacKeysResponse.hmacKeys[topic] = hmacKeys } - return hmacKeysResponse } @@ -699,41 +645,32 @@ public actor Conversations { topic: .userIntro(client.address), pagination: pagination ).envelopes - let messages = envelopes.compactMap { envelope in do { let message = try MessageV1.fromBytes(envelope.message) - // Attempt to decrypt, just to make sure we can _ = try message.decrypt(with: client.privateKeyBundleV1) - return message } catch { return nil } } - var seenPeers: [String: Date] = [:] for message in messages { guard let recipientAddress = message.recipientAddress, - let senderAddress = message.senderAddress - else { + let senderAddress = message.senderAddress else { continue } - let sentAt = message.sentAt let peerAddress = recipientAddress == client.address ? senderAddress : recipientAddress - guard let existing = seenPeers[peerAddress] else { seenPeers[peerAddress] = sentAt continue } - if existing > sentAt { seenPeers[peerAddress] = sentAt } } - return seenPeers } @@ -742,7 +679,6 @@ public actor Conversations { topic: Topic.userInvite(client.address).description, pagination: pagination ) - return envelopes.compactMap { envelope in // swiftlint:disable no_optional_try try? SealedInvitation(serializedData: envelope.message) @@ -757,14 +693,11 @@ public actor Conversations { created: created, invitation: invitation ) - let peerAddress = try recipient.walletAddress - try await client.publish(envelopes: [ Envelope(topic: .userInvite(client.address), timestamp: created, message: sealed.serializedData()), Envelope(topic: .userInvite(peerAddress), timestamp: created, message: sealed.serializedData()), ]) - return sealed } }