Skip to content

Commit

Permalink
Add support for unix domain sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
argerus committed Nov 6, 2024
1 parent 706a633 commit 872e44f
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 31 deletions.
4 changes: 2 additions & 2 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ glob-match = "0.2.1"
jemallocator = { version = "0.5.0", optional = true }
lazy_static = "1.4.0"
thiserror = "1.0.47"
futures = { version = "0.3.28" }

# VISS
axum = { version = "0.6.20", optional = true, features = ["ws"] }
futures = { version = "0.3.28", optional = true }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }

Expand All @@ -74,7 +74,7 @@ sd-notify = "0.4.1"
default = ["tls"]
tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"]
viss = ["dep:axum", "dep:chrono", "dep:uuid"]
libtest = []

[build-dependencies]
Expand Down
79 changes: 52 additions & 27 deletions databroker/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

use std::{convert::TryFrom, future::Future, time::Duration};

use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::Server;
use futures::Stream;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::{TcpListener, UnixListener},
};
use tokio_stream::wrappers::{TcpListenerStream, UnixListenerStream};
#[cfg(feature = "tls")]
use tonic::transport::ServerTlsConfig;
use tonic::transport::{server::Connected, Server};
use tracing::{debug, info};

use databroker_proto::{kuksa, sdv};
Expand All @@ -34,7 +38,7 @@ pub enum ServerTLS {
Enabled { tls_config: ServerTlsConfig },
}

#[derive(PartialEq)]
#[derive(PartialEq, Clone)]
pub enum Api {
KuksaValV1,
SdvDatabrokerV1,
Expand Down Expand Up @@ -95,7 +99,7 @@ where
databroker.shutdown().await;
}

