diff --git a/Cargo.toml b/Cargo.toml index e143c64..27c466f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,8 @@ include = [ # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0" +async-trait = "0.1" base64 = "0.21.5" bytes = { version = "1.4.0", features = ["serde"] } chrono = "0.4.26" diff --git a/README.md b/README.md index 2c152e6..ba78c6d 100644 --- a/README.md +++ b/README.md @@ -94,3 +94,11 @@ If the number of retries become larger than this value, the write/send operation The maximum duration of wait between retries, in milliseconds. If calculated retry wait is larger than this value, operation will fail. The default is 60,000 (60 seconds). + +### max_connection_lifetime + +The maximum lifetime of a connection before reconnection is attempted. + +Note that reconnection is only triggered when new log lines are sent. +If no new log lines are received within this timeframe, the connection will remain open, even if it's older than the value. +The default is 0 (no reconnection). diff --git a/src/client.rs b/src/client.rs index 02f15aa..2aff682 100644 --- a/src/client.rs +++ b/src/client.rs @@ -23,18 +23,18 @@ use std::net::SocketAddr; use std::path::Path; +use std::sync::Arc; use std::time::Duration; +use anyhow::Result as AnyhowResult; use base64::{engine::general_purpose, Engine}; -use tokio::{ - net::{TcpStream, UnixStream}, - sync::broadcast::{channel, Sender}, - time::timeout, -}; +use tokio::sync::broadcast::{channel, Sender}; use uuid::Uuid; use crate::record::Map; -use crate::worker::{Message, Options, Record, RetryConfig, Worker}; +use crate::worker::{ + Message, Options, Record, RetryConfig, TCPConnectionConfig, UnixSocketConfig, Worker, +}; #[derive(Debug, Clone)] pub struct SendError { @@ -65,6 +65,12 @@ pub struct Config { /// If calculated retry wait is larger than this value, operation will fail. /// The default is 60,000 (60 seconds). pub max_retry_wait: u64, + /// The maximum lifetime of a connection before reconnection is attempted. + /// Note that reconnection is only triggered when new log lines are sent. + /// If no new log lines are received within this timeframe, the connection + /// will remain open, even if it's older than `max_connection_lifetime`. + /// The default is 0 (no reconnection). + pub max_connection_lifetime: Duration, } impl Default for Config { @@ -74,6 +80,7 @@ impl Default for Config { retry_wait: 500, max_retry: 10, max_retry_wait: 60000, + max_connection_lifetime: Duration::from_secs(0), } } } @@ -91,44 +98,61 @@ pub struct Client { impl Client { /// Connect to the fluentd server using TCP and create a worker with tokio::spawn. - pub async fn new_tcp(addr: SocketAddr, config: &Config) -> tokio::io::Result { - let stream = timeout(config.timeout, TcpStream::connect(addr)).await??; + pub async fn new_tcp(addr: SocketAddr, config: &Config) -> AnyhowResult { let (sender, receiver) = channel(1024); let config = config.clone(); - let _ = tokio::spawn(async move { - let mut worker = Worker::new( - stream, - receiver, - RetryConfig { - initial_wait: config.retry_wait, - max: config.max_retry, - max_wait: config.max_retry_wait, - }, - ); - worker.run().await + let stream_config = Arc::new(TCPConnectionConfig { + addr: addr.to_owned(), + timeout: config.timeout, }); + // create the worker -- + // new() will try to establish an connection, so it returns error if connection, + // so it returns error upon connection error + let mut worker = Worker::new( + stream_config, + config.max_connection_lifetime, + receiver, + RetryConfig { + initial_wait: config.retry_wait, + max: config.max_retry, + max_wait: config.max_retry_wait, + }, + ) + .await?; + let _ = tokio::spawn(async move { worker.run().await }); Ok(Self { sender }) } /// Connect to the fluentd server using unix domain socket and create a worker with tokio::spawn. - pub async fn new_unix>(path: P, config: &Config) -> tokio::io::Result { - let stream = timeout(config.timeout, UnixStream::connect(path)).await??; + pub async fn new_unix + std::marker::Send>( + path: P, + config: &Config, + ) -> AnyhowResult { let (sender, receiver) = channel(1024); let config = config.clone(); + let stream_config = Arc::new(UnixSocketConfig { + path: path.as_ref().to_path_buf(), + timeout: config.timeout, + }); + // create the worker -- + // new() will try to establish an connection, so it returns error if connection, + // so it returns error upon connection error + let mut worker = Worker::new( + stream_config, + config.max_connection_lifetime, + receiver, + RetryConfig { + initial_wait: config.retry_wait, + max: config.max_retry, + max_wait: config.max_retry_wait, + }, + ) + .await?; let _ = tokio::spawn(async move { - let mut worker = Worker::new( - stream, - receiver, - RetryConfig { - initial_wait: config.retry_wait, - max: config.max_retry, - max_wait: config.max_retry_wait, - }, - ); - worker.run().await + worker.run().await; }); Ok(Self { sender }) diff --git a/src/worker.rs b/src/worker.rs index 0ff222c..d1b703b 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,11 +1,19 @@ +use std::cell::Cell; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use anyhow::Result as AnyhowResult; +use async_trait::async_trait; use bytes::{Buf, BufMut}; -use log::warn; +use log::{debug, warn}; use rmp_serde::Serializer; use serde::{ser::SerializeMap, Deserialize, Serialize}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpStream, UnixStream}, sync::broadcast::{error::RecvError, Receiver}, - time::Duration, + time::{timeout, Duration}, }; use crate::record::Map; @@ -84,7 +92,10 @@ pub struct RetryConfig { } pub struct Worker { - stream: StreamType, + stream_config: Arc + Send + Sync>, + max_connection_lifetime: Duration, + stream: Cell, + last_connection_time: Cell, receiver: Receiver, retry_config: RetryConfig, } @@ -93,12 +104,21 @@ impl Worker where StreamType: AsyncReadExt + AsyncWriteExt + Unpin, { - pub fn new(stream: StreamType, receiver: Receiver, retry_config: RetryConfig) -> Self { - Self { - stream, + pub async fn new( + stream_config: Arc + Send + Sync>, + max_connection_lifetime: Duration, + receiver: Receiver, + retry_config: RetryConfig, + ) -> AnyhowResult { + let stream = stream_config.connect().await?; + Ok(Self { + stream_config, + max_connection_lifetime, + stream: Cell::new(stream), + last_connection_time: Cell::new(Instant::now()), receiver, retry_config, - } + }) } pub async fn run(&mut self) { @@ -140,7 +160,26 @@ where for i in 0..self.retry_config.max as i32 { tokio::time::sleep(wait_time).await; - match self.write(record).await { + // reconnect when the lifetime is reached + if !self.max_connection_lifetime.is_zero() + && self.last_connection_time.get().elapsed() >= self.max_connection_lifetime + { + debug!("attempting to re-establish connection"); + match self.stream_config.connect().await { + Ok(new_stream) => { + self.stream.replace(new_stream); + self.last_connection_time.replace(Instant::now()); + } + Err(err) => { + warn!( + "failed to reconnect. Will try again upon the next try-write: {}", + err + ); + } + } + } + + match Self::write(&mut self.stream.get_mut(), record).await { Ok(_) => return Ok(()), Err(Error::ConnectionClosed) => return Err(Error::ConnectionClosed), Err(_) => {} @@ -157,13 +196,13 @@ where Err(Error::MaxRetriesExceeded) } - async fn write(&mut self, record: &SerializedRecord) -> Result<(), Error> { - self.stream + async fn write(stream: &mut StreamType, record: &SerializedRecord) -> Result<(), Error> { + stream .write_all(record.record.chunk()) .await .map_err(|e| Error::WriteFailed(e.to_string()))?; - let received_ack = self.read_ack().await?; + let received_ack = Self::read_ack(stream).await?; if received_ack.ack != record.chunk { warn!( @@ -175,15 +214,14 @@ where Ok(()) } - async fn read_ack(&mut self) -> Result { + async fn read_ack(stream: &mut StreamType) -> Result { let mut buf = bytes::BytesMut::with_capacity(64); loop { if let Ok(ack) = rmp_serde::from_slice::(&buf) { return Ok(ack); } - if self - .stream + if stream .read_buf(&mut buf) .await .map_err(|e| Error::ReadFailed(e.to_string()))? @@ -194,3 +232,36 @@ where } } } + +#[async_trait] +pub trait Connectable { + async fn connect(&self) -> AnyhowResult; +} + +#[derive(Debug)] +pub struct TCPConnectionConfig { + pub addr: std::net::SocketAddr, + pub timeout: Duration, +} + +#[async_trait] +impl Connectable for TCPConnectionConfig { + async fn connect(&self) -> AnyhowResult { + let stream = timeout(self.timeout, TcpStream::connect(self.addr)).await??; + Ok(stream) + } +} + +#[derive(Debug)] +pub struct UnixSocketConfig { + pub path: PathBuf, + pub timeout: Duration, +} + +#[async_trait] +impl Connectable for UnixSocketConfig { + async fn connect(&self) -> AnyhowResult { + let stream = timeout(self.timeout, UnixStream::connect(self.path.as_path())).await??; + Ok(stream) + } +}