Skip to content

Commit

Permalink
feat: instrument server
Browse files Browse the repository at this point in the history
  • Loading branch information
XOR-op committed Aug 9, 2024
1 parent b5d05b0 commit 04c6da1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
34 changes: 22 additions & 12 deletions boltconn/src/external/instrument_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::external::web_common::{get_cors_layer, parse_cors_allow, web_auth};
use crate::instrument::bus::{BusSubscriber, MessageBus};
use crate::proxy::error::{RuntimeError, SystemError};
use axum::extract::ws::WebSocket;
use axum::extract::{ws, Path, State, WebSocketUpgrade};
use axum::extract::{ws, Query, State, WebSocketUpgrade};
use axum::middleware::map_request;
use axum::response::IntoResponse;
use axum::routing::get;
Expand All @@ -13,6 +13,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::select;

#[derive(Clone)]
pub struct InstrumentServer {
Expand Down Expand Up @@ -59,7 +60,7 @@ impl InstrumentServer {

async fn subscribe(
State(server): State<Self>,
Path(params): Path<HashMap<String, String>>,
Query(params): Query<HashMap<String, String>>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
if let Some(secret) = server.secret.as_ref() {
Expand All @@ -71,12 +72,14 @@ impl InstrumentServer {
let ids = {
let mut arr = Vec::new();
let Some(s) = params.get("id") else {
tracing::debug!("No id parameter in request");
return refusal_resp(http::StatusCode::BAD_REQUEST);
};
for id in s.split(',') {
if let Ok(val) = u64::from_str_radix(id, 10) {
if let Ok(val) = id.parse::<u64>() {
arr.push(val);
} else {
tracing::debug!("Invalid id parameter in request: {} in {}", id, s);
return refusal_resp(http::StatusCode::BAD_REQUEST);
}
}
Expand All @@ -85,22 +88,29 @@ impl InstrumentServer {
let Some(sub) = server.msg_bus.create_subscriber(ids.iter().copied()) else {
return refusal_resp(http::StatusCode::CONFLICT);
};
ws.on_upgrade(move |socket| Self::subscribe_inner(socket, sub, ids))
ws.on_upgrade(move |socket| Self::subscribe_inner(socket, sub))
}

async fn subscribe_inner(mut socket: WebSocket, sub: BusSubscriber, ids: Vec<u64>) {
while let Some(msg) = sub.recv().await {
async fn subscribe_inner(mut socket: WebSocket, sub: BusSubscriber) {
loop {
let msg = select! {
r = socket.recv() => {
// client disconnected
if r.is_none(){
return;
}
// some messages, maybe error
continue;
}
Some(msg) = sub.recv() => msg,
};
let wire_msg = InstrumentData {
id: msg.sub_id,
message: msg.msg,
};
if let Err(e) = socket
let _ = socket
.send(ws::Message::Text(wire_msg.encode_string()))
.await
{
tracing::warn!("Subscriber for {:?} failed to send: {}", ids, e);
break;
}
.await;
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions boltconn/src/instrument/bus.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Mutex;

Expand All @@ -22,7 +23,6 @@ impl MessageBus {

pub async fn run(&self) {
while let Ok(msg) = self.ingress_receiver.recv_async().await {
tracing::debug!("[Message Bus {}] {:?} ", msg.sub_id, msg.msg);
if let Some(sender) = self.egress_senders.lock().unwrap().get(&msg.sub_id) {
let _ = sender.try_send(msg);
}
Expand All @@ -44,7 +44,16 @@ impl MessageBus {
let mut egress_senders = self.egress_senders.lock().unwrap();
for sub_id in iter2 {
if egress_senders.contains_key(&sub_id) {
return None;
match egress_senders.entry(sub_id) {
Entry::Occupied(e) => {
if e.get().is_disconnected() {
e.remove();
} else {
return None;
}
}
Entry::Vacant(_) => unreachable!(),
}
}
}
for sub_id in sub_ids {
Expand Down

0 comments on commit 04c6da1

Please sign in to comment.