Skip to content

Commit

Permalink
feat: improve PreparedMessage handling
Browse files Browse the repository at this point in the history
  • Loading branch information
dmccartney committed Sep 6, 2023
1 parent 10e9739 commit 16477f9
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 73 deletions.
16 changes: 8 additions & 8 deletions Sources/XMTP/ApiClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import Foundation
import XMTPRust
import XMTPRustSwift

typealias PublishRequest = Xmtp_MessageApi_V1_PublishRequest
typealias PublishResponse = Xmtp_MessageApi_V1_PublishResponse
typealias BatchQueryRequest = Xmtp_MessageApi_V1_BatchQueryRequest
typealias BatchQueryResponse = Xmtp_MessageApi_V1_BatchQueryResponse
typealias Cursor = Xmtp_MessageApi_V1_Cursor
typealias QueryRequest = Xmtp_MessageApi_V1_QueryRequest
typealias QueryResponse = Xmtp_MessageApi_V1_QueryResponse
typealias SubscribeRequest = Xmtp_MessageApi_V1_SubscribeRequest
public typealias PublishRequest = Xmtp_MessageApi_V1_PublishRequest
public typealias PublishResponse = Xmtp_MessageApi_V1_PublishResponse
public typealias BatchQueryRequest = Xmtp_MessageApi_V1_BatchQueryRequest
public typealias BatchQueryResponse = Xmtp_MessageApi_V1_BatchQueryResponse
public typealias Cursor = Xmtp_MessageApi_V1_Cursor
public typealias QueryRequest = Xmtp_MessageApi_V1_QueryRequest
public typealias QueryResponse = Xmtp_MessageApi_V1_QueryResponse
public typealias SubscribeRequest = Xmtp_MessageApi_V1_SubscribeRequest

