Skip to content

Commit

Permalink
Scouting: closing channel when scout is closed. (#295)
Browse files Browse the repository at this point in the history
  • Loading branch information
DariusIMP authored Nov 19, 2024
1 parent ee7a866 commit c5d3350
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 6 deletions.
8 changes: 6 additions & 2 deletions zenoh-jni/src/scouting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
use std::{ptr::null, sync::Arc};

use jni::{
objects::{JClass, JList, JObject, JValue},
objects::{GlobalRef, JClass, JList, JObject, JValue},
sys::jint,
JNIEnv,
};
use zenoh::{config::WhatAmIMatcher, Wait};
use zenoh::{scouting::Scout, Config};

use crate::utils::{get_callback_global_ref, get_java_vm};
use crate::utils::{get_callback_global_ref, get_java_vm, load_on_close};
use crate::{errors::ZResult, throw_exception, zerror};

/// Start a scout.
Expand All @@ -42,11 +42,14 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIScout_00024Companion_scoutViaJNI(
_class: JClass,
whatAmI: jint,
callback: JObject,
on_close: JObject,
config_ptr: /*nullable=*/ *const Config,
) -> *const Scout<()> {
|| -> ZResult<*const Scout<()>> {
let callback_global_ref = get_callback_global_ref(&mut env, callback)?;
let java_vm = Arc::new(get_java_vm(&mut env)?);
let on_close_global_ref: GlobalRef = get_callback_global_ref(&mut env, on_close)?;
let on_close = load_on_close(&java_vm, on_close_global_ref);
let whatAmIMatcher: WhatAmIMatcher = (whatAmI as u8).try_into().unwrap(); // The validity of the operation is guaranteed on the kotlin layer.
let config = if config_ptr.is_null() {
Config::default()
Expand All @@ -58,6 +61,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIScout_00024Companion_scoutViaJNI(
};
zenoh::scout(whatAmIMatcher, config)
.callback(move |hello| {
on_close.noop(); // Moves `on_close` inside the closure so it gets destroyed with the closure
tracing::debug!("Received hello: {hello}");
let _ = || -> jni::errors::Result<()> {
let mut env = java_vm.attach_current_thread_as_daemon()?;
Expand Down
4 changes: 3 additions & 1 deletion zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object Zenoh {
config: Config? = null
): Result<Scout<Unit>> {
ZenohLoad
return JNIScout.scout(whatAmI = whatAmI, callback = callback, receiver = Unit, config = config)
return JNIScout.scout(whatAmI = whatAmI, callback = callback, receiver = Unit, onClose = {}, config = config)
}

/**
Expand All @@ -78,6 +78,7 @@ object Zenoh {
whatAmI = whatAmI,
callback = { hello -> handler.handle(hello) },
receiver = handler.receiver(),
onClose = handler::onClose,
config = config
)
}
Expand All @@ -104,6 +105,7 @@ object Zenoh {
whatAmI = whatAmI,
callback = { hello -> handler.handle(hello) },
receiver = handler.receiver(),
onClose = handler::onClose,
config = config
)
}
Expand Down
5 changes: 4 additions & 1 deletion zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIScout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.zenoh.config.ZenohId
import io.zenoh.scouting.Hello
import io.zenoh.scouting.Scout
import io.zenoh.config.WhatAmI
import io.zenoh.jni.callbacks.JNIOnCloseCallback

/**
* Adapter class to handle the interactions with Zenoh through JNI for a [io.zenoh.scouting.Scout]
Expand All @@ -34,21 +35,23 @@ internal class JNIScout(private val ptr: Long) {
fun <R> scout(
whatAmI: Set<WhatAmI>,
callback: Callback<Hello>,
onClose: () -> Unit,
config: Config?,
receiver: R
): Result<Scout<R>> = runCatching {
val scoutCallback = JNIScoutCallback { whatAmI2: Int, id: ByteArray, locators: List<String> ->
callback.run(Hello(WhatAmI.fromInt(whatAmI2), ZenohId(id), locators))
}
val binaryWhatAmI: Int = whatAmI.map { it.value }.reduce { acc, it -> acc or it }
val ptr = scoutViaJNI(binaryWhatAmI, scoutCallback, config?.jniConfig?.ptr ?: 0)
val ptr = scoutViaJNI(binaryWhatAmI, scoutCallback, onClose,config?.jniConfig?.ptr ?: 0)
Scout(receiver, JNIScout(ptr))
}

@Throws(ZError::class)
private external fun scoutViaJNI(
whatAmI: Int,
callback: JNIScoutCallback,
onCloseCallback: JNIOnCloseCallback,
configPtr: Long,
): Long

Expand Down
38 changes: 36 additions & 2 deletions zenoh-kotlin/src/commonTest/kotlin/io/zenoh/ScoutTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
package io.zenoh

import io.zenoh.scouting.Hello
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.test.Test
import kotlin.test.assertNotNull
import kotlin.test.assertTrue

class ScoutTest {

Expand All @@ -28,18 +33,47 @@ class ScoutTest {
}

@Test
fun `scouting detects session test`() {
fun `scouting with callback test`() {
val session = Session.open(Config.default()).getOrThrow()

var hello: Hello? = null
Zenoh.scout(callback = {
val scout = Zenoh.scout(callback = {
hello = it
}).getOrThrow()

Thread.sleep(1000)

assertNotNull(hello)
session.close()
scout.close()
}

@Test
@OptIn(DelicateCoroutinesApi::class)
fun `scouting with channel test`() {
val session = Session.open(Config.default()).getOrThrow()

var hello: Hello? = null
val scout = Zenoh.scout(Channel()).getOrThrow()

Thread.sleep(1000)

runBlocking {
launch {
delay(1000)
scout.close()
}

// Start receiving messages
for (receivedHello in scout.receiver) {
hello = receivedHello
}
}

assertNotNull(hello)
session.close()

assertTrue { scout.receiver.isClosedForReceive }
}

@Test
Expand Down

0 comments on commit c5d3350

Please sign in to comment.