Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing users to use Selectors for performing queries. #2

Merged
merged 2 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,7 @@ aimed to make the design as stable as possible from the very beginning, so chang

### Missing features

There are some missing features we will implement soon. The most notorious are:

* Missing pull subscriber.
* Specifying parameters to a query through a selector.
There are some missing features we will implement soon. The most notorious is the Pull Subscriber feature.

### Performance

Expand Down
15 changes: 5 additions & 10 deletions examples/src/main/kotlin/io.zenoh/ZGet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,18 @@

package io.zenoh

import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.query.ConsolidationMode
import io.zenoh.query.QueryTarget
import io.zenoh.query.Reply
import io.zenoh.selector.intoSelector
import java.time.Duration

fun main() {
val timeout = Duration.ofMillis(1000)
Session.open().onSuccess { session ->
session.use {
val keyExpressionResult = "demo/example/**".intoKeyExpr()
keyExpressionResult.onSuccess { keyExpr ->
keyExpr.use {
val request = session.get(keyExpr)
.consolidation(ConsolidationMode.NONE)
.target(QueryTarget.BEST_MATCHING)
.withValue("Get value example")
val selectorResult = "demo/example/**".intoSelector()
selectorResult.onSuccess { selector ->
selector.use {
val request = session.get(selector)
.with { reply ->
if (reply is Reply.Success) {
println("Received ('${reply.sample.keyExpr}': '${reply.sample.value}')")
Expand Down
17 changes: 13 additions & 4 deletions zenoh-jni/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_undeclareKeyExprViaJNI(
/// Parameters:
/// - `env`: The JNI environment.
/// - `_class`: The JNI class.
/// - `key_expr`: The key expression for the `get` operation as a `JString`.
/// - `ptr`: A raw pointer to the Zenoh session.
/// - `key_expr`: Pointer to the key expression for the `get`.
/// - `selector_params`: Parameters of the selector.
/// - `session_ptr`: A raw pointer to the Zenoh session.
/// - `callback`: An instance of the Java/Kotlin `JNIGetCallback` function interface to be called upon receiving a reply.
/// - `timeout_ms`: The timeout in milliseconds.
/// - `target`: The [QueryTarget] as the ordinal of the enum.
Expand All @@ -458,6 +459,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
mut env: JNIEnv,
_class: JClass,
key_expr: *const KeyExpr<'static>,
selector_params: JString,
session_ptr: *const zenoh::Session,
callback: JObject,
timeout_ms: jlong,
Expand All @@ -469,6 +471,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
match on_get_query(
&mut env,
&key_expr,
selector_params,
&session,
callback,
timeout_ms,
Expand Down Expand Up @@ -498,6 +501,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
/// - `env`: The JNI environment.
/// - `_class`: The JNI class.
/// - `key_expr_ptr`: A raw pointer to the [KeyExpr] to be used for the operation.
/// - `selector_params`: Parameters of the selector.
/// - `session_ptr`: A raw pointer to the Zenoh [Session].
/// - `callback`: A Java/Kotlin callback to be called upon receiving a reply.
/// - `timeout_ms`: The timeout in milliseconds.
Expand All @@ -522,6 +526,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getWithValueViaJNI(
mut env: JNIEnv,
_class: JClass,
key_expr_ptr: *const KeyExpr<'static>,
selector_params: JString,
session_ptr: *const zenoh::Session,
callback: JObject,
timeout_ms: jlong,
Expand All @@ -535,6 +540,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getWithValueViaJNI(
match on_get_query(
&mut env,
&key_expr,
selector_params,
&session,
callback,
timeout_ms,
Expand All @@ -560,7 +566,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getWithValueViaJNI(
///
/// Parameters:
/// - `env`: A mutable reference to the JNI environment.
/// - `key_expr`: The key expression for the `get` operation as a `JString`.
/// - `key_expr`: The key expression for the `get` operation.
/// - `session`: An `Arc<Session>` representing the Zenoh session.
/// - `callback`: A Java/Kotlin `JNIGetCallback` interface callback to be called upon receiving a reply.
/// - `timeout_ms`: The timeout in milliseconds.
Expand All @@ -576,6 +582,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getWithValueViaJNI(
fn on_get_query(
env: &mut JNIEnv,
key_expr: &Arc<KeyExpr<'static>>,
selector_params: JString,
session: &Arc<Session>,
callback: JObject,
timeout_ms: jlong,
Expand All @@ -587,11 +594,13 @@ fn on_get_query(
let callback_global_ref = get_callback_global_ref(env, callback)?;
let query_target = decode_query_target(target)?;
let consolidation = decode_consolidation(consolidation)?;
let selector_params = decode_string(env, selector_params)?;
let timeout = Duration::from_millis(timeout_ms as u64);

let key_expr_clone = key_expr.deref().clone();
let selector = Selector::from(key_expr_clone).with_parameters(&selector_params);
let mut get_builder = session
.get(key_expr_clone)
.get(selector)
.callback(move |reply| {
log::debug!("Receiving reply through JNI: {:?}", reply);
let env = match java_vm.attach_current_thread_as_daemon() {
Expand Down
35 changes: 32 additions & 3 deletions zenoh-kotlin/src/main/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.zenoh.query.*
import io.zenoh.queryable.Query
import io.zenoh.queryable.Queryable
import io.zenoh.sample.Sample
import io.zenoh.selector.Selector
import io.zenoh.subscriber.Reliability
import io.zenoh.subscriber.Subscriber
import io.zenoh.value.Value
Expand Down Expand Up @@ -250,6 +251,34 @@ class Session private constructor(private val config: Config) : AutoCloseable {
} ?: Result.failure(sessionClosedException)
}

/**
* Declare a [Get] with a [Channel] receiver.
*
* ```kotlin
* val timeout = Duration.ofMillis(10000)
* println("Opening Session")
* Session.open().onSuccess { session -> session.use {
* "demo/kotlin/example".intoKeyExpr().onSuccess { keyExpr ->
* session.get(keyExpr)
* .consolidation(ConsolidationMode.NONE)
* .target(QueryTarget.BEST_MATCHING)
* .withValue("Get value example")
* .with { reply -> println("Received reply $reply") }
* .timeout(timeout)
* .res()
* .onSuccess {
* // Leaving the session alive the same duration as the timeout for the sake of this example.
* Thread.sleep(timeout.toMillis())
* }
* }
* }
* }
* ```
* @param selector The [KeyExpr] to be used for the get operation.
* @return a resolvable [Get.Builder] with a [Channel] receiver.
*/
fun get(selector: Selector): Get.Builder<Channel<Reply>> = Get.newBuilder(this, selector)

/**
* Declare a [Get] with a [Channel] receiver.
*
Expand All @@ -276,7 +305,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
* @param keyExpr The [KeyExpr] to be used for the get operation.
* @return a resolvable [Get.Builder] with a [Channel] receiver.
*/
fun get(keyExpr: KeyExpr): Get.Builder<Channel<Reply>> = Get.newBuilder(this, keyExpr)
fun get(keyExpr: KeyExpr): Get.Builder<Channel<Reply>> = Get.newBuilder(this, Selector(keyExpr))

/**
* Declare a [Put] with the provided value on the specified key expression.
Expand Down Expand Up @@ -383,7 +412,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
}

internal fun <R> resolveGet(
keyExpr: KeyExpr,
selector: Selector,
callback: Callback<Reply>,
receiver: R?,
timeout: Duration,
Expand All @@ -392,7 +421,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
value: Value?
): Result<R?> {
return jniSession?.run {
performGet(keyExpr, callback, receiver, timeout, target, consolidation, value)
performGet(selector, callback, receiver, timeout, target, consolidation, value)
} ?: Result.failure(sessionClosedException)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ package io.zenoh.exceptions
* for instance when attempting to create a [io.zenoh.keyexpr.KeyExpr] from a string that does not respect the
* key expression conventions.
*/
class KeyExprException : Exception()
class KeyExprException(val msg: String) : Exception()
10 changes: 7 additions & 3 deletions zenoh-kotlin/src/main/kotlin/io/zenoh/jni/JNISession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ internal class JNISession() {
}

fun <R> performGet(
keyExpr: KeyExpr,
selector: Selector,
callback: Callback<Reply>,
receiver: R?,
timeout: Duration,
Expand All @@ -118,7 +118,8 @@ internal class JNISession() {
}
if (value == null) {
getViaJNI(
keyExpr.jniKeyExpr!!.ptr,
selector.keyExpr.jniKeyExpr!!.ptr,
selector.parameters,
sessionPtr.get(),
getCallback,
timeout.toMillis(),
Expand All @@ -127,7 +128,8 @@ internal class JNISession() {
)
} else {
getWithValueViaJNI(
keyExpr.jniKeyExpr!!.ptr,
selector.keyExpr.jniKeyExpr!!.ptr,
selector.parameters,
sessionPtr.get(),
getCallback,
timeout.toMillis(),
Expand Down Expand Up @@ -195,6 +197,7 @@ internal class JNISession() {
@Throws(Exception::class)
private external fun getViaJNI(
keyExpr: Long,
selectorParams: String,
sessionPtr: Long,
callback: JNIGetCallback,
timeoutMs: Long,
Expand All @@ -205,6 +208,7 @@ internal class JNISession() {
@Throws(Exception::class)
private external fun getWithValueViaJNI(
keyExpr: Long,
selectorParams: String,
sessionPtr: Long,
callback: JNIGetCallback,
timeoutMs: Long,
Expand Down
9 changes: 8 additions & 1 deletion zenoh-kotlin/src/main/kotlin/io/zenoh/keyexpr/IntoKeyExpr.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,12 @@

package io.zenoh.keyexpr

fun String.intoKeyExpr(): Result<KeyExpr> = KeyExpr.autocanonize(this)
import io.zenoh.exceptions.KeyExprException

fun String.intoKeyExpr(): Result<KeyExpr> = runCatching {
if (this.isEmpty()) {
return Result.failure(KeyExprException("Attempting to create a KeyExpr from an empty string."))
}
return KeyExpr.autocanonize(this)
}

30 changes: 13 additions & 17 deletions zenoh-kotlin/src/main/kotlin/io/zenoh/query/Get.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.zenoh.Session
import io.zenoh.handlers.ChannelHandler
import io.zenoh.handlers.Handler
import io.zenoh.keyexpr.KeyExpr
import io.zenoh.selector.Selector
import io.zenoh.value.Value
import kotlinx.coroutines.channels.Channel
import java.time.Duration
Expand All @@ -30,8 +31,8 @@ import java.time.Duration
* ```
* println("Opening Session")
* Session.open().onSuccess { session -> session.use {
* "demo/kotlin/example".intoKeyExpr().onSuccess { keyExpr ->
* session.get(keyExpr)
* "demo/kotlin/example".intoSelector().onSuccess { selector ->
* session.get(selector)
* .consolidation(ConsolidationMode.NONE)
* .target(QueryTarget.BEST_MATCHING)
* .withValue("Get value example")
Expand All @@ -45,24 +46,19 @@ import java.time.Duration
* ```
*
* @param R Receiver type of the [Handler] implementation. If no handler is provided to the builder, R will be [Unit].
* @property keyExpr The [KeyExpr] upon which the get operation will be performed.
* @property receiver The receiver, of the handler, if no handler is specified then the receiver is set as null and
* [R] is set as [Unit].
* @constructor Internal constructor. A Get operation must be created through the [Builder] obtained after calling
* [Session.get] or alternatively through [newBuilder].
*/
class Get<R> internal constructor(val keyExpr: KeyExpr, val receiver: R?) {
class Get<R> private constructor() {

companion object {
/**
* Creates a bew [Builder] associated to the specified [session] and [keyExpr].
*
* @param session The [Session] from which the query will be triggered.
* @param keyExpr The [KeyExpr] upon which the query will be performed.
* @param selector The [Selector] with which the query will be performed.
* @return A [Builder] with a default [ChannelHandler] to handle any incoming [Reply].
*/
fun newBuilder(session: Session, keyExpr: KeyExpr): Builder<Channel<Reply>> {
return Builder(session, keyExpr, handler = ChannelHandler(Channel()))
fun newBuilder(session: Session, selector: Selector): Builder<Channel<Reply>> {
return Builder(session, selector, handler = ChannelHandler(Channel()))
}
}

Expand All @@ -75,28 +71,28 @@ class Get<R> internal constructor(val keyExpr: KeyExpr, val receiver: R?) {
*
* @param R The receiver type of the [Handler] implementation, defaults to [Unit] when no handler is specified.
* @property session The [Session] from which the query will be performed.
* @property keyExpr The [KeyExpr] upon which the get query will be performed.
* @property selector The [Selector] with which the get query will be performed.
* @constructor Creates a Builder. This constructor is internal and should not be called directly. Instead, this
* builder should be obtained through the [Session] after calling [Session.get].
*/
class Builder<R> internal constructor(
private val session: Session,
private val keyExpr: KeyExpr,
private val selector: Selector,
private var callback: Callback<Reply>? = null,
private var handler: Handler<Reply, R>? = null,
) {

private var timeout = Duration.ofMillis(10000)
private var target: QueryTarget = QueryTarget.BEST_MATCHING
private var consolidation: ConsolidationMode = ConsolidationMode.NONE // None
private var consolidation: ConsolidationMode = ConsolidationMode.NONE
private var value: Value? = null

private constructor(other: Builder<*>, handler: Handler<Reply, R>?) : this(other.session, other.keyExpr) {
private constructor(other: Builder<*>, handler: Handler<Reply, R>?) : this(other.session, other.selector) {
this.handler = handler
copyParams(other)
}

private constructor(other: Builder<*>, callback: Callback<Reply>?) : this(other.session, other.keyExpr) {
private constructor(other: Builder<*>, callback: Callback<Reply>?) : this(other.session, other.selector) {
this.callback = callback
copyParams(other)
}
Expand Down Expand Up @@ -160,7 +156,7 @@ class Get<R> internal constructor(val keyExpr: KeyExpr, val receiver: R?) {
val resolvedCallback = callback ?: Callback { t: Reply -> handler?.handle(t) }
return session.run {
resolveGet(
keyExpr,
selector,
resolvedCallback,
handler?.receiver(),
timeout,
Expand Down
29 changes: 29 additions & 0 deletions zenoh-kotlin/src/main/kotlin/io/zenoh/selector/IntoSelector.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh.selector

import io.zenoh.exceptions.KeyExprException
import io.zenoh.keyexpr.KeyExpr

fun String.intoSelector(): Result<Selector> = runCatching {
if (this.isEmpty()) {
return Result.failure(KeyExprException("Attempting to create a KeyExpr from an empty string."))
}
val result = this.split('?', limit = 2)
val keyExpr = KeyExpr.autocanonize(result[0]).getOrThrow()
val params = if (result.size == 2) result[1] else ""
return Result.success(Selector(keyExpr, params))
}

Loading
Loading