Skip to content

Commit

Permalink
Adding onClose behaviour for Get, Queryable and Subscribers. (#9)
Browse files Browse the repository at this point in the history
* Restructuring packages aiming for Android compatibility, using Kotlin multiplatform plugin

* Edit cargo task in build.gradle.kts

* Reenabling examples.

* Making jvmTest commonTest

* [WIP] Target building Zenoh JNI

* Editing workflows after multiplatforming project.

* Editing README and build.gradle.kts

* Publish to JVM.

* Publish to JVM.

Also:
 - fix Examples build
 - tidy up build.gradle.kts files
 - triggering compile zenoh-jni if needed when running gradle tasks.

* Fix CI + edit README

* Fix CI + edit README

* OnFinish callback feature.

 The possibility of adding an onFinish callback for Get requests, Subscribers and Queryables is added to their respective builders.
 Also the Handler interface is expanded with an "onFinish" callback (in the case of the default handler, which is ChannelHandler, the implementation consists of closing the channel upon finishing).

* Fix doc comment.

* Fix formatting of Zenoh-JNI

* Renaming 'onFinish' with 'onClose'.

* Fix cargo fmt

* Fix CI workflow
  • Loading branch information
DariusIMP authored Oct 16, 2023
1 parent 1b8af00 commit 3e0d300
Show file tree
Hide file tree
Showing 23 changed files with 297 additions and 73 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,5 @@ jobs:
working-directory: zenoh-jni
run: cargo fmt --all --check

- name: Cargo Build
working-directory: zenoh-jni
run: cargo build --verbose

- name: Gradle Test
run: gradle jvmTest --info
4 changes: 0 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ jobs:
working-directory: zenoh-jni
run: cargo fmt --all --check

- name: Cargo Build
working-directory: zenoh-jni
run: cargo build --verbose

- name: Gradle Test
run: gradle jvmTest --info

Expand Down
27 changes: 14 additions & 13 deletions examples/src/main/kotlin/io.zenoh/ZGet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package io.zenoh

import io.zenoh.query.Reply
import io.zenoh.selector.intoSelector
import kotlinx.coroutines.runBlocking
import java.time.Duration

fun main() {
Expand All @@ -24,23 +25,23 @@ fun main() {
session.use {
"demo/example/**".intoSelector().onSuccess { selector ->
selector.use {
session.get(selector)
.with { reply ->
if (reply is Reply.Success) {
println("Received ('${reply.sample.keyExpr}': '${reply.sample.value}')")
} else {
reply as Reply.Error
println("Received (ERROR: '${reply.error}')")
session.get(selector).timeout(timeout).res().onSuccess {
runBlocking {
val iterator = it!!.iterator()
while (iterator.hasNext()) {
val reply = iterator.next()
if (reply is Reply.Success) {
println("Received ('${reply.sample.keyExpr}': '${reply.sample.value}')")
} else {
reply as Reply.Error
println("Received (ERROR: '${reply.error}')")
}
}
}
.timeout(timeout)
.res()
.onSuccess {
// Keep the session alive for the duration of the timeout.
Thread.sleep(timeout.toMillis())
}
}
}
}
}
}
}

12 changes: 9 additions & 3 deletions zenoh-jni/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
errors::Error,
errors::Result,
query::on_query,
utils::{get_callback_global_ref, get_java_vm},
utils::{get_callback_global_ref, get_java_vm, load_on_close},
};

/// Frees the memory associated with a Zenoh queryable raw pointer via JNI.
Expand Down Expand Up @@ -61,6 +61,8 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQueryable_freePtrViaJNI(
/// - `key_expr_ptr`: Raw pointer to the [KeyExpr] to be used for the queryable.
/// - `session_ptr`: Raw pointer to the [Session] from which to declare the queryable..
/// - `callback`: The callback function as an instance of the `Callback` interface in Java/Kotlin.
/// - `on_close`: The `on_close` callback function as an instance of the `JNIOnCloseCallback` interface in
/// Java/Kotlin, to be called when Zenoh notfies that no more queries will be received.
/// - `complete`: The completeness of the queryable.
///
/// Returns:
Expand All @@ -75,11 +77,14 @@ pub(crate) unsafe fn declare_queryable(
key_expr_ptr: *const KeyExpr<'static>,
session_ptr: *const zenoh::Session,
callback: JObject,
on_close: JObject,
complete: jboolean,
) -> Result<Queryable<'static, ()>> {
let java_vm = get_java_vm(env)?;
let java_vm = Arc::new(get_java_vm(env)?);
let callback_global_ref = get_callback_global_ref(env, callback)?;
let on_close_global_ref = get_callback_global_ref(env, on_close)?;
let complete = complete != 0;
let on_close = load_on_close(&java_vm, on_close_global_ref);

let session: Arc<Session> = Arc::from_raw(session_ptr);
let key_expr = Arc::from_raw(key_expr_ptr);
Expand All @@ -88,6 +93,7 @@ pub(crate) unsafe fn declare_queryable(
let queryable = session
.declare_queryable(key_expr_clone)
.callback(move |query| {
on_close.noop(); // Does nothing, but moves `on_close` inside the closure so it gets destroyed with the closure
let env = match java_vm.attach_current_thread_as_daemon() {
Ok(env) => env,
Err(err) => {
Expand All @@ -98,7 +104,7 @@ pub(crate) unsafe fn declare_queryable(

log::debug!("Receiving query through JNI: {}", query.to_string());
match on_query(env, query, &callback_global_ref) {
Ok(_) => log::info!("Queryable callback called successfully."),
Ok(_) => log::debug!("Queryable callback called successfully."),
Err(err) => log::error!("Error calling queryable callback: {}", err),
}
})
Expand Down
36 changes: 30 additions & 6 deletions zenoh-jni/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::query::{decode_consolidation, decode_query_target};
use crate::queryable::declare_queryable;
use crate::reply::on_reply;
use crate::subscriber::declare_subscriber;
use crate::utils::{decode_string, get_callback_global_ref, get_java_vm};
use crate::utils::{decode_string, get_callback_global_ref, get_java_vm, load_on_close};
use crate::value::decode_value;

use jni::objects::{JByteArray, JClass, JObject, JString};
Expand Down Expand Up @@ -255,6 +255,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_putViaJNI(
/// - `key_expr`: The key expression for the subscriber.
/// - `ptr`: The raw pointer to the Zenoh session.
/// - `callback`: The callback function as an instance of the `JNISubscriberCallback` interface in Java/Kotlin.
/// - `on_close`: A Java/Kotlin `JNIOnCloseCallback` function interface to be called upon closing the subscriber.
/// - `reliability`: The [Reliability] value as an ordinal.
///
/// Returns:
Expand All @@ -277,9 +278,10 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareSubscriberViaJNI(
key_expr_ptr: *const KeyExpr<'static>,
ptr: *const zenoh::Session,
callback: JObject,
on_close: JObject,
reliability: jint,
) -> *const zenoh::subscriber::Subscriber<'static, ()> {
match declare_subscriber(&mut env, key_expr_ptr, ptr, callback, reliability) {
match declare_subscriber(&mut env, key_expr_ptr, ptr, callback, on_close, reliability) {
Ok(subscriber_ptr) => subscriber_ptr,
Err(err) => {
_ = err.throw_on_jvm(&mut env).map_err(|err| {
Expand All @@ -303,6 +305,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareSubscriberViaJNI(
/// - `key_expr_ptr`: A raw pointer to the [KeyExpr] to be used for the queryable.
/// - `session_ptr`: A raw pointer to the Zenoh [Session] to be used to declare the queryable.
/// - `callback`: The callback function as an instance of the `JNIQueryableCallback` interface in Java/Kotlin.
/// - `on_close`: A Java/Kotlin `JNIOnCloseCallback` function interface to be called upon closing the queryable.
/// - `complete`: The completeness of the queryable.
///
/// Returns:
Expand All @@ -325,9 +328,17 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareQueryableViaJNI(
key_expr_ptr: *const KeyExpr<'static>,
session_ptr: *const zenoh::Session,
callback: JObject,
on_close: JObject,
complete: jboolean,
) -> *const zenoh::queryable::Queryable<'static, ()> {
match declare_queryable(&mut env, key_expr_ptr, session_ptr, callback, complete) {
match declare_queryable(
&mut env,
key_expr_ptr,
session_ptr,
callback,
on_close,
complete,
) {
Ok(queryable) => Arc::into_raw(Arc::new(queryable)),
Err(err) => {
_ = err.throw_on_jvm(&mut env).map_err(|err| {
Expand Down Expand Up @@ -439,6 +450,8 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_undeclareKeyExprViaJNI(
/// - `selector_params`: Parameters of the selector.
/// - `session_ptr`: A raw pointer to the Zenoh session.
/// - `callback`: An instance of the Java/Kotlin `JNIGetCallback` function interface to be called upon receiving a reply.
/// - `on_close`: A Java/Kotlin `JNIOnCloseCallback` function interface to be called when the get operation won't receive
/// any more replies.
/// - `timeout_ms`: The timeout in milliseconds.
/// - `target`: The [QueryTarget] as the ordinal of the enum.
/// - `consolidation`: The [ConsolidationMode] as the ordinal of the enum.
Expand All @@ -462,6 +475,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
selector_params: JString,
session_ptr: *const zenoh::Session,
callback: JObject,
on_close: JObject,
timeout_ms: jlong,
target: jint,
consolidation: jint,
Expand All @@ -474,6 +488,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
selector_params,
&session,
callback,
on_close,
timeout_ms,
target,
consolidation,
Expand Down Expand Up @@ -504,6 +519,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
/// - `selector_params`: Parameters of the selector.
/// - `session_ptr`: A raw pointer to the Zenoh [Session].
/// - `callback`: A Java/Kotlin callback to be called upon receiving a reply.
/// - `on_close`: A Java/Kotlin `JNIOnCloseCallback` function interface to be called when no more replies will be received.
/// - `timeout_ms`: The timeout in milliseconds.
/// - `target`: The [QueryTarget] as the ordinal of the enum.
/// - `consolidation`: The [ConsolidationMode] as the ordinal of the enum.
Expand All @@ -529,6 +545,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getWithValueViaJNI(
selector_params: JString,
session_ptr: *const zenoh::Session,
callback: JObject,
on_close: JObject,
timeout_ms: jlong,
target: jint,
consolidation: jint,
Expand All @@ -543,6 +560,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getWithValueViaJNI(
selector_params,
&session,
callback,
on_close,
timeout_ms,
target,
consolidation,
Expand All @@ -568,7 +586,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getWithValueViaJNI(
/// - `env`: A mutable reference to the JNI environment.
/// - `key_expr`: The key expression for the `get` operation.
/// - `session`: An `Arc<Session>` representing the Zenoh session.
/// - `callback`: A Java/Kotlin `JNIGetCallback` interface callback to be called upon receiving a reply.
/// - `callback`: A Java/Kotlin `JNIGetCallback` function interface to be called upon receiving a reply.
/// - `on_close`: A Java/Kotlin `JNIOnCloseCallback` function interface to be called when Zenoh notifies
/// that no more replies will be received.
/// - `timeout_ms`: The timeout in milliseconds.
/// - `target`: The [QueryTarget] as the ordinal of the enum.
/// - `consolidation`: The [ConsolidationMode] as the ordinal of the enum.
Expand All @@ -585,28 +605,32 @@ fn on_get_query(
selector_params: JString,
session: &Arc<Session>,
callback: JObject,
on_close: JObject,
timeout_ms: jlong,
target: jint,
consolidation: jint,
value_params: Option<(JByteArray, jint)>,
) -> Result<()> {
let java_vm = get_java_vm(env)?;
let java_vm = Arc::new(get_java_vm(env)?);
let callback_global_ref = get_callback_global_ref(env, callback)?;
let on_close_global_ref = get_callback_global_ref(env, on_close)?;
let query_target = decode_query_target(target)?;
let consolidation = decode_consolidation(consolidation)?;
let selector_params = decode_string(env, selector_params)?;
let timeout = Duration::from_millis(timeout_ms as u64);
let on_close = load_on_close(&java_vm, on_close_global_ref);

let key_expr_clone = key_expr.deref().clone();
let selector = Selector::from(key_expr_clone).with_parameters(&selector_params);
let mut get_builder = session
.get(selector)
.callback(move |reply| {
on_close.noop(); // Does nothing, but moves `on_close` inside the closure so it gets destroyed with the closure
log::debug!("Receiving reply through JNI: {:?}", reply);
let env = match java_vm.attach_current_thread_as_daemon() {
Ok(env) => env,
Err(err) => {
log::error!("Unable to attach thread for queryable callback: {}", err);
log::error!("Unable to attach thread for GET query callback: {}", err);
return;
}
};
Expand Down
9 changes: 7 additions & 2 deletions zenoh-jni/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use zenoh::prelude::r#sync::*;
use zenoh::subscriber::Subscriber;

use crate::errors::{Error, Result};
use crate::utils::{get_callback_global_ref, get_java_vm};
use crate::utils::{get_callback_global_ref, get_java_vm, load_on_close};

/// Frees the memory associated with a Zenoh subscriber raw pointer via JNI.
///
Expand Down Expand Up @@ -57,6 +57,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNISubscriber_freePtrViaJNI(
/// - `key_expr_ptr`: Raw pointer to the key expression to be used for the subscriber.
/// - `session_ptr`: Raw pointer to the session to be used for the declaration..
/// - `callback`: The callback function as an instance of the `Callback` interface in Java/Kotlin.
/// - `onClose`: A Java/Kotlin `JNIOnCloseCallback` function interface to be called when the subscriber is undeclared.
/// - `reliability`: The [Reliability] configuration as an ordinal.
///
/// Returns:
Expand All @@ -71,11 +72,14 @@ pub(crate) unsafe fn declare_subscriber(
key_expr_ptr: *const KeyExpr<'static>,
session_ptr: *const zenoh::Session,
callback: JObject,
on_close: JObject,
reliability: jint,
) -> Result<*const Subscriber<'static, ()>> {
let java_vm = get_java_vm(env)?;
let java_vm = Arc::new(get_java_vm(env)?);
let callback_global_ref = get_callback_global_ref(env, callback)?;
let on_close_global_ref = get_callback_global_ref(env, on_close)?;
let reliability = decode_reliability(reliability)?;
let on_close = load_on_close(&java_vm, on_close_global_ref);

let session = Arc::from_raw(session_ptr);
let key_expr = Arc::from_raw(key_expr_ptr);
Expand All @@ -84,6 +88,7 @@ pub(crate) unsafe fn declare_subscriber(
let result = session
.declare_subscriber(key_expr_clone.to_owned())
.callback(move |sample| {
on_close.noop(); // Does nothing, but moves `on_close` inside the closure so it gets destroyed with the closure
let mut env = match java_vm.attach_current_thread_as_daemon() {
Ok(env) => env,
Err(err) => {
Expand Down
52 changes: 52 additions & 0 deletions zenoh-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::Arc;

use jni::{
objects::{JObject, JString},
JNIEnv, JavaVM,
Expand Down Expand Up @@ -46,3 +48,53 @@ pub(crate) fn get_callback_global_ref(
))
})
}

/// A type that calls a function when dropped
pub(crate) struct CallOnDrop<F: FnOnce()>(core::mem::MaybeUninit<F>);
impl<F: FnOnce()> CallOnDrop<F> {
/// Constructs a value that calls `f` when dropped.
pub fn new(f: F) -> Self {
Self(core::mem::MaybeUninit::new(f))
}
/// Does nothing, but tricks closures into moving the value inside,
/// so that the closure's destructor will call `drop(self)`.
pub fn noop(&self) {}
}
impl<F: FnOnce()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
// Take ownership of the closure that is always initialized,
// since the only constructor uses `MaybeUninit::new`
let f = unsafe { self.0.assume_init_read() };
// Call the now owned function
f();
}
}

pub(crate) fn load_on_close(
java_vm: &Arc<jni::JavaVM>,
on_close_global_ref: jni::objects::GlobalRef,
) -> CallOnDrop<impl FnOnce()> {
CallOnDrop::new({
let java_vm = java_vm.clone();
move || {
let mut env = match java_vm.attach_current_thread_as_daemon() {
Ok(env) => env,
Err(err) => {
log::error!("Unable to attach thread for 'onClose' callback: {}", err);
return;
}
};
match env.call_method(on_close_global_ref, "run", "()V", &[]) {
Ok(_) => (),
Err(err) => {
_ = env.exception_describe();
_ = Error::Jni(format!("Error while running 'onClose' callback: {}", err))
.throw_on_jvm(&mut env)
.map_err(|err| {
log::error!("Unable to throw exception upon 'onClose' failure: {}", err)
});
}
}
}
})
}
Loading

0 comments on commit 3e0d300

Please sign in to comment.