Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
flush when response exceed limit size
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Oct 6, 2023
1 parent e7b848a commit 1935787
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
1 change: 0 additions & 1 deletion sqld/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ impl MakeConnection for MakeWriteProxyConn {
}
}

#[derive(Debug)]
pub struct WriteProxyConnection {
/// Lazily initialized read connection
read_conn: LibSqlConnection<TransparentMethods>,
Expand Down
13 changes: 10 additions & 3 deletions sqld/src/rpc/streaming_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use std::task::{ready, Context, Poll};

use futures_core::Stream;
use prost::Message;
use rusqlite::types::ValueRef;
use tokio::sync::mpsc;
use tonic::{Code, Status};
Expand Down Expand Up @@ -50,6 +51,7 @@ struct StreamResponseBuilder {
request_id: u32,
sender: mpsc::Sender<ExecResp>,
current: Option<ProgramResp>,
current_size: usize,
}

impl StreamResponseBuilder {
Expand All @@ -59,12 +61,15 @@ impl StreamResponseBuilder {
}

fn push(&mut self, step: Step) -> Result<(), QueryResultBuilderError> {
const MAX_RESPONSE_STEPS: usize = 10;
const MAX_RESPONSE_SIZE: usize = bytesize::ByteSize::mb(1).as_u64() as usize;

let current = self.current();
current.steps.push(RespStep { step: Some(step) });
let step = RespStep { step: Some(step) };
let size = step.encoded_len();
current.steps.push(step);
self.current_size += size;

if current.steps.len() > MAX_RESPONSE_STEPS {
if self.current_size >= MAX_RESPONSE_SIZE {
self.flush()?;
}

Expand All @@ -77,6 +82,7 @@ impl StreamResponseBuilder {
request_id: self.request_id,
response: Some(exec_resp::Response::ProgramResp(current)),
};
self.current_size = 0;
self.sender
.blocking_send(resp)
.map_err(|_| QueryResultBuilderError::Internal(anyhow::anyhow!("stream closed")))?;
Expand Down Expand Up @@ -235,6 +241,7 @@ where
request_id,
sender,
current: None,
current_size: 0,
};
let mut fut = conn.execute_program(pgm, authenticated, builder, None);
loop {
Expand Down

0 comments on commit 1935787

Please sign in to comment.