From 105dff576ada9f16dc66bba5d9c79ba3ccf0a482 Mon Sep 17 00:00:00 2001 From: zenghua Date: Thu, 17 Aug 2023 11:31:50 +0800 Subject: [PATCH 1/4] init python metadata Signed-off-by: zenghua --- .gitignore | 1 + python/metadata/__init__.py | 0 python/metadata/dao.py | 21 ++++++ python/metadata/db_manager.py | 46 ++++++++++++ python/metadata/lib/__init__.py | 0 python/metadata/lib/const.py | 85 ++++++++++++++++++++++ python/metadata/lib/lakesoul_metadata_c.py | 58 +++++++++++++++ python/metadata/native_client.py | 82 +++++++++++++++++++++ python/metadata/poc.py | 10 +++ python/metadata/utils.py | 34 +++++++++ 10 files changed, 337 insertions(+) create mode 100644 python/metadata/__init__.py create mode 100644 python/metadata/dao.py create mode 100644 python/metadata/db_manager.py create mode 100644 python/metadata/lib/__init__.py create mode 100644 python/metadata/lib/const.py create mode 100644 python/metadata/lib/lakesoul_metadata_c.py create mode 100644 python/metadata/native_client.py create mode 100644 python/metadata/poc.py create mode 100644 python/metadata/utils.py diff --git a/.gitignore b/.gitignore index 3135709fb..e90bc15d4 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ dependency-reduced-pom.xml /script/benchmark/work-dir/*.jar *.DS_Store native-io/lakesoul-io-c/lakesoul_c_bindings.h +python/metadata/generated diff --git a/python/metadata/__init__.py b/python/metadata/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/metadata/dao.py b/python/metadata/dao.py new file mode 100644 index 000000000..b05830afa --- /dev/null +++ b/python/metadata/dao.py @@ -0,0 +1,21 @@ +from lib.const import DaoType +from native_client import * + + +def select_table_info_by_table_name(table_name, namespace="default"): + wrapper = query(DaoType.SelectTableInfoByTableNameAndNameSpace, [table_name, namespace]) + return wrapper.table_info[0] + + +def get_partition_desc_by_table_id(table_id): + wrapper = query(DaoType.ListPartitionByTableId, [table_id]) + return wrapper.partition_info + + +def list_data_commit_info_by_table_id_and_partition_desc_and_commit_list(table_id, partition_desc, commit_id_list): + joined_commit_id = "" + for commit_id in commit_id_list: + joined_commit_id += "{:016x}{:016x}".format(commit_id.high, commit_id.low) + wrapper = query(DaoType.ListDataCommitInfoByTableIdAndPartitionDescAndCommitList, + [table_id, partition_desc, joined_commit_id]) + return wrapper.data_commit_info diff --git a/python/metadata/db_manager.py b/python/metadata/db_manager.py new file mode 100644 index 000000000..7647f21cb --- /dev/null +++ b/python/metadata/db_manager.py @@ -0,0 +1,46 @@ +from dao import * +from utils import to_arrow_schema + + +class DBManager(): + def __init__(self): + pass + + def get_table_info_by_name(self, table_name, namespace): + return select_table_info_by_table_name(table_name, namespace) + + def get_all_partition_info(self, table_id): + return get_partition_desc_by_table_id(table_id) + + def get_table_single_partition_data_info(self, partition_info): + return list_data_commit_info_by_table_id_and_partition_desc_and_commit_list(partition_info.table_id, + partition_info.partition_desc, + partition_info.snapshot) + + def get_table_info_by_id(self, table_id): + pass + + def get_data_file_info_list(self, table_info): + pass + + def split_data_info_list_to_range_and_hash_partition(self, table_id, data_file_info_list): + pass + + def get_data_files_by_table_name(self, table_name, namespace="default"): + table_info = self.get_table_info_by_name(table_name, namespace) + partition_list = self.get_all_partition_info(table_info.table_id) + data_files = [] + for partition in partition_list: + data_commit_info_list = self.get_table_single_partition_data_info(partition) + for data_commit_info in data_commit_info_list: + for file_op in data_commit_info.file_ops: + data_files.append(file_op.path) + return data_files + + def get_spark_schema_by_table_name(self, table_name, namespace="default"): + table_info = self.get_table_info_by_name(table_name, namespace) + return table_info.table_schema + + def get_arrow_schema_by_table_name(self, table_name, namespace="default"): + schema = self.get_spark_schema_by_table_name(table_name, namespace) + return to_arrow_schema(schema) diff --git a/python/metadata/lib/__init__.py b/python/metadata/lib/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/metadata/lib/const.py b/python/metadata/lib/const.py new file mode 100644 index 000000000..bf5a8b154 --- /dev/null +++ b/python/metadata/lib/const.py @@ -0,0 +1,85 @@ +PARAM_DELIM = "__DELIM__" + +DAO_TYPE_QUERY_ONE_OFFSET: int = 0 +DAO_TYPE_QUERY_LIST_OFFSET = 100 +DAO_TYPE_INSERT_ONE_OFFSET = 200 +DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET = 300 +DAO_TYPE_QUERY_SCALAR_OFFSET = 400 +DAO_TYPE_UPDATE_OFFSET = 500 + + +class DaoType: + SelectNamespaceByNamespace = DAO_TYPE_QUERY_ONE_OFFSET, + SelectTablePathIdByTablePath = DAO_TYPE_QUERY_ONE_OFFSET + 1, + SelectTableInfoByTableId = DAO_TYPE_QUERY_ONE_OFFSET + 2, + SelectTableNameIdByTableName = DAO_TYPE_QUERY_ONE_OFFSET + 3, + SelectTableInfoByTableNameAndNameSpace = DAO_TYPE_QUERY_ONE_OFFSET + 4, + SelectTableInfoByTablePath = DAO_TYPE_QUERY_ONE_OFFSET + 5, + SelectTableInfoByIdAndTablePath = DAO_TYPE_QUERY_ONE_OFFSET + 6, + + SelectOnePartitionVersionByTableIdAndDesc = DAO_TYPE_QUERY_ONE_OFFSET + 7, + SelectPartitionVersionByTableIdAndDescAndVersion = DAO_TYPE_QUERY_ONE_OFFSET + 8, + + SelectOneDataCommitInfoByTableIdAndPartitionDescAndCommitId = DAO_TYPE_QUERY_ONE_OFFSET + 9, + + # // ==== Query List ==== + + ListNamespaces = DAO_TYPE_QUERY_LIST_OFFSET, + ListTableNameByNamespace = DAO_TYPE_QUERY_LIST_OFFSET + 1, + ListAllTablePath = DAO_TYPE_QUERY_LIST_OFFSET + 2, + ListAllPathTablePathByNamespace = DAO_TYPE_QUERY_LIST_OFFSET + 3, + + # // Query Partition List + ListPartitionByTableId = DAO_TYPE_QUERY_LIST_OFFSET + 4, + ListPartitionDescByTableIdAndParList = DAO_TYPE_QUERY_LIST_OFFSET + 5, + ListPartitionByTableIdAndDesc = DAO_TYPE_QUERY_LIST_OFFSET + 6, + ListPartitionVersionByTableIdAndPartitionDescAndVersionRange = DAO_TYPE_QUERY_LIST_OFFSET + 7, + ListPartitionVersionByTableIdAndPartitionDescAndTimestampRange = DAO_TYPE_QUERY_LIST_OFFSET + 8, + ListCommitOpsBetweenVersions = DAO_TYPE_QUERY_LIST_OFFSET + 9, + + # // Query DataCommitInfo List + ListDataCommitInfoByTableIdAndPartitionDescAndCommitList = DAO_TYPE_QUERY_LIST_OFFSET + 10, + + # // ==== Insert One ==== + InsertNamespace = DAO_TYPE_INSERT_ONE_OFFSET, + InsertTablePathId = DAO_TYPE_INSERT_ONE_OFFSET + 1, + InsertTableNameId = DAO_TYPE_INSERT_ONE_OFFSET + 2, + InsertTableInfo = DAO_TYPE_INSERT_ONE_OFFSET + 3, + InsertPartitionInfo = DAO_TYPE_INSERT_ONE_OFFSET + 4, + InsertDataCommitInfo = DAO_TYPE_INSERT_ONE_OFFSET + 5, + + # // ==== Transaction Insert List ==== + TransactionInsertPartitionInfo = DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET, + TransactionInsertDataCommitInfo = DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET + 1, + + # // ==== Query SCALAR ==== + GetLatestTimestampFromPartitionInfo = DAO_TYPE_QUERY_SCALAR_OFFSET, + GetLatestTimestampFromPartitionInfoWithoutPartitionDesc = DAO_TYPE_QUERY_SCALAR_OFFSET + 1, + GetLatestVersionUpToTimeFromPartitionInfo = DAO_TYPE_QUERY_SCALAR_OFFSET + 2, + GetLatestVersionTimestampUpToTimeFromPartitionInfo = DAO_TYPE_QUERY_SCALAR_OFFSET + 3, + + # // ==== Update ==== + # // Update Namespace + DeleteNamespaceByNamespace = DAO_TYPE_UPDATE_OFFSET, + UpdateNamespacePropertiesByNamespace = DAO_TYPE_UPDATE_OFFSET + 1, + + # // Update TableInfo + DeleteTableInfoByIdAndPath = DAO_TYPE_UPDATE_OFFSET + 2, + UpdateTableInfoPropertiesById = DAO_TYPE_UPDATE_OFFSET + 3, + UpdateTableInfoById = DAO_TYPE_UPDATE_OFFSET + 4, + + # // Update TablePathId + DeleteTablePathIdByTablePath = DAO_TYPE_UPDATE_OFFSET + 5, + DeleteTablePathIdByTableId = DAO_TYPE_UPDATE_OFFSET + 6, + # // Update TableNameId + DeleteTableNameIdByTableNameAndNamespace = DAO_TYPE_UPDATE_OFFSET + 7, + DeleteTableNameIdByTableId = DAO_TYPE_UPDATE_OFFSET + 8, + # // Update PartitionInfo + DeletePartitionInfoByTableIdAndPartitionDesc = DAO_TYPE_UPDATE_OFFSET + 9, + DeletePartitionInfoByTableId = DAO_TYPE_UPDATE_OFFSET + 10, + DeletePreviousVersionPartition = DAO_TYPE_UPDATE_OFFSET + 11, + # // Update DataCommitInfo + DeleteOneDataCommitInfoByTableIdAndPartitionDescAndCommitId = DAO_TYPE_UPDATE_OFFSET + 12, + DeleteDataCommitInfoByTableIdAndPartitionDescAndCommitIdList = DAO_TYPE_UPDATE_OFFSET + 13, + DeleteDataCommitInfoByTableIdAndPartitionDesc = DAO_TYPE_UPDATE_OFFSET + 14, + DeleteDataCommitInfoByTableId = DAO_TYPE_UPDATE_OFFSET + 15, diff --git a/python/metadata/lib/lakesoul_metadata_c.py b/python/metadata/lib/lakesoul_metadata_c.py new file mode 100644 index 000000000..2941f4783 --- /dev/null +++ b/python/metadata/lib/lakesoul_metadata_c.py @@ -0,0 +1,58 @@ +from ctypes import * + +lib = CDLL("/Users/ceng/Documents/GitHub/LakeSoul/native-metadata/target/release/liblakesoul_metadata_c.dylib") + + +class NonNull(Structure): + pass + + +I32Callback = CFUNCTYPE(c_int, c_char_p) + +# pub extern "C" fn execute_query( +# callback: extern "C" fn(i32, *const c_char), +# runtime: NonNull>, +# client: NonNull>, +# prepared: NonNull>, +# query_type: i32, +# joined_string: *const c_char, +# addr: c_ptrdiff_t, +# ) +execute_query = lib.execute_query +execute_query.restype = c_void_p +execute_query.argtypes = [CFUNCTYPE(c_void_p, c_int, c_char_p), POINTER(NonNull), POINTER(NonNull), POINTER(NonNull), + c_int, c_char_p, c_char_p] + +# pub extern "C" fn create_tokio_runtime() -> NonNull> +create_tokio_runtime = lib.create_tokio_runtime +create_tokio_runtime.restype = POINTER(NonNull) +create_tokio_runtime.argtypes = [] + +# pub extern "C" fn free_tokio_runtime(runtime: NonNull>) +free_tokio_runtime = lib.free_tokio_runtime +free_tokio_runtime.restype = c_void_p +free_tokio_runtime.argtypes = [POINTER(NonNull)] + +# pub extern "C" fn create_tokio_postgres_client( +# callback: extern "C" fn(bool, *const c_char), +# config: *const c_char, +# runtime: NonNull>, +# ) -> NonNull> +create_tokio_postgres_client = lib.create_tokio_postgres_client +create_tokio_postgres_client.restype = POINTER(NonNull) +create_tokio_postgres_client.argtypes = [CFUNCTYPE(c_void_p, c_bool, c_char_p), c_char_p, POINTER(NonNull)] + +# pub extern "C" fn free_tokio_postgres_client(client: NonNull>) +free_tokio_postgres_client = lib.free_tokio_postgres_client +free_tokio_postgres_client.restype = c_void_p +free_tokio_postgres_client.argtypes = [POINTER(NonNull)] + +# pub extern "C" fn create_prepared_statement() -> NonNull> +create_prepared_statement = lib.create_prepared_statement +create_prepared_statement.restype = POINTER(NonNull) +create_prepared_statement.argtypes = [] + +# pub extern "C" fn free_prepared_statement(prepared: NonNull>) +free_prepared_statement = lib.free_prepared_statement +free_prepared_statement.restype = c_void_p +free_prepared_statement.argtypes = [POINTER(NonNull)] diff --git a/python/metadata/native_client.py b/python/metadata/native_client.py new file mode 100644 index 000000000..c1e0c65c3 --- /dev/null +++ b/python/metadata/native_client.py @@ -0,0 +1,82 @@ +import threading +import concurrent.futures +from lib.const import PARAM_DELIM, DAO_TYPE_QUERY_LIST_OFFSET +from lib.lakesoul_metadata_c import * +import generated.entity_pb2 as entity_pb2 + + +class NativeMetadataClient: + def __init__(self): + self._lock = threading.Lock() + self._buffer = create_string_buffer(1024) + self._large_buffer = create_string_buffer(65536) + self._runtime = create_tokio_runtime() + + def callback(bool, msg): + print("create connection callback: status={} msg={}".format(bool, msg.decode("utf-8"))) + + config = "host={} port={} dbname={} user={} password={}".format("localhost", "5433", "test_lakesoul_meta", + "yugabyte", "yugabyte") + + def target(): + return create_tokio_postgres_client(CFUNCTYPE(c_void_p, c_bool, c_char_p)(callback), + config.encode("utf-8"), + self._runtime) + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(target) + self._client = future.result(5) + + self._prepared = create_prepared_statement() + + def __del__(self): + free_tokio_runtime(self._runtime) + free_tokio_postgres_client(self._client) + free_prepared_statement(self._prepared) + + def execute_query(self, query_type, params): + joined_params = PARAM_DELIM.join(params).encode("utf-8") + buffer = self._buffer + if query_type >= DAO_TYPE_QUERY_LIST_OFFSET: + buffer = self._large_buffer + buffer.value = b'' + + def callback(len, msg): + print("execute_query query_type={} callback: len={} msg={}".format(query_type, len, msg.decode("utf-8"))) + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(execute_query, + CFUNCTYPE(c_void_p, c_int, c_char_p)(callback), self._runtime, self._client, + self._prepared, query_type, joined_params, buffer) + future.result(2.0) + + if len(buffer.value) == 0: + return None + else: + wrapper = entity_pb2.JniWrapper() + wrapper.ParseFromString(buffer.value) + return wrapper + + def get_lock(self): + return self._lock + + +global INSTANCE + +INSTANCE = None + + +def get_instance(): + global INSTANCE + if INSTANCE is None: + INSTANCE = NativeMetadataClient() + return INSTANCE + else: + return INSTANCE + + +def query(query_type, params): + instance = get_instance() + lock = instance.get_lock() + with lock: + return instance.execute_query(query_type[0], params) diff --git a/python/metadata/poc.py b/python/metadata/poc.py new file mode 100644 index 000000000..85e924bbb --- /dev/null +++ b/python/metadata/poc.py @@ -0,0 +1,10 @@ +import json + +from db_manager import DBManager + +if __name__ == '__main__': + db_manager = DBManager() + data_files = db_manager.get_data_files_by_table_name("tbl1692096521988") + arrow_schema = db_manager.get_arrow_schema_by_table_name("tbl1692096521988") + print(data_files) + print(arrow_schema) diff --git a/python/metadata/utils.py b/python/metadata/utils.py new file mode 100644 index 000000000..976cbda5e --- /dev/null +++ b/python/metadata/utils.py @@ -0,0 +1,34 @@ +import json +import pyarrow + + +def to_arrow_field(spark_field_json): + spark_type = spark_field_json['type'] + arrow_type = None + if spark_type == 'long': + arrow_type = pyarrow.int64() + elif spark_type == 'int': + arrow_type = pyarrow.int32() + elif spark_type == 'string': + arrow_type = pyarrow.utf8() + elif spark_type == 'float': + arrow_type = pyarrow.float32() + elif spark_type == 'double': + arrow_type = pyarrow.float64() + elif spark_type == 'struct': + fields = spark_field_json['fields'] + arrow_fields = [] + for field in fields: + arrow_fields.append(to_arrow_field(field)) + arrow_type = pyarrow.struct(arrow_fields) + else: + raise IOError("Not supported spark type " + str(spark_type)) + return pyarrow.field(spark_field_json['name'], arrow_type, spark_field_json['nullable']) + + +def to_arrow_schema(spark_schema_str): + fields = json.loads(spark_schema_str)['fields'] + arrow_fields = [] + for field in fields: + arrow_fields.append(to_arrow_field(field)) + return pyarrow.schema(arrow_fields) From 3f3dee6e81de1bc4d359662180fc5cc3622740a0 Mon Sep 17 00:00:00 2001 From: zenghua Date: Thu, 17 Aug 2023 16:09:37 +0800 Subject: [PATCH 2/4] write tailed 0 while query Signed-off-by: zenghua --- native-metadata/lakesoul-metadata-c/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/native-metadata/lakesoul-metadata-c/src/lib.rs b/native-metadata/lakesoul-metadata-c/src/lib.rs index cca63828d..457af3426 100644 --- a/native-metadata/lakesoul-metadata-c/src/lib.rs +++ b/native-metadata/lakesoul-metadata-c/src/lib.rs @@ -202,8 +202,9 @@ pub extern "C" fn execute_query( |(idx, byte)| unsafe{std::ptr::write::(addr.wrapping_add(idx), *byte)}) .collect::>(); - let len = u8_vec.len() as i32; - callback( len, CString::new("").unwrap().into_raw()); + let len = u8_vec.len(); + unsafe{std::ptr::write::(addr.wrapping_add(len), 0u8)} + callback( len as i32, CString::new("").unwrap().into_raw()); } Err(e) => { callback(0, CString::new(e.to_string().as_str()).unwrap().into_raw()); From 568d1e5c37b83e5bf843a1b67191943b8759a734 Mon Sep 17 00:00:00 2001 From: zenghua Date: Mon, 21 Aug 2023 15:08:28 +0800 Subject: [PATCH 3/4] add query_data_files with partition Signed-off-by: zenghua --- python/compile_pb.sh | 1 + python/metadata/db_manager.py | 15 ++- python/metadata/lib/__init__.py | 1 + python/metadata/lib/lakesoul_metadata_c.py | 105 +++++++++++---------- python/metadata/native_client.py | 41 +++++--- python/metadata/poc.py | 20 +++- 6 files changed, 114 insertions(+), 69 deletions(-) create mode 100755 python/compile_pb.sh diff --git a/python/compile_pb.sh b/python/compile_pb.sh new file mode 100755 index 000000000..2bbdc215c --- /dev/null +++ b/python/compile_pb.sh @@ -0,0 +1 @@ +protoc native-metadata/proto/src/entity.proto --python_out python/metadata --proto_path generated=./native-metadata/proto/src/ \ No newline at end of file diff --git a/python/metadata/db_manager.py b/python/metadata/db_manager.py index 7647f21cb..2d4a2e8c3 100644 --- a/python/metadata/db_manager.py +++ b/python/metadata/db_manager.py @@ -2,7 +2,7 @@ from utils import to_arrow_schema -class DBManager(): +class DBManager: def __init__(self): pass @@ -26,11 +26,22 @@ def get_data_file_info_list(self, table_info): def split_data_info_list_to_range_and_hash_partition(self, table_id, data_file_info_list): pass - def get_data_files_by_table_name(self, table_name, namespace="default"): + def get_data_files_by_table_name(self, table_name, partitions={}, namespace="default"): + part_filter = [] + for part_key, part_value in partitions.items(): + part_filter.append("{}={}".format(part_key, part_value)) table_info = self.get_table_info_by_name(table_name, namespace) partition_list = self.get_all_partition_info(table_info.table_id) data_files = [] for partition in partition_list: + partition_desc = partition.partition_desc + filtered = False + for part in part_filter: + if part not in partition_desc: + filtered = True + break + if filtered: + continue data_commit_info_list = self.get_table_single_partition_data_info(partition) for data_commit_info in data_commit_info_list: for file_op in data_commit_info.file_ops: diff --git a/python/metadata/lib/__init__.py b/python/metadata/lib/__init__.py index e69de29bb..e655df38d 100644 --- a/python/metadata/lib/__init__.py +++ b/python/metadata/lib/__init__.py @@ -0,0 +1 @@ +from .lakesoul_metadata_c import reload_lib diff --git a/python/metadata/lib/lakesoul_metadata_c.py b/python/metadata/lib/lakesoul_metadata_c.py index 2941f4783..1accb8e53 100644 --- a/python/metadata/lib/lakesoul_metadata_c.py +++ b/python/metadata/lib/lakesoul_metadata_c.py @@ -1,7 +1,5 @@ from ctypes import * -lib = CDLL("/Users/ceng/Documents/GitHub/LakeSoul/native-metadata/target/release/liblakesoul_metadata_c.dylib") - class NonNull(Structure): pass @@ -9,50 +7,59 @@ class NonNull(Structure): I32Callback = CFUNCTYPE(c_int, c_char_p) -# pub extern "C" fn execute_query( -# callback: extern "C" fn(i32, *const c_char), -# runtime: NonNull>, -# client: NonNull>, -# prepared: NonNull>, -# query_type: i32, -# joined_string: *const c_char, -# addr: c_ptrdiff_t, -# ) -execute_query = lib.execute_query -execute_query.restype = c_void_p -execute_query.argtypes = [CFUNCTYPE(c_void_p, c_int, c_char_p), POINTER(NonNull), POINTER(NonNull), POINTER(NonNull), - c_int, c_char_p, c_char_p] - -# pub extern "C" fn create_tokio_runtime() -> NonNull> -create_tokio_runtime = lib.create_tokio_runtime -create_tokio_runtime.restype = POINTER(NonNull) -create_tokio_runtime.argtypes = [] - -# pub extern "C" fn free_tokio_runtime(runtime: NonNull>) -free_tokio_runtime = lib.free_tokio_runtime -free_tokio_runtime.restype = c_void_p -free_tokio_runtime.argtypes = [POINTER(NonNull)] - -# pub extern "C" fn create_tokio_postgres_client( -# callback: extern "C" fn(bool, *const c_char), -# config: *const c_char, -# runtime: NonNull>, -# ) -> NonNull> -create_tokio_postgres_client = lib.create_tokio_postgres_client -create_tokio_postgres_client.restype = POINTER(NonNull) -create_tokio_postgres_client.argtypes = [CFUNCTYPE(c_void_p, c_bool, c_char_p), c_char_p, POINTER(NonNull)] - -# pub extern "C" fn free_tokio_postgres_client(client: NonNull>) -free_tokio_postgres_client = lib.free_tokio_postgres_client -free_tokio_postgres_client.restype = c_void_p -free_tokio_postgres_client.argtypes = [POINTER(NonNull)] - -# pub extern "C" fn create_prepared_statement() -> NonNull> -create_prepared_statement = lib.create_prepared_statement -create_prepared_statement.restype = POINTER(NonNull) -create_prepared_statement.argtypes = [] - -# pub extern "C" fn free_prepared_statement(prepared: NonNull>) -free_prepared_statement = lib.free_prepared_statement -free_prepared_statement.restype = c_void_p -free_prepared_statement.argtypes = [POINTER(NonNull)] + +def reload_lib(path): + global lib, execute_query, create_tokio_runtime, free_tokio_runtime, create_tokio_postgres_client, free_tokio_postgres_client, create_prepared_statement, free_prepared_statement + lib = CDLL(path) + # pub extern "C" fn execute_query( + # callback: extern "C" fn(i32, *const c_char), + # runtime: NonNull>, + # client: NonNull>, + # prepared: NonNull>, + # query_type: i32, + # joined_string: *const c_char, + # addr: c_ptrdiff_t, + # ) + execute_query = lib.execute_query + execute_query.restype = c_void_p + execute_query.argtypes = [CFUNCTYPE(c_void_p, c_int, c_char_p), POINTER(NonNull), POINTER(NonNull), + POINTER(NonNull), + c_int, c_char_p, c_char_p] + + # pub extern "C" fn create_tokio_runtime() -> NonNull> + create_tokio_runtime = lib.create_tokio_runtime + create_tokio_runtime.restype = POINTER(NonNull) + create_tokio_runtime.argtypes = [] + + # pub extern "C" fn free_tokio_runtime(runtime: NonNull>) + free_tokio_runtime = lib.free_tokio_runtime + free_tokio_runtime.restype = c_void_p + free_tokio_runtime.argtypes = [POINTER(NonNull)] + + # pub extern "C" fn create_tokio_postgres_client( + # callback: extern "C" fn(bool, *const c_char), + # config: *const c_char, + # runtime: NonNull>, + # ) -> NonNull> + create_tokio_postgres_client = lib.create_tokio_postgres_client + create_tokio_postgres_client.restype = POINTER(NonNull) + create_tokio_postgres_client.argtypes = [CFUNCTYPE(c_void_p, c_bool, c_char_p), c_char_p, POINTER(NonNull)] + + # pub extern "C" fn free_tokio_postgres_client(client: NonNull>) + free_tokio_postgres_client = lib.free_tokio_postgres_client + free_tokio_postgres_client.restype = c_void_p + free_tokio_postgres_client.argtypes = [POINTER(NonNull)] + + # pub extern "C" fn create_prepared_statement() -> NonNull> + create_prepared_statement = lib.create_prepared_statement + create_prepared_statement.restype = POINTER(NonNull) + create_prepared_statement.argtypes = [] + + # pub extern "C" fn free_prepared_statement(prepared: NonNull>) + free_prepared_statement = lib.free_prepared_statement + free_prepared_statement.restype = c_void_p + free_prepared_statement.argtypes = [POINTER(NonNull)] + +# lib = CDLL("/Users/ceng/Documents/GitHub/LakeSoul/native-metadata/target/release/liblakesoul_metadata_c.dylib") + +# reload_lib("/Users/ceng/Documents/GitHub/LakeSoul/native-metadata/target/release/liblakesoul_metadata_c.dylib") diff --git a/python/metadata/native_client.py b/python/metadata/native_client.py index c1e0c65c3..3acc866cb 100644 --- a/python/metadata/native_client.py +++ b/python/metadata/native_client.py @@ -1,38 +1,51 @@ import threading import concurrent.futures +import importlib +from ctypes import * + from lib.const import PARAM_DELIM, DAO_TYPE_QUERY_LIST_OFFSET -from lib.lakesoul_metadata_c import * +import lib import generated.entity_pb2 as entity_pb2 +global config +config = "host={} port={} dbname={} user={} password={}".format("localhost", "5433", "test_lakesoul_meta", + "yugabyte", "yugabyte") + + +def reset_pg_conf(conf): + global config + config = " ".join(conf) + class NativeMetadataClient: def __init__(self): self._lock = threading.Lock() - self._buffer = create_string_buffer(1024) + importlib.reload(lib) + self._buffer = create_string_buffer(4096) self._large_buffer = create_string_buffer(65536) - self._runtime = create_tokio_runtime() + self._runtime = lib.lakesoul_metadata_c.create_tokio_runtime() def callback(bool, msg): print("create connection callback: status={} msg={}".format(bool, msg.decode("utf-8"))) - - config = "host={} port={} dbname={} user={} password={}".format("localhost", "5433", "test_lakesoul_meta", - "yugabyte", "yugabyte") + if not bool: + exit(1) def target(): - return create_tokio_postgres_client(CFUNCTYPE(c_void_p, c_bool, c_char_p)(callback), - config.encode("utf-8"), - self._runtime) + global config + return lib.lakesoul_metadata_c.create_tokio_postgres_client(CFUNCTYPE(c_void_p, c_bool, c_char_p)(callback), + config.encode("utf-8"), + self._runtime) with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(target) self._client = future.result(5) - self._prepared = create_prepared_statement() + self._prepared = lib.lakesoul_metadata_c.create_prepared_statement() def __del__(self): - free_tokio_runtime(self._runtime) - free_tokio_postgres_client(self._client) - free_prepared_statement(self._prepared) + lib.lakesoul_metadata_c.free_tokio_runtime(self._runtime) + lib.lakesoul_metadata_c.free_tokio_postgres_client(self._client) + lib.lakesoul_metadata_c.free_prepared_statement(self._prepared) def execute_query(self, query_type, params): joined_params = PARAM_DELIM.join(params).encode("utf-8") @@ -45,7 +58,7 @@ def callback(len, msg): print("execute_query query_type={} callback: len={} msg={}".format(query_type, len, msg.decode("utf-8"))) with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(execute_query, + future = executor.submit(lib.lakesoul_metadata_c.execute_query, CFUNCTYPE(c_void_p, c_int, c_char_p)(callback), self._runtime, self._client, self._prepared, query_type, joined_params, buffer) future.result(2.0) diff --git a/python/metadata/poc.py b/python/metadata/poc.py index 85e924bbb..dba928cd1 100644 --- a/python/metadata/poc.py +++ b/python/metadata/poc.py @@ -1,10 +1,22 @@ -import json - from db_manager import DBManager +from lib import reload_lib +from native_client import reset_pg_conf if __name__ == '__main__': + reload_lib("/home/huazeng/liblakesoul_metadata_c.so") + reset_pg_conf( + ["host=localhost", "port=5432", " dbname=lakesoul_test", " user=lakesoul_test", "password=lakesoul_test"]) + db_manager = DBManager() - data_files = db_manager.get_data_files_by_table_name("tbl1692096521988") - arrow_schema = db_manager.get_arrow_schema_by_table_name("tbl1692096521988") + data_files = db_manager.get_data_files_by_table_name("titanic") + print(data_files) + data_files = db_manager.get_data_files_by_table_name("titanic", partitions={"split": "train"}) + print(data_files) + arrow_schema = db_manager.get_arrow_schema_by_table_name("titanic") + print(arrow_schema) + data_files = db_manager.get_data_files_by_table_name("imdb") + print(data_files) + data_files = db_manager.get_data_files_by_table_name("imdb", partitions={"split": "train"}) print(data_files) + arrow_schema = db_manager.get_arrow_schema_by_table_name("imdb") print(arrow_schema) From 717ed9f1b9b8a9154290053b7eac222049a54263 Mon Sep 17 00:00:00 2001 From: zenghua Date: Thu, 24 Aug 2023 10:32:33 +0800 Subject: [PATCH 4/4] add copyright header && requirement.txt Signed-off-by: zenghua --- python/metadata/__init__.py | 3 +++ python/metadata/dao.py | 4 ++++ python/metadata/db_manager.py | 4 ++++ python/metadata/lib/__init__.py | 4 ++++ python/metadata/lib/const.py | 4 ++++ python/metadata/lib/lakesoul_metadata_c.py | 8 ++++---- python/metadata/native_client.py | 4 ++++ python/metadata/poc.py | 4 ++++ python/metadata/requirement.txt | 3 +++ python/metadata/utils.py | 4 ++++ 10 files changed, 38 insertions(+), 4 deletions(-) create mode 100644 python/metadata/requirement.txt diff --git a/python/metadata/__init__.py b/python/metadata/__init__.py index e69de29bb..5f067a07d 100644 --- a/python/metadata/__init__.py +++ b/python/metadata/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/python/metadata/dao.py b/python/metadata/dao.py index b05830afa..51e2dfe06 100644 --- a/python/metadata/dao.py +++ b/python/metadata/dao.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 + from lib.const import DaoType from native_client import * diff --git a/python/metadata/db_manager.py b/python/metadata/db_manager.py index 2d4a2e8c3..75d3adb61 100644 --- a/python/metadata/db_manager.py +++ b/python/metadata/db_manager.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 + from dao import * from utils import to_arrow_schema diff --git a/python/metadata/lib/__init__.py b/python/metadata/lib/__init__.py index e655df38d..405d4dc4f 100644 --- a/python/metadata/lib/__init__.py +++ b/python/metadata/lib/__init__.py @@ -1 +1,5 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 + from .lakesoul_metadata_c import reload_lib diff --git a/python/metadata/lib/const.py b/python/metadata/lib/const.py index bf5a8b154..71efe1874 100644 --- a/python/metadata/lib/const.py +++ b/python/metadata/lib/const.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 + PARAM_DELIM = "__DELIM__" DAO_TYPE_QUERY_ONE_OFFSET: int = 0 diff --git a/python/metadata/lib/lakesoul_metadata_c.py b/python/metadata/lib/lakesoul_metadata_c.py index 1accb8e53..89dbb51a8 100644 --- a/python/metadata/lib/lakesoul_metadata_c.py +++ b/python/metadata/lib/lakesoul_metadata_c.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 + from ctypes import * @@ -59,7 +63,3 @@ def reload_lib(path): free_prepared_statement = lib.free_prepared_statement free_prepared_statement.restype = c_void_p free_prepared_statement.argtypes = [POINTER(NonNull)] - -# lib = CDLL("/Users/ceng/Documents/GitHub/LakeSoul/native-metadata/target/release/liblakesoul_metadata_c.dylib") - -# reload_lib("/Users/ceng/Documents/GitHub/LakeSoul/native-metadata/target/release/liblakesoul_metadata_c.dylib") diff --git a/python/metadata/native_client.py b/python/metadata/native_client.py index 3acc866cb..b084d1de5 100644 --- a/python/metadata/native_client.py +++ b/python/metadata/native_client.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 + import threading import concurrent.futures import importlib diff --git a/python/metadata/poc.py b/python/metadata/poc.py index dba928cd1..9d82e7ea0 100644 --- a/python/metadata/poc.py +++ b/python/metadata/poc.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 + from db_manager import DBManager from lib import reload_lib from native_client import reset_pg_conf diff --git a/python/metadata/requirement.txt b/python/metadata/requirement.txt new file mode 100644 index 000000000..2b7d68b8a --- /dev/null +++ b/python/metadata/requirement.txt @@ -0,0 +1,3 @@ +numpy +pyarrow +protobuf \ No newline at end of file diff --git a/python/metadata/utils.py b/python/metadata/utils.py index 976cbda5e..490892c12 100644 --- a/python/metadata/utils.py +++ b/python/metadata/utils.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 + import json import pyarrow