From 091e57a6c0085e2ccfcc8477cc1b322f1ffef60f Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Wed, 18 Dec 2024 07:57:54 +0100 Subject: [PATCH] Catalog view: fetch StoreInfo from the catalog and do the catalog data enrichment on the viewer side (#8504) ### What Rerun Data Platform now supports simple filtering by recording ids and also stores (some of the) fields required to create StoreInfo, hence we can fetch it from the catalog. Also, instead of doing all the arrow data manipulation to make DataframePart convertible to Rerun Chunk on the ReDap side, we now do it on the viewer side. --- crates/store/re_grpc_client/src/lib.rs | 177 ++++++++++++++++++++++--- 1 file changed, 160 insertions(+), 17 deletions(-) diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 5b6f553ee428..f324862dc3a8 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -5,6 +5,7 @@ mod address; pub use address::{InvalidRedapAddress, RedapAddress}; use re_chunk::external::arrow2; use re_log_types::external::re_types_core::ComponentDescriptor; +use re_protos::remote_store::v0::CatalogFilter; use re_types::blueprint::archetypes::{ContainerBlueprint, ViewportBlueprint}; use re_types::blueprint::archetypes::{ViewBlueprint, ViewContents}; use re_types::blueprint::components::{ContainerKind, RootContainer}; @@ -16,6 +17,7 @@ use url::Url; // ---------------------------------------------------------------------------- use std::error::Error; +use std::sync::Arc; use arrow2::array::Utf8Array as Arrow2Utf8Array; use arrow2::datatypes::Field as Arrow2Field; @@ -175,6 +177,43 @@ async fn stream_recording_async( StorageNodeClient::new(tonic_client).max_decoding_message_size(usize::MAX) }; + re_log::debug!("Fetching catalog data for {recording_id}…"); + + let resp = client + .query_catalog(QueryCatalogRequest { + column_projection: None, // fetch all columns + filter: Some(CatalogFilter { + recording_ids: vec![RecordingId { + id: recording_id.clone(), + }], + }), + }) + .await + .map_err(TonicStatusError)? + .into_inner() + .filter_map(|resp| { + resp.and_then(|r| { + decode(r.encoder_version(), &r.payload) + .map_err(|err| tonic::Status::internal(err.to_string())) + }) + .transpose() + }) + .collect::, tonic::Status>>() + .await + .map_err(TonicStatusError)?; + + if resp.len() != 1 || resp[0].num_rows() != 1 { + return Err(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: format!( + "expected exactly one recording with id {recording_id}, got {}", + resp.len() + ), + })); + } + + let store_info = store_info_from_catalog_chunk(&resp[0], &recording_id)?; + let store_id = store_info.store_id.clone(); + re_log::debug!("Fetching {recording_id}…"); let mut resp = client @@ -196,19 +235,6 @@ async fn stream_recording_async( drop(client); - // TODO(zehiko) - we need a separate gRPC endpoint for fetching Store info REDAP #85 - let store_id = StoreId::from_string(StoreKind::Recording, recording_id.clone()); - - let store_info = StoreInfo { - application_id: ApplicationId::from("redap_recording"), - store_id: store_id.clone(), - cloned_from: None, - is_official_example: false, - started: Time::now(), - store_source: StoreSource::Unknown, - store_version: None, - }; - // We need a whole StoreInfo here. if tx .send(LogMsg::SetStoreInfo(SetStoreInfo { @@ -242,6 +268,51 @@ async fn stream_recording_async( Ok(()) } +fn store_info_from_catalog_chunk( + tc: &TransportChunk, + recording_id: &str, +) -> Result { + let store_id = StoreId::from_string(StoreKind::Recording, recording_id.to_owned()); + + let (_field, data) = tc + .components() + .find(|(f, _)| f.name == "application_id") + .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: "no application_id field found".to_owned(), + }))?; + let app_id = data + .as_any() + .downcast_ref::>() + .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: format!("application_id must be a utf8 array: {:?}", tc.schema), + }))? + .value(0); + + let (_field, data) = tc + .components() + .find(|(f, _)| f.name == "start_time") + .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: "no start_time field found".to_owned(), + }))?; + let start_time = data + .as_any() + .downcast_ref::() + .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: format!("start_time must be an int64 array: {:?}", tc.schema), + }))? + .value(0); + + Ok(StoreInfo { + application_id: ApplicationId::from(app_id), + store_id: store_id.clone(), + cloned_from: None, + is_official_example: false, + started: Time::from_ns_since_epoch(start_time), + store_source: StoreSource::Unknown, + store_version: None, + }) +} + async fn stream_catalog_async( tx: re_smart_channel::Sender, redap_endpoint: Url, @@ -318,16 +389,88 @@ async fn stream_catalog_async( re_log::info!("Starting to read..."); while let Some(result) = resp.next().await { - let mut tc = result.map_err(TonicStatusError)?; - // received TransportChunk doesn't have ChunkId, hence we need to add it before converting - // to Chunk + let input = result.map_err(TonicStatusError)?; + + // Catalog received from the ReDap server isn't suitable for direct conversion to a Rerun Chunk: + // - conversion expects "data" columns to be ListArrays, hence we need to convert any individual row column data to ListArray + // - conversion expects the input TransportChunk to have a ChunkId so we need to add that piece of metadata + + let mut fields = Vec::new(); + let mut arrays = Vec::new(); + // add the (row id) control field + let (row_id_field, row_id_data) = input.controls().next().ok_or( + StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: "no control field found".to_owned(), + }), + )?; + + fields.push( + Arrow2Field::new( + RowId::name().to_string(), // need to rename to Rerun Chunk expected control field + row_id_field.data_type().clone(), + false, /* not nullable */ + ) + .with_metadata(TransportChunk::field_metadata_control_column()), + ); + arrays.push(row_id_data.clone()); + + // next add any timeline field + for (field, data) in input.timelines() { + fields.push(field.clone()); + arrays.push(data.clone()); + } + + // now add all the 'data' fields - we slice each column array into individual arrays and then convert the whole lot into a ListArray + for (field, data) in input.components() { + let data_field_inner = + Arrow2Field::new("item", field.data_type().clone(), true /* nullable */); + + let data_field = Arrow2Field::new( + field.name.clone(), + arrow2::datatypes::DataType::List(Arc::new(data_field_inner.clone())), + false, /* not nullable */ + ) + .with_metadata(TransportChunk::field_metadata_data_column()); + + let mut sliced: Vec> = Vec::new(); + for idx in 0..data.len() { + let mut array = data.clone(); + array.slice(idx, 1); + sliced.push(array); + } + + let data_arrays = sliced.iter().map(|e| Some(e.as_ref())).collect::>(); + #[allow(clippy::unwrap_used)] // we know we've given the right field type + let data_field_array: arrow2::array::ListArray = + re_chunk::util::arrays_to_list_array( + data_field_inner.data_type().clone(), + &data_arrays, + ) + .unwrap(); + + fields.push(data_field); + arrays.push(Box::new(data_field_array)); + } + + let mut schema = arrow2::datatypes::Schema::from(fields); + schema.metadata.insert( + TransportChunk::CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(), + "catalog".to_owned(), + ); + + // modified and enriched TransportChunk + let mut tc = TransportChunk { + schema, + data: arrow2::chunk::Chunk::new(arrays), + }; + tc.schema.metadata.insert( TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(), ChunkId::new().to_string(), ); let mut chunk = Chunk::from_transport(&tc)?; - // enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know) + // finally, enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know) // and the recording id (that we have in the catalog data) let host = redap_endpoint .host()