From 00a9e94e23edc9facf3714535baf32dceabc97de Mon Sep 17 00:00:00 2001 From: Darius Maitia Date: Tue, 26 Sep 2023 12:21:52 +0200 Subject: [PATCH] Examples refactor. (#7) --- examples/src/main/kotlin/io.zenoh/ZDelete.kt | 6 ++- examples/src/main/kotlin/io.zenoh/ZGet.kt | 14 +++---- examples/src/main/kotlin/io.zenoh/ZPub.kt | 38 ++++++++--------- .../src/main/kotlin/io.zenoh/ZQueryable.kt | 42 +++++++++++-------- examples/src/main/kotlin/io.zenoh/ZSub.kt | 14 +++---- examples/src/main/kotlin/io.zenoh/ZSubThr.kt | 25 +++++------ .../src/main/kotlin/io/zenoh/Session.kt | 1 - .../main/kotlin/io/zenoh/jni/JNISession.kt | 2 +- .../kotlin/io/zenoh/subscriber/Subscriber.kt | 1 - 9 files changed, 70 insertions(+), 73 deletions(-) diff --git a/examples/src/main/kotlin/io.zenoh/ZDelete.kt b/examples/src/main/kotlin/io.zenoh/ZDelete.kt index 66fb400e..2584f3b2 100644 --- a/examples/src/main/kotlin/io.zenoh/ZDelete.kt +++ b/examples/src/main/kotlin/io.zenoh/ZDelete.kt @@ -21,8 +21,10 @@ fun main() { Session.open().onSuccess { session -> session.use { "demo/example/zenoh-kotlin-put".intoKeyExpr().onSuccess { keyExpr -> - println("Deleting resources matching '$keyExpr'...") - session.delete(keyExpr).res() + keyExpr.use { + println("Deleting resources matching '$keyExpr'...") + session.delete(keyExpr).res() + } } } } diff --git a/examples/src/main/kotlin/io.zenoh/ZGet.kt b/examples/src/main/kotlin/io.zenoh/ZGet.kt index 0aaa5841..29a39b96 100644 --- a/examples/src/main/kotlin/io.zenoh/ZGet.kt +++ b/examples/src/main/kotlin/io.zenoh/ZGet.kt @@ -22,10 +22,9 @@ fun main() { val timeout = Duration.ofMillis(1000) Session.open().onSuccess { session -> session.use { - val selectorResult = "demo/example/**".intoSelector() - selectorResult.onSuccess { selector -> + "demo/example/**".intoSelector().onSuccess { selector -> selector.use { - val request = session.get(selector) + session.get(selector) .with { reply -> if (reply is Reply.Success) { println("Received ('${reply.sample.keyExpr}': '${reply.sample.value}')") @@ -36,11 +35,10 @@ fun main() { } .timeout(timeout) .res() - - request.onSuccess { - // Keep the session alive for the duration of the timeout. - Thread.sleep(timeout.toMillis()) - } + .onSuccess { + // Keep the session alive for the duration of the timeout. + Thread.sleep(timeout.toMillis()) + } } } } diff --git a/examples/src/main/kotlin/io.zenoh/ZPub.kt b/examples/src/main/kotlin/io.zenoh/ZPub.kt index 526cc826..d8a78ed5 100644 --- a/examples/src/main/kotlin/io.zenoh/ZPub.kt +++ b/examples/src/main/kotlin/io.zenoh/ZPub.kt @@ -15,34 +15,30 @@ package io.zenoh import io.zenoh.keyexpr.intoKeyExpr -import io.zenoh.publication.CongestionControl -import io.zenoh.publication.Priority fun main() { println("Opening session...") - Session.open().onSuccess { - it.use { session -> - val keyExpressionResult = "demo/example/zenoh-kotlin-pub".intoKeyExpr() - keyExpressionResult.onSuccess { keyExpr -> + Session.open().onSuccess { session -> + session.use { + "demo/example/zenoh-kotlin-pub".intoKeyExpr().onSuccess { keyExpr -> keyExpr.use { println("Declaring publisher on '$keyExpr'...") - session.declarePublisher(keyExpr).priority(Priority.REALTIME) - .congestionControl(CongestionControl.DROP).res().onSuccess { pub -> - pub.use { - var idx = 0 - while (true) { - Thread.sleep(1000) - val payload = "Pub from Kotlin!" - println( - "Putting Data ('$keyExpr': '[${ - idx.toString().padStart(4, ' ') - }] $payload')..." - ) - pub.put(payload).res() - idx++ - } + session.declarePublisher(keyExpr).res().onSuccess { pub -> + pub.use { + var idx = 0 + while (true) { + Thread.sleep(1000) + val payload = "Pub from Kotlin!" + println( + "Putting Data ('$keyExpr': '[${ + idx.toString().padStart(4, ' ') + }] $payload')..." + ) + pub.put(payload).res() + idx++ } } + } } } } diff --git a/examples/src/main/kotlin/io.zenoh/ZQueryable.kt b/examples/src/main/kotlin/io.zenoh/ZQueryable.kt index 687fd72c..57c4136a 100644 --- a/examples/src/main/kotlin/io.zenoh/ZQueryable.kt +++ b/examples/src/main/kotlin/io.zenoh/ZQueryable.kt @@ -14,8 +14,11 @@ package io.zenoh +import io.zenoh.keyexpr.KeyExpr import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.prelude.SampleKind +import io.zenoh.queryable.Query +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking import org.apache.commons.net.ntp.TimeStamp @@ -23,23 +26,13 @@ fun main() { Session.open().onSuccess { session -> session.use { "demo/example/zenoh-kotlin-queryable".intoKeyExpr().onSuccess { keyExpr -> - println("Declaring Queryable") - session.declareQueryable(keyExpr).res().onSuccess { queryable -> - queryable.use { - it.receiver?.let { receiverChannel -> // The default receiver is a Channel we can process on a coroutine. - runBlocking { - val iterator = receiverChannel.iterator() - while (iterator.hasNext()) { - iterator.next().use { query -> - val valueInfo = query.value?.let { value -> " with value '$value'" } ?: "" - println(">> [Queryable] Received Query '${query.selector}' $valueInfo") - query.reply(keyExpr) - .success("Queryable from Kotlin!") - .withKind(SampleKind.PUT) - .withTimeStamp(TimeStamp.getCurrentTime()) - .res() - .onFailure { println(">> [Queryable ] Error sending reply: $it") } - } + keyExpr.use { + println("Declaring Queryable") + session.declareQueryable(keyExpr).res().onSuccess { queryable -> + queryable.use { + queryable.receiver?.let { receiverChannel -> // The default receiver is a Channel we can process on a coroutine. + runBlocking { + handleRequests(receiverChannel, keyExpr) } } } @@ -49,3 +42,18 @@ fun main() { } } } + +private suspend fun handleRequests( + receiverChannel: Channel, keyExpr: KeyExpr +) { + val iterator = receiverChannel.iterator() + while (iterator.hasNext()) { + iterator.next().use { query -> + val valueInfo = query.value?.let { value -> " with value '$value'" } ?: "" + println(">> [Queryable] Received Query '${query.selector}' $valueInfo") + query.reply(keyExpr).success("Queryable from Kotlin!").withKind(SampleKind.PUT) + .withTimeStamp(TimeStamp.getCurrentTime()).res() + .onFailure { println(">> [Queryable ] Error sending reply: $it") } + } + } +} diff --git a/examples/src/main/kotlin/io.zenoh/ZSub.kt b/examples/src/main/kotlin/io.zenoh/ZSub.kt index 517ba4cf..ed77a173 100644 --- a/examples/src/main/kotlin/io.zenoh/ZSub.kt +++ b/examples/src/main/kotlin/io.zenoh/ZSub.kt @@ -15,7 +15,6 @@ package io.zenoh import io.zenoh.keyexpr.intoKeyExpr -import io.zenoh.subscriber.Reliability import kotlinx.coroutines.runBlocking fun main() { @@ -23,12 +22,9 @@ fun main() { Session.open().onSuccess { session -> session.use { "demo/example/**".intoKeyExpr().onSuccess { keyExpr -> - println("Declaring Subscriber on '$keyExpr'...") - session.declareSubscriber(keyExpr) - .bestEffort() - .reliability(Reliability.RELIABLE) - .res() - .onSuccess { subscriber -> + keyExpr.use { + println("Declaring Subscriber on '$keyExpr'...") + session.declareSubscriber(keyExpr).bestEffort().res().onSuccess { subscriber -> subscriber.use { runBlocking { val receiver = subscriber.receiver!! @@ -38,9 +34,11 @@ fun main() { println(">> [Subscriber] Received ${sample.kind} ('${sample.keyExpr}': '${sample.value}')") } } + } } - } + } } } } } + diff --git a/examples/src/main/kotlin/io.zenoh/ZSubThr.kt b/examples/src/main/kotlin/io.zenoh/ZSubThr.kt index 2275a3d9..c3aaf7e3 100644 --- a/examples/src/main/kotlin/io.zenoh/ZSubThr.kt +++ b/examples/src/main/kotlin/io.zenoh/ZSubThr.kt @@ -15,7 +15,6 @@ package io.zenoh import io.zenoh.keyexpr.intoKeyExpr -import io.zenoh.subscriber.Reliability const val NANOS_TO_SEC = 1_000_000_000L var n = 50000L @@ -53,21 +52,19 @@ fun report() { } fun main() { - val keyExpr = "test/thr".intoKeyExpr().getOrThrow() - println("Opening Session") - Session.open().onSuccess { session -> - session.use { - session.declareSubscriber(keyExpr) - .reliability(Reliability.RELIABLE) - .with { - listener() - } - .res() - .onSuccess { - while (readlnOrNull() != "q") { - // Do nothing + "test/thr".intoKeyExpr().onSuccess { + it.use { keyExpr -> + println("Opening Session") + Session.open().onSuccess { it.use { + session -> session.declareSubscriber(keyExpr) + .reliable() + .with { listener() } + .res() + .onSuccess { + while (readlnOrNull() != "q") { /* Do nothing */ } } } + } } } report() diff --git a/zenoh-kotlin/src/main/kotlin/io/zenoh/Session.kt b/zenoh-kotlin/src/main/kotlin/io/zenoh/Session.kt index f852071e..fedcb18a 100644 --- a/zenoh-kotlin/src/main/kotlin/io/zenoh/Session.kt +++ b/zenoh-kotlin/src/main/kotlin/io/zenoh/Session.kt @@ -139,7 +139,6 @@ class Session private constructor(private val config: Config) : AutoCloseable { * "demo/kotlin/sub".intoKeyExpr().onSuccess { keyExpr -> * session.declareSubscriber(keyExpr) * .bestEffort() - * .reliability(Reliability.RELIABLE) * .res() * .onSuccess { subscriber -> * subscriber.use { diff --git a/zenoh-kotlin/src/main/kotlin/io/zenoh/jni/JNISession.kt b/zenoh-kotlin/src/main/kotlin/io/zenoh/jni/JNISession.kt index 20dcd546..ad48fc47 100644 --- a/zenoh-kotlin/src/main/kotlin/io/zenoh/jni/JNISession.kt +++ b/zenoh-kotlin/src/main/kotlin/io/zenoh/jni/JNISession.kt @@ -38,7 +38,7 @@ import java.time.Duration import java.util.concurrent.atomic.AtomicLong /** Adapter class to handle the communication with the Zenoh JNI code for a [Session]. */ -internal class JNISession() { +internal class JNISession { /* Pointer to the underlying Rust zenoh session. */ private var sessionPtr: AtomicLong = AtomicLong(0) diff --git a/zenoh-kotlin/src/main/kotlin/io/zenoh/subscriber/Subscriber.kt b/zenoh-kotlin/src/main/kotlin/io/zenoh/subscriber/Subscriber.kt index ef551faa..fe1269f6 100644 --- a/zenoh-kotlin/src/main/kotlin/io/zenoh/subscriber/Subscriber.kt +++ b/zenoh-kotlin/src/main/kotlin/io/zenoh/subscriber/Subscriber.kt @@ -37,7 +37,6 @@ import kotlinx.coroutines.channels.Channel * "demo/kotlin/sub".intoKeyExpr().onSuccess { keyExpr -> * session.declareSubscriber(keyExpr) * .bestEffort() - * .reliability(Reliability.RELIABLE) * .res() * .onSuccess { subscriber -> * subscriber.use {