Skip to content

Commit

Permalink
Add new stream_function_logs endpoint (#23612)
Browse files Browse the repository at this point in the history
This endpoint will eventually be used to allow showing live log lines for actions in the dashboard and via `npx convex logs`

It returns "Completion" and "Progress" events, where "Progress" events contain log lines. Log lines from a function might appear in the "Completion" (in the case of queries and mutations) or in the "Progress" events (in the case of actions), but will not appear twice.

GitOrigin-RevId: aa5fa0965fad1f2dbb98f6c9911533f82688416b
  • Loading branch information
sshader authored and Convex, Inc. committed Mar 20, 2024
1 parent 27ab48f commit 3d83c84
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 61 deletions.
100 changes: 89 additions & 11 deletions crates/application/src/function_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,17 @@ pub struct FunctionExecutionProgress {
pub log_lines: LogLines,

pub event_source: FunctionEventSource,
pub function_start_timestamp: UnixTimestamp,
}

impl HeapSize for FunctionExecutionProgress {
fn heap_size(&self) -> usize {
self.log_lines.heap_size() + self.event_source.heap_size()
}
}

impl FunctionExecutionProgress {
fn console_log_events(self, unix_timestamp: UnixTimestamp) -> Vec<LogEvent> {
fn console_log_events(self) -> Vec<LogEvent> {
self.log_lines
.into_iter()
.map(|line| {
Expand All @@ -291,7 +298,7 @@ impl FunctionExecutionProgress {
topic: LogTopic::Console,
source: EventSource::Function(self.event_source.clone()),
payload,
timestamp: unix_timestamp,
timestamp: self.function_start_timestamp,
})
})
.filter_map(|event| match event {
Expand All @@ -306,6 +313,21 @@ impl FunctionExecutionProgress {
}
}

#[derive(Debug, Clone)]
pub enum FunctionExecutionPart {
Completion(FunctionExecution),
Progress(FunctionExecutionProgress),
}

impl HeapSize for FunctionExecutionPart {
fn heap_size(&self) -> usize {
match self {
FunctionExecutionPart::Completion(i) => i.heap_size(),
FunctionExecutionPart::Progress(i) => i.heap_size(),
}
}
}

#[derive(Debug, Clone)]
pub struct ActionCompletion {
pub outcome: ActionOutcome,
Expand Down Expand Up @@ -510,6 +532,7 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
pub fn new(rt: RT, usage_tracking: UsageCounter, log_manager: Arc<dyn LogSender>) -> Self {
let inner = Inner {
rt: rt.clone(),
num_execution_completions: 0,
log: WithHeapSize::default(),
log_waiters: vec![].into(),
log_manager,
Expand Down Expand Up @@ -884,6 +907,9 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
let mut summary = UdfMetricSummary::default();
for i in first_entry_ix..inner.log.len() {
let (_, entry) = &inner.log[i];
let FunctionExecutionPart::Completion(entry) = entry else {
continue;
};
let function_summary = summary
.function_calls
.entry(entry.caller.clone())
Expand Down Expand Up @@ -916,7 +942,48 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
if first_entry_ix < inner.log.len() {
let entries = (first_entry_ix..inner.log.len())
.map(|i| &inner.log[i])
.map(|(_, entry)| entry.clone())
.filter_map(|(_, entry)| match entry {
FunctionExecutionPart::Completion(completion) => {
Some(completion.clone())
},
_ => None,
})
.collect();
let (new_cursor, _) = inner.log.back().unwrap();
return (entries, *new_cursor);
}
let (tx, rx) = oneshot::channel();
inner.log_waiters.push(tx);
rx
};
let _ = rx.await;
}
}

pub async fn stream_parts(&self, cursor: CursorMs) -> (Vec<FunctionExecutionPart>, CursorMs) {
loop {
let rx = {
let mut inner = self.inner.lock();
let first_entry_ix = inner.log.partition_point(|(ts, _)| *ts <= cursor);
if first_entry_ix < inner.log.len() {
let entries = (first_entry_ix..inner.log.len())
.map(|i| &inner.log[i])
.map(|(_, entry)| match entry {
FunctionExecutionPart::Completion(c) => {
let with_stripped_log_lines = match c.udf_type {
UdfType::Query | UdfType::Mutation => c.clone(),
UdfType::Action | UdfType::HttpAction => {
let mut cloned = c.clone();
cloned.log_lines = vec![].into();
cloned
},
};
FunctionExecutionPart::Completion(with_stripped_log_lines)
},
FunctionExecutionPart::Progress(c) => {
FunctionExecutionPart::Progress(c.clone())
},
})
.collect();
let (new_cursor, _) = inner.log.back().unwrap();
return (entries, *new_cursor);
Expand All @@ -942,7 +1009,8 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
struct Inner<RT: Runtime> {
rt: RT,

log: WithHeapSize<VecDeque<(CursorMs, FunctionExecution)>>,
log: WithHeapSize<VecDeque<(CursorMs, FunctionExecutionPart)>>,
num_execution_completions: usize,
log_waiters: WithHeapSize<Vec<oneshot::Sender<()>>>,
log_manager: Arc<dyn LogSender>,

Expand Down Expand Up @@ -983,9 +1051,14 @@ impl<RT: Runtime> Inner<RT> {

self.log_manager.send_logs(log_events);

self.log.push_back((next_time, execution));
while self.log.len() > *MAX_UDF_EXECUTION {
self.log.pop_front();
self.log
.push_back((next_time, FunctionExecutionPart::Completion(execution)));
self.num_execution_completions += 1;
while self.num_execution_completions > *MAX_UDF_EXECUTION {
let front = self.log.pop_front();
if let Some((_, FunctionExecutionPart::Completion(_))) = front {
self.num_execution_completions -= 1;
}
}
for waiter in self.log_waiters.drain(..) {
let _ = waiter.send(());
Expand All @@ -997,18 +1070,23 @@ impl<RT: Runtime> Inner<RT> {
&mut self,
log_lines: LogLines,
event_source: FunctionEventSource,
unix_timestamp: UnixTimestamp,
function_start_timestamp: UnixTimestamp,
) -> anyhow::Result<()> {
let next_time = self.next_time()?;
let progress = FunctionExecutionProgress {
log_lines,
event_source,
function_start_timestamp,
};
// TODO: this should ideally use a timestamp on the log lines themselves, but
// for now use the start timestamp of the function
let log_events = progress.console_log_events(unix_timestamp);
let log_events = progress.clone().console_log_events();
self.log_manager.send_logs(log_events);
// TODO: add these to the UDF execution log so we can stream them to the
// dashboard
self.log
.push_back((next_time, FunctionExecutionPart::Progress(progress)));
for waiter in self.log_waiters.drain(..) {
let _ = waiter.send(());
}
Ok(())
}

Expand Down
16 changes: 15 additions & 1 deletion crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ use file_storage::{
FileStorage,
FileStream,
};
use function_log::FunctionExecution;
use function_log::{
FunctionExecution,
FunctionExecutionPart,
};
use function_runner::FunctionRunner;
use futures::{
channel::mpsc,
Expand Down Expand Up @@ -2311,6 +2314,17 @@ impl<RT: Runtime> Application<RT> {
Ok(self.function_log.stream(cursor).await)
}

pub async fn stream_function_logs(
&self,
identity: Identity,
cursor: CursorMs,
) -> anyhow::Result<(Vec<FunctionExecutionPart>, CursorMs)> {
if !(identity.is_admin() || identity.is_system()) {
anyhow::bail!(unauthorized_error("stream_function_logs"));
}
Ok(self.function_log.stream_parts(cursor).await)
}

pub async fn cancel_all_jobs(
&self,
udf_path: Option<String>,
Expand Down
7 changes: 7 additions & 0 deletions crates/common/src/log_streaming.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use request_context::RequestContext;
use serde_json::Value as JsonValue;
use value::heap_size::HeapSize;

use crate::{
errors::JsError,
Expand Down Expand Up @@ -204,6 +205,12 @@ pub struct FunctionEventSource {
pub cached: Option<bool>,
}

impl HeapSize for FunctionEventSource {
fn heap_size(&self) -> usize {
self.path.heap_size() + self.udf_type.heap_size() + self.cached.heap_size()
}
}

#[cfg(test)]
mod tests {
use request_context::RequestContext;
Expand Down
7 changes: 7 additions & 0 deletions crates/common/src/types/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use serde::{
Serialize,
};
use sync_types::CanonicalizedUdfPath;
use value::heap_size::HeapSize;

use super::HttpActionRoute;
use crate::version::ClientVersion;
Expand Down Expand Up @@ -71,6 +72,12 @@ impl fmt::Display for UdfType {
}
}

impl HeapSize for UdfType {
fn heap_size(&self) -> usize {
0
}
}

impl From<UdfType> for UdfTypeProto {
fn from(u: UdfType) -> UdfTypeProto {
match u {
Expand Down
Loading

0 comments on commit 3d83c84

Please sign in to comment.