From 7fe1fdf8bcb23e2b02af0ea282e6dfd9372f8655 Mon Sep 17 00:00:00 2001 From: Darius Maitia Date: Thu, 1 Aug 2024 12:24:02 +0200 Subject: [PATCH 1/2] feat(background declarations): Allowing session declarations to stay alive for the lifespan of a session. Zenoh declarations ran for as long as the Kotlin variable representing them was kept alive. This meant that whenever the user lost track of the variable, it got garbage collected and undeclared in the process. This behavior seems to be counterintuitive for programmers used to garbage collected languages (see #43). Therefore in this PR we provide the following change: we keep track of session declarations in a list inside the session, allowing users to keep running them despite losing their references. When the session is finalized, the associated declarations are undeclared. In case the user needs to close a declaration earlier, they need to keep the variable in order to undeclare it. --- .../src/commonMain/kotlin/io/zenoh/Session.kt | 17 +++++++------ .../kotlin/io/zenoh/SessionDeclaration.kt | 5 +--- .../kotlin/io/zenoh/keyexpr/KeyExpr.kt | 7 +++++- .../kotlin/io/zenoh/publication/Publisher.kt | 14 +++++++---- .../kotlin/io/zenoh/queryable/Queryable.kt | 25 +++++++++++++------ .../kotlin/io/zenoh/subscriber/Subscriber.kt | 14 +++++++---- 6 files changed, 53 insertions(+), 29 deletions(-) diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt index 79f6e596..752a1c09 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt @@ -37,9 +37,10 @@ import java.time.Duration * A Zenoh Session, the core interaction point with a Zenoh network. * * A session is typically associated with declarations such as [Publisher]s, [Subscriber]s, or [Queryable]s, which are - * declared using [declarePublisher], [declareSubscriber], and [declareQueryable], respectively. + * declared using [declarePublisher], [declareSubscriber], and [declareQueryable], respectively. It is also possible to + * declare key expressions ([KeyExpr]) as well with [declareKeyExpr] for optimization purposes. + * * Other operations such as simple Put, Get or Delete can be performed from a session using [put], [get] and [delete]. - * Finally, it's possible to declare key expressions ([KeyExpr]) as well. * * Sessions are open upon creation and can be closed manually by calling [close]. Alternatively, the session will be * automatically closed when used with Java's try-with-resources statement or its Kotlin counterpart, [use]. @@ -52,6 +53,8 @@ class Session private constructor(private val config: Config) : AutoCloseable { private var jniSession: JNISession? = JNISession() + private var declarations = mutableListOf() + companion object { private val sessionClosedException = SessionException("Session is closed.") @@ -84,8 +87,8 @@ class Session private constructor(private val config: Config) : AutoCloseable { jniSession = null } - @Suppress("removal") protected fun finalize() { + declarations.forEach { it.undeclare() } jniSession?.close() } @@ -228,7 +231,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { */ fun declareKeyExpr(keyExpr: String): Resolvable = Resolvable { return@Resolvable jniSession?.run { - declareKeyExpr(keyExpr) + declareKeyExpr(keyExpr).onSuccess { declarations.add(it) } } ?: Result.failure(sessionClosedException) } @@ -379,7 +382,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { internal fun resolvePublisher(keyExpr: KeyExpr, qos: QoS): Result { return jniSession?.run { - declarePublisher(keyExpr, qos) + declarePublisher(keyExpr, qos).onSuccess { declarations.add(it) } } ?: Result.failure(sessionClosedException) } @@ -391,7 +394,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { reliability: Reliability ): Result> { return jniSession?.run { - declareSubscriber(keyExpr, callback, onClose, receiver, reliability) + declareSubscriber(keyExpr, callback, onClose, receiver, reliability).onSuccess { declarations.add(it) } } ?: Result.failure(sessionClosedException) } @@ -403,7 +406,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { complete: Boolean ): Result> { return jniSession?.run { - declareQueryable(keyExpr, callback, onClose, receiver, complete) + declareQueryable(keyExpr, callback, onClose, receiver, complete).onSuccess { declarations.add(it) } } ?: Result.failure(sessionClosedException) } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/SessionDeclaration.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/SessionDeclaration.kt index 1c747e40..ecdfd860 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/SessionDeclaration.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/SessionDeclaration.kt @@ -18,13 +18,10 @@ package io.zenoh * Session declaration. * * A session declaration is either a [io.zenoh.publication.Publisher], - * a [io.zenoh.subscriber.Subscriber] or a [io.zenoh.queryable.Queryable] declared from a [Session]. + * a [io.zenoh.subscriber.Subscriber], a [io.zenoh.queryable.Queryable] or a [io.zenoh.keyexpr.KeyExpr] declared from a [Session]. */ interface SessionDeclaration { - /** Returns true if the declaration has not been undeclared. */ - fun isValid(): Boolean - /** Undeclare a declaration. No further operations should be performed after calling this function. */ fun undeclare() } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/keyexpr/KeyExpr.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/keyexpr/KeyExpr.kt index 17244bdd..41c7f6e1 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/keyexpr/KeyExpr.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/keyexpr/KeyExpr.kt @@ -16,6 +16,7 @@ package io.zenoh.keyexpr import io.zenoh.Resolvable import io.zenoh.Session +import io.zenoh.SessionDeclaration import io.zenoh.jni.JNIKeyExpr /** @@ -59,7 +60,7 @@ import io.zenoh.jni.JNIKeyExpr * @param jniKeyExpr An optional [JNIKeyExpr] instance, present when the key expression was declared through [Session.declareKeyExpr], * it represents the native instance of the key expression. */ -class KeyExpr internal constructor(internal val keyExpr: String, internal var jniKeyExpr: JNIKeyExpr? = null): AutoCloseable { +class KeyExpr internal constructor(internal val keyExpr: String, internal var jniKeyExpr: JNIKeyExpr? = null): AutoCloseable, SessionDeclaration { companion object { @@ -139,6 +140,10 @@ class KeyExpr internal constructor(internal val keyExpr: String, internal var jn jniKeyExpr = null } + override fun undeclare() { + close() + } + override fun equals(other: Any?): Boolean { if (this === other) return true if (javaClass != other?.javaClass) return false diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt index 4a60a257..2aee4315 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt @@ -24,6 +24,8 @@ import io.zenoh.prelude.QoS import io.zenoh.value.Value /** + * # Publisher + * * A Zenoh Publisher. * * A publisher is automatically dropped when using it with the 'try-with-resources' statement (i.e. 'use' in Kotlin). @@ -58,6 +60,12 @@ import io.zenoh.value.Value * * The publisher configuration parameters can be later changed using the setter functions. * + * ## Lifespan + * + * Internally, the [Session] from which the [Publisher] was declared keeps a reference to it, therefore keeping it alive + * until the session is closed. For the cases where we want to stop the publisher earlier, it's necessary + * to keep a reference to it in order to undeclare it later. + * * @property keyExpr The key expression the publisher will be associated to. * @property qos [QoS] configuration of the publisher. * @property jniPublisher Delegate class handling the communication with the native code. @@ -96,7 +104,7 @@ class Publisher internal constructor( return qos.priority() } - override fun isValid(): Boolean { + fun isValid(): Boolean { return jniPublisher != null } @@ -109,10 +117,6 @@ class Publisher internal constructor( jniPublisher = null } - protected fun finalize() { - jniPublisher?.close() - } - class Put internal constructor( private var jniPublisher: JNIPublisher?, val value: Value, diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/queryable/Queryable.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/queryable/Queryable.kt index ef48f43f..5e07f090 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/queryable/Queryable.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/queryable/Queryable.kt @@ -23,6 +23,8 @@ import io.zenoh.keyexpr.KeyExpr import kotlinx.coroutines.channels.Channel /** + * # Queryable + * * A queryable that allows to perform multiple queries on the specified [KeyExpr]. * * Its main purpose is to keep the queryable active as long as it exists. @@ -56,6 +58,11 @@ import kotlinx.coroutines.channels.Channel * } * }} * ``` + * ## Lifespan + * + * Internally, the [Session] from which the [Queryable] was declared keeps a reference to it, therefore keeping it alive + * until the session is closed. For the cases where we want to stop the queryable earlier, it's necessary + * to keep a reference to it in order to undeclare it later. * * @param R Receiver type of the [Handler] implementation. If no handler is provided to the builder, [R] will be [Unit]. * @property keyExpr The [KeyExpr] to which the subscriber is associated. @@ -68,7 +75,7 @@ class Queryable internal constructor( val keyExpr: KeyExpr, val receiver: R?, private var jniQueryable: JNIQueryable? ) : AutoCloseable, SessionDeclaration { - override fun isValid(): Boolean { + fun isValid(): Boolean { return jniQueryable != null } @@ -81,10 +88,6 @@ class Queryable internal constructor( undeclare() } - protected fun finalize() { - jniQueryable?.close() - } - companion object { /** @@ -119,7 +122,7 @@ class Queryable internal constructor( private val keyExpr: KeyExpr, private var callback: Callback? = null, private var handler: Handler? = null - ): Resolvable> { + ) : Resolvable> { private var complete: Boolean = false private var onClose: (() -> Unit)? = null @@ -165,7 +168,15 @@ class Queryable internal constructor( handler?.onClose() onClose?.invoke() } - return session.run { resolveQueryable(keyExpr, resolvedCallback, resolvedOnClose, handler?.receiver(), complete) } + return session.run { + resolveQueryable( + keyExpr, + resolvedCallback, + resolvedOnClose, + handler?.receiver(), + complete + ) + } } } } diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/subscriber/Subscriber.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/subscriber/Subscriber.kt index 65d5858b..6670c38d 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/subscriber/Subscriber.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/subscriber/Subscriber.kt @@ -25,6 +25,8 @@ import io.zenoh.sample.Sample import kotlinx.coroutines.channels.Channel /** + * # Subscriber + * * A subscriber that allows listening to updates on a key expression and reacting to changes. * * Its main purpose is to keep the subscription active as long as it exists. @@ -56,6 +58,12 @@ import kotlinx.coroutines.channels.Channel * } * ``` * + * ## Lifespan + * + * Internally, the [Session] from which the [Subscriber] was declared keeps a reference to it, therefore keeping it alive + * until the session is closed. For the cases where we want to stop the subscriber earlier, it's necessary + * to keep a reference to it in order to undeclare it later. + * * @param R Receiver type of the [Handler] implementation. If no handler is provided to the builder, R will be [Unit]. * @property keyExpr The [KeyExpr] to which the subscriber is associated. * @property receiver Optional [R] that is provided when specifying a [Handler] for the subscriber. @@ -67,7 +75,7 @@ class Subscriber internal constructor( val keyExpr: KeyExpr, val receiver: R?, private var jniSubscriber: JNISubscriber? ) : AutoCloseable, SessionDeclaration { - override fun isValid(): Boolean { + fun isValid(): Boolean { return jniSubscriber != null } @@ -80,10 +88,6 @@ class Subscriber internal constructor( undeclare() } - protected fun finalize() { - jniSubscriber?.close() - } - companion object { /** From ca1ea9217dd064f60c00e3188c8f9ec66cd79ad0 Mon Sep 17 00:00:00 2001 From: Darius Maitia Date: Thu, 1 Aug 2024 12:52:09 +0200 Subject: [PATCH 2/2] feat(background declarations): closing all session declarations immediately after doing `session.close()` --- .../src/commonMain/kotlin/io/zenoh/Session.kt | 8 ++++++-- .../commonTest/kotlin/io/zenoh/KeyExprTest.kt | 10 ---------- .../commonTest/kotlin/io/zenoh/SessionTest.kt | 16 +++++----------- 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt index 752a1c09..c8a55a8b 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt @@ -83,13 +83,17 @@ class Session private constructor(private val config: Config) : AutoCloseable { /** Close the session. */ override fun close() { + declarations.removeIf { + it.undeclare() + true + } + jniSession?.close() jniSession = null } protected fun finalize() { - declarations.forEach { it.undeclare() } - jniSession?.close() + close() } /** diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/KeyExprTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/KeyExprTest.kt index 37116af9..ee589caa 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/KeyExprTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/KeyExprTest.kt @@ -127,14 +127,4 @@ class KeyExprTest { keyExpr.close() keyExpr2.close() } - - @Test - fun keyExprIsValidAfterClosingSession() { - val session = Session.open().getOrThrow() - val keyExpr = session.declareKeyExpr("a/b/c").res().getOrThrow() - session.close() - - assertTrue(keyExpr.isValid()) - assertFalse(keyExpr.toString().isEmpty()) // An operation such as toString that goes through JNI is still valid. - } } diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SessionTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SessionTest.kt index af41ca49..02a8f544 100644 --- a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SessionTest.kt +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SessionTest.kt @@ -56,23 +56,17 @@ class SessionTest { } @Test - fun sessionClose_declarationsAreAliveAfterClosingSessionTest() = runBlocking { + fun sessionClose_declarationsAreUndeclaredAfterClosingSessionTest() = runBlocking { val session = Session.open().getOrThrow() - var receivedSample: Sample? = null val publisher = session.declarePublisher(testKeyExpr).res().getOrThrow() - val subscriber = session.declareSubscriber(testKeyExpr).with { sample -> receivedSample = sample }.res().getOrThrow() + val subscriber = session.declareSubscriber(testKeyExpr).res().getOrThrow() session.close() - assertTrue(publisher.isValid()) - assertTrue(subscriber.isValid()) + assertFalse(publisher.isValid()) + assertFalse(subscriber.isValid()) - publisher.put("Test").res() - assertNotNull(receivedSample) - assertEquals("Test", receivedSample!!.value.payload.decodeToString()) - - subscriber.close() - publisher.close() + assertTrue(publisher.put("Test").res().isFailure) } @Test