Skip to content

Commit

Permalink
local streams
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Aug 29, 2024
1 parent 26cad1a commit d1536f9
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 13 deletions.
10 changes: 4 additions & 6 deletions xmtp_api_http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,25 +234,23 @@ impl XmtpMlsClient for XmtpHttpApiClient {
request: SubscribeGroupMessagesRequest,
) -> Result<GroupMessageStream, Error> {
log::debug!("subscribe_group_messages");
create_grpc_stream::<_, GroupMessage>(
Ok(create_grpc_stream::<_, GroupMessage>(
request,
self.endpoint(ApiEndpoints::SUBSCRIBE_GROUP_MESSAGES),
self.http_client.clone(),
)
.await
))
}

async fn subscribe_welcome_messages(
&self,
request: SubscribeWelcomeMessagesRequest,
) -> Result<WelcomeMessageStream, Error> {
log::debug!("subscribe_welcome_messages");
create_grpc_stream::<_, WelcomeMessage>(
Ok(create_grpc_stream::<_, WelcomeMessage>(
request,
self.endpoint(ApiEndpoints::SUBSCRIBE_WELCOME_MESSAGES),
self.http_client.clone(),
)
.await
))
}
}

Expand Down
39 changes: 32 additions & 7 deletions xmtp_api_http/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::{stream::LocalBoxStream, StreamExt};
use futures::{stream, Stream, StreamExt};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Deserializer;
use std::io::Read;
Expand Down Expand Up @@ -40,16 +40,43 @@ where
}
}

pub async fn create_grpc_stream<
#[cfg(target_arch = "wasm32")]
pub fn create_grpc_stream<
T: Serialize + Send + 'static,
R: DeserializeOwned + Send + std::fmt::Debug + 'static,
>(
request: T,
endpoint: String,
http_client: reqwest::Client,
) -> Result<LocalBoxStream<'static, Result<R, Error>>, Error> {
) -> stream::LocalBoxStream<'static, Result<R, Error>> {
log::info!("About to spawn stream");
let stream = async_stream::stream! {
let stream = create_grpc_stream_inner(request, endpoint, http_client);
stream.boxed_local()
}

#[cfg(not(target_arch = "wasm32"))]
pub fn create_grpc_stream<
T: Serialize + Send + 'static,
R: DeserializeOwned + Send + std::fmt::Debug + 'static,
>(
request: T,
endpoint: String,
http_client: reqwest::Client,
) -> stream::BoxStream<'static, Result<R, Error>> {
log::info!("About to spawn stream");
let stream = create_grpc_stream_inner(request, endpoint, http_client);
stream.boxed()
}

fn create_grpc_stream_inner<
T: Serialize + Send + 'static,
R: DeserializeOwned + Send + std::fmt::Debug + 'static,
>(
request: T,
endpoint: String,
http_client: reqwest::Client,
) -> impl Stream<Item = Result<R, Error>> {
async_stream::stream! {
log::info!("Spawning grpc http stream");
let request = http_client
.post(endpoint)
Expand Down Expand Up @@ -87,9 +114,7 @@ pub async fn create_grpc_stream<
yield res;
}
}
};

Ok(stream.boxed_local())
}
}

#[cfg(test)]
Expand Down

0 comments on commit d1536f9

Please sign in to comment.