Skip to content

Commit

Permalink
chore(rust): refactor http stream parsing
Browse files Browse the repository at this point in the history
move to struct method, avoid some copying
  • Loading branch information
polvorin committed Aug 30, 2024
1 parent 22d5988 commit 8bcb606
Showing 1 changed file with 129 additions and 119 deletions.
248 changes: 129 additions & 119 deletions implementations/rust/ockam/ockam_api/src/http_auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,118 +122,130 @@ fn body_state(method: &str, headers: &[Header]) -> ockam_core::Result<RequestSta
}
}

fn copy_body(
state: RequestState,
buf: &[u8],
token: &str,
upstream: &str,
) -> ockam_core::Result<(RequestState, Vec<u8>)> {
let mut v = Vec::with_capacity(buf.len());
let mut s = state;
let mut cursor = buf;
loop {
if cursor.is_empty() {
return Ok((s, v));
}
match &mut s {
RequestState::ParsingHeader(prev) => {
let (to_parse, prev_size): (&[u8], usize) = if let Some(b) = prev {
let prev_size = b.len();
b.extend_from_slice(cursor);
(b, prev_size)
} else {
(cursor, 0usize)
};
let mut headers = [httparse::EMPTY_HEADER; 64];
let mut req = httparse::Request::new(&mut headers);
match req.parse(to_parse) {
Ok(httparse::Status::Partial) if prev_size == 0 => {
// No previous buffered, need to copy and own the unparsed data
return Ok((RequestState::ParsingHeader(Some(cursor.to_vec())), v));
}
Ok(httparse::Status::Partial) => {
// There was a previous buffer, and we already added the newly data to it
return Ok((s, v));
}
Ok(httparse::Status::Complete(body_offset)) => {
cursor = &cursor[body_offset - prev_size..];
attach_auth_token_and_serialize_into(&req, token, upstream, &mut v);
s = body_state(req.method.unwrap(), req.headers)?;
}
Err(e) => {
error!("Error parsing header: {:?}", e);
return Err(ockam_core::Error::new(Origin::Transport, Kind::Invalid, e));
}
}
impl RequestState {
/* Parse the incoming data, attaching an Authorization header token to it.
* data is received in chunks, and there is no warranty on what we get on each:
* incomplete requests, multiple requests, etc.
*/
fn process_http_buffer(
&mut self,
buf: &[u8],
token: &str,
upstream: &str,
) -> ockam_core::Result<Vec<u8>> {
let mut acc = Vec::with_capacity(buf.len());
let mut cursor = buf;
loop {
if cursor.is_empty() {
return Ok(acc);
}
RequestState::RemainingBody(remaining) => {
if *remaining <= cursor.len() {
v.extend_from_slice(&cursor[..*remaining]);
cursor = &cursor[*remaining..];
s = RequestState::ParsingHeader(None);
} else {
v.extend_from_slice(cursor);
*remaining -= cursor.len();
return Ok((s, v));
}
}
RequestState::ParsingChunkedHeader(prev) => {
let (to_parse, prev_size): (&[u8], usize) = if let Some(b) = prev {
let prev_size = b.len();
b.extend_from_slice(cursor);
(b, prev_size)
} else {
(cursor, 0usize)
};
match httparse::parse_chunk_size(to_parse) {
Ok(Status::Complete((2, 0))) => {
// this is just a final \r\n. The spec said it should end in a 0-sized
// chunk.. but having seen this on the wild as well.
v.extend_from_slice(&to_parse[..2]);
cursor = &cursor[2 - prev_size..];
s = RequestState::ParsingHeader(None);
}
Ok(Status::Complete((3, 0))) => {
// this is just a proper 0\r\n final chunk.
v.extend_from_slice(&to_parse[..3]);
cursor = &cursor[3 - prev_size..];
// There must be a final \r\n. And no more chunks,
// so just reuse the RemainingBody state for this
s = RequestState::RemainingBody(2);
}
Ok(Status::Complete((pos, chunk_size))) => {
v.extend_from_slice(&to_parse[..pos]);
cursor = &cursor[pos - prev_size..];
let complete_size = chunk_size + 2; //chunks ends in \r\n
s = RequestState::RemainingInChunk(complete_size.try_into().unwrap());
}
Ok(Status::Partial) if prev_size == 0 => {
// No previous buffered, need to copy and own the unparsed data
return Ok((RequestState::ParsingChunkedHeader(Some(cursor.to_vec())), v));
match self {
RequestState::ParsingHeader(prev) => {
let (to_parse, prev_size): (&[u8], usize) = if let Some(b) = prev {
let prev_size = b.len();
b.extend_from_slice(cursor);
(b, prev_size)
} else {
(cursor, 0usize)
};
let mut headers = [httparse::EMPTY_HEADER; 64];
let mut req = httparse::Request::new(&mut headers);
match req.parse(to_parse) {
Ok(httparse::Status::Partial) if prev_size == 0 => {
// No previous buffered, need to copy and own the unparsed data
*self = RequestState::ParsingHeader(Some(cursor.to_vec()));
return Ok(acc);
}
Ok(httparse::Status::Partial) => {
// There was a previous buffer, and we already added the newly data to it
return Ok(acc);
}
Ok(httparse::Status::Complete(body_offset)) => {
cursor = &cursor[body_offset - prev_size..];
attach_auth_token_and_serialize_into(&req, token, upstream, &mut acc);
*self = body_state(req.method.unwrap(), req.headers)?;
}
Err(e) => {
error!("Error parsing header: {:?}", e);
return Err(ockam_core::Error::new(
Origin::Transport,
Kind::Invalid,
e,
));
}
}
Ok(Status::Partial) => {
// There was a previous buffer, and we already added the newly data to it
return Ok((s, v));
}
RequestState::RemainingBody(remaining) => {
if *remaining <= cursor.len() {
acc.extend_from_slice(&cursor[..*remaining]);
cursor = &cursor[*remaining..];
*self = RequestState::ParsingHeader(None);
} else {
acc.extend_from_slice(cursor);
*remaining -= cursor.len();
return Ok(acc);
}
Err(e) => {
error!("Error parsing chunk size: {:?}. Buffer: {:?}", e, prev);
return Err(ockam_core::Error::new(
Origin::Transport,
Kind::Invalid,
format!("Can't parse chunked body {:?}", e),
));
}
RequestState::ParsingChunkedHeader(prev) => {
let (to_parse, prev_size): (&[u8], usize) = if let Some(b) = prev {
let prev_size = b.len();
b.extend_from_slice(cursor);
(b, prev_size)
} else {
(cursor, 0usize)
};
match httparse::parse_chunk_size(to_parse) {
Ok(Status::Complete((2, 0))) => {
// this is just a final \r\n. The spec said it should end in a 0-sized
// chunk.. but having seen this on the wild as well.
acc.extend_from_slice(&to_parse[..2]);
cursor = &cursor[2 - prev_size..];
*self = RequestState::ParsingHeader(None);
}
Ok(Status::Complete((3, 0))) => {
// this is just a proper 0\r\n final chunk.
acc.extend_from_slice(&to_parse[..3]);
cursor = &cursor[3 - prev_size..];
// There must be a final \r\n. And no more chunks,
// so just reuse the RemainingBody state for this
*self = RequestState::RemainingBody(2);
}
Ok(Status::Complete((pos, chunk_size))) => {
acc.extend_from_slice(&to_parse[..pos]);
cursor = &cursor[pos - prev_size..];
let complete_size = chunk_size + 2; //chunks ends in \r\n
*self =
RequestState::RemainingInChunk(complete_size.try_into().unwrap());
}
Ok(Status::Partial) if prev_size == 0 => {
// No previous buffered, need to copy and own the unparsed data
*self = RequestState::ParsingChunkedHeader(Some(cursor.to_vec()));
return Ok(acc);
}
Ok(Status::Partial) => {
// There was a previous buffer, and we already added the newly data to it
return Ok(acc);
}
Err(e) => {
error!("Error parsing chunk size: {:?}. Buffer: {:?}", e, prev);
return Err(ockam_core::Error::new(
Origin::Transport,
Kind::Invalid,
format!("Can't parse chunked body {:?}", e),
));
}
}
}
}
RequestState::RemainingInChunk(size) => {
if cursor.len() >= *size {
v.extend_from_slice(&cursor[..*size]);
cursor = &cursor[*size..];
s = RequestState::ParsingChunkedHeader(None);
} else {
v.extend_from_slice(cursor);
*size -= cursor.len();
return Ok((s, v));
RequestState::RemainingInChunk(size) => {
if cursor.len() >= *size {
acc.extend_from_slice(&cursor[..*size]);
cursor = &cursor[*size..];
*self = RequestState::ParsingChunkedHeader(None);
} else {
acc.extend_from_slice(cursor);
*size -= cursor.len();
return Ok(acc);
}
}
}
}
Expand All @@ -257,13 +269,11 @@ impl PortalInterceptor for HttpAuthInterceptor {
if token.is_none() {
error!("No authorization token available");
}
let (new_state, out) = copy_body(
guard.state.clone(),
let out = guard.state.process_http_buffer(
buffer,
&token.unwrap_or_default(),
&self.upstream,
)?;
guard.state = new_state;
Ok(Some(out))
}
}
Expand Down Expand Up @@ -303,9 +313,9 @@ Transfer-Encoding: gzip, chunked\r\n\r\n\
let mut result = Vec::new();
let mut request_state = RequestState::ParsingHeader(None);
for chunk in data.chunks(size) {
let (next_state, data_out) =
copy_body(request_state, chunk, TOKEN, UPSTREAM).unwrap();
request_state = next_state;
let data_out = request_state
.process_http_buffer(chunk, TOKEN, UPSTREAM)
.unwrap();
result.extend_from_slice(&data_out);
}
assert_eq!(
Expand Down Expand Up @@ -340,9 +350,9 @@ field1=value1&field2=value2",
let mut result = Vec::new();
let mut request_state = RequestState::ParsingHeader(None);
for chunk in data.chunks(size) {
let (next_state, data_out) =
copy_body(request_state, chunk, TOKEN, UPSTREAM).unwrap();
request_state = next_state;
let data_out = request_state
.process_http_buffer(chunk, TOKEN, UPSTREAM)
.unwrap();
result.extend_from_slice(&data_out);
}
assert_eq!(
Expand Down Expand Up @@ -370,9 +380,9 @@ field1=value1&field2=value2",
let mut result = Vec::new();
let mut request_state = RequestState::ParsingHeader(None);
for chunk in data.chunks(size) {
let (next_state, data_out) =
copy_body(request_state, chunk, TOKEN, UPSTREAM).unwrap();
request_state = next_state;
let data_out = request_state
.process_http_buffer(chunk, TOKEN, UPSTREAM)
.unwrap();
result.extend_from_slice(&data_out);
}
assert_eq!(String::from_utf8(result).unwrap(), expected);
Expand Down

0 comments on commit 8bcb606

Please sign in to comment.