Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V3] Remove Decrypted Messages #418

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/XMTPiOS/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -793,12 +793,12 @@ public final class Client {
}
}

public func findMessage(messageId: String) throws -> MessageV3? {
public func findMessage(messageId: String) throws -> Message? {
guard let client = v3Client else {
throw ClientError.noV3Client("Error no V3 client initialized")
}
do {
return MessageV3(
return Message(
client: self,
ffiMessage: try client.message(messageId: messageId.hexToData))
} catch {
Expand Down
29 changes: 1 addition & 28 deletions Sources/XMTPiOS/Conversation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public enum Conversation {
}
}

public func processMessage(messageBytes: Data) async throws -> MessageV3 {
public func processMessage(messageBytes: Data) async throws -> Message {
switch self {
case let .group(group):
return try await group.processMessage(messageBytes: messageBytes)
Expand Down Expand Up @@ -168,17 +168,6 @@ public enum Conversation {
}
}

public func streamDecryptedMessages() -> AsyncThrowingStream<
DecryptedMessage, Error
> {
switch self {
case let .group(group):
return group.streamDecryptedMessages()
case let .dm(dm):
return dm.streamDecryptedMessages()
}
}

public func messages(
limit: Int? = nil, before: Date? = nil, after: Date? = nil,
direction: PagingInfoSortDirection? = .descending
Expand All @@ -195,22 +184,6 @@ public enum Conversation {
}
}

public func decryptedMessages(
limit: Int? = nil, before: Date? = nil, after: Date? = nil,
direction: PagingInfoSortDirection? = .descending
) async throws -> [DecryptedMessage] {
switch self {
case let .group(group):
return try await group.decryptedMessages(
before: before, after: after, limit: limit, direction: direction
)
case let .dm(dm):
return try await dm.decryptedMessages(
before: before, after: after, limit: limit, direction: direction
)
}
}

var client: Client {
switch self {
case let .group(group):
Expand Down
41 changes: 1 addition & 40 deletions Sources/XMTPiOS/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public actor Conversations {
}
do {
continuation.yield(
try MessageV3(
try Message(
client: self.client, ffiMessage: message
).decode())
} catch {
Expand All @@ -423,45 +423,6 @@ public actor Conversations {
}
}

public func streamAllDecryptedConversationMessages() -> AsyncThrowingStream<
DecryptedMessage, Error
> {
AsyncThrowingStream { continuation in
let ffiStreamActor = FfiStreamActor()
let task = Task {
let stream = await self.client.v3Client?.conversations()
.streamAllMessages(
messageCallback: MessageCallback(client: self.client) {
message in
guard !Task.isCancelled else {
continuation.finish()
Task {
await ffiStreamActor.endStream() // End the stream upon cancellation
}
return
}
do {
continuation.yield(
try MessageV3(
client: self.client, ffiMessage: message
).decrypt())
} catch {
print("Error onMessage \(error)")
}
}
)
await ffiStreamActor.setFfiStream(stream)
}

continuation.onTermination = { _ in
task.cancel()
Task {
await ffiStreamActor.endStream()
}
}
}
}

public func fromWelcome(envelopeBytes: Data) async throws
-> Conversation?
{
Expand Down
106 changes: 4 additions & 102 deletions Sources/XMTPiOS/Dm.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ public struct Dm: Identifiable, Equatable, Hashable {
return try ffiConversation.consentState().fromFFI
}

public func processMessage(messageBytes: Data) async throws -> MessageV3 {
public func processMessage(messageBytes: Data) async throws -> Message {
let message =
try await ffiConversation.processStreamedConversationMessage(
envelopeBytes: messageBytes)
return MessageV3(client: client, ffiMessage: message)
return Message(client: client, ffiMessage: message)
}

public func send<T>(content: T, options: SendOptions? = nil) async throws
Expand Down Expand Up @@ -181,7 +181,7 @@ public struct Dm: Identifiable, Equatable, Hashable {
}
do {
continuation.yield(
try MessageV3(
try Message(
client: self.client, ffiMessage: message
).decode())
} catch {
Expand All @@ -203,42 +203,6 @@ public struct Dm: Identifiable, Equatable, Hashable {
}
}

public func streamDecryptedMessages() -> AsyncThrowingStream<
DecryptedMessage, Error
> {
AsyncThrowingStream { continuation in
let task = Task.detached {
self.streamHolder.stream = await self.ffiConversation.stream(
messageCallback: MessageCallback(client: self.client) {
message in
guard !Task.isCancelled else {
continuation.finish()
return
}
do {
continuation.yield(
try MessageV3(
client: self.client, ffiMessage: message
).decrypt())
} catch {
print("Error onMessage \(error)")
continuation.finish(throwing: error)
}
}
)

continuation.onTermination = { @Sendable reason in
self.streamHolder.stream?.end()
}
}

continuation.onTermination = { @Sendable reason in
task.cancel()
self.streamHolder.stream?.end()
}
}
}

public func messages(
before: Date? = nil,
after: Date? = nil,
Expand Down Expand Up @@ -296,70 +260,8 @@ public struct Dm: Identifiable, Equatable, Hashable {

return try ffiConversation.findMessages(opts: options).compactMap {
ffiMessage in
return MessageV3(client: self.client, ffiMessage: ffiMessage)
return Message(client: self.client, ffiMessage: ffiMessage)
.decodeOrNull()
}
}

public func decryptedMessages(
before: Date? = nil,
after: Date? = nil,
limit: Int? = nil,
direction: PagingInfoSortDirection? = .descending,
deliveryStatus: MessageDeliveryStatus? = .all
) async throws -> [DecryptedMessage] {
var options = FfiListMessagesOptions(
sentBeforeNs: nil,
sentAfterNs: nil,
limit: nil,
deliveryStatus: nil,
direction: nil
)

if let before {
options.sentBeforeNs = Int64(
before.millisecondsSinceEpoch * 1_000_000)
}

if let after {
options.sentAfterNs = Int64(
after.millisecondsSinceEpoch * 1_000_000)
}

if let limit {
options.limit = Int64(limit)
}

let status: FfiDeliveryStatus? = {
switch deliveryStatus {
case .published:
return FfiDeliveryStatus.published
case .unpublished:
return FfiDeliveryStatus.unpublished
case .failed:
return FfiDeliveryStatus.failed
default:
return nil
}
}()

options.deliveryStatus = status

let direction: FfiDirection? = {
switch direction {
case .ascending:
return FfiDirection.ascending
default:
return FfiDirection.descending
}
}()

options.direction = direction

return try ffiConversation.findMessages(opts: options).compactMap {
ffiMessage in
return MessageV3(client: self.client, ffiMessage: ffiMessage)
.decryptOrNull()
}
}
}
106 changes: 4 additions & 102 deletions Sources/XMTPiOS/Group.swift
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ public struct Group: Identifiable, Equatable, Hashable {
return try ffiGroup.consentState().fromFFI
}

public func processMessage(messageBytes: Data) async throws -> MessageV3 {
public func processMessage(messageBytes: Data) async throws -> Message {
let message = try await ffiGroup.processStreamedConversationMessage(
envelopeBytes: messageBytes)
return MessageV3(client: client, ffiMessage: message)
return Message(client: client, ffiMessage: message)
}

public func send<T>(content: T, options: SendOptions? = nil) async throws
Expand Down Expand Up @@ -379,7 +379,7 @@ public struct Group: Identifiable, Equatable, Hashable {
}
do {
continuation.yield(
try MessageV3(
try Message(
client: self.client, ffiMessage: message
).decode())
} catch {
Expand All @@ -401,42 +401,6 @@ public struct Group: Identifiable, Equatable, Hashable {
}
}

public func streamDecryptedMessages() -> AsyncThrowingStream<
DecryptedMessage, Error
> {
AsyncThrowingStream { continuation in
let task = Task.detached {
self.streamHolder.stream = await self.ffiGroup.stream(
messageCallback: MessageCallback(client: self.client) {
message in
guard !Task.isCancelled else {
continuation.finish()
return
}
do {
continuation.yield(
try MessageV3(
client: self.client, ffiMessage: message
).decrypt())
} catch {
print("Error onMessage \(error)")
continuation.finish(throwing: error)
}
}
)

continuation.onTermination = { @Sendable reason in
self.streamHolder.stream?.end()
}
}

continuation.onTermination = { @Sendable reason in
task.cancel()
self.streamHolder.stream?.end()
}
}
}

public func messages(
before: Date? = nil,
after: Date? = nil,
Expand Down Expand Up @@ -494,70 +458,8 @@ public struct Group: Identifiable, Equatable, Hashable {

return try ffiGroup.findMessages(opts: options).compactMap {
ffiMessage in
return MessageV3(client: self.client, ffiMessage: ffiMessage)
return Message(client: self.client, ffiMessage: ffiMessage)
.decodeOrNull()
}
}

public func decryptedMessages(
before: Date? = nil,
after: Date? = nil,
limit: Int? = nil,
direction: PagingInfoSortDirection? = .descending,
deliveryStatus: MessageDeliveryStatus? = .all
) async throws -> [DecryptedMessage] {
var options = FfiListMessagesOptions(
sentBeforeNs: nil,
sentAfterNs: nil,
limit: nil,
deliveryStatus: nil,
direction: nil
)

if let before {
options.sentBeforeNs = Int64(
before.millisecondsSinceEpoch * 1_000_000)
}

if let after {
options.sentAfterNs = Int64(
after.millisecondsSinceEpoch * 1_000_000)
}

if let limit {
options.limit = Int64(limit)
}

let status: FfiDeliveryStatus? = {
switch deliveryStatus {
case .published:
return FfiDeliveryStatus.published
case .unpublished:
return FfiDeliveryStatus.unpublished
case .failed:
return FfiDeliveryStatus.failed
default:
return nil
}
}()

options.deliveryStatus = status

let direction: FfiDirection? = {
switch direction {
case .ascending:
return FfiDirection.ascending
default:
return FfiDirection.descending
}
}()

options.direction = direction

return try ffiGroup.findMessages(opts: options).compactMap {
ffiMessage in
return MessageV3(client: self.client, ffiMessage: ffiMessage)
.decryptOrNull()
}
}
}
File renamed without changes.
Loading
Loading