Skip to content

Commit

Permalink
Adding Querier (unstable) to Zenoh-Kotlin (#316)
Browse files Browse the repository at this point in the history
* feat(querier): Adding Querier (missing docs)

* feat(querier): Adding docs and fix undeclare querier on drop

* Cargo fmt

* Setting get consolidation mode default to AUTO

* feat(querier): Adding 'Unstable' annotation to the querier feature.
  • Loading branch information
DariusIMP authored Dec 9, 2024
1 parent 39dfea5 commit 7b1c940
Show file tree
Hide file tree
Showing 11 changed files with 785 additions and 3 deletions.
1 change: 1 addition & 0 deletions examples/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tasks {
"ZPub",
"ZPubThr",
"ZPut",
"ZQuerier",
"ZQueryable",
"ZScout",
"ZSub",
Expand Down
93 changes: 93 additions & 0 deletions examples/src/main/kotlin/io.zenoh/ZQuerier.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh

import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.long
import io.zenoh.annotations.Unstable
import io.zenoh.bytes.ZBytes
import io.zenoh.query.QueryTarget
import io.zenoh.query.intoSelector
import java.time.Duration

class ZQuerier(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Querier example"
) {

@OptIn(Unstable::class)
override fun run() {
val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode)

Zenoh.initLogFromEnvOr("error")

val session = Zenoh.open(config).getOrThrow()
val selector = selector.intoSelector().getOrThrow()

val target = target ?.let{ QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING
val timeout = Duration.ofMillis(timeout)
val querier = session.declareQuerier(selector.keyExpr, target, timeout = timeout).getOrThrow()

for (idx in 0..Int.MAX_VALUE) {
Thread.sleep(1000)
val payload = "[${idx.toString().padStart(4, ' ')}] ${payload ?: ""}"
println("Querying '$selector' with payload: '$payload'...")
querier.get(callback = {
it.result.onSuccess { sample ->
println(">> Received ('${sample.keyExpr}': '${sample.payload}')")
}.onFailure { error ->
println(">> Received (ERROR: '${error.message}')")
}
}, payload = ZBytes.from(payload), parameters = selector.parameters)
}
}

private val selector by option(
"-s",
"--selector",
help = "The selection of resources to query [default: demo/example/**]",
metavar = "selector"
).default("demo/example/**")
private val payload by option(
"-p", "--payload", help = "An optional payload to put in the query.", metavar = "payload"
)
private val target by option(
"-t",
"--target",
help = "The target queryables of the query. Default: BEST_MATCHING. " + "[possible values: BEST_MATCHING, ALL, ALL_COMPLETE]",
metavar = "target"
)
private val timeout by option(
"-o", "--timeout", help = "The query timeout in milliseconds [default: 10000]", metavar = "timeout"
).long().default(10000)
private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")
private val mode by option(
"-m",
"--mode",
help = "The session mode. Default: peer. Possible values: [peer, client, router]",
metavar = "mode"
).default("peer")
private val connect: List<String> by option(
"-e", "--connect", help = "Endpoints to connect to.", metavar = "connect"
).multiple()
private val listen: List<String> by option(
"-l", "--listen", help = "Endpoints to listen on.", metavar = "listen"
).multiple()
private val noMulticastScouting: Boolean by option(
"--no-multicast-scouting", help = "Disable the multicast-based scouting mechanism."
).flag(default = false)
}

fun main(args: Array<String>) = ZQuerier(args.isEmpty()).main(args)
1 change: 1 addition & 0 deletions zenoh-jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod key_expr;
mod liveliness;
mod logger;
mod publisher;
mod querier;
mod query;
mod queryable;
mod scouting;
Expand Down
137 changes: 137 additions & 0 deletions zenoh-jni/src/querier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::Arc;

use jni::{
objects::{JByteArray, JClass, JObject, JString},
sys::jint,
JNIEnv,
};
use zenoh::{key_expr::KeyExpr, query::Querier, Wait};

use crate::{
errors::ZResult,
key_expr::process_kotlin_key_expr,
session::{on_reply_error, on_reply_success},
throw_exception,
utils::{
decode_byte_array, decode_encoding, decode_string, get_callback_global_ref, get_java_vm,
load_on_close,
},
zerror,
};

/// Perform a Zenoh GET through a querier.
///
/// This function is meant to be called from Java/Kotlin code through JNI.
///
/// Parameters:
/// - `env`: The JNI environment.
/// - `_class`: The JNI class.
/// - `querier_ptr`: The raw pointer to the querier.
/// - `key_expr_ptr`: A raw pointer to the [KeyExpr] provided to the kotlin querier. May be null in case of using an
/// undeclared key expression.
/// - `key_expr_str`: String representation of the key expression used during the querier declaration.
/// It won't be considered in case a key_expr_ptr to a declared key expression is provided.
/// - `selector_params`: Optional selector parameters for the query.
/// - `callback`: Reference to the Kotlin callback to be run upon receiving a reply.
/// - `on_close`: Reference to a kotlin callback to be run upon finishing the get operation, mostly used for closing a provided channel.
/// - `attachment`: Optional attachment.
/// - `payload`: Optional payload for the query.
/// - `encoding_id`: Encoding id of the payload provided.
/// - `encoding_schema`: Encoding schema of the payload provided.
///
#[no_mangle]
#[allow(non_snake_case)]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNIQuerier_getViaJNI(
mut env: JNIEnv,
_class: JClass,
querier_ptr: *const Querier,
key_expr_ptr: /*nullable*/ *const KeyExpr<'static>,
key_expr_str: JString,
selector_params: /*nullable*/ JString,
callback: JObject,
on_close: JObject,
attachment: /*nullable*/ JByteArray,
payload: /*nullable*/ JByteArray,
encoding_id: jint,
encoding_schema: /*nullable*/ JString,
) {
let querier = Arc::from_raw(querier_ptr);
let _ = || -> ZResult<()> {
let key_expr = process_kotlin_key_expr(&mut env, &key_expr_str, key_expr_ptr)?;
let java_vm = Arc::new(get_java_vm(&mut env)?);
let callback_global_ref = get_callback_global_ref(&mut env, callback)?;
let on_close_global_ref = get_callback_global_ref(&mut env, on_close)?;
let on_close = load_on_close(&java_vm, on_close_global_ref);
let mut get_builder = querier.get().callback(move |reply| {
|| -> ZResult<()> {
on_close.noop(); // Does nothing, but moves `on_close` inside the closure so it gets destroyed with the closure
tracing::debug!("Receiving reply through JNI: {:?}", reply);
let mut env = java_vm.attach_current_thread_as_daemon().map_err(|err| {
zerror!("Unable to attach thread for GET query callback: {}.", err)
})?;

match reply.result() {
Ok(sample) => {
on_reply_success(&mut env, reply.replier_id(), sample, &callback_global_ref)
}
Err(error) => {
on_reply_error(&mut env, reply.replier_id(), error, &callback_global_ref)
}
}
}()
.unwrap_or_else(|err| tracing::error!("Error on get callback: {err}"));
});

if !selector_params.is_null() {
let params = decode_string(&mut env, &selector_params)?;
get_builder = get_builder.parameters(params)
};

if !payload.is_null() {
let encoding = decode_encoding(&mut env, encoding_id, &encoding_schema)?;
get_builder = get_builder.encoding(encoding);
get_builder = get_builder.payload(decode_byte_array(&env, payload)?);
}

if !attachment.is_null() {
let attachment = decode_byte_array(&env, attachment)?;
get_builder = get_builder.attachment::<Vec<u8>>(attachment);
}

get_builder
.wait()
.map(|_| tracing::trace!("Performing get on '{key_expr}'.",))
.map_err(|err| zerror!(err))
}()
.map_err(|err| throw_exception!(env, err));
std::mem::forget(querier);
}

///
/// Frees the pointer of the querier.
///
/// After a call to this function, no further jni operations should be performed using the querier associated to the raw pointer provided.
///
#[no_mangle]
#[allow(non_snake_case)]
pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuerier_freePtrViaJNI(
_env: JNIEnv,
_: JClass,
querier_ptr: *const Querier<'static>,
) {
Arc::from_raw(querier_ptr);
}
65 changes: 64 additions & 1 deletion zenoh-jni/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use zenoh::{
config::Config,
key_expr::KeyExpr,
pubsub::{Publisher, Subscriber},
query::{Query, Queryable, ReplyError, Selector},
query::{Querier, Query, Queryable, ReplyError, Selector},
sample::Sample,
session::{Session, ZenohId},
Wait,
Expand Down Expand Up @@ -514,6 +514,69 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareSubscriberViaJNI(
})
}

