Skip to content

Commit

Permalink
[Python][Native-Metadata] Python interface of lakesoul metadata (#305)
Browse files Browse the repository at this point in the history
* init python metadata

Signed-off-by: zenghua <[email protected]>

* write tailed 0 while query

Signed-off-by: zenghua <[email protected]>

* add query_data_files with partition

Signed-off-by: zenghua <[email protected]>

* add copyright header && requirement.txt

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Aug 24, 2023
1 parent f1328cf commit 9d3fd86
Show file tree
Hide file tree
Showing 13 changed files with 419 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ dependency-reduced-pom.xml
/script/benchmark/work-dir/*.jar
*.DS_Store
native-io/lakesoul-io-c/lakesoul_c_bindings.h
python/metadata/generated
5 changes: 3 additions & 2 deletions native-metadata/lakesoul-metadata-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ pub extern "C" fn execute_query(
|(idx, byte)|
unsafe{std::ptr::write::<c_uchar>(addr.wrapping_add(idx), *byte)})
.collect::<Vec<_>>();
let len = u8_vec.len() as i32;
callback( len, CString::new("").unwrap().into_raw());
let len = u8_vec.len();
unsafe{std::ptr::write::<c_uchar>(addr.wrapping_add(len), 0u8)}
callback( len as i32, CString::new("").unwrap().into_raw());
}
Err(e) => {
callback(0, CString::new(e.to_string().as_str()).unwrap().into_raw());
Expand Down
1 change: 1 addition & 0 deletions python/compile_pb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
protoc native-metadata/proto/src/entity.proto --python_out python/metadata --proto_path generated=./native-metadata/proto/src/
3 changes: 3 additions & 0 deletions python/metadata/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0
25 changes: 25 additions & 0 deletions python/metadata/dao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

from lib.const import DaoType
from native_client import *


def select_table_info_by_table_name(table_name, namespace="default"):
wrapper = query(DaoType.SelectTableInfoByTableNameAndNameSpace, [table_name, namespace])
return wrapper.table_info[0]


def get_partition_desc_by_table_id(table_id):
wrapper = query(DaoType.ListPartitionByTableId, [table_id])
return wrapper.partition_info


def list_data_commit_info_by_table_id_and_partition_desc_and_commit_list(table_id, partition_desc, commit_id_list):
joined_commit_id = ""
for commit_id in commit_id_list:
joined_commit_id += "{:016x}{:016x}".format(commit_id.high, commit_id.low)
wrapper = query(DaoType.ListDataCommitInfoByTableIdAndPartitionDescAndCommitList,
[table_id, partition_desc, joined_commit_id])
return wrapper.data_commit_info
61 changes: 61 additions & 0 deletions python/metadata/db_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

from dao import *
from utils import to_arrow_schema


class DBManager:
def __init__(self):
pass

def get_table_info_by_name(self, table_name, namespace):
return select_table_info_by_table_name(table_name, namespace)

def get_all_partition_info(self, table_id):
return get_partition_desc_by_table_id(table_id)

def get_table_single_partition_data_info(self, partition_info):
return list_data_commit_info_by_table_id_and_partition_desc_and_commit_list(partition_info.table_id,
partition_info.partition_desc,
partition_info.snapshot)

def get_table_info_by_id(self, table_id):
pass

def get_data_file_info_list(self, table_info):
pass

def split_data_info_list_to_range_and_hash_partition(self, table_id, data_file_info_list):
pass

def get_data_files_by_table_name(self, table_name, partitions={}, namespace="default"):
part_filter = []
for part_key, part_value in partitions.items():
part_filter.append("{}={}".format(part_key, part_value))
table_info = self.get_table_info_by_name(table_name, namespace)
partition_list = self.get_all_partition_info(table_info.table_id)
data_files = []
for partition in partition_list:
partition_desc = partition.partition_desc
filtered = False
for part in part_filter:
if part not in partition_desc:
filtered = True
break
if filtered:
continue
data_commit_info_list = self.get_table_single_partition_data_info(partition)
for data_commit_info in data_commit_info_list:
for file_op in data_commit_info.file_ops:
data_files.append(file_op.path)
return data_files

def get_spark_schema_by_table_name(self, table_name, namespace="default"):
table_info = self.get_table_info_by_name(table_name, namespace)
return table_info.table_schema

def get_arrow_schema_by_table_name(self, table_name, namespace="default"):
schema = self.get_spark_schema_by_table_name(table_name, namespace)
return to_arrow_schema(schema)
5 changes: 5 additions & 0 deletions python/metadata/lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

from .lakesoul_metadata_c import reload_lib
89 changes: 89 additions & 0 deletions python/metadata/lib/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

PARAM_DELIM = "__DELIM__"

DAO_TYPE_QUERY_ONE_OFFSET: int = 0
DAO_TYPE_QUERY_LIST_OFFSET = 100
DAO_TYPE_INSERT_ONE_OFFSET = 200
DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET = 300
DAO_TYPE_QUERY_SCALAR_OFFSET = 400
DAO_TYPE_UPDATE_OFFSET = 500


class DaoType:
SelectNamespaceByNamespace = DAO_TYPE_QUERY_ONE_OFFSET,
SelectTablePathIdByTablePath = DAO_TYPE_QUERY_ONE_OFFSET + 1,
SelectTableInfoByTableId = DAO_TYPE_QUERY_ONE_OFFSET + 2,
SelectTableNameIdByTableName = DAO_TYPE_QUERY_ONE_OFFSET + 3,
SelectTableInfoByTableNameAndNameSpace = DAO_TYPE_QUERY_ONE_OFFSET + 4,
SelectTableInfoByTablePath = DAO_TYPE_QUERY_ONE_OFFSET + 5,
SelectTableInfoByIdAndTablePath = DAO_TYPE_QUERY_ONE_OFFSET + 6,

SelectOnePartitionVersionByTableIdAndDesc = DAO_TYPE_QUERY_ONE_OFFSET + 7,
SelectPartitionVersionByTableIdAndDescAndVersion = DAO_TYPE_QUERY_ONE_OFFSET + 8,

SelectOneDataCommitInfoByTableIdAndPartitionDescAndCommitId = DAO_TYPE_QUERY_ONE_OFFSET + 9,

# // ==== Query List ====

ListNamespaces = DAO_TYPE_QUERY_LIST_OFFSET,
ListTableNameByNamespace = DAO_TYPE_QUERY_LIST_OFFSET + 1,
ListAllTablePath = DAO_TYPE_QUERY_LIST_OFFSET + 2,
ListAllPathTablePathByNamespace = DAO_TYPE_QUERY_LIST_OFFSET + 3,

# // Query Partition List
ListPartitionByTableId = DAO_TYPE_QUERY_LIST_OFFSET + 4,
ListPartitionDescByTableIdAndParList = DAO_TYPE_QUERY_LIST_OFFSET + 5,
ListPartitionByTableIdAndDesc = DAO_TYPE_QUERY_LIST_OFFSET + 6,
ListPartitionVersionByTableIdAndPartitionDescAndVersionRange = DAO_TYPE_QUERY_LIST_OFFSET + 7,
ListPartitionVersionByTableIdAndPartitionDescAndTimestampRange = DAO_TYPE_QUERY_LIST_OFFSET + 8,
ListCommitOpsBetweenVersions = DAO_TYPE_QUERY_LIST_OFFSET + 9,

# // Query DataCommitInfo List
ListDataCommitInfoByTableIdAndPartitionDescAndCommitList = DAO_TYPE_QUERY_LIST_OFFSET + 10,

# // ==== Insert One ====
InsertNamespace = DAO_TYPE_INSERT_ONE_OFFSET,
InsertTablePathId = DAO_TYPE_INSERT_ONE_OFFSET + 1,
InsertTableNameId = DAO_TYPE_INSERT_ONE_OFFSET + 2,
InsertTableInfo = DAO_TYPE_INSERT_ONE_OFFSET + 3,
InsertPartitionInfo = DAO_TYPE_INSERT_ONE_OFFSET + 4,
InsertDataCommitInfo = DAO_TYPE_INSERT_ONE_OFFSET + 5,

# // ==== Transaction Insert List ====
TransactionInsertPartitionInfo = DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET,
TransactionInsertDataCommitInfo = DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET + 1,

# // ==== Query SCALAR ====
GetLatestTimestampFromPartitionInfo = DAO_TYPE_QUERY_SCALAR_OFFSET,
GetLatestTimestampFromPartitionInfoWithoutPartitionDesc = DAO_TYPE_QUERY_SCALAR_OFFSET + 1,
GetLatestVersionUpToTimeFromPartitionInfo = DAO_TYPE_QUERY_SCALAR_OFFSET + 2,
GetLatestVersionTimestampUpToTimeFromPartitionInfo = DAO_TYPE_QUERY_SCALAR_OFFSET + 3,

# // ==== Update ====
# // Update Namespace
DeleteNamespaceByNamespace = DAO_TYPE_UPDATE_OFFSET,
UpdateNamespacePropertiesByNamespace = DAO_TYPE_UPDATE_OFFSET + 1,

# // Update TableInfo
DeleteTableInfoByIdAndPath = DAO_TYPE_UPDATE_OFFSET + 2,
UpdateTableInfoPropertiesById = DAO_TYPE_UPDATE_OFFSET + 3,
UpdateTableInfoById = DAO_TYPE_UPDATE_OFFSET + 4,

# // Update TablePathId
DeleteTablePathIdByTablePath = DAO_TYPE_UPDATE_OFFSET + 5,
DeleteTablePathIdByTableId = DAO_TYPE_UPDATE_OFFSET + 6,
# // Update TableNameId
DeleteTableNameIdByTableNameAndNamespace = DAO_TYPE_UPDATE_OFFSET + 7,
DeleteTableNameIdByTableId = DAO_TYPE_UPDATE_OFFSET + 8,
# // Update PartitionInfo
DeletePartitionInfoByTableIdAndPartitionDesc = DAO_TYPE_UPDATE_OFFSET + 9,
DeletePartitionInfoByTableId = DAO_TYPE_UPDATE_OFFSET + 10,
DeletePreviousVersionPartition = DAO_TYPE_UPDATE_OFFSET + 11,
# // Update DataCommitInfo
DeleteOneDataCommitInfoByTableIdAndPartitionDescAndCommitId = DAO_TYPE_UPDATE_OFFSET + 12,
DeleteDataCommitInfoByTableIdAndPartitionDescAndCommitIdList = DAO_TYPE_UPDATE_OFFSET + 13,
DeleteDataCommitInfoByTableIdAndPartitionDesc = DAO_TYPE_UPDATE_OFFSET + 14,
DeleteDataCommitInfoByTableId = DAO_TYPE_UPDATE_OFFSET + 15,
65 changes: 65 additions & 0 deletions python/metadata/lib/lakesoul_metadata_c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

from ctypes import *


class NonNull(Structure):
pass


I32Callback = CFUNCTYPE(c_int, c_char_p)


def reload_lib(path):
global lib, execute_query, create_tokio_runtime, free_tokio_runtime, create_tokio_postgres_client, free_tokio_postgres_client, create_prepared_statement, free_prepared_statement
lib = CDLL(path)
# pub extern "C" fn execute_query(
# callback: extern "C" fn(i32, *const c_char),
# runtime: NonNull<Result<TokioRuntime>>,
# client: NonNull<Result<TokioPostgresClient>>,
# prepared: NonNull<Result<PreparedStatement>>,
# query_type: i32,
# joined_string: *const c_char,
# addr: c_ptrdiff_t,
# )
execute_query = lib.execute_query
execute_query.restype = c_void_p
execute_query.argtypes = [CFUNCTYPE(c_void_p, c_int, c_char_p), POINTER(NonNull), POINTER(NonNull),
POINTER(NonNull),
c_int, c_char_p, c_char_p]

# pub extern "C" fn create_tokio_runtime() -> NonNull<Result<TokioRuntime>>
create_tokio_runtime = lib.create_tokio_runtime
create_tokio_runtime.restype = POINTER(NonNull)
create_tokio_runtime.argtypes = []

# pub extern "C" fn free_tokio_runtime(runtime: NonNull<Result<TokioRuntime>>)
free_tokio_runtime = lib.free_tokio_runtime
free_tokio_runtime.restype = c_void_p
free_tokio_runtime.argtypes = [POINTER(NonNull)]

# pub extern "C" fn create_tokio_postgres_client(
# callback: extern "C" fn(bool, *const c_char),
# config: *const c_char,
# runtime: NonNull<Result<TokioRuntime>>,
# ) -> NonNull<Result<TokioPostgresClient>>
create_tokio_postgres_client = lib.create_tokio_postgres_client
create_tokio_postgres_client.restype = POINTER(NonNull)
create_tokio_postgres_client.argtypes = [CFUNCTYPE(c_void_p, c_bool, c_char_p), c_char_p, POINTER(NonNull)]

# pub extern "C" fn free_tokio_postgres_client(client: NonNull<Result<TokioPostgresClient>>)
free_tokio_postgres_client = lib.free_tokio_postgres_client
free_tokio_postgres_client.restype = c_void_p
free_tokio_postgres_client.argtypes = [POINTER(NonNull)]

# pub extern "C" fn create_prepared_statement() -> NonNull<Result<PreparedStatement>>
create_prepared_statement = lib.create_prepared_statement
create_prepared_statement.restype = POINTER(NonNull)
create_prepared_statement.argtypes = []

# pub extern "C" fn free_prepared_statement(prepared: NonNull<Result<PreparedStatement>>)
free_prepared_statement = lib.free_prepared_statement
free_prepared_statement.restype = c_void_p
free_prepared_statement.argtypes = [POINTER(NonNull)]
99 changes: 99 additions & 0 deletions python/metadata/native_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

import threading
import concurrent.futures
import importlib
from ctypes import *

from lib.const import PARAM_DELIM, DAO_TYPE_QUERY_LIST_OFFSET
import lib
import generated.entity_pb2 as entity_pb2

global config
config = "host={} port={} dbname={} user={} password={}".format("localhost", "5433", "test_lakesoul_meta",
"yugabyte", "yugabyte")


def reset_pg_conf(conf):
global config
config = " ".join(conf)


class NativeMetadataClient:
def __init__(self):
self._lock = threading.Lock()
importlib.reload(lib)
self._buffer = create_string_buffer(4096)
self._large_buffer = create_string_buffer(65536)
self._runtime = lib.lakesoul_metadata_c.create_tokio_runtime()

def callback(bool, msg):
print("create connection callback: status={} msg={}".format(bool, msg.decode("utf-8")))
if not bool:
exit(1)

def target():
global config
return lib.lakesoul_metadata_c.create_tokio_postgres_client(CFUNCTYPE(c_void_p, c_bool, c_char_p)(callback),
config.encode("utf-8"),
self._runtime)

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(target)
self._client = future.result(5)

self._prepared = lib.lakesoul_metadata_c.create_prepared_statement()

def __del__(self):
lib.lakesoul_metadata_c.free_tokio_runtime(self._runtime)
lib.lakesoul_metadata_c.free_tokio_postgres_client(self._client)
lib.lakesoul_metadata_c.free_prepared_statement(self._prepared)

def execute_query(self, query_type, params):
joined_params = PARAM_DELIM.join(params).encode("utf-8")
buffer = self._buffer
if query_type >= DAO_TYPE_QUERY_LIST_OFFSET:
buffer = self._large_buffer
buffer.value = b''

def callback(len, msg):
print("execute_query query_type={} callback: len={} msg={}".format(query_type, len, msg.decode("utf-8")))

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(lib.lakesoul_metadata_c.execute_query,
CFUNCTYPE(c_void_p, c_int, c_char_p)(callback), self._runtime, self._client,
self._prepared, query_type, joined_params, buffer)
future.result(2.0)

if len(buffer.value) == 0:
return None
else:
wrapper = entity_pb2.JniWrapper()
wrapper.ParseFromString(buffer.value)
return wrapper

def get_lock(self):
return self._lock


global INSTANCE

INSTANCE = None


def get_instance():
global INSTANCE
if INSTANCE is None:
INSTANCE = NativeMetadataClient()
return INSTANCE
else:
return INSTANCE


def query(query_type, params):
instance = get_instance()
lock = instance.get_lock()
with lock:
return instance.execute_query(query_type[0], params)
Loading

0 comments on commit 9d3fd86

Please sign in to comment.