From a04669b61f3c332a4176f9fa4ddca75c875497ad Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Mon, 10 Jun 2024 14:06:09 +0200 Subject: [PATCH] SSE::Connected event to return status an headers --- contract-tests/src/bin/sse-test-api/main.rs | 17 ++++++++++++++--- eventsource-client/examples/tail.rs | 3 +++ eventsource-client/src/client.rs | 18 +++++++++++++++++- eventsource-client/src/event_parser.rs | 7 ++++++- eventsource-client/src/lib.rs | 3 ++- 5 files changed, 42 insertions(+), 6 deletions(-) diff --git a/contract-tests/src/bin/sse-test-api/main.rs b/contract-tests/src/bin/sse-test-api/main.rs index 0cea4bd..22f4e78 100644 --- a/contract-tests/src/bin/sse-test-api/main.rs +++ b/contract-tests/src/bin/sse-test-api/main.rs @@ -50,14 +50,25 @@ struct Config { #[derive(Serialize, Debug)] #[serde(tag = "kind", rename_all = "camelCase")] enum EventType { - Event { event: Event }, - Comment { comment: String }, - Error { error: String }, + Connected { + status: u16, + headers: HashMap, + }, + Event { + event: Event, + }, + Comment { + comment: String, + }, + Error { + error: String, + }, } impl From for EventType { fn from(event: es::SSE) -> Self { match event { + es::SSE::Connected((status, headers)) => Self::Connected { status, headers }, es::SSE::Event(evt) => Self::Event { event: Event { event_type: evt.event_type, diff --git a/eventsource-client/examples/tail.rs b/eventsource-client/examples/tail.rs index 5df399c..77c2c05 100644 --- a/eventsource-client/examples/tail.rs +++ b/eventsource-client/examples/tail.rs @@ -40,6 +40,9 @@ fn tail_events(client: impl es::Client) -> impl Stream> { client .stream() .map_ok(|event| match event { + es::SSE::Connected((status, _)) => { + println!("got connected: \nstatus={}", status) + } es::SSE::Event(ev) => { println!("got an event: {}\n{}", ev.event_type, ev.data) } diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 8ee7077..79ffc1e 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -13,6 +13,7 @@ use log::{debug, info, trace, warn}; use pin_project::pin_project; use std::{ boxed, + collections::HashMap, fmt::{self, Debug, Display, Formatter}, future::Future, io::ErrorKind, @@ -393,6 +394,7 @@ where let this = self.as_mut().project(); if let Some(event) = this.event_parser.get_event() { return match event { + SSE::Connected(_) => Poll::Ready(Some(Ok(event))), SSE::Event(ref evt) => { *this.last_event_id = evt.id.clone(); @@ -438,11 +440,25 @@ where if resp.status().is_success() { self.as_mut().project().retry_strategy.reset(Instant::now()); self.as_mut().reset_redirects(); + + let headers = resp.headers(); + let mut map = HashMap::new(); + for (key, value) in headers.iter() { + let key = key.to_string(); + let value = match value.to_str() { + Ok(value) => value.to_string(), + Err(_) => String::from(""), + }; + map.insert(key, value); + } + let status = resp.status().as_u16(); + self.as_mut() .project() .state .set(State::Connected(resp.into_body())); - continue; + + return Poll::Ready(Some(Ok(SSE::Connected((status, map))))); } if resp.status() == 301 || resp.status() == 307 { diff --git a/eventsource-client/src/event_parser.rs b/eventsource-client/src/event_parser.rs index 4e66891..0920be3 100644 --- a/eventsource-client/src/event_parser.rs +++ b/eventsource-client/src/event_parser.rs @@ -1,4 +1,8 @@ -use std::{collections::VecDeque, convert::TryFrom, str::from_utf8}; +use std::{ + collections::{HashMap, VecDeque}, + convert::TryFrom, + str::from_utf8, +}; use hyper::body::Bytes; use log::{debug, log_enabled, trace}; @@ -32,6 +36,7 @@ impl EventData { #[derive(Debug, Eq, PartialEq)] pub enum SSE { + Connected((u16, HashMap)), Event(Event), Comment(String), } diff --git a/eventsource-client/src/lib.rs b/eventsource-client/src/lib.rs index 9865f39..7677f4f 100644 --- a/eventsource-client/src/lib.rs +++ b/eventsource-client/src/lib.rs @@ -14,7 +14,8 @@ //! let mut stream = Box::pin(client.stream()) //! .map_ok(|event| match event { //! SSE::Comment(comment) => println!("got a comment event: {:?}", comment), -//! SSE::Event(evt) => println!("got an event: {}", evt.event_type) +//! SSE::Event(evt) => println!("got an event: {}", evt.event_type), +//! SSE::Connected(_) => println!("got connected") //! }) //! .map_err(|e| println!("error streaming events: {:?}", e)); //! # while let Ok(Some(_)) = stream.try_next().await {}