Skip to content

Commit

Permalink
Allowing session declarations to stay alive for the lifespan of a ses…
Browse files Browse the repository at this point in the history
…sion. (#96)

* feat(background declarations): Allowing session declarations to stay alive for the lifespan of a session.

Zenoh declarations ran for as long as the Kotlin variable representing them was kept alive. This
meant that whenever the user lost track of the variable, it got garbage collected and undeclared
in the process. This behavior seems to be counterintuitive for programmers used to garbage collected
languages (see #43).

Therefore in this PR we provide the following change: we keep track of session declarations in a list
inside the session, allowing users to keep running them despite losing their references.
When the session is finalized, the associated declarations are undeclared.
In case the user needs to close a declaration earlier, they need to keep the variable in order
to undeclare it.

* feat(background declarations): closing all session declarations immediately after doing `session.close()`
  • Loading branch information
DariusIMP authored Aug 1, 2024
1 parent 9c79835 commit 19171df
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 51 deletions.
23 changes: 15 additions & 8 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ import java.time.Duration
* A Zenoh Session, the core interaction point with a Zenoh network.
*
* A session is typically associated with declarations such as [Publisher]s, [Subscriber]s, or [Queryable]s, which are
* declared using [declarePublisher], [declareSubscriber], and [declareQueryable], respectively.
* declared using [declarePublisher], [declareSubscriber], and [declareQueryable], respectively. It is also possible to
* declare key expressions ([KeyExpr]) as well with [declareKeyExpr] for optimization purposes.
*
* Other operations such as simple Put, Get or Delete can be performed from a session using [put], [get] and [delete].
* Finally, it's possible to declare key expressions ([KeyExpr]) as well.
*
* Sessions are open upon creation and can be closed manually by calling [close]. Alternatively, the session will be
* automatically closed when used with Java's try-with-resources statement or its Kotlin counterpart, [use].
Expand All @@ -52,6 +53,8 @@ class Session private constructor(private val config: Config) : AutoCloseable {

private var jniSession: JNISession? = JNISession()

private var declarations = mutableListOf<SessionDeclaration>()

companion object {

private val sessionClosedException = SessionException("Session is closed.")
Expand Down Expand Up @@ -80,13 +83,17 @@ class Session private constructor(private val config: Config) : AutoCloseable {

/** Close the session. */
override fun close() {
declarations.removeIf {
it.undeclare()
true
}

jniSession?.close()
jniSession = null
}

@Suppress("removal")
protected fun finalize() {
jniSession?.close()
close()
}

/**
Expand Down Expand Up @@ -228,7 +235,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
*/
fun declareKeyExpr(keyExpr: String): Resolvable<KeyExpr> = Resolvable {
return@Resolvable jniSession?.run {
declareKeyExpr(keyExpr)
declareKeyExpr(keyExpr).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
}

Expand Down Expand Up @@ -379,7 +386,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {

internal fun resolvePublisher(keyExpr: KeyExpr, qos: QoS): Result<Publisher> {
return jniSession?.run {
declarePublisher(keyExpr, qos)
declarePublisher(keyExpr, qos).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
}

Expand All @@ -391,7 +398,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
reliability: Reliability
): Result<Subscriber<R>> {
return jniSession?.run {
declareSubscriber(keyExpr, callback, onClose, receiver, reliability)
declareSubscriber(keyExpr, callback, onClose, receiver, reliability).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
}

Expand All @@ -403,7 +410,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
complete: Boolean
): Result<Queryable<R>> {
return jniSession?.run {
declareQueryable(keyExpr, callback, onClose, receiver, complete)
declareQueryable(keyExpr, callback, onClose, receiver, complete).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ package io.zenoh
* Session declaration.
*
* A session declaration is either a [io.zenoh.publication.Publisher],
* a [io.zenoh.subscriber.Subscriber] or a [io.zenoh.queryable.Queryable] declared from a [Session].
* a [io.zenoh.subscriber.Subscriber], a [io.zenoh.queryable.Queryable] or a [io.zenoh.keyexpr.KeyExpr] declared from a [Session].
*/
interface SessionDeclaration {

/** Returns true if the declaration has not been undeclared. */
fun isValid(): Boolean

/** Undeclare a declaration. No further operations should be performed after calling this function. */
fun undeclare()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package io.zenoh.keyexpr

import io.zenoh.Resolvable
import io.zenoh.Session
import io.zenoh.SessionDeclaration
import io.zenoh.jni.JNIKeyExpr

/**
Expand Down Expand Up @@ -59,7 +60,7 @@ import io.zenoh.jni.JNIKeyExpr
* @param jniKeyExpr An optional [JNIKeyExpr] instance, present when the key expression was declared through [Session.declareKeyExpr],
* it represents the native instance of the key expression.
*/
class KeyExpr internal constructor(internal val keyExpr: String, internal var jniKeyExpr: JNIKeyExpr? = null): AutoCloseable {
class KeyExpr internal constructor(internal val keyExpr: String, internal var jniKeyExpr: JNIKeyExpr? = null): AutoCloseable, SessionDeclaration {

companion object {

Expand Down Expand Up @@ -139,6 +140,10 @@ class KeyExpr internal constructor(internal val keyExpr: String, internal var jn
jniKeyExpr = null
}

override fun undeclare() {
close()
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import io.zenoh.prelude.QoS
import io.zenoh.value.Value

/**
* # Publisher
*
* A Zenoh Publisher.
*
* A publisher is automatically dropped when using it with the 'try-with-resources' statement (i.e. 'use' in Kotlin).
Expand Down Expand Up @@ -58,6 +60,12 @@ import io.zenoh.value.Value
*
* The publisher configuration parameters can be later changed using the setter functions.
*
* ## Lifespan
*
* Internally, the [Session] from which the [Publisher] was declared keeps a reference to it, therefore keeping it alive
* until the session is closed. For the cases where we want to stop the publisher earlier, it's necessary
* to keep a reference to it in order to undeclare it later.
*
* @property keyExpr The key expression the publisher will be associated to.
* @property qos [QoS] configuration of the publisher.
* @property jniPublisher Delegate class handling the communication with the native code.
Expand Down Expand Up @@ -96,7 +104,7 @@ class Publisher internal constructor(
return qos.priority()
}

override fun isValid(): Boolean {
fun isValid(): Boolean {
return jniPublisher != null
}

Expand All @@ -109,10 +117,6 @@ class Publisher internal constructor(
jniPublisher = null
}

protected fun finalize() {
jniPublisher?.close()
}

class Put internal constructor(
private var jniPublisher: JNIPublisher?,
val value: Value,
Expand Down
25 changes: 18 additions & 7 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/queryable/Queryable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import io.zenoh.keyexpr.KeyExpr
import kotlinx.coroutines.channels.Channel

/**
* # Queryable
*
* A queryable that allows to perform multiple queries on the specified [KeyExpr].
*
* Its main purpose is to keep the queryable active as long as it exists.
Expand Down Expand Up @@ -56,6 +58,11 @@ import kotlinx.coroutines.channels.Channel
* }
* }}
* ```
* ## Lifespan
*
* Internally, the [Session] from which the [Queryable] was declared keeps a reference to it, therefore keeping it alive
* until the session is closed. For the cases where we want to stop the queryable earlier, it's necessary
* to keep a reference to it in order to undeclare it later.
*
* @param R Receiver type of the [Handler] implementation. If no handler is provided to the builder, [R] will be [Unit].
* @property keyExpr The [KeyExpr] to which the subscriber is associated.
Expand All @@ -68,7 +75,7 @@ class Queryable<R> internal constructor(
val keyExpr: KeyExpr, val receiver: R?, private var jniQueryable: JNIQueryable?
) : AutoCloseable, SessionDeclaration {

override fun isValid(): Boolean {
fun isValid(): Boolean {
return jniQueryable != null
}

Expand All @@ -81,10 +88,6 @@ class Queryable<R> internal constructor(
undeclare()
}

protected fun finalize() {
jniQueryable?.close()
}

companion object {

/**
Expand Down Expand Up @@ -119,7 +122,7 @@ class Queryable<R> internal constructor(
private val keyExpr: KeyExpr,
private var callback: Callback<Query>? = null,
private var handler: Handler<Query, R>? = null
): Resolvable<Queryable<R>> {
) : Resolvable<Queryable<R>> {
private var complete: Boolean = false
private var onClose: (() -> Unit)? = null

Expand Down Expand Up @@ -165,7 +168,15 @@ class Queryable<R> internal constructor(
handler?.onClose()
onClose?.invoke()
}
return session.run { resolveQueryable(keyExpr, resolvedCallback, resolvedOnClose, handler?.receiver(), complete) }
return session.run {
resolveQueryable(
keyExpr,
resolvedCallback,
resolvedOnClose,
handler?.receiver(),
complete
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.zenoh.sample.Sample
import kotlinx.coroutines.channels.Channel

/**
* # Subscriber
*
* A subscriber that allows listening to updates on a key expression and reacting to changes.
*
* Its main purpose is to keep the subscription active as long as it exists.
Expand Down Expand Up @@ -56,6 +58,12 @@ import kotlinx.coroutines.channels.Channel
* }
* ```
*
* ## Lifespan
*
* Internally, the [Session] from which the [Subscriber] was declared keeps a reference to it, therefore keeping it alive
* until the session is closed. For the cases where we want to stop the subscriber earlier, it's necessary
* to keep a reference to it in order to undeclare it later.
*
* @param R Receiver type of the [Handler] implementation. If no handler is provided to the builder, R will be [Unit].
* @property keyExpr The [KeyExpr] to which the subscriber is associated.
* @property receiver Optional [R] that is provided when specifying a [Handler] for the subscriber.
Expand All @@ -67,7 +75,7 @@ class Subscriber<R> internal constructor(
val keyExpr: KeyExpr, val receiver: R?, private var jniSubscriber: JNISubscriber?
) : AutoCloseable, SessionDeclaration {

override fun isValid(): Boolean {
fun isValid(): Boolean {
return jniSubscriber != null
}

Expand All @@ -80,10 +88,6 @@ class Subscriber<R> internal constructor(
undeclare()
}

protected fun finalize() {
jniSubscriber?.close()
}

companion object {

/**
Expand Down
10 changes: 0 additions & 10 deletions zenoh-kotlin/src/commonTest/kotlin/io/zenoh/KeyExprTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,4 @@ class KeyExprTest {
keyExpr.close()
keyExpr2.close()
}

@Test
fun keyExprIsValidAfterClosingSession() {
val session = Session.open().getOrThrow()
val keyExpr = session.declareKeyExpr("a/b/c").res().getOrThrow()
session.close()

assertTrue(keyExpr.isValid())
assertFalse(keyExpr.toString().isEmpty()) // An operation such as toString that goes through JNI is still valid.
}
}
16 changes: 5 additions & 11 deletions zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SessionTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,17 @@ class SessionTest {
}

@Test
fun sessionClose_declarationsAreAliveAfterClosingSessionTest() = runBlocking {
fun sessionClose_declarationsAreUndeclaredAfterClosingSessionTest() = runBlocking {
val session = Session.open().getOrThrow()
var receivedSample: Sample? = null

val publisher = session.declarePublisher(testKeyExpr).res().getOrThrow()
val subscriber = session.declareSubscriber(testKeyExpr).with { sample -> receivedSample = sample }.res().getOrThrow()
val subscriber = session.declareSubscriber(testKeyExpr).res().getOrThrow()
session.close()

assertTrue(publisher.isValid())
assertTrue(subscriber.isValid())
assertFalse(publisher.isValid())
assertFalse(subscriber.isValid())

publisher.put("Test").res()
assertNotNull(receivedSample)
assertEquals("Test", receivedSample!!.value.payload.decodeToString())

subscriber.close()
publisher.close()
assertTrue(publisher.put("Test").res().isFailure)
}

@Test
Expand Down

0 comments on commit 19171df

Please sign in to comment.