/// Declare a Zenoh querier via JNI.
///
/// This function is meant to be called from Java/Kotlin code through JNI.
///
/// Parameters:
/// - `env`: The JNI environment.
/// - `_class`: The JNI class.
/// - `key_expr_ptr`: A raw pointer to the [KeyExpr] to be used for the querier. May be null in case of using an
/// undeclared key expression.
/// - `key_expr_str`: String representation of the key expression to be used to declare the querier.
/// It won't be considered in case a key_expr_ptr to a declared key expression is provided.
/// - `target`: The ordinal value of the query target enum value.
/// - `consolidation`: The ordinal value of the consolidation enum value.
/// - `congestion_control`: The ordinal value of the congestion control enum value.
/// - `priority`: The ordinal value of the priority enum value.
/// - `is_express`: The boolean express value of the QoS provided.
/// - `timeout_ms`: The timeout in milliseconds.
#[no_mangle]
#[allow(non_snake_case)]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareQuerierViaJNI(
mut env: JNIEnv,
_class: JClass,
key_expr_ptr: /*nullable*/ *const KeyExpr<'static>,
key_expr_str: JString,
session_ptr: *const Session,
target: jint,
consolidation: jint,
congestion_control: jint,
priority: jint,
is_express: jboolean,
timeout_ms: jlong,
) -> *const Querier<'static> {
let session = Arc::from_raw(session_ptr);
|| -> ZResult<*const Querier<'static>> {
let key_expr = process_kotlin_key_expr(&mut env, &key_expr_str, key_expr_ptr)?;
let query_target = decode_query_target(target)?;
let consolidation = decode_consolidation(consolidation)?;
let congestion_control = decode_congestion_control(congestion_control)?;
let timeout = Duration::from_millis(timeout_ms as u64);
let priority = decode_priority(priority)?;
tracing::debug!("Declaring querier on '{}'...", key_expr);

let querier = session
.declare_querier(key_expr.to_owned())
.congestion_control(congestion_control)
.consolidation(consolidation)
.express(is_express != 0)
.target(query_target)
.priority(priority)
.timeout(timeout)
.wait()
.map_err(|err| zerror!(err))?;

tracing::debug!("Querier declared on '{}'.", key_expr);
std::mem::forget(session);
Ok(Arc::into_raw(Arc::new(querier)))
}()
.unwrap_or_else(|err| {
throw_exception!(env, err);
null()
})
}

/// Declare a Zenoh queryable via JNI.
///
/// This function is meant to be called from Java/Kotlin code through JNI.
Expand Down
Loading

0 comments on commit 7b1c940

Please sign in to comment.