Skip to content

Commit

Permalink
experimental: sse client support (#592)
Browse files Browse the repository at this point in the history
> ⚠️ This is experimental and a work in progress

## Overview

tl;dr: This is a very experimental implementation of streaming in Edge: it both listens to the experimental Unleash streaming endpoint and pushes updates to any listeners subscribing to Edge (in effect mirroring the Unleash endpoint). All related code is behind the "streaming" feature gate.

More detail:

I have added an event source client to the `features_refresher` file. If the streaming flag is on, we'll spawn a task that takes care of listening to Unleash for events instead of starting the poll loop for flags.

There is a new endpoint at `api/client/streaming` (mirroring the Unleash endpoint) that you can hit to start listening for updates.

The updates are handled by a new `broadcaster` module (largely stolen from [this Actix example](https://github.com/actix/examples/blob/master/server-sent-events/src/broadcast.rs)).

The broadcaster stores its client in a hash map that uses the flag query as the key and maps it to a vec of clients that use the same query.

## Left to do

Regarding the implementation: I'm not very familiar with Actix, and I haven't done a whole lot of async / multithreaded rust before, so there's probably gonna be a whole lot of things that we can improve, from the architecture level to the specific data structures used. 

~~Also, due to the very time-limited spike we did, we need to actually do some real error handling. There's a good few places where we either ignore errors or would just panic if we ever encountered them.~~

But aside from that, there's a few other things to do too:
- [x] Store the streaming url in the urls struct
- [ ] Add fetch mode streaming/polling as a CLI option
- [ ] The broadcaster needs to store the token used with each client; not just the query (you can have multiple tokens with the same query and those tokens can be invalidated separately)
- [ ] We should probably find a more sensible way to use the query as a key in the hash map: serializing it and hashing the string seems roundabout and wonky.
  • Loading branch information
thomasheartman authored Dec 12, 2024
1 parent 91750dd commit f511287
Show file tree
Hide file tree
Showing 9 changed files with 682 additions and 70 deletions.
189 changes: 185 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ repository = "https://github.com/Unleash/unleash-edge"
rust-version = "1.81.0"
version = "19.6.3"

[features]
streaming = ["actix-web-lab", "eventsource-client", "tokio-stream"]

[package.metadata.wix]
upgrade-guid = "11E5D83A-3034-48BB-9A84-9F589EBD648C"
path-guid = "6F606A3B-C7E9-43EC-8B6E-91D7B74F80FC"
Expand All @@ -30,6 +33,7 @@ actix-http = "3.9.0"
actix-middleware-etag = "0.4.2"
actix-service = "2.0.2"
actix-web = { version = "4.9.0", features = ["rustls-0_23", "compress-zstd"] }
actix-web-lab = { version = "0.23.0", optional = true }
ahash = "0.8.11"
anyhow = "1.0.91"
async-trait = "0.1.83"
Expand All @@ -41,6 +45,7 @@ cidr = "0.3.0"
clap = { version = "4.5.19", features = ["derive", "env"] }
clap-markdown = "0.1.4"
dashmap = "6.0.1"
eventsource-client = { version = "0.13.0", optional = true }
futures = "0.3.30"
futures-core = "0.3.30"
iter_tools = "0.24.0"
Expand Down Expand Up @@ -88,6 +93,7 @@ tokio = { version = "1.42.0", features = [
"tracing",
"fs",
] }
tokio-stream = { version = "0.1.16", optional = true }
tracing = { version = "0.1.40", features = ["log"] }
tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] }
ulid = "1.1.2"
Expand Down
Loading

0 comments on commit f511287

Please sign in to comment.