Skip to content

Commit

Permalink
write query result buffer by bytes::buf::writer
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Oct 11, 2023
1 parent d941d1f commit 8087ec9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public static void setDataBaseProperty(DataBaseProperty dataBaseProperty) {
}

public NativeMetadataJavaClient() {
this(5000L, 1 << 12, 1 << 16);
this(5000L, 1 << 12);
}

public NativeMetadataJavaClient(long timeout, int bufferSize, int largeBufferSize) {
public NativeMetadataJavaClient(long timeout, int bufferSize) {
this.timeout = timeout;
libLakeSoulMetaData = JnrLoader.get();
booleanCallbackObjectReferenceManager = Runtime.getRuntime(libLakeSoulMetaData).newObjectReferenceManager();
Expand Down Expand Up @@ -245,15 +245,15 @@ public JniWrapper executeQuery(Integer queryType, List<String> params) {
preparedStatement,
queryType,
String.join(PARAM_DELIM, params)
// queryType < DAO_TYPE_QUERY_LIST_OFFSET ? sharedBuffer.address() : largeSharedBuffer.address()
);
Integer len = queryFuture.get(timeout, TimeUnit.MILLISECONDS);
if (len < 0) return null;
Integer lenWithTail = len + 1;

Pointer buffer = fixedBuffer;
if (len > fixedBuffer.size()) {
if (len > mutableBuffer.size()) {
mutableBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(len);
if (lenWithTail > fixedBuffer.size()) {
if (lenWithTail > mutableBuffer.size()) {
mutableBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(lenWithTail);
}
buffer = mutableBuffer;
}
Expand Down Expand Up @@ -335,11 +335,13 @@ public Integer executeInsert(Integer insertType, JniWrapper jniWrapper) {
Pointer buffer = fixedBuffer;
if (bytes.length < fixedBuffer.size())
fixedBuffer.put(0, bytes, 0, bytes.length);
else if (bytes.length < mutableBuffer.size())
else if (bytes.length < mutableBuffer.size()) {
mutableBuffer.put(0, bytes, 0, bytes.length);
else {
buffer = mutableBuffer;
} else {
mutableBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(bytes.length);
mutableBuffer.put(0, bytes, 0, bytes.length);
buffer = mutableBuffer;
}

getLibLakeSoulMetaData().execute_insert(
Expand Down
26 changes: 12 additions & 14 deletions rust/lakesoul-metadata-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
extern crate core;

use core::ffi::c_ptrdiff_t;
use std::io::Write;
use std::ptr::NonNull;
use std::ffi::{c_char, c_uchar, CString, CStr};

use lakesoul_metadata::{Runtime, Builder, Client, PreparedStatementMap};
use prost::bytes::BufMut;
use proto::proto::entity;
use prost::Message;

Expand Down Expand Up @@ -185,7 +187,6 @@ pub extern "C" fn execute_query(
prepared: NonNull<Result<PreparedStatement>>,
query_type: i32,
joined_string: *const c_char,
// addr: c_ptrdiff_t,
) -> NonNull<Result<BytesResult>> {
let runtime = unsafe {NonNull::new_unchecked(runtime.as_ref().ptr as *mut Runtime).as_ref()};
let client = unsafe {NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_ref()};
Expand Down Expand Up @@ -218,25 +219,22 @@ pub extern "C" fn export_bytes_result(
len: i32,
addr: c_ptrdiff_t,
) {
let bytes = unsafe {NonNull::new_unchecked(bytes.as_ref().ptr as *mut Vec<u8>).as_ref()};
let len = len as usize;
let bytes = unsafe {NonNull::new_unchecked(bytes.as_ref().ptr as *mut Vec<c_uchar>).as_mut()};

if bytes.len() != len as usize {
if bytes.len() != len {
callback( false, CString::new("Size of buffer and result mismatch at export_bytes_result.").unwrap().into_raw());
return;
}
bytes.push(0u8);
bytes.shrink_to_fit();

let addr = addr as *mut c_uchar;
let _ = bytes
.iter()
.enumerate()
.map(
|(idx, byte)|
unsafe{std::ptr::write::<c_uchar>(addr.wrapping_add(idx), *byte)})
.collect::<Vec<_>>();
let dst = unsafe {
std::slice::from_raw_parts_mut(addr as *mut u8, len + 1)
};
let mut writer = dst.writer();
let _ = writer.write_all(bytes.as_slice());

unsafe{
std::ptr::write::<c_uchar>(addr.wrapping_add(len as usize), 0u8)
}
callback( true, CString::new("").unwrap().into_raw());
}

Expand Down

0 comments on commit 8087ec9

Please sign in to comment.