Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python][Native-Metadata] Python interface of lakesoul metadata #305

Merged
merged 4 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ dependency-reduced-pom.xml
/script/benchmark/work-dir/*.jar
*.DS_Store
native-io/lakesoul-io-c/lakesoul_c_bindings.h
python/metadata/generated
5 changes: 3 additions & 2 deletions native-metadata/lakesoul-metadata-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ pub extern "C" fn execute_query(
|(idx, byte)|
unsafe{std::ptr::write::<c_uchar>(addr.wrapping_add(idx), *byte)})
.collect::<Vec<_>>();
let len = u8_vec.len() as i32;
callback( len, CString::new("").unwrap().into_raw());
let len = u8_vec.len();
unsafe{std::ptr::write::<c_uchar>(addr.wrapping_add(len), 0u8)}
callback( len as i32, CString::new("").unwrap().into_raw());
}
Err(e) => {
callback(0, CString::new(e.to_string().as_str()).unwrap().into_raw());
Expand Down
1 change: 1 addition & 0 deletions python/compile_pb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
protoc native-metadata/proto/src/entity.proto --python_out python/metadata --proto_path generated=./native-metadata/proto/src/
3 changes: 3 additions & 0 deletions python/metadata/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0
25 changes: 25 additions & 0 deletions python/metadata/dao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

from lib.const import DaoType
from native_client import *


def select_table_info_by_table_name(table_name, namespace="default"):
wrapper = query(DaoType.SelectTableInfoByTableNameAndNameSpace, [table_name, namespace])
return wrapper.table_info[0]


def get_partition_desc_by_table_id(table_id):
wrapper = query(DaoType.ListPartitionByTableId, [table_id])
return wrapper.partition_info


def list_data_commit_info_by_table_id_and_partition_desc_and_commit_list(table_id, partition_desc, commit_id_list):
joined_commit_id = ""
for commit_id in commit_id_list:
joined_commit_id += "{:016x}{:016x}".format(commit_id.high, commit_id.low)
wrapper = query(DaoType.ListDataCommitInfoByTableIdAndPartitionDescAndCommitList,
[table_id, partition_desc, joined_commit_id])
return wrapper.data_commit_info
61 changes: 61 additions & 0 deletions python/metadata/db_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

from dao import *
from utils import to_arrow_schema


class DBManager:
def __init__(self):
pass

def get_table_info_by_name(self, table_name, namespace):
return select_table_info_by_table_name(table_name, namespace)

def get_all_partition_info(self, table_id):
return get_partition_desc_by_table_id(table_id)

def get_table_single_partition_data_info(self, partition_info):
return list_data_commit_info_by_table_id_and_partition_desc_and_commit_list(partition_info.table_id,
partition_info.partition_desc,
partition_info.snapshot)

def get_table_info_by_id(self, table_id):
pass

def get_data_file_info_list(self, table_info):
pass

def split_data_info_list_to_range_and_hash_partition(self, table_id, data_file_info_list):
pass

def get_data_files_by_table_name(self, table_name, partitions={}, namespace="default"):
part_filter = []
for part_key, part_value in partitions.items():
part_filter.append("{}={}".format(part_key, part_value))
table_info = self.get_table_info_by_name(table_name, namespace)
partition_list = self.get_all_partition_info(table_info.table_id)
data_files = []
for partition in partition_list:
partition_desc = partition.partition_desc
filtered = False
for part in part_filter:
if part not in partition_desc:
filtered = True
break
if filtered:
continue
data_commit_info_list = self.get_table_single_partition_data_info(partition)
for data_commit_info in data_commit_info_list:
for file_op in data_commit_info.file_ops:
data_files.append(file_op.path)
return data_files

def get_spark_schema_by_table_name(self, table_name, namespace="default"):
table_info = self.get_table_info_by_name(table_name, namespace)
return table_info.table_schema

def get_arrow_schema_by_table_name(self, table_name, namespace="default"):
schema = self.get_spark_schema_by_table_name(table_name, namespace)
return to_arrow_schema(schema)
5 changes: 5 additions & 0 deletions python/metadata/lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

from .lakesoul_metadata_c import reload_lib
89 changes: 89 additions & 0 deletions python/metadata/lib/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

PARAM_DELIM = "__DELIM__"

DAO_TYPE_QUERY_ONE_OFFSET: int = 0
DAO_TYPE_QUERY_LIST_OFFSET = 100
DAO_TYPE_INSERT_ONE_OFFSET = 200
DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET = 300
DAO_TYPE_QUERY_SCALAR_OFFSET = 400
DAO_TYPE_UPDATE_OFFSET = 500


class DaoType:
SelectNamespaceByNamespace = DAO_TYPE_QUERY_ONE_OFFSET,
SelectTablePathIdByTablePath = DAO_TYPE_QUERY_ONE_OFFSET + 1,
SelectTableInfoByTableId = DAO_TYPE_QUERY_ONE_OFFSET + 2,
SelectTableNameIdByTableName = DAO_TYPE_QUERY_ONE_OFFSET + 3,
SelectTableInfoByTableNameAndNameSpace = DAO_TYPE_QUERY_ONE_OFFSET + 4,
SelectTableInfoByTablePath = DAO_TYPE_QUERY_ONE_OFFSET + 5,
SelectTableInfoByIdAndTablePath = DAO_TYPE_QUERY_ONE_OFFSET + 6,

SelectOnePartitionVersionByTableIdAndDesc = DAO_TYPE_QUERY_ONE_OFFSET + 7,
SelectPartitionVersionByTableIdAndDescAndVersion = DAO_TYPE_QUERY_ONE_OFFSET + 8,

SelectOneDataCommitInfoByTableIdAndPartitionDescAndCommitId = DAO_TYPE_QUERY_ONE_OFFSET + 9,

# // ==== Query List ====

ListNamespaces = DAO_TYPE_QUERY_LIST_OFFSET,
ListTableNameByNamespace = DAO_TYPE_QUERY_LIST_OFFSET + 1,
ListAllTablePath = DAO_TYPE_QUERY_LIST_OFFSET + 2,
ListAllPathTablePathByNamespace = DAO_TYPE_QUERY_LIST_OFFSET + 3,

# // Query Partition List
ListPartitionByTableId = DAO_TYPE_QUERY_LIST_OFFSET + 4,
ListPartitionDescByTableIdAndParList = DAO_TYPE_QUERY_LIST_OFFSET + 5,
ListPartitionByTableIdAndDesc = DAO_TYPE_QUERY_LIST_OFFSET + 6,
ListPartitionVersionByTableIdAndPartitionDescAndVersionRange = DAO_TYPE_QUERY_LIST_OFFSET + 7,
ListPartitionVersionByTableIdAndPartitionDescAndTimestampRange = DAO_TYPE_QUERY_LIST_OFFSET + 8,
ListCommitOpsBetweenVersions = DAO_TYPE_QUERY_LIST_OFFSET + 9,

# // Query DataCommitInfo List
ListDataCommitInfoByTableIdAndPartitionDescAndCommitList = DAO_TYPE_QUERY_LIST_OFFSET + 10,

# // ==== Insert One ====
InsertNamespace = DAO_TYPE_INSERT_ONE_OFFSET,
InsertTablePathId = DAO_TYPE_INSERT_ONE_OFFSET + 1,
InsertTableNameId = DAO_TYPE_INSERT_ONE_OFFSET + 2,
InsertTableInfo = DAO_TYPE_INSERT_ONE_OFFSET + 3,
InsertPartitionInfo = DAO_TYPE_INSERT_ONE_OFFSET + 4,
InsertDataCommitInfo = DAO_TYPE_INSERT_ONE_OFFSET + 5,

# // ==== Transaction Insert List ====
TransactionInsertPartitionInfo = DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET,
TransactionInsertDataCommitInfo = DAO_TYPE_TRANSACTION_INSERT_LIST_OFFSET + 1,

# // ==== Query SCALAR ====
GetLatestTimestampFromPartitionInfo = DAO_TYPE_QUERY_SCALAR_OFFSET,
GetLatestTimestampFromPartitionInfoWithoutPartitionDesc = DAO_TYPE_QUERY_SCALAR_OFFSET + 1,
GetLatestVersionUpToTimeFromPartitionInfo = DAO_TYPE_QUERY_SCALAR_OFFSET + 2,
GetLatestVersionTimestampUpToTimeFromPartitionInfo = DAO_TYPE_QUERY_SCALAR_OFFSET + 3,

# // ==== Update ====
# // Update Namespace
DeleteNamespaceByNamespace = DAO_TYPE_UPDATE_OFFSET,
UpdateNamespacePropertiesByNamespace = DAO_TYPE_UPDATE_OFFSET + 1,

# // Update TableInfo
DeleteTableInfoByIdAndPath = DAO_TYPE_UPDATE_OFFSET + 2,
UpdateTableInfoPropertiesById = DAO_TYPE_UPDATE_OFFSET + 3,
UpdateTableInfoById = DAO_TYPE_UPDATE_OFFSET + 4,

# // Update TablePathId
DeleteTablePathIdByTablePath = DAO_TYPE_UPDATE_OFFSET + 5,
DeleteTablePathIdByTableId = DAO_TYPE_UPDATE_OFFSET + 6,
# // Update TableNameId
DeleteTableNameIdByTableNameAndNamespace = DAO_TYPE_UPDATE_OFFSET + 7,
DeleteTableNameIdByTableId = DAO_TYPE_UPDATE_OFFSET + 8,
# // Update PartitionInfo
DeletePartitionInfoByTableIdAndPartitionDesc = DAO_TYPE_UPDATE_OFFSET + 9,
DeletePartitionInfoByTableId = DAO_TYPE_UPDATE_OFFSET + 10,
DeletePreviousVersionPartition = DAO_TYPE_UPDATE_OFFSET + 11,
# // Update DataCommitInfo
DeleteOneDataCommitInfoByTableIdAndPartitionDescAndCommitId = DAO_TYPE_UPDATE_OFFSET + 12,
DeleteDataCommitInfoByTableIdAndPartitionDescAndCommitIdList = DAO_TYPE_UPDATE_OFFSET + 13,
DeleteDataCommitInfoByTableIdAndPartitionDesc = DAO_TYPE_UPDATE_OFFSET + 14,
DeleteDataCommitInfoByTableId = DAO_TYPE_UPDATE_OFFSET + 15,
65 changes: 65 additions & 0 deletions python/metadata/lib/lakesoul_metadata_c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

from ctypes import *


class NonNull(Structure):
pass


I32Callback = CFUNCTYPE(c_int, c_char_p)


def reload_lib(path):
global lib, execute_query, create_tokio_runtime, free_tokio_runtime, create_tokio_postgres_client, free_tokio_postgres_client, create_prepared_statement, free_prepared_statement
lib = CDLL(path)
# pub extern "C" fn execute_query(
# callback: extern "C" fn(i32, *const c_char),
# runtime: NonNull<Result<TokioRuntime>>,
# client: NonNull<Result<TokioPostgresClient>>,
# prepared: NonNull<Result<PreparedStatement>>,
# query_type: i32,
# joined_string: *const c_char,
# addr: c_ptrdiff_t,
# )
execute_query = lib.execute_query
execute_query.restype = c_void_p
execute_query.argtypes = [CFUNCTYPE(c_void_p, c_int, c_char_p), POINTER(NonNull), POINTER(NonNull),
POINTER(NonNull),
c_int, c_char_p, c_char_p]

# pub extern "C" fn create_tokio_runtime() -> NonNull<Result<TokioRuntime>>
create_tokio_runtime = lib.create_tokio_runtime
create_tokio_runtime.restype = POINTER(NonNull)
create_tokio_runtime.argtypes = []

# pub extern "C" fn free_tokio_runtime(runtime: NonNull<Result<TokioRuntime>>)
free_tokio_runtime = lib.free_tokio_runtime
free_tokio_runtime.restype = c_void_p
free_tokio_runtime.argtypes = [POINTER(NonNull)]

# pub extern "C" fn create_tokio_postgres_client(
# callback: extern "C" fn(bool, *const c_char),
# config: *const c_char,
# runtime: NonNull<Result<TokioRuntime>>,
# ) -> NonNull<Result<TokioPostgresClient>>
create_tokio_postgres_client = lib.create_tokio_postgres_client
create_tokio_postgres_client.restype = POINTER(NonNull)
create_tokio_postgres_client.argtypes = [CFUNCTYPE(c_void_p, c_bool, c_char_p), c_char_p, POINTER(NonNull)]

# pub extern "C" fn free_tokio_postgres_client(client: NonNull<Result<TokioPostgresClient>>)
free_tokio_postgres_client = lib.free_tokio_postgres_client
free_tokio_postgres_client.restype = c_void_p
free_tokio_postgres_client.argtypes = [POINTER(NonNull)]

# pub extern "C" fn create_prepared_statement() -> NonNull<Result<PreparedStatement>>
create_prepared_statement = lib.create_prepared_statement
create_prepared_statement.restype = POINTER(NonNull)
create_prepared_statement.argtypes = []

# pub extern "C" fn free_prepared_statement(prepared: NonNull<Result<PreparedStatement>>)
free_prepared_statement = lib.free_prepared_statement
free_prepared_statement.restype = c_void_p
free_prepared_statement.argtypes = [POINTER(NonNull)]
99 changes: 99 additions & 0 deletions python/metadata/native_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0

import threading
import concurrent.futures
import importlib
from ctypes import *

from lib.const import PARAM_DELIM, DAO_TYPE_QUERY_LIST_OFFSET
import lib
import generated.entity_pb2 as entity_pb2

global config
config = "host={} port={} dbname={} user={} password={}".format("localhost", "5433", "test_lakesoul_meta",
"yugabyte", "yugabyte")


def reset_pg_conf(conf):
global config
config = " ".join(conf)


class NativeMetadataClient:
def __init__(self):
self._lock = threading.Lock()
importlib.reload(lib)
self._buffer = create_string_buffer(4096)
self._large_buffer = create_string_buffer(65536)
self._runtime = lib.lakesoul_metadata_c.create_tokio_runtime()

def callback(bool, msg):
print("create connection callback: status={} msg={}".format(bool, msg.decode("utf-8")))
if not bool:
exit(1)

def target():
global config
return lib.lakesoul_metadata_c.create_tokio_postgres_client(CFUNCTYPE(c_void_p, c_bool, c_char_p)(callback),
config.encode("utf-8"),
self._runtime)

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(target)
self._client = future.result(5)

self._prepared = lib.lakesoul_metadata_c.create_prepared_statement()

def __del__(self):
lib.lakesoul_metadata_c.free_tokio_runtime(self._runtime)
lib.lakesoul_metadata_c.free_tokio_postgres_client(self._client)
lib.lakesoul_metadata_c.free_prepared_statement(self._prepared)

def execute_query(self, query_type, params):
joined_params = PARAM_DELIM.join(params).encode("utf-8")
buffer = self._buffer
if query_type >= DAO_TYPE_QUERY_LIST_OFFSET:
buffer = self._large_buffer
buffer.value = b''

def callback(len, msg):
print("execute_query query_type={} callback: len={} msg={}".format(query_type, len, msg.decode("utf-8")))

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(lib.lakesoul_metadata_c.execute_query,
CFUNCTYPE(c_void_p, c_int, c_char_p)(callback), self._runtime, self._client,
self._prepared, query_type, joined_params, buffer)
future.result(2.0)

if len(buffer.value) == 0:
return None
else:
wrapper = entity_pb2.JniWrapper()
wrapper.ParseFromString(buffer.value)
return wrapper

def get_lock(self):
return self._lock


global INSTANCE

INSTANCE = None


def get_instance():
global INSTANCE
if INSTANCE is None:
INSTANCE = NativeMetadataClient()
return INSTANCE
else:
return INSTANCE


def query(query_type, params):
instance = get_instance()
lock = instance.get_lock()
with lock:
return instance.execute_query(query_type[0], params)
Loading