public enum ApiClientError: Error {
case batchQueryError(String)
Expand Down
10 changes: 5 additions & 5 deletions Sources/XMTP/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,18 @@ public class Client {
_ = try await publish(envelopes: envelopes)
}

func query(topic: Topic, pagination: Pagination? = nil) async throws -> QueryResponse {
public func query(topic: Topic, pagination: Pagination? = nil) async throws -> QueryResponse {
return try await apiClient.query(
topic: topic,
pagination: pagination
)
}

func batchQuery(request: BatchQueryRequest) async throws -> BatchQueryResponse {
public func batchQuery(request: BatchQueryRequest) async throws -> BatchQueryResponse {
return try await apiClient.batchQuery(request: request)
}

@discardableResult func publish(envelopes: [Envelope]) async throws -> PublishResponse {
@discardableResult public func publish(envelopes: [Envelope]) async throws -> PublishResponse {
let authorized = AuthorizedIdentity(address: address, authorized: privateKeyBundleV1.identityKey.publicKey, identity: privateKeyBundleV1.identityKey)
let authToken = try await authorized.createAuthToken()

Expand All @@ -305,11 +305,11 @@ public class Client {
return try await apiClient.publish(envelopes: envelopes)
}

func subscribe(topics: [String]) -> AsyncThrowingStream<Envelope, Error> {
public func subscribe(topics: [String]) -> AsyncThrowingStream<Envelope, Error> {
return apiClient.subscribe(topics: topics)
}

func subscribe(topics: [Topic]) -> AsyncThrowingStream<Envelope, Error> {
public func subscribe(topics: [Topic]) -> AsyncThrowingStream<Envelope, Error> {
return subscribe(topics: topics.map(\.description))
}

Expand Down
11 changes: 11 additions & 0 deletions Sources/XMTP/Conversation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ public enum Conversation: Sendable {
}
}

// This is a convenience for invoking the underlying `client.publish(prepared.envelopes)`
// If a caller has a `Client` handy, they may opt to do that directly instead.
@discardableResult public func send(prepared: PreparedMessage) async throws -> String {
switch self {
case let .v1(conversationV1):
return try await conversationV1.send(prepared: prepared)
case let .v2(conversationV2):
return try await conversationV2.send(prepared: prepared)
}
}

@discardableResult public func send<T>(content: T, options: SendOptions? = nil, fallback _: String? = nil) async throws -> String {
switch self {
case let .v1(conversationV1):
Expand Down
60 changes: 29 additions & 31 deletions Sources/XMTP/ConversationV1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,35 +64,31 @@ public struct ConversationV1 {
} else {
isEphemeral = false
}

let msg = try Message(v1: message).serializedData()
let messageEnvelope = Envelope(
topic: isEphemeral ? ephemeralTopic : topic.description,
timestamp: date,
message: try Message(v1: message).serializedData()
message: msg
)

return PreparedMessage(messageEnvelope: messageEnvelope, conversation: .v1(self)) {
var envelopes = [messageEnvelope]

if client.contacts.needsIntroduction(peerAddress) && !isEphemeral {
envelopes.append(contentsOf: [
Envelope(
topic: .userIntro(peerAddress),
timestamp: date,
message: try Message(v1: message).serializedData()
),
Envelope(
topic: .userIntro(client.address),
timestamp: date,
message: try Message(v1: message).serializedData()
),
])

client.contacts.hasIntroduced[peerAddress] = true
}

try await client.publish(envelopes: envelopes)
}
var envelopes = [messageEnvelope]
if client.contacts.needsIntroduction(peerAddress) && !isEphemeral {
envelopes.append(contentsOf: [
Envelope(
topic: .userIntro(peerAddress),
timestamp: date,
message: msg
),
Envelope(
topic: .userIntro(client.address),
timestamp: date,
message: msg
),
])

client.contacts.hasIntroduced[peerAddress] = true
}

return PreparedMessage(envelopes: envelopes)
}

func prepareMessage<T>(content: T, options: SendOptions?) async throws -> PreparedMessage {
Expand Down Expand Up @@ -123,20 +119,22 @@ public struct ConversationV1 {

@discardableResult internal func send(content: String, options: SendOptions? = nil, sentAt _: Date? = nil) async throws -> String {
let preparedMessage = try await prepareMessage(content: content, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
return try await send(prepared: preparedMessage)
}

@discardableResult func send(encodedContent: EncodedContent, options: SendOptions?) async throws -> String {
let preparedMessage = try await prepareMessage(encodedContent: encodedContent, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
return try await send(prepared: preparedMessage)
}

@discardableResult func send(prepared: PreparedMessage) async throws -> String {
try await client.publish(envelopes: prepared.envelopes)
return prepared.messageID
}

func send<T>(content: T, options: SendOptions? = nil) async throws -> String {
let preparedMessage = try await prepareMessage(content: content, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
return try await send(prepared: preparedMessage)
}

public func streamMessages() -> AsyncThrowingStream<DecodedMessage, Error> {
Expand Down
18 changes: 9 additions & 9 deletions Sources/XMTP/ConversationV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ public struct ConversationV2 {
let topic = options?.ephemeral == true ? ephemeralTopic : topic

let envelope = Envelope(topic: topic, timestamp: Date(), message: try Message(v2: message).serializedData())
return PreparedMessage(messageEnvelope: envelope, conversation: .v2(self)) {
try await client.publish(envelopes: [envelope])
}
return PreparedMessage(envelopes: [envelope])
}

func prepareMessage<T>(content: T, options: SendOptions?) async throws -> PreparedMessage {
Expand Down Expand Up @@ -172,22 +170,24 @@ public struct ConversationV2 {

@discardableResult func send<T>(content: T, options: SendOptions? = nil) async throws -> String {
let preparedMessage = try await prepareMessage(content: content, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
return try await send(prepared: preparedMessage)
}

@discardableResult func send(content: String, options: SendOptions? = nil, sentAt _: Date) async throws -> String {
let preparedMessage = try await prepareMessage(content: content, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
return try await send(prepared: preparedMessage)
}

@discardableResult func send(encodedContent: EncodedContent, options: SendOptions? = nil) async throws -> String {
let preparedMessage = try await prepareMessage(encodedContent: encodedContent, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
return try await send(prepared: preparedMessage)
}

@discardableResult func send(prepared: PreparedMessage) async throws -> String {
try await client.publish(envelopes: prepared.envelopes)
return prepared.messageID
}

public func encode<Codec: ContentCodec, T>(codec: Codec, content: T) async throws -> Data where Codec.T == T {
let content = try codec.encode(content: content)

Expand Down
47 changes: 28 additions & 19 deletions Sources/XMTP/PreparedMessage.swift
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
//
// PreparedMessage.swift
//
//
// Created by Pat Nakajima on 3/9/23.
//

import CryptoKit
import Foundation

// This houses a fully prepared message that can be published
// as soon as the API client has connectivity.
//
// To support persistance layers that queue pending messages (e.g. while offline)
// this struct supports serializing to/from bytes that can be written to disk or elsewhere.
// See serializedData() and fromSerializedData()
public struct PreparedMessage {
var messageEnvelope: Envelope
var conversation: Conversation
var onSend: () async throws -> Void

// The first envelope should send the message to the conversation itself.
// Any more are for required intros/invites etc.
// A client can just publish these when it has connectivity.
public let envelopes: [Envelope]

// Note: we serialize as a PublishRequest as a convenient `envelopes` wrapper.
public static func fromSerializedData(_ serializedData: Data) throws -> PreparedMessage {
let req = try Xmtp_MessageApi_V1_PublishRequest(serializedData: serializedData)
return PreparedMessage(envelopes: req.envelopes)
}

public func decodedMessage() throws -> DecodedMessage {
return try conversation.decode(messageEnvelope)
}
// Note: we serialize as a PublishRequest as a convenient `envelopes` wrapper.
public func serializedData() throws -> Data {
let req = Xmtp_MessageApi_V1_PublishRequest.with { $0.envelopes = envelopes }
return try req.serializedData()
}

public func send() async throws {
try await onSend()
}
public var messageID: String {
Data(SHA256.hash(data: envelopes[0].message)).toHex
}

var messageID: String {
Data(SHA256.hash(data: messageEnvelope.message)).toHex
}
public var conversationTopic: String {
envelopes[0].contentTopic
}
}
18 changes: 17 additions & 1 deletion Tests/XMTPTests/ConversationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ConversationTests: XCTestCase {
let preparedMessage = try await conversation.prepareMessage(content: "hi")
let messageID = preparedMessage.messageID

try await preparedMessage.send()
try await conversation.send(prepared: preparedMessage)

let messages = try await conversation.messages()
let message = messages[0]
Expand All @@ -48,6 +48,22 @@ class ConversationTests: XCTestCase {
XCTAssertEqual(message.id, messageID)
}

func testCanSendPreparedMessagesWithoutAConversation() async throws {
let conversation = try await aliceClient.conversations.newConversation(with: bob.address)
let preparedMessage = try await conversation.prepareMessage(content: "hi")
let messageID = preparedMessage.messageID

// This does not need the `conversation` to `.publish` the message.
// This simulates a background task publishes all pending messages upon connection.
try await aliceClient.publish(envelopes: preparedMessage.envelopes)

let messages = try await conversation.messages()
let message = messages[0]

XCTAssertEqual("hi", message.body)
XCTAssertEqual(message.id, messageID)
}

func testV2RejectsSpoofedContactBundles() async throws {
let topic =
"/xmtp/0/m-Gdb7oj5nNdfZ3MJFLAcS4WTABgr6al1hePy6JV1-QUE/proto"
Expand Down

0 comments on commit 16477f9

Please sign in to comment.