Skip to content

Commit

Permalink
Catalog view: fetch StoreInfo from the catalog and do the catalog dat…
Browse files Browse the repository at this point in the history
…a enrichment on the viewer side (#8504)

### What

Rerun Data Platform now supports simple filtering by recording ids and
also stores (some of the) fields required to create StoreInfo, hence
we can fetch it from the catalog.

Also, instead of doing all the arrow data manipulation to make
DataframePart convertible to Rerun Chunk on the ReDap side, we now
do it on the viewer side.
  • Loading branch information
zehiko authored Dec 18, 2024
1 parent a040ce3 commit 091e57a
Showing 1 changed file with 160 additions and 17 deletions.
177 changes: 160 additions & 17 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod address;
pub use address::{InvalidRedapAddress, RedapAddress};
use re_chunk::external::arrow2;
use re_log_types::external::re_types_core::ComponentDescriptor;
use re_protos::remote_store::v0::CatalogFilter;
use re_types::blueprint::archetypes::{ContainerBlueprint, ViewportBlueprint};
use re_types::blueprint::archetypes::{ViewBlueprint, ViewContents};
use re_types::blueprint::components::{ContainerKind, RootContainer};
Expand All @@ -16,6 +17,7 @@ use url::Url;
// ----------------------------------------------------------------------------

use std::error::Error;
use std::sync::Arc;

use arrow2::array::Utf8Array as Arrow2Utf8Array;
use arrow2::datatypes::Field as Arrow2Field;
Expand Down Expand Up @@ -175,6 +177,43 @@ async fn stream_recording_async(
StorageNodeClient::new(tonic_client).max_decoding_message_size(usize::MAX)
};

re_log::debug!("Fetching catalog data for {recording_id}…");

let resp = client
.query_catalog(QueryCatalogRequest {
column_projection: None, // fetch all columns
filter: Some(CatalogFilter {
recording_ids: vec![RecordingId {
id: recording_id.clone(),
}],
}),
})
.await
.map_err(TonicStatusError)?
.into_inner()
.filter_map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
})
.collect::<Result<Vec<_>, tonic::Status>>()
.await
.map_err(TonicStatusError)?;

if resp.len() != 1 || resp[0].num_rows() != 1 {
return Err(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: format!(
"expected exactly one recording with id {recording_id}, got {}",
resp.len()
),
}));
}

let store_info = store_info_from_catalog_chunk(&resp[0], &recording_id)?;
let store_id = store_info.store_id.clone();

re_log::debug!("Fetching {recording_id}…");

let mut resp = client
Expand All @@ -196,19 +235,6 @@ async fn stream_recording_async(

drop(client);

// TODO(zehiko) - we need a separate gRPC endpoint for fetching Store info REDAP #85
let store_id = StoreId::from_string(StoreKind::Recording, recording_id.clone());

let store_info = StoreInfo {
application_id: ApplicationId::from("redap_recording"),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
started: Time::now(),
store_source: StoreSource::Unknown,
store_version: None,
};

// We need a whole StoreInfo here.
if tx
.send(LogMsg::SetStoreInfo(SetStoreInfo {
Expand Down Expand Up @@ -242,6 +268,51 @@ async fn stream_recording_async(
Ok(())
}

fn store_info_from_catalog_chunk(
tc: &TransportChunk,
recording_id: &str,
) -> Result<StoreInfo, StreamError> {
let store_id = StoreId::from_string(StoreKind::Recording, recording_id.to_owned());

let (_field, data) = tc
.components()
.find(|(f, _)| f.name == "application_id")
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: "no application_id field found".to_owned(),
}))?;
let app_id = data
.as_any()
.downcast_ref::<Arrow2Utf8Array<i32>>()
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: format!("application_id must be a utf8 array: {:?}", tc.schema),
}))?
.value(0);

let (_field, data) = tc
.components()
.find(|(f, _)| f.name == "start_time")
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: "no start_time field found".to_owned(),
}))?;
let start_time = data
.as_any()
.downcast_ref::<arrow2::array::Int64Array>()
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: format!("start_time must be an int64 array: {:?}", tc.schema),
}))?
.value(0);

Ok(StoreInfo {
application_id: ApplicationId::from(app_id),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
started: Time::from_ns_since_epoch(start_time),
store_source: StoreSource::Unknown,
store_version: None,
})
}

async fn stream_catalog_async(
tx: re_smart_channel::Sender<LogMsg>,
redap_endpoint: Url,
Expand Down Expand Up @@ -318,16 +389,88 @@ async fn stream_catalog_async(

re_log::info!("Starting to read...");
while let Some(result) = resp.next().await {
let mut tc = result.map_err(TonicStatusError)?;
// received TransportChunk doesn't have ChunkId, hence we need to add it before converting
// to Chunk
let input = result.map_err(TonicStatusError)?;

// Catalog received from the ReDap server isn't suitable for direct conversion to a Rerun Chunk:
// - conversion expects "data" columns to be ListArrays, hence we need to convert any individual row column data to ListArray
// - conversion expects the input TransportChunk to have a ChunkId so we need to add that piece of metadata

let mut fields = Vec::new();
let mut arrays = Vec::new();
// add the (row id) control field
let (row_id_field, row_id_data) = input.controls().next().ok_or(
StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: "no control field found".to_owned(),
}),
)?;

fields.push(
Arrow2Field::new(
RowId::name().to_string(), // need to rename to Rerun Chunk expected control field
row_id_field.data_type().clone(),
false, /* not nullable */
)
.with_metadata(TransportChunk::field_metadata_control_column()),
);
arrays.push(row_id_data.clone());

// next add any timeline field
for (field, data) in input.timelines() {
fields.push(field.clone());
arrays.push(data.clone());
}

// now add all the 'data' fields - we slice each column array into individual arrays and then convert the whole lot into a ListArray
for (field, data) in input.components() {
let data_field_inner =
Arrow2Field::new("item", field.data_type().clone(), true /* nullable */);

let data_field = Arrow2Field::new(
field.name.clone(),
arrow2::datatypes::DataType::List(Arc::new(data_field_inner.clone())),
false, /* not nullable */
)
.with_metadata(TransportChunk::field_metadata_data_column());

let mut sliced: Vec<Box<dyn Arrow2Array>> = Vec::new();
for idx in 0..data.len() {
let mut array = data.clone();
array.slice(idx, 1);
sliced.push(array);
}

let data_arrays = sliced.iter().map(|e| Some(e.as_ref())).collect::<Vec<_>>();
#[allow(clippy::unwrap_used)] // we know we've given the right field type
let data_field_array: arrow2::array::ListArray<i32> =
re_chunk::util::arrays_to_list_array(
data_field_inner.data_type().clone(),
&data_arrays,
)
.unwrap();

fields.push(data_field);
arrays.push(Box::new(data_field_array));
}

let mut schema = arrow2::datatypes::Schema::from(fields);
schema.metadata.insert(
TransportChunk::CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(),
"catalog".to_owned(),
);

// modified and enriched TransportChunk
let mut tc = TransportChunk {
schema,
data: arrow2::chunk::Chunk::new(arrays),
};

tc.schema.metadata.insert(
TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(),
ChunkId::new().to_string(),
);
let mut chunk = Chunk::from_transport(&tc)?;

// enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know)
// finally, enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know)
// and the recording id (that we have in the catalog data)
let host = redap_endpoint
.host()
Expand Down

0 comments on commit 091e57a

Please sign in to comment.