diff --git a/zenoh-jni/src/scouting.rs b/zenoh-jni/src/scouting.rs index e334e954..b0a665c1 100644 --- a/zenoh-jni/src/scouting.rs +++ b/zenoh-jni/src/scouting.rs @@ -15,14 +15,14 @@ use std::{ptr::null, sync::Arc}; use jni::{ - objects::{JClass, JList, JObject, JValue}, + objects::{GlobalRef, JClass, JList, JObject, JValue}, sys::jint, JNIEnv, }; use zenoh::{config::WhatAmIMatcher, Wait}; use zenoh::{scouting::Scout, Config}; -use crate::utils::{get_callback_global_ref, get_java_vm}; +use crate::utils::{get_callback_global_ref, get_java_vm, load_on_close}; use crate::{errors::ZResult, throw_exception, zerror}; /// Start a scout. @@ -42,11 +42,14 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIScout_00024Companion_scoutViaJNI( _class: JClass, whatAmI: jint, callback: JObject, + on_close: JObject, config_ptr: /*nullable=*/ *const Config, ) -> *const Scout<()> { || -> ZResult<*const Scout<()>> { let callback_global_ref = get_callback_global_ref(&mut env, callback)?; let java_vm = Arc::new(get_java_vm(&mut env)?); + let on_close_global_ref: GlobalRef = get_callback_global_ref(&mut env, on_close)?; + let on_close = load_on_close(&java_vm, on_close_global_ref); let whatAmIMatcher: WhatAmIMatcher = (whatAmI as u8).try_into().unwrap(); // The validity of the operation is guaranteed on the kotlin layer. let config = if config_ptr.is_null() { Config::default() @@ -58,6 +61,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIScout_00024Companion_scoutViaJNI( }; zenoh::scout(whatAmIMatcher, config) .callback(move |hello| { + on_close.noop(); // Moves `on_close` inside the closure so it gets destroyed with the closure tracing::debug!("Received hello: {hello}"); let _ = || -> jni::errors::Result<()> { let mut env = java_vm.attach_current_thread_as_daemon()?; diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt index 370596ba..12049903 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt @@ -54,7 +54,7 @@ object Zenoh { config: Config? = null ): Result> { ZenohLoad - return JNIScout.scout(whatAmI = whatAmI, callback = callback, receiver = Unit, config = config) + return JNIScout.scout(whatAmI = whatAmI, callback = callback, receiver = Unit, onClose = {}, config = config) } /** @@ -78,6 +78,7 @@ object Zenoh { whatAmI = whatAmI, callback = { hello -> handler.handle(hello) }, receiver = handler.receiver(), + onClose = handler::onClose, config = config ) } @@ -104,6 +105,7 @@ object Zenoh { whatAmI = whatAmI, callback = { hello -> handler.handle(hello) }, receiver = handler.receiver(), + onClose = handler::onClose, config = config ) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIScout.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIScout.kt index 6e2019cd..cdc3f5e2 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIScout.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIScout.kt @@ -22,6 +22,7 @@ import io.zenoh.config.ZenohId import io.zenoh.scouting.Hello import io.zenoh.scouting.Scout import io.zenoh.config.WhatAmI +import io.zenoh.jni.callbacks.JNIOnCloseCallback /** * Adapter class to handle the interactions with Zenoh through JNI for a [io.zenoh.scouting.Scout] @@ -34,6 +35,7 @@ internal class JNIScout(private val ptr: Long) { fun scout( whatAmI: Set, callback: Callback, + onClose: () -> Unit, config: Config?, receiver: R ): Result> = runCatching { @@ -41,7 +43,7 @@ internal class JNIScout(private val ptr: Long) { callback.run(Hello(WhatAmI.fromInt(whatAmI2), ZenohId(id), locators)) } val binaryWhatAmI: Int = whatAmI.map { it.value }.reduce { acc, it -> acc or it } - val ptr = scoutViaJNI(binaryWhatAmI, scoutCallback, config?.jniConfig?.ptr ?: 0) + val ptr = scoutViaJNI(binaryWhatAmI, scoutCallback, onClose,config?.jniConfig?.ptr ?: 0) Scout(receiver, JNIScout(ptr)) } @@ -49,6 +51,7 @@ internal class JNIScout(private val ptr: Long) { private external fun scoutViaJNI( whatAmI: Int, callback: JNIScoutCallback, + onCloseCallback: JNIOnCloseCallback, configPtr: Long, ): Long diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/ScoutTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/ScoutTest.kt index 80754efd..26c41c1d 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/ScoutTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/ScoutTest.kt @@ -15,9 +15,14 @@ package io.zenoh import io.zenoh.scouting.Hello +import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import kotlin.test.Test import kotlin.test.assertNotNull +import kotlin.test.assertTrue class ScoutTest { @@ -28,11 +33,11 @@ class ScoutTest { } @Test - fun `scouting detects session test`() { + fun `scouting with callback test`() { val session = Session.open(Config.default()).getOrThrow() var hello: Hello? = null - Zenoh.scout(callback = { + val scout = Zenoh.scout(callback = { hello = it }).getOrThrow() @@ -40,6 +45,35 @@ class ScoutTest { assertNotNull(hello) session.close() + scout.close() + } + + @Test + @OptIn(DelicateCoroutinesApi::class) + fun `scouting with channel test`() { + val session = Session.open(Config.default()).getOrThrow() + + var hello: Hello? = null + val scout = Zenoh.scout(Channel()).getOrThrow() + + Thread.sleep(1000) + + runBlocking { + launch { + delay(1000) + scout.close() + } + + // Start receiving messages + for (receivedHello in scout.receiver) { + hello = receivedHello + } + } + + assertNotNull(hello) + session.close() + + assertTrue { scout.receiver.isClosedForReceive } } @Test