Skip to content

Commit

Permalink
moved stream id back into Frame
Browse files Browse the repository at this point in the history
refs #69
  • Loading branch information
krojew committed Dec 9, 2021
1 parent c2e5765 commit e1de5ff
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 31 deletions.
47 changes: 35 additions & 12 deletions cassandra-protocol/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,49 @@ pub type StreamId = i16;
pub struct ParsedFrame {
/// How many bytes from the buffer have been read.
pub frame_len: usize,
/// Stream id associated with given frame
pub stream_id: StreamId,
/// The parsed frame.
pub frame: Frame,
}

#[derive(Derivative, Clone, PartialEq, Ord, PartialOrd, Eq, Hash, Constructor)]
#[derive(Derivative, Clone, PartialEq, Ord, PartialOrd, Eq, Hash)]
#[derivative(Debug)]
pub struct Frame {
pub version: Version,
pub direction: Direction,
pub flags: Flags,
pub opcode: Opcode,
pub stream_id: StreamId,
#[derivative(Debug = "ignore")]
pub body: Vec<u8>,
pub tracing_id: Option<Uuid>,
pub warnings: Vec<String>,
}

impl Frame {
#[inline]
#[allow(clippy::too_many_arguments)]
pub fn new(
version: Version,
direction: Direction,
flags: Flags,
opcode: Opcode,
stream_id: StreamId,
body: Vec<u8>,
tracing_id: Option<Uuid>,
warnings: Vec<String>,
) -> Self {
Frame {
version,
direction,
flags,
opcode,
stream_id,
body,
tracing_id,
warnings,
}
}

#[inline]
pub fn request_body(&self) -> error::Result<RequestBody> {
RequestBody::try_from(self.body.as_slice(), self.opcode)
Expand Down Expand Up @@ -156,24 +179,20 @@ impl Frame {

Ok(ParsedFrame::new(
frame_len,
stream_id,
Frame {
version,
direction,
flags,
opcode,
stream_id,
body,
tracing_id,
warnings,
},
))
}

pub fn encode_with(
&self,
stream_id: StreamId,
compressor: Compression,
) -> error::Result<Vec<u8>> {
pub fn encode_with(&self, compressor: Compression) -> error::Result<Vec<u8>> {
let combined_version_byte = u8::from(self.version) | u8::from(self.direction);
let flag_byte = self.flags.bits();
let opcode_byte = u8::from(self.opcode);
Expand All @@ -182,7 +201,7 @@ impl Frame {

v.push(combined_version_byte);
v.push(flag_byte);
v.extend_from_slice(&stream_id.to_be_bytes());
v.extend_from_slice(&self.stream_id.to_be_bytes());
v.push(opcode_byte);

if compressor.is_compressed() {
Expand Down Expand Up @@ -458,7 +477,7 @@ mod tests {
"encoded body did not match frames body"
);

let encoded_frame = frame.encode_with(0, Compression::None).unwrap();
let encoded_frame = frame.encode_with(Compression::None).unwrap();
assert_eq!(
raw_frame, &encoded_frame,
"encoded frame did not match expected raw frame"
Expand All @@ -482,7 +501,7 @@ mod tests {
"encoded body did not match frames body"
);

let encoded_frame = frame.encode_with(0, Compression::None).unwrap();
let encoded_frame = frame.encode_with(Compression::None).unwrap();
assert_eq!(
raw_frame, &encoded_frame,
"encoded frame did not match expected raw frame"
Expand Down Expand Up @@ -516,6 +535,7 @@ mod tests {
direction: Direction::Request,
flags: Flags::empty(),
opcode: Opcode::Ready,
stream_id: 0,
body: vec![],
tracing_id: None,
warnings: vec![],
Expand All @@ -534,6 +554,7 @@ mod tests {
direction: Direction::Request,
flags: Flags::empty(),
opcode: Opcode::Query,
stream_id: 0,
body: vec![0, 0, 0, 4, 98, 108, 97, 104, 0, 0, 64],
tracing_id: None,
warnings: vec![],
Expand Down Expand Up @@ -564,6 +585,7 @@ mod tests {
direction: Direction::Request,
flags: Flags::empty(),
opcode: Opcode::Query,
stream_id: 0,
body: vec![
0, 0, 0, 10, 115, 111, 109, 101, 32, 113, 117, 101, 114, 121, 0, 8, 1, 0, 2, 0, 0,
0, 3, 1, 2, 3, 255, 255, 255, 255,
Expand Down Expand Up @@ -596,6 +618,7 @@ mod tests {
direction: Direction::Request,
flags: Flags::empty(),
opcode: Opcode::Query,
stream_id: 0,
body: vec![],
tracing_id: None,
warnings: vec![],
Expand Down
1 change: 1 addition & 0 deletions cassandra-protocol/src/frame/frame_auth_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl Frame {
direction,
Flags::empty(),
opcode,
0,
body.serialize_to_vec(),
None,
vec![],
Expand Down
2 changes: 2 additions & 0 deletions cassandra-protocol/src/frame/frame_batch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use derive_more::Constructor;
use std::convert::TryInto;
use std::io::{Cursor, Read};

Expand Down Expand Up @@ -215,6 +216,7 @@ impl Frame {
direction,
flags,
opcode,
0,
query.serialize_to_vec(),
None,
vec![],
Expand Down
1 change: 1 addition & 0 deletions cassandra-protocol/src/frame/frame_execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl Frame {
direction,
flags,
opcode,
0,
body.serialize_to_vec(),
None,
vec![],
Expand Down
1 change: 1 addition & 0 deletions cassandra-protocol/src/frame/frame_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl Frame {
direction,
Flags::empty(),
opcode,
0,
body.serialize_to_vec(),
None,
vec![],
Expand Down
1 change: 1 addition & 0 deletions cassandra-protocol/src/frame/frame_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl Frame {
direction,
flags,
opcode,
0,
body.serialize_to_vec(),
None,
vec![],
Expand Down
1 change: 1 addition & 0 deletions cassandra-protocol/src/frame/frame_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl Frame {
direction,
flags,
opcode,
0,
body.serialize_to_vec(),
None,
vec![],
Expand Down
1 change: 1 addition & 0 deletions cassandra-protocol/src/frame/frame_register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl Frame {
direction,
Flags::empty(),
opcode,
0,
register_body.serialize_to_vec(),
None,
vec![],
Expand Down
1 change: 1 addition & 0 deletions cassandra-protocol/src/frame/frame_startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl Frame {
direction,
Flags::empty(),
opcode,
0,
body.serialize_to_vec(),
None,
vec![],
Expand Down
22 changes: 9 additions & 13 deletions cdrs-tokio/src/frame_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use tokio::io::AsyncReadExt;
use cassandra_protocol::compression::Compression;
use cassandra_protocol::error;
use cassandra_protocol::frame::frame_response::ResponseBody;
use cassandra_protocol::frame::{
Direction, Flags, Frame, Opcode, StreamId, Version, LENGTH_LEN, STREAM_LEN,
};
use cassandra_protocol::frame::{Direction, Flags, Frame, Opcode, Version, LENGTH_LEN, STREAM_LEN};
use cassandra_protocol::types::data_serialization_types::decode_timeuuid;
use cassandra_protocol::types::{
from_cursor_string_list, try_i16_from_bytes, try_i32_from_bytes, UUID_LEN,
Expand All @@ -16,7 +14,7 @@ use cassandra_protocol::types::{
async fn parse_raw_frame<T: AsyncReadExt + Unpin>(
cursor: &mut T,
compressor: Compression,
) -> error::Result<(StreamId, Frame)> {
) -> error::Result<Frame> {
let mut version_bytes = [0; Version::BYTE_LENGTH];
let mut flag_bytes = [0; Flags::BYTE_LENGTH];
let mut opcode_bytes = [0; Opcode::BYTE_LENGTH];
Expand Down Expand Up @@ -76,31 +74,29 @@ async fn parse_raw_frame<T: AsyncReadExt + Unpin>(
direction,
flags,
opcode,
stream_id,
body,
tracing_id,
warnings,
};

Ok((stream_id, frame))
Ok(frame)
}

pub async fn parse_frame<T: AsyncReadExt + Unpin>(
cursor: &mut T,
compressor: Compression,
) -> error::Result<(StreamId, Frame)> {
let (stream_id, frame) = parse_raw_frame(cursor, compressor).await?;
convert_frame_into_result(stream_id, frame)
) -> error::Result<Frame> {
let frame = parse_raw_frame(cursor, compressor).await?;
convert_frame_into_result(frame)
}

fn convert_frame_into_result(
stream_id: StreamId,
frame: Frame,
) -> error::Result<(StreamId, Frame)> {
fn convert_frame_into_result(frame: Frame) -> error::Result<Frame> {
match frame.opcode {
Opcode::Error => frame.response_body().and_then(|err| match err {
ResponseBody::Error(err) => Err(error::Error::Server(err)),
_ => unreachable!(),
}),
_ => Ok((stream_id, frame)),
_ => Ok(frame),
}
}
12 changes: 6 additions & 6 deletions cdrs-tokio/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ impl AsyncTransport {

// startup message is never compressed
let data = if frame.opcode != Opcode::Startup {
frame.encode_with(0, self.compression)?
frame.encode_with(self.compression)?
} else {
frame.encode_with(0, Compression::None)?
frame.encode_with(Compression::None)?
};

self.write_sender
Expand Down Expand Up @@ -313,8 +313,8 @@ impl AsyncTransport {
loop {
let result = parse_frame(&mut read_half, compression).await;
match result {
Ok((stream_id, frame)) => {
if stream_id >= 0 {
Ok(frame) => {
if frame.stream_id >= 0 {
// in case we get a SetKeyspace result, we need to store current keyspace
// checks are done manually for speed
if frame.opcode == Opcode::Result {
Expand All @@ -333,8 +333,8 @@ impl AsyncTransport {
}

// normal response to query
response_handler_map.send_response(stream_id, Ok(frame))?;
} else if stream_id == EVENT_STREAM_ID {
response_handler_map.send_response(frame.stream_id, Ok(frame))?;
} else if frame.stream_id == EVENT_STREAM_ID {
// server event
if let Some(event_handler) = &event_handler {
let _ = event_handler.send(frame).await;
Expand Down

0 comments on commit e1de5ff

Please sign in to comment.