diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt index 79f6e596..c8a55a8b 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt @@ -37,9 +37,10 @@ import java.time.Duration * A Zenoh Session, the core interaction point with a Zenoh network. * * A session is typically associated with declarations such as [Publisher]s, [Subscriber]s, or [Queryable]s, which are - * declared using [declarePublisher], [declareSubscriber], and [declareQueryable], respectively. + * declared using [declarePublisher], [declareSubscriber], and [declareQueryable], respectively. It is also possible to + * declare key expressions ([KeyExpr]) as well with [declareKeyExpr] for optimization purposes. + * * Other operations such as simple Put, Get or Delete can be performed from a session using [put], [get] and [delete]. - * Finally, it's possible to declare key expressions ([KeyExpr]) as well. * * Sessions are open upon creation and can be closed manually by calling [close]. Alternatively, the session will be * automatically closed when used with Java's try-with-resources statement or its Kotlin counterpart, [use]. @@ -52,6 +53,8 @@ class Session private constructor(private val config: Config) : AutoCloseable { private var jniSession: JNISession? = JNISession() + private var declarations = mutableListOf() + companion object { private val sessionClosedException = SessionException("Session is closed.") @@ -80,13 +83,17 @@ class Session private constructor(private val config: Config) : AutoCloseable { /** Close the session. */ override fun close() { + declarations.removeIf { + it.undeclare() + true + } + jniSession?.close() jniSession = null } - @Suppress("removal") protected fun finalize() { - jniSession?.close() + close() } /** @@ -228,7 +235,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { */ fun declareKeyExpr(keyExpr: String): Resolvable = Resolvable { return@Resolvable jniSession?.run { - declareKeyExpr(keyExpr) + declareKeyExpr(keyExpr).onSuccess { declarations.add(it) } } ?: Result.failure(sessionClosedException) } @@ -379,7 +386,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { internal fun resolvePublisher(keyExpr: KeyExpr, qos: QoS): Result { return jniSession?.run { - declarePublisher(keyExpr, qos) + declarePublisher(keyExpr, qos).onSuccess { declarations.add(it) } } ?: Result.failure(sessionClosedException) } @@ -391,7 +398,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { reliability: Reliability ): Result> { return jniSession?.run { - declareSubscriber(keyExpr, callback, onClose, receiver, reliability) + declareSubscriber(keyExpr, callback, onClose, receiver, reliability).onSuccess { declarations.add(it) } } ?: Result.failure(sessionClosedException) } @@ -403,7 +410,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { complete: Boolean ): Result> { return jniSession?.run { - declareQueryable(keyExpr, callback, onClose, receiver, complete) + declareQueryable(keyExpr, callback, onClose, receiver, complete).onSuccess { declarations.add(it) } } ?: Result.failure(sessionClosedException) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/SessionDeclaration.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/SessionDeclaration.kt index 1c747e40..ecdfd860 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/SessionDeclaration.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/SessionDeclaration.kt @@ -18,13 +18,10 @@ package io.zenoh * Session declaration. * * A session declaration is either a [io.zenoh.publication.Publisher], - * a [io.zenoh.subscriber.Subscriber] or a [io.zenoh.queryable.Queryable] declared from a [Session]. + * a [io.zenoh.subscriber.Subscriber], a [io.zenoh.queryable.Queryable] or a [io.zenoh.keyexpr.KeyExpr] declared from a [Session]. */ interface SessionDeclaration { - /** Returns true if the declaration has not been undeclared. */ - fun isValid(): Boolean - /** Undeclare a declaration. No further operations should be performed after calling this function. */ fun undeclare() } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/keyexpr/KeyExpr.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/keyexpr/KeyExpr.kt index 17244bdd..41c7f6e1 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/keyexpr/KeyExpr.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/keyexpr/KeyExpr.kt @@ -16,6 +16,7 @@ package io.zenoh.keyexpr import io.zenoh.Resolvable import io.zenoh.Session +import io.zenoh.SessionDeclaration import io.zenoh.jni.JNIKeyExpr /** @@ -59,7 +60,7 @@ import io.zenoh.jni.JNIKeyExpr * @param jniKeyExpr An optional [JNIKeyExpr] instance, present when the key expression was declared through [Session.declareKeyExpr], * it represents the native instance of the key expression. */ -class KeyExpr internal constructor(internal val keyExpr: String, internal var jniKeyExpr: JNIKeyExpr? = null): AutoCloseable { +class KeyExpr internal constructor(internal val keyExpr: String, internal var jniKeyExpr: JNIKeyExpr? = null): AutoCloseable, SessionDeclaration { companion object { @@ -139,6 +140,10 @@ class KeyExpr internal constructor(internal val keyExpr: String, internal var jn jniKeyExpr = null } + override fun undeclare() { + close() + } + override fun equals(other: Any?): Boolean { if (this === other) return true if (javaClass != other?.javaClass) return false diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt index 4a60a257..2aee4315 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt @@ -24,6 +24,8 @@ import io.zenoh.prelude.QoS import io.zenoh.value.Value /** + * # Publisher + * * A Zenoh Publisher. * * A publisher is automatically dropped when using it with the 'try-with-resources' statement (i.e. 'use' in Kotlin). @@ -58,6 +60,12 @@ import io.zenoh.value.Value * * The publisher configuration parameters can be later changed using the setter functions. * + * ## Lifespan + * + * Internally, the [Session] from which the [Publisher] was declared keeps a reference to it, therefore keeping it alive + * until the session is closed. For the cases where we want to stop the publisher earlier, it's necessary + * to keep a reference to it in order to undeclare it later. + * * @property keyExpr The key expression the publisher will be associated to. * @property qos [QoS] configuration of the publisher. * @property jniPublisher Delegate class handling the communication with the native code. @@ -96,7 +104,7 @@ class Publisher internal constructor( return qos.priority() } - override fun isValid(): Boolean { + fun isValid(): Boolean { return jniPublisher != null } @@ -109,10 +117,6 @@ class Publisher internal constructor( jniPublisher = null } - protected fun finalize() { - jniPublisher?.close() - } - class Put internal constructor( private var jniPublisher: JNIPublisher?, val value: Value, diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/queryable/Queryable.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/queryable/Queryable.kt index ef48f43f..5e07f090 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/queryable/Queryable.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/queryable/Queryable.kt @@ -23,6 +23,8 @@ import io.zenoh.keyexpr.KeyExpr import kotlinx.coroutines.channels.Channel /** + * # Queryable + * * A queryable that allows to perform multiple queries on the specified [KeyExpr]. * * Its main purpose is to keep the queryable active as long as it exists. @@ -56,6 +58,11 @@ import kotlinx.coroutines.channels.Channel * } * }} * ``` + * ## Lifespan + * + * Internally, the [Session] from which the [Queryable] was declared keeps a reference to it, therefore keeping it alive + * until the session is closed. For the cases where we want to stop the queryable earlier, it's necessary + * to keep a reference to it in order to undeclare it later. * * @param R Receiver type of the [Handler] implementation. If no handler is provided to the builder, [R] will be [Unit]. * @property keyExpr The [KeyExpr] to which the subscriber is associated. @@ -68,7 +75,7 @@ class Queryable internal constructor( val keyExpr: KeyExpr, val receiver: R?, private var jniQueryable: JNIQueryable? ) : AutoCloseable, SessionDeclaration { - override fun isValid(): Boolean { + fun isValid(): Boolean { return jniQueryable != null } @@ -81,10 +88,6 @@ class Queryable internal constructor( undeclare() } - protected fun finalize() { - jniQueryable?.close() - } - companion object { /** @@ -119,7 +122,7 @@ class Queryable internal constructor( private val keyExpr: KeyExpr, private var callback: Callback? = null, private var handler: Handler? = null - ): Resolvable> { + ) : Resolvable> { private var complete: Boolean = false private var onClose: (() -> Unit)? = null @@ -165,7 +168,15 @@ class Queryable internal constructor( handler?.onClose() onClose?.invoke() } - return session.run { resolveQueryable(keyExpr, resolvedCallback, resolvedOnClose, handler?.receiver(), complete) } + return session.run { + resolveQueryable( + keyExpr, + resolvedCallback, + resolvedOnClose, + handler?.receiver(), + complete + ) + } } } } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/subscriber/Subscriber.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/subscriber/Subscriber.kt index 65d5858b..6670c38d 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/subscriber/Subscriber.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/subscriber/Subscriber.kt @@ -25,6 +25,8 @@ import io.zenoh.sample.Sample import kotlinx.coroutines.channels.Channel /** + * # Subscriber + * * A subscriber that allows listening to updates on a key expression and reacting to changes. * * Its main purpose is to keep the subscription active as long as it exists. @@ -56,6 +58,12 @@ import kotlinx.coroutines.channels.Channel * } * ``` * + * ## Lifespan + * + * Internally, the [Session] from which the [Subscriber] was declared keeps a reference to it, therefore keeping it alive + * until the session is closed. For the cases where we want to stop the subscriber earlier, it's necessary + * to keep a reference to it in order to undeclare it later. + * * @param R Receiver type of the [Handler] implementation. If no handler is provided to the builder, R will be [Unit]. * @property keyExpr The [KeyExpr] to which the subscriber is associated. * @property receiver Optional [R] that is provided when specifying a [Handler] for the subscriber. @@ -67,7 +75,7 @@ class Subscriber internal constructor( val keyExpr: KeyExpr, val receiver: R?, private var jniSubscriber: JNISubscriber? ) : AutoCloseable, SessionDeclaration { - override fun isValid(): Boolean { + fun isValid(): Boolean { return jniSubscriber != null } @@ -80,10 +88,6 @@ class Subscriber internal constructor( undeclare() } - protected fun finalize() { - jniSubscriber?.close() - } - companion object { /** diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/KeyExprTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/KeyExprTest.kt index 37116af9..ee589caa 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/KeyExprTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/KeyExprTest.kt @@ -127,14 +127,4 @@ class KeyExprTest { keyExpr.close() keyExpr2.close() } - - @Test - fun keyExprIsValidAfterClosingSession() { - val session = Session.open().getOrThrow() - val keyExpr = session.declareKeyExpr("a/b/c").res().getOrThrow() - session.close() - - assertTrue(keyExpr.isValid()) - assertFalse(keyExpr.toString().isEmpty()) // An operation such as toString that goes through JNI is still valid. - } } diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SessionTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SessionTest.kt index af41ca49..02a8f544 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SessionTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SessionTest.kt @@ -56,23 +56,17 @@ class SessionTest { } @Test - fun sessionClose_declarationsAreAliveAfterClosingSessionTest() = runBlocking { + fun sessionClose_declarationsAreUndeclaredAfterClosingSessionTest() = runBlocking { val session = Session.open().getOrThrow() - var receivedSample: Sample? = null val publisher = session.declarePublisher(testKeyExpr).res().getOrThrow() - val subscriber = session.declareSubscriber(testKeyExpr).with { sample -> receivedSample = sample }.res().getOrThrow() + val subscriber = session.declareSubscriber(testKeyExpr).res().getOrThrow() session.close() - assertTrue(publisher.isValid()) - assertTrue(subscriber.isValid()) + assertFalse(publisher.isValid()) + assertFalse(subscriber.isValid()) - publisher.put("Test").res() - assertNotNull(receivedSample) - assertEquals("Test", receivedSample!!.value.payload.decodeToString()) - - subscriber.close() - publisher.close() + assertTrue(publisher.put("Test").res().isFailure) } @Test