pub async fn serve<F>(
pub async fn serve_tcp<F>(
addr: impl Into<std::net::SocketAddr>,
broker: broker::DataBroker,
#[cfg(feature = "tls")] server_tls: ServerTLS,
Expand All @@ -109,25 +113,14 @@ where
let socket_addr = addr.into();
let listener = TcpListener::bind(socket_addr).await?;

/* On Linux systems try to notify daemon readiness to systemd.
* This function determines whether the a system is using systemd
* or not, so it is safe to use on non-systemd systems as well.
*/
#[cfg(target_os = "linux")]
{
match sd_notify::booted() {
Ok(true) => {
info!("Notifying systemd that the service is ready");
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
}
_ => {
debug!("System is not using systemd, will not try to notify");
}
}
if let Ok(addr) = listener.local_addr() {
info!("Listening on {}", addr);
}

let incoming = TcpListenerStream::new(listener);

serve_with_incoming_shutdown(
listener,
incoming,
broker,
#[cfg(feature = "tls")]
server_tls,
Expand All @@ -138,23 +131,55 @@ where
.await
}

pub async fn serve_with_incoming_shutdown<F>(
listener: TcpListener,
pub async fn serve_uds<F>(
path: impl AsRef<std::path::Path>,
broker: broker::DataBroker,
#[cfg(feature = "tls")] server_tls: ServerTLS,
apis: &[Api],
authorization: Authorization,
signal: F,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Future<Output = ()>,
{
broker.start_housekeeping_task();
let listener = UnixListener::bind(path)?;

if let Ok(addr) = listener.local_addr() {
info!("Listening on {}", addr);
match addr.as_pathname() {
Some(pathname) => info!("Listening on unix socket at {}", pathname.display()),
None => info!("Listening on unix socket (unknown path)"),
}
}

let incoming = TcpListenerStream::new(listener);
let incoming = UnixListenerStream::new(listener);

serve_with_incoming_shutdown(
incoming,
broker,
ServerTLS::Disabled,
apis,
authorization,
signal,
)
.await
}

pub async fn serve_with_incoming_shutdown<F, I, IO, IE>(
incoming: I,
broker: broker::DataBroker,
#[cfg(feature = "tls")] server_tls: ServerTLS,
apis: &[Api],
authorization: Authorization,
signal: F,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Future<Output = ()>,
I: Stream<Item = Result<IO, IE>>,
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
IO::ConnectInfo: Clone + Send + Sync + 'static,
IE: Into<Box<dyn std::error::Error + Send + Sync>>,
{
broker.start_housekeeping_task();

let mut server = Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(10)))
.http2_keepalive_timeout(Some(Duration::from_secs(20)));
Expand Down
63 changes: 62 additions & 1 deletion databroker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

use std::io;
use std::os::unix::fs::FileTypeExt;
use std::path::Path;

use databroker::authorization::Authorization;
use databroker::broker::RegistrationError;

Expand Down Expand Up @@ -170,6 +174,15 @@ async fn read_metadata_file<'a, 'b>(
Ok(())
}

fn unlink_unix_domain_socket(path: impl AsRef<Path>) -> Result<(), io::Error> {
if let Ok(metadata) = std::fs::metadata(&path) {
if metadata.file_type().is_socket() {
std::fs::remove_file(&path)?;
}
};
Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let version = option_env!("CARGO_PKG_VERSION").unwrap_or_default();
Expand Down Expand Up @@ -218,6 +231,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.value_parser(clap::value_parser!(u16))
.default_value("55555"),
)
.arg(
Arg::new("unix-socket")
.display_order(3)
.long("unix-socket")
.help("Listen on unix socket, e.g. /tmp/kuksa/databroker.sock")
.action(ArgAction::Set)
.value_name("PATH")
.required(false)
.env("KUKSA_DATABROKER_UNIX_SOCKET"),
)
.arg(
Arg::new("vss-file")
.display_order(4)
Expand Down Expand Up @@ -457,7 +480,45 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
apis.push(grpc::server::Api::SdvDatabrokerV1);
}

grpc::server::serve(
let unix_socket = args.get_one::<String>("unix-socket").cloned();
if let Some(path) = unix_socket {
unlink_unix_domain_socket(&path)?;
std::fs::create_dir_all(Path::new(&path).parent().unwrap())?;
let broker = broker.clone();
let authorization = authorization.clone();
let apis = apis.clone();
tokio::spawn(async move {
if let Err(err) =
grpc::server::serve_uds(&path, broker, &apis, authorization, shutdown_handler())
.await
{
error!("{err}");
}

info!("Unlinking unix domain socket at {}", path);
unlink_unix_domain_socket(path)
.unwrap_or_else(|_| error!("Failed to unlink unix domain socket"));
});
}

/* On Linux systems try to notify daemon readiness to systemd.
* This function determines whether the a system is using systemd
* or not, so it is safe to use on non-systemd systems as well.
*/
#[cfg(target_os = "linux")]
{
match sd_notify::booted() {
Ok(true) => {
info!("Notifying systemd that the service is ready");
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
}
_ => {
debug!("System is not using systemd, will not try to notify");
}
}
}

grpc::server::serve_tcp(
addr,
broker,
#[cfg(feature = "tls")]
Expand Down
4 changes: 3 additions & 1 deletion databroker/tests/world/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use databroker::{
};

use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tracing::debug;

use lazy_static::lazy_static;
Expand Down Expand Up @@ -188,6 +189,7 @@ impl DataBrokerWorld {
let addr = listener
.local_addr()
.expect("failed to determine listener's port");
let incoming = TcpListenerStream::new(listener);

tokio::spawn(async move {
let version = option_env!("VERGEN_GIT_SEMVER_LIGHTWEIGHT")
Expand Down Expand Up @@ -228,7 +230,7 @@ impl DataBrokerWorld {
}

grpc::server::serve_with_incoming_shutdown(
listener,
incoming,
data_broker,
#[cfg(feature = "tls")]
CERTS.server_tls_config(),
Expand Down

0 comments on commit 872e44f

Please sign in to comment.