Skip to content

Commit

Permalink
Examples refactor. (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
DariusIMP authored Sep 26, 2023
1 parent ad9d626 commit 00a9e94
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 73 deletions.
6 changes: 4 additions & 2 deletions examples/src/main/kotlin/io.zenoh/ZDelete.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ fun main() {
Session.open().onSuccess { session ->
session.use {
"demo/example/zenoh-kotlin-put".intoKeyExpr().onSuccess { keyExpr ->
println("Deleting resources matching '$keyExpr'...")
session.delete(keyExpr).res()
keyExpr.use {
println("Deleting resources matching '$keyExpr'...")
session.delete(keyExpr).res()
}
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions examples/src/main/kotlin/io.zenoh/ZGet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ fun main() {
val timeout = Duration.ofMillis(1000)
Session.open().onSuccess { session ->
session.use {
val selectorResult = "demo/example/**".intoSelector()
selectorResult.onSuccess { selector ->
"demo/example/**".intoSelector().onSuccess { selector ->
selector.use {
val request = session.get(selector)
session.get(selector)
.with { reply ->
if (reply is Reply.Success) {
println("Received ('${reply.sample.keyExpr}': '${reply.sample.value}')")
Expand All @@ -36,11 +35,10 @@ fun main() {
}
.timeout(timeout)
.res()

request.onSuccess {
// Keep the session alive for the duration of the timeout.
Thread.sleep(timeout.toMillis())
}
.onSuccess {
// Keep the session alive for the duration of the timeout.
Thread.sleep(timeout.toMillis())
}
}
}
}
Expand Down
38 changes: 17 additions & 21 deletions examples/src/main/kotlin/io.zenoh/ZPub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,30 @@
package io.zenoh

import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.publication.CongestionControl
import io.zenoh.publication.Priority

fun main() {
println("Opening session...")
Session.open().onSuccess {
it.use { session ->
val keyExpressionResult = "demo/example/zenoh-kotlin-pub".intoKeyExpr()
keyExpressionResult.onSuccess { keyExpr ->
Session.open().onSuccess { session ->
session.use {
"demo/example/zenoh-kotlin-pub".intoKeyExpr().onSuccess { keyExpr ->
keyExpr.use {
println("Declaring publisher on '$keyExpr'...")
session.declarePublisher(keyExpr).priority(Priority.REALTIME)
.congestionControl(CongestionControl.DROP).res().onSuccess { pub ->
pub.use {
var idx = 0
while (true) {
Thread.sleep(1000)
val payload = "Pub from Kotlin!"
println(
"Putting Data ('$keyExpr': '[${
idx.toString().padStart(4, ' ')
}] $payload')..."
)
pub.put(payload).res()
idx++
}
session.declarePublisher(keyExpr).res().onSuccess { pub ->
pub.use {
var idx = 0
while (true) {
Thread.sleep(1000)
val payload = "Pub from Kotlin!"
println(
"Putting Data ('$keyExpr': '[${
idx.toString().padStart(4, ' ')
}] $payload')..."
)
pub.put(payload).res()
idx++
}
}
}
}
}
}
Expand Down
42 changes: 25 additions & 17 deletions examples/src/main/kotlin/io.zenoh/ZQueryable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,25 @@

package io.zenoh

import io.zenoh.keyexpr.KeyExpr
import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.prelude.SampleKind
import io.zenoh.queryable.Query
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import org.apache.commons.net.ntp.TimeStamp

fun main() {
Session.open().onSuccess { session ->
session.use {
"demo/example/zenoh-kotlin-queryable".intoKeyExpr().onSuccess { keyExpr ->
println("Declaring Queryable")
session.declareQueryable(keyExpr).res().onSuccess { queryable ->
queryable.use {
it.receiver?.let { receiverChannel -> // The default receiver is a Channel we can process on a coroutine.
runBlocking {
val iterator = receiverChannel.iterator()
while (iterator.hasNext()) {
iterator.next().use { query ->
val valueInfo = query.value?.let { value -> " with value '$value'" } ?: ""
println(">> [Queryable] Received Query '${query.selector}' $valueInfo")
query.reply(keyExpr)
.success("Queryable from Kotlin!")
.withKind(SampleKind.PUT)
.withTimeStamp(TimeStamp.getCurrentTime())
.res()
.onFailure { println(">> [Queryable ] Error sending reply: $it") }
}
keyExpr.use {
println("Declaring Queryable")
session.declareQueryable(keyExpr).res().onSuccess { queryable ->
queryable.use {
queryable.receiver?.let { receiverChannel -> // The default receiver is a Channel we can process on a coroutine.
runBlocking {
handleRequests(receiverChannel, keyExpr)
}
}
}
Expand All @@ -49,3 +42,18 @@ fun main() {
}
}
}

private suspend fun handleRequests(
receiverChannel: Channel<Query>, keyExpr: KeyExpr
) {
val iterator = receiverChannel.iterator()
while (iterator.hasNext()) {
iterator.next().use { query ->
val valueInfo = query.value?.let { value -> " with value '$value'" } ?: ""
println(">> [Queryable] Received Query '${query.selector}' $valueInfo")
query.reply(keyExpr).success("Queryable from Kotlin!").withKind(SampleKind.PUT)
.withTimeStamp(TimeStamp.getCurrentTime()).res()
.onFailure { println(">> [Queryable ] Error sending reply: $it") }
}
}
}
14 changes: 6 additions & 8 deletions examples/src/main/kotlin/io.zenoh/ZSub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@
package io.zenoh

import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.subscriber.Reliability
import kotlinx.coroutines.runBlocking

fun main() {
println("Opening session...")
Session.open().onSuccess { session ->
session.use {
"demo/example/**".intoKeyExpr().onSuccess { keyExpr ->
println("Declaring Subscriber on '$keyExpr'...")
session.declareSubscriber(keyExpr)
.bestEffort()
.reliability(Reliability.RELIABLE)
.res()
.onSuccess { subscriber ->
keyExpr.use {
println("Declaring Subscriber on '$keyExpr'...")
session.declareSubscriber(keyExpr).bestEffort().res().onSuccess { subscriber ->
subscriber.use {
runBlocking {
val receiver = subscriber.receiver!!
Expand All @@ -38,9 +34,11 @@ fun main() {
println(">> [Subscriber] Received ${sample.kind} ('${sample.keyExpr}': '${sample.value}')")
}
}
}
}
}
}
}
}
}
}

25 changes: 11 additions & 14 deletions examples/src/main/kotlin/io.zenoh/ZSubThr.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package io.zenoh

import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.subscriber.Reliability

const val NANOS_TO_SEC = 1_000_000_000L
var n = 50000L
Expand Down Expand Up @@ -53,21 +52,19 @@ fun report() {
}

fun main() {
val keyExpr = "test/thr".intoKeyExpr().getOrThrow()
println("Opening Session")
Session.open().onSuccess { session ->
session.use {
session.declareSubscriber(keyExpr)
.reliability(Reliability.RELIABLE)
.with {
listener()
}
.res()
.onSuccess {
while (readlnOrNull() != "q") {
// Do nothing
"test/thr".intoKeyExpr().onSuccess {
it.use { keyExpr ->
println("Opening Session")
Session.open().onSuccess { it.use {
session -> session.declareSubscriber(keyExpr)
.reliable()
.with { listener() }
.res()
.onSuccess {
while (readlnOrNull() != "q") { /* Do nothing */ }
}
}
}
}
}
report()
Expand Down
1 change: 0 additions & 1 deletion zenoh-kotlin/src/main/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class Session private constructor(private val config: Config) : AutoCloseable {
* "demo/kotlin/sub".intoKeyExpr().onSuccess { keyExpr ->
* session.declareSubscriber(keyExpr)
* .bestEffort()
* .reliability(Reliability.RELIABLE)
* .res()
* .onSuccess { subscriber ->
* subscriber.use {
Expand Down
2 changes: 1 addition & 1 deletion zenoh-kotlin/src/main/kotlin/io/zenoh/jni/JNISession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import java.time.Duration
import java.util.concurrent.atomic.AtomicLong

/** Adapter class to handle the communication with the Zenoh JNI code for a [Session]. */
internal class JNISession() {
internal class JNISession {

/* Pointer to the underlying Rust zenoh session. */
private var sessionPtr: AtomicLong = AtomicLong(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import kotlinx.coroutines.channels.Channel
* "demo/kotlin/sub".intoKeyExpr().onSuccess { keyExpr ->
* session.declareSubscriber(keyExpr)
* .bestEffort()
* .reliability(Reliability.RELIABLE)
* .res()
* .onSuccess { subscriber ->
* subscriber.use {
Expand Down

0 comments on commit 00a9e94

Please sign in to comment.