Skip to content

Commit

Permalink
Merge pull request #55 from danielsig727/reconnect
Browse files Browse the repository at this point in the history
feat: support max_connection_lifetime and reconnection
  • Loading branch information
johnmanjiro13 authored Dec 2, 2024
2 parents c2cdcd1 + 835bc1c commit 12f359a
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 45 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ include = [
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
async-trait = "0.1"
base64 = "0.21.5"
bytes = { version = "1.4.0", features = ["serde"] }
chrono = "0.4.26"
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,11 @@ If the number of retries become larger than this value, the write/send operation

The maximum duration of wait between retries, in milliseconds. If calculated retry wait is larger than this value, operation will fail.
The default is 60,000 (60 seconds).

### max_connection_lifetime

The maximum lifetime of a connection before reconnection is attempted.

Note that reconnection is only triggered when new log lines are sent.
If no new log lines are received within this timeframe, the connection will remain open, even if it's older than the value.
The default is 0 (no reconnection).
86 changes: 55 additions & 31 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result as AnyhowResult;
use base64::{engine::general_purpose, Engine};
use tokio::{
net::{TcpStream, UnixStream},
sync::broadcast::{channel, Sender},
time::timeout,
};
use tokio::sync::broadcast::{channel, Sender};
use uuid::Uuid;

use crate::record::Map;
use crate::worker::{Message, Options, Record, RetryConfig, Worker};
use crate::worker::{
Message, Options, Record, RetryConfig, TCPConnectionConfig, UnixSocketConfig, Worker,
};

#[derive(Debug, Clone)]
pub struct SendError {
Expand Down Expand Up @@ -65,6 +65,12 @@ pub struct Config {
/// If calculated retry wait is larger than this value, operation will fail.
/// The default is 60,000 (60 seconds).
pub max_retry_wait: u64,
/// The maximum lifetime of a connection before reconnection is attempted.
/// Note that reconnection is only triggered when new log lines are sent.
/// If no new log lines are received within this timeframe, the connection
/// will remain open, even if it's older than `max_connection_lifetime`.
/// The default is 0 (no reconnection).
pub max_connection_lifetime: Duration,
}

impl Default for Config {
Expand All @@ -74,6 +80,7 @@ impl Default for Config {
retry_wait: 500,
max_retry: 10,
max_retry_wait: 60000,
max_connection_lifetime: Duration::from_secs(0),
}
}
}
Expand All @@ -91,44 +98,61 @@ pub struct Client {

impl Client {
/// Connect to the fluentd server using TCP and create a worker with tokio::spawn.
pub async fn new_tcp(addr: SocketAddr, config: &Config) -> tokio::io::Result<Client> {
let stream = timeout(config.timeout, TcpStream::connect(addr)).await??;
pub async fn new_tcp(addr: SocketAddr, config: &Config) -> AnyhowResult<Client> {
let (sender, receiver) = channel(1024);

let config = config.clone();
let _ = tokio::spawn(async move {
let mut worker = Worker::new(
stream,
receiver,
RetryConfig {
initial_wait: config.retry_wait,
max: config.max_retry,
max_wait: config.max_retry_wait,
},
);
worker.run().await
let stream_config = Arc::new(TCPConnectionConfig {
addr: addr.to_owned(),
timeout: config.timeout,
});
// create the worker --
// new() will try to establish an connection, so it returns error if connection,
// so it returns error upon connection error
let mut worker = Worker::new(
stream_config,
config.max_connection_lifetime,
receiver,
RetryConfig {
initial_wait: config.retry_wait,
max: config.max_retry,
max_wait: config.max_retry_wait,
},
)
.await?;
let _ = tokio::spawn(async move { worker.run().await });

Ok(Self { sender })
}

/// Connect to the fluentd server using unix domain socket and create a worker with tokio::spawn.
pub async fn new_unix<P: AsRef<Path>>(path: P, config: &Config) -> tokio::io::Result<Client> {
let stream = timeout(config.timeout, UnixStream::connect(path)).await??;
pub async fn new_unix<P: AsRef<Path> + std::marker::Send>(
path: P,
config: &Config,
) -> AnyhowResult<Client> {
let (sender, receiver) = channel(1024);

let config = config.clone();
let stream_config = Arc::new(UnixSocketConfig {
path: path.as_ref().to_path_buf(),
timeout: config.timeout,
});
// create the worker --
// new() will try to establish an connection, so it returns error if connection,
// so it returns error upon connection error
let mut worker = Worker::new(
stream_config,
config.max_connection_lifetime,
receiver,
RetryConfig {
initial_wait: config.retry_wait,
max: config.max_retry,
max_wait: config.max_retry_wait,
},
)
.await?;
let _ = tokio::spawn(async move {
let mut worker = Worker::new(
stream,
receiver,
RetryConfig {
initial_wait: config.retry_wait,
max: config.max_retry,
max_wait: config.max_retry_wait,
},
);
worker.run().await
worker.run().await;
});

Ok(Self { sender })
Expand Down
99 changes: 85 additions & 14 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
use std::cell::Cell;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;

use anyhow::Result as AnyhowResult;
use async_trait::async_trait;
use bytes::{Buf, BufMut};
use log::warn;
use log::{debug, warn};
use rmp_serde::Serializer;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpStream, UnixStream},
sync::broadcast::{error::RecvError, Receiver},
time::Duration,
time::{timeout, Duration},
};

use crate::record::Map;
Expand Down Expand Up @@ -84,7 +92,10 @@ pub struct RetryConfig {
}

pub struct Worker<StreamType> {
stream: StreamType,
stream_config: Arc<dyn Connectable<StreamType> + Send + Sync>,
max_connection_lifetime: Duration,
stream: Cell<StreamType>,
last_connection_time: Cell<Instant>,
receiver: Receiver<Message>,
retry_config: RetryConfig,
}
Expand All @@ -93,12 +104,21 @@ impl<StreamType> Worker<StreamType>
where
StreamType: AsyncReadExt + AsyncWriteExt + Unpin,
{
pub fn new(stream: StreamType, receiver: Receiver<Message>, retry_config: RetryConfig) -> Self {
Self {
stream,
pub async fn new(
stream_config: Arc<dyn Connectable<StreamType> + Send + Sync>,
max_connection_lifetime: Duration,
receiver: Receiver<Message>,
retry_config: RetryConfig,
) -> AnyhowResult<Self> {
let stream = stream_config.connect().await?;
Ok(Self {
stream_config,
max_connection_lifetime,
stream: Cell::new(stream),
last_connection_time: Cell::new(Instant::now()),
receiver,
retry_config,
}
})
}

pub async fn run(&mut self) {
Expand Down Expand Up @@ -140,7 +160,26 @@ where
for i in 0..self.retry_config.max as i32 {
tokio::time::sleep(wait_time).await;

match self.write(record).await {
// reconnect when the lifetime is reached
if !self.max_connection_lifetime.is_zero()
&& self.last_connection_time.get().elapsed() >= self.max_connection_lifetime
{
debug!("attempting to re-establish connection");
match self.stream_config.connect().await {
Ok(new_stream) => {
self.stream.replace(new_stream);
self.last_connection_time.replace(Instant::now());
}
Err(err) => {
warn!(
"failed to reconnect. Will try again upon the next try-write: {}",
err
);
}
}
}

match Self::write(&mut self.stream.get_mut(), record).await {
Ok(_) => return Ok(()),
Err(Error::ConnectionClosed) => return Err(Error::ConnectionClosed),
Err(_) => {}
Expand All @@ -157,13 +196,13 @@ where
Err(Error::MaxRetriesExceeded)
}

async fn write(&mut self, record: &SerializedRecord) -> Result<(), Error> {
self.stream
async fn write(stream: &mut StreamType, record: &SerializedRecord) -> Result<(), Error> {
stream
.write_all(record.record.chunk())
.await
.map_err(|e| Error::WriteFailed(e.to_string()))?;

let received_ack = self.read_ack().await?;
let received_ack = Self::read_ack(stream).await?;

if received_ack.ack != record.chunk {
warn!(
Expand All @@ -175,15 +214,14 @@ where
Ok(())
}

async fn read_ack(&mut self) -> Result<AckResponse, Error> {
async fn read_ack(stream: &mut StreamType) -> Result<AckResponse, Error> {
let mut buf = bytes::BytesMut::with_capacity(64);
loop {
if let Ok(ack) = rmp_serde::from_slice::<AckResponse>(&buf) {
return Ok(ack);
}

if self
.stream
if stream
.read_buf(&mut buf)
.await
.map_err(|e| Error::ReadFailed(e.to_string()))?
Expand All @@ -194,3 +232,36 @@ where
}
}
}

#[async_trait]
pub trait Connectable<T> {
async fn connect(&self) -> AnyhowResult<T>;
}

#[derive(Debug)]
pub struct TCPConnectionConfig {
pub addr: std::net::SocketAddr,
pub timeout: Duration,
}

#[async_trait]
impl Connectable<TcpStream> for TCPConnectionConfig {
async fn connect(&self) -> AnyhowResult<TcpStream> {
let stream = timeout(self.timeout, TcpStream::connect(self.addr)).await??;
Ok(stream)
}
}

#[derive(Debug)]
pub struct UnixSocketConfig {
pub path: PathBuf,
pub timeout: Duration,
}

#[async_trait]
impl Connectable<UnixStream> for UnixSocketConfig {
async fn connect(&self) -> AnyhowResult<UnixStream> {
let stream = timeout(self.timeout, UnixStream::connect(self.path.as_path())).await??;
Ok(stream)
}
}

0 comments on commit 12f359a

Please sign in to comment.