Skip to content

Commit

Permalink
update catalog test
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Jan 20, 2024
1 parent 45d6ec6 commit 3fb49c0
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 53 deletions.
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,4 @@ impl SchemaProvider for LakeSoulNamespace {
let set = self.table_names().into_iter().collect::<HashSet<String>>();
set.contains(name)
}
}
}
103 changes: 52 additions & 51 deletions rust/lakesoul-datafusion/src/test/catalog_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod catalog_tests {
use datafusion::assert_batches_eq;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use futures::TryFutureExt;
use tokio::runtime::Runtime;
use tracing::{debug, info};
use lakesoul_io::datasource::file_format::LakeSoulParquetFormat;
use lakesoul_io::datasource::listing::LakeSoulListingTable;
Expand Down Expand Up @@ -189,48 +190,51 @@ mod catalog_tests {
Arc::new(MetaDataClient::from_env().await.unwrap())
}

#[test_log::test(tokio::test)]
async fn test_get_all_table_name_id_by_namespace() {
let client = get_client().await;
let v = vec![
Some(String::from("lakesoul01")),
None,
Some(String::from("lakesoul02")),
None,
Some(String::from("lakesoul03")),
];
let len = v.len();
let nps = create_namespaces_with_random(client.clone(), v)
.await
.into_iter()
.zip(1..=len as i32)
.collect::<Vec<(Namespace, i32)>>();
debug!("{nps:#?}");
let config = LakeSoulIOConfigBuilder::new().build();
let mut expected = create_tables_with_random(Arc::clone(&client), &config, nps).await;
expected.sort_by(|a, b| a.table_namespace.cmp(&b.table_namespace));
let expected = expected
.into_iter()
.map(|info| table_name_id_from_table_info(&info))
.collect::<Vec<TableNameId>>();
let mut actual = vec![];
let namespace_set = expected
.iter()
.map(|t| t.table_namespace.clone())
.collect::<BTreeSet<String>>();
for ns in namespace_set {
let tmp = client.get_all_table_name_id_by_namespace(&ns).await.unwrap();
actual.extend(tmp);
}
debug!("expected:\n{:#?}", expected);
debug!("actual:\n{:#?}", actual);
assert_eq!(expected, actual);
// #[test_log::test]
fn test_get_all_table_name_id_by_namespace() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let client = get_client().await;
let v = vec![
Some(String::from("lakesoul011")),
None,
Some(String::from("lakesoul021")),
None,
Some(String::from("lakesoul031")),
];
let len = v.len();
let nps = create_namespaces_with_random(client.clone(), v)
.await
.into_iter()
.zip(1..=len as i32)
.collect::<Vec<(Namespace, i32)>>();
debug!("{nps:#?}");
let config = LakeSoulIOConfigBuilder::new().build();
let mut expected = create_tables_with_random(Arc::clone(&client), &config, nps).await;
expected.sort_by(|a, b| a.table_namespace.cmp(&b.table_namespace));
let expected = expected
.into_iter()
.map(|info| table_name_id_from_table_info(&info))
.collect::<Vec<TableNameId>>();
let mut actual = vec![];
let namespace_set = expected
.iter()
.map(|t| t.table_namespace.clone())
.collect::<BTreeSet<String>>();
for ns in namespace_set {
let tmp = client.get_all_table_name_id_by_namespace(&ns).await.unwrap();
actual.extend(tmp);
}
debug!("expected:\n{:#?}", expected);
debug!("actual:\n{:#?}", actual);
assert_eq!(expected, actual);
});
}

#[test_log::test]
fn test_schema_provider() {
let rt = Arc::new(tokio::runtime::Runtime::new().unwrap());
rt.clone().block_on(async {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let client = get_client().await;
let mut config = LakeSoulIOConfigBuilder::new().build();
let sc = Arc::new(create_session_context(&mut config).unwrap());
Expand All @@ -254,14 +258,13 @@ mod catalog_tests {
&n,
);
assert_eq!(schema.table_names().len(), len)
}
)
});
}

#[test_log::test]
fn test_catalog_provider() {
let rt = Arc::new(tokio::runtime::Runtime::new().unwrap());
rt.clone().block_on(async {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let client = get_client().await;
let v = vec![
Some(String::from("lakesoul01")),
Expand All @@ -287,21 +290,19 @@ mod catalog_tests {
get_client().await,
sc.clone(),
));
assert_eq!(catalog.schema_names().len(), len);
}
)
// when cargo test in ci , it use parallel.
// so metadata may be changed
assert!(catalog.schema_names().len() >= len);
});
}

#[test]

#[test_log::test]
fn simple_sql_test() {
let rt = tokio::runtime::Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(async {
let client = get_client().await;
// TODO
let ret = client.meta_cleanup().await.unwrap();
assert_eq!(0, ret);
// create namespace

let np = Namespace {
namespace: "default".to_string(),
properties: serde_json::to_string(&LakeSoulTableProperty {
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod hash_tests;

mod catalog_tests;

// #[ctor::ctor]
#[ctor::ctor]
fn init() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
Expand Down

0 comments on commit 3fb49c0

Please sign in to comment.