Skip to content

Commit

Permalink
fix: TCP refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
can-keklik committed May 25, 2024
1 parent dba5c9b commit b8c9d63
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 48 deletions.
4 changes: 2 additions & 2 deletions lykiadb-connect/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::BytesMut;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use lykiadb_server::net::{CommunicationError, Message, TcpConnection};
use lykiadb_server::net::{CommunicationError, Message};
use lykiadb_server::net::tcp::TcpConnection;
use crate::session::ClientSession;

pub(crate) struct TcpClientSession {
Expand Down
48 changes: 3 additions & 45 deletions lykiadb-server/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use bytes::BytesMut;
pub mod tcp;

use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::{
io::{copy, AsyncReadExt, AsyncWriteExt, BufWriter},
net::TcpStream,
io::{AsyncReadExt, AsyncWriteExt},
};

use crate::runtime::{error::ExecutionError, types::RV};
Expand Down Expand Up @@ -45,45 +45,3 @@ impl From<bson::ser::Error> for CommunicationError {
CommunicationError::BsonError(value)
}
}

pub struct TcpConnection {
pub stream: BufWriter<TcpStream>,
pub read_buffer: BytesMut,
}

impl TcpConnection {
pub fn new(stream: TcpStream) -> TcpConnection {
TcpConnection {
stream: BufWriter::new(stream),
read_buffer: BytesMut::with_capacity(4096),
}
}

pub async fn read(&mut self) -> Result<Option<Message>, CommunicationError> {
loop {
// TODO(vck): Replace .to_vec call with something cheaper
if let Ok(parsed) = bson::from_slice::<Message>(&self.read_buffer.to_vec()) {
self.read_buffer.clear();
return Ok(Some(parsed));
}

if 0 == self.stream.read_buf(&mut self.read_buffer).await? {
if self.read_buffer.is_empty() {
return Ok(None);
} else {
return Err(CommunicationError::GenericError(
"Connection reset by peer".to_owned(),
));
}
}
}
}

pub async fn write(&mut self, message: Message) -> Result<(), CommunicationError> {
let vec = bson::to_vec(&bson::to_bson(&message)?)?;
let mut buffer = vec.as_slice();
copy(&mut buffer, &mut self.stream).await?;
self.stream.flush().await?;
Ok(())
}
}
46 changes: 46 additions & 0 deletions lykiadb-server/src/net/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use bytes::BytesMut;
use tokio::io::{BufWriter, AsyncReadExt, AsyncWriteExt, copy};
use tokio::net::TcpStream;
use crate::net::{CommunicationError, Message};

pub struct TcpConnection {
pub stream: BufWriter<TcpStream>,
pub read_buffer: BytesMut,
}

impl TcpConnection {
pub fn new(stream: TcpStream) -> TcpConnection {
TcpConnection {
stream: BufWriter::new(stream),
read_buffer: BytesMut::with_capacity(4096),
}
}

pub async fn read(&mut self) -> Result<Option<Message>, CommunicationError> {
loop {
// TODO(vck): Replace .to_vec call with something cheaper
if let Ok(parsed) = bson::from_slice::<Message>(&self.read_buffer.to_vec()) {
self.read_buffer.clear();
return Ok(Some(parsed));
}

if 0 == self.stream.read_buf(&mut self.read_buffer).await? {
if self.read_buffer.is_empty() {
return Ok(None);
} else {
return Err(CommunicationError::GenericError(
"Connection reset by peer".to_owned(),
));
}
}
}
}

pub async fn write(&mut self, message: Message) -> Result<(), CommunicationError> {
let vec = bson::to_vec(&bson::to_bson(&message)?)?;
let mut buffer = vec.as_slice();
copy(&mut buffer, &mut self.stream).await?;
self.stream.flush().await?;
Ok(())
}
}
3 changes: 2 additions & 1 deletion lykiadb-server/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use self::std::stdlib;

use crate::lang::parser::Parser;
use crate::lang::tokenizer::scanner::Scanner;
use crate::net::{CommunicationError, TcpConnection, Message, Request, Response};
use crate::net::{CommunicationError, Message, Request, Response};
use crate::net::tcp::TcpConnection;
use crate::runtime::interpreter::Interpreter;
use crate::runtime::types::RV;
use crate::util::{alloc_shared, Shared};
Expand Down

0 comments on commit b8c9d63

Please sign in to comment.