From 97b2a5c75d96acc15b50aa3a355f06e8ad4fdbec Mon Sep 17 00:00:00 2001 From: Darius Maitia Date: Fri, 23 Feb 2024 15:06:48 +0100 Subject: [PATCH] feat(user_attachment): user attachment implementation (#41) --- .../src/commonMain/kotlin/io/zenoh/Session.kt | 6 +- .../kotlin/io/zenoh/jni/JNIPublisher.kt | 42 ++- .../kotlin/io/zenoh/jni/JNIQuery.kt | 4 +- .../kotlin/io/zenoh/jni/JNISession.kt | 29 +- .../kotlin/io/zenoh/jni/JNIUtils.kt | 74 +++++ .../io/zenoh/jni/callbacks/JNIGetCallback.kt | 3 +- .../jni/callbacks/JNIQueryableCallback.kt | 1 + .../jni/callbacks/JNISubscriberCallback.kt | 3 +- .../kotlin/io/zenoh/publication/Delete.kt | 2 +- .../kotlin/io/zenoh/publication/Publisher.kt | 65 ++++- .../kotlin/io/zenoh/publication/Put.kt | 13 +- .../commonMain/kotlin/io/zenoh/query/Get.kt | 12 +- .../commonMain/kotlin/io/zenoh/query/Reply.kt | 9 +- .../kotlin/io/zenoh/queryable/Query.kt | 3 + .../kotlin/io/zenoh/sample/Attachment.kt | 60 ++++ .../kotlin/io/zenoh/sample/Sample.kt | 4 +- .../kotlin/io/zenoh/UserAttachmentTest.kt | 274 ++++++++++++++++++ zenoh-jni/src/publisher.rs | 62 +++- zenoh-jni/src/put.rs | 17 +- zenoh-jni/src/query.rs | 85 +++++- zenoh-jni/src/reply.rs | 17 +- zenoh-jni/src/session.rs | 22 +- zenoh-jni/src/subscriber.rs | 22 +- zenoh-jni/src/utils.rs | 75 ++++- zenoh-jni/src/value.rs | 13 +- 25 files changed, 830 insertions(+), 87 deletions(-) create mode 100644 zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIUtils.kt create mode 100644 zenoh-java/src/commonMain/kotlin/io/zenoh/sample/Attachment.kt create mode 100644 zenoh-java/src/commonTest/kotlin/io/zenoh/UserAttachmentTest.kt diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt index 9509ba8a..719199d6 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt @@ -25,6 +25,7 @@ import io.zenoh.publication.Put import io.zenoh.query.* import io.zenoh.queryable.Query import io.zenoh.queryable.Queryable +import io.zenoh.sample.Attachment import io.zenoh.sample.Sample import io.zenoh.selector.Selector import io.zenoh.subscriber.Reliability @@ -387,12 +388,13 @@ class Session private constructor(private val config: Config) : AutoCloseable { timeout: Duration, target: QueryTarget, consolidation: ConsolidationMode, - value: Value? + value: Value?, + attachment: Attachment?, ): R? { if (jniSession == null) { throw sessionClosedException } - return jniSession?.performGet(selector, callback, onClose, receiver, timeout, target, consolidation, value) + return jniSession?.performGet(selector, callback, onClose, receiver, timeout, target, consolidation, value, attachment) } @Throws(ZenohException::class) diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt index fa2eebc2..08369a08 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIPublisher.kt @@ -19,6 +19,7 @@ import io.zenoh.exceptions.ZenohException import io.zenoh.prelude.SampleKind import io.zenoh.publication.CongestionControl import io.zenoh.publication.Priority +import io.zenoh.sample.Attachment import io.zenoh.value.Value /** @@ -32,20 +33,39 @@ internal class JNIPublisher(private val ptr: Long) { * Put value through the publisher. * * @param value The [Value] to be put. + * @param attachment Optional [Attachment]. */ @Throws(ZenohException::class) - fun put(value: Value) { - putViaJNI(value.payload, value.encoding.knownEncoding.ordinal, ptr) + fun put(value: Value, attachment: Attachment?) { + putViaJNI(value.payload, value.encoding.knownEncoding.ordinal, attachment?.let { encodeAttachment(it) }, ptr) } + /** + * Write operation. + * + * @param kind The [SampleKind]. + * @param value The [Value] to be written. + * @param attachment Optional [Attachment]. + */ @Throws(ZenohException::class) - fun write(kind: SampleKind, value: Value) { - writeViaJNI(value.payload, value.encoding.knownEncoding.ordinal, kind.ordinal, ptr) + fun write(kind: SampleKind, value: Value, attachment: Attachment?) { + writeViaJNI( + value.payload, + value.encoding.knownEncoding.ordinal, + kind.ordinal, + attachment?.let { encodeAttachment(it) }, + ptr + ) } + /** + * Delete operation. + * + * @param attachment Optional [Attachment]. + */ @Throws(ZenohException::class) - fun delete() { - deleteViaJNI(ptr) + fun delete(attachment: Attachment?) { + deleteViaJNI(attachment?.let { encodeAttachment(it) }, ptr) } /** @@ -106,13 +126,17 @@ internal class JNIPublisher(private val ptr: Long) { /** Puts through the native Publisher. */ @Throws(ZenohException::class) - private external fun putViaJNI(valuePayload: ByteArray, valueEncoding: Int, ptr: Long) + private external fun putViaJNI( + valuePayload: ByteArray, valueEncoding: Int, encodedAttachment: ByteArray?, ptr: Long + ) @Throws(ZenohException::class) - private external fun writeViaJNI(payload: ByteArray, encoding: Int, sampleKind: Int, ptr: Long) + private external fun writeViaJNI( + payload: ByteArray, encoding: Int, sampleKind: Int, encodedAttachment: ByteArray?, ptr: Long + ) @Throws(ZenohException::class) - private external fun deleteViaJNI(ptr: Long) + private external fun deleteViaJNI(encodedAttachment: ByteArray?, ptr: Long) /** Frees the underlying native Publisher. */ private external fun freePtrViaJNI(ptr: Long) diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt index 1b3feb63..bf2d49bd 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIQuery.kt @@ -38,6 +38,7 @@ internal class JNIQuery(private val ptr: Long) { sample.kind.ordinal, timestampEnabled, if (timestampEnabled) sample.timestamp!!.ntpValue() else 0, + sample.attachment?.let { encodeAttachment(it) }, ) } @@ -58,7 +59,8 @@ internal class JNIQuery(private val ptr: Long) { valueEncoding: Int, sampleKind: Int, timestampEnabled: Boolean, - timestampNtp64: Long + timestampNtp64: Long, + attachment: ByteArray?, ) @Throws(ZenohException::class) diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt index e8f04732..db5c0df5 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt @@ -31,6 +31,7 @@ import io.zenoh.publication.Put import io.zenoh.query.* import io.zenoh.queryable.Query import io.zenoh.queryable.Queryable +import io.zenoh.sample.Attachment import io.zenoh.sample.Sample import io.zenoh.selector.Selector import io.zenoh.subscriber.Reliability @@ -80,13 +81,15 @@ internal class JNISession { keyExpr: KeyExpr, callback: Callback, onClose: () -> Unit, receiver: R?, reliability: Reliability ): Subscriber { val subCallback = - JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid -> + JNISubscriberCallback { keyExprPtr, payload, encoding, kind, timestampNTP64, timestampIsValid, attachmentBytes -> val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null + val attachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) } val sample = Sample( KeyExpr(JNIKeyExpr(keyExprPtr)), Value(payload, Encoding(KnownEncoding.fromInt(encoding))), SampleKind.fromInt(kind), - timestamp + timestamp, + attachment ) callback.run(sample) } @@ -101,12 +104,13 @@ internal class JNISession { keyExpr: KeyExpr, callback: Callback, onClose: () -> Unit, receiver: R?, complete: Boolean ): Queryable { val queryCallback = - JNIQueryableCallback { keyExprPtr: Long, selectorParams: String, withValue: Boolean, payload: ByteArray?, encoding: Int, queryPtr: Long -> + JNIQueryableCallback { keyExprPtr: Long, selectorParams: String, withValue: Boolean, payload: ByteArray?, encoding: Int, attachmentBytes: ByteArray, queryPtr: Long -> val jniQuery = JNIQuery(queryPtr) val keyExpression = KeyExpr(JNIKeyExpr(keyExprPtr)) val selector = Selector(keyExpression, selectorParams) val value: Value? = if (withValue) Value(payload!!, Encoding(KnownEncoding.fromInt(encoding))) else null - val query = Query(keyExpression, selector, value, jniQuery) + val decodedAttachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) } + val query = Query(keyExpression, selector, value, decodedAttachment, jniQuery) callback.run(query) } val queryableRawPtr = @@ -123,17 +127,20 @@ internal class JNISession { timeout: Duration, target: QueryTarget, consolidation: ConsolidationMode, - value: Value? + value: Value?, + attachment: Attachment? ): R? { val getCallback = - JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean -> + JNIGetCallback { replierId: String, success: Boolean, keyExprPtr: Long, payload: ByteArray, encoding: Int, kind: Int, timestampNTP64: Long, timestampIsValid: Boolean, attachmentBytes: ByteArray -> if (success) { val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null + val decodedAttachment = attachmentBytes.takeIf { it.isNotEmpty() }?.let { decodeAttachment(it) } val sample = Sample( KeyExpr(JNIKeyExpr(keyExprPtr)), Value(payload, Encoding(KnownEncoding.fromInt(encoding))), SampleKind.fromInt(kind), - timestamp + timestamp, + decodedAttachment ) val reply = Reply.Success(replierId, sample) callback.run(reply) @@ -153,6 +160,7 @@ internal class JNISession { timeout.toMillis(), target.ordinal, consolidation.ordinal, + attachment?.let { encodeAttachment(it) } ) } else { getWithValueViaJNI( @@ -166,6 +174,7 @@ internal class JNISession { consolidation.ordinal, value.payload, value.encoding.knownEncoding.ordinal, + attachment?.let { encodeAttachment(it) } ) } return receiver @@ -195,6 +204,7 @@ internal class JNISession { put.congestionControl.ordinal, put.priority.value, put.kind.ordinal, + put.attachment?.let { encodeAttachment(it) } ) } @@ -246,6 +256,7 @@ internal class JNISession { timeoutMs: Long, target: Int, consolidation: Int, + attachmentBytes: ByteArray?, ) @Throws(ZenohException::class) @@ -259,7 +270,8 @@ internal class JNISession { target: Int, consolidation: Int, payload: ByteArray, - encoding: Int + encoding: Int, + attachmentBytes: ByteArray?, ) @Throws(ZenohException::class) @@ -271,5 +283,6 @@ internal class JNISession { congestionControl: Int, priority: Int, kind: Int, + attachmentBytes: ByteArray?, ) } diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIUtils.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIUtils.kt new file mode 100644 index 00000000..9c9bb5ae --- /dev/null +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIUtils.kt @@ -0,0 +1,74 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.jni + +import io.zenoh.sample.Attachment + +/** + * Encode attachment as a byte array. + */ +internal fun encodeAttachment(attachment: Attachment): ByteArray { + return attachment.values.map { + val key = it.first + val keyLength = key.size.toByteArray() + val value = it.second + val valueLength = value.size.toByteArray() + keyLength + key + valueLength + value + }.reduce { acc, bytes -> acc + bytes } +} + +/** + * Decode an attachment as a byte array, recreating the original [Attachment]. + */ +internal fun decodeAttachment(attachmentBytes: ByteArray): Attachment { + var idx = 0 + var sliceSize: Int + val pairs: MutableList> = mutableListOf() + while (idx < attachmentBytes.size) { + sliceSize = attachmentBytes.sliceArray(IntRange(idx, idx + Int.SIZE_BYTES - 1)).toInt() + idx += Int.SIZE_BYTES + + val key = attachmentBytes.sliceArray(IntRange(idx, idx + sliceSize - 1)) + idx += sliceSize + + sliceSize = attachmentBytes.sliceArray(IntRange(idx, idx + Int.SIZE_BYTES - 1)).toInt() + idx += Int.SIZE_BYTES + + val value = attachmentBytes.sliceArray(IntRange(idx, idx + sliceSize - 1)) + idx += sliceSize + + pairs.add(key to value) + } + return Attachment(pairs) +} + +/** + * Converts an integer into a byte array with little endian format. + */ +fun Int.toByteArray(): ByteArray { + val result = ByteArray(UInt.SIZE_BYTES) + (0 until UInt.SIZE_BYTES).forEach { + result[it] = this.shr(Byte.SIZE_BITS * it).toByte() + } + return result +} + +/** + * To int. The byte array is expected to be in Little Endian format. + * + * @return The integer value. + */ +fun ByteArray.toInt(): Int = + (((this[3].toUInt() and 0xFFu) shl 24) or ((this[2].toUInt() and 0xFFu) shl 16) or ((this[1].toUInt() and 0xFFu) shl 8) or (this[0].toUInt() and 0xFFu)).toInt() diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt index 5adb1653..621eaf98 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIGetCallback.kt @@ -24,6 +24,7 @@ internal fun interface JNIGetCallback { encoding: Int, kind: Int, timestampNTP64: Long, - timestampIsValid: Boolean + timestampIsValid: Boolean, + attachment: ByteArray, ) } diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIQueryableCallback.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIQueryableCallback.kt index 402d1cf4..813965aa 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIQueryableCallback.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIQueryableCallback.kt @@ -20,5 +20,6 @@ internal fun interface JNIQueryableCallback { withValue: Boolean, payload: ByteArray?, encoding: Int, + attachmentBytes: ByteArray, queryPtr: Long) } diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt index fc0a4ca5..750888eb 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNISubscriberCallback.kt @@ -21,6 +21,7 @@ internal fun interface JNISubscriberCallback { encoding: Int, kind: Int, timestampNTP64: Long, - timestampIsValid: Boolean + timestampIsValid: Boolean, + attachment: ByteArray, ) } diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Delete.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Delete.kt index 3d6702df..5d6036a4 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Delete.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Delete.kt @@ -46,7 +46,7 @@ class Delete private constructor( congestionControl: CongestionControl, priority: Priority, kind: SampleKind -) : Put(keyExpr, value, congestionControl, priority, kind) { +) : Put(keyExpr, value, congestionControl, priority, kind, null) { companion object { /** diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt index e0bd1353..a11c1ecc 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Publisher.kt @@ -20,6 +20,7 @@ import io.zenoh.exceptions.ZenohException import io.zenoh.jni.JNIPublisher import io.zenoh.keyexpr.KeyExpr import io.zenoh.prelude.SampleKind +import io.zenoh.sample.Attachment import io.zenoh.value.Value import kotlin.Throws @@ -71,16 +72,10 @@ class Publisher internal constructor( } /** Performs a PUT operation on the specified [keyExpr] with the specified [value]. */ - @Throws(ZenohException::class) - fun put(value: Value): Resolvable = Resolvable { - return@Resolvable jniPublisher?.put(value) ?: throw(sessionException) - } + fun put(value: Value) = Put(jniPublisher, value) /** Performs a PUT operation on the specified [keyExpr] with the specified string [value]. */ - @Throws(ZenohException::class) - fun put(value: String): Resolvable = Resolvable { - return@Resolvable jniPublisher?.put(Value(value)) ?: throw(sessionException) - } + fun put(value: String) = Put(jniPublisher, Value(value)) /** * Performs a WRITE operation on the specified [keyExpr] @@ -89,20 +84,16 @@ class Publisher internal constructor( * @param value The [Value] to send. * @return A [Resolvable] operation. */ - @Throws(ZenohException::class) - fun write(kind: SampleKind, value: Value): Resolvable = Resolvable { - return@Resolvable jniPublisher?.write(kind, value) ?: throw(sessionException) - } + fun write(kind: SampleKind, value: Value) = Write(jniPublisher, value, kind) + /** * Performs a DELETE operation on the specified [keyExpr] * * @return A [Resolvable] operation. */ - @Throws(ZenohException::class) - fun delete(): Resolvable = Resolvable { - return@Resolvable jniPublisher?.delete() ?: throw(sessionException) - } + fun delete() = Delete(jniPublisher) + /** Get congestion control policy. */ fun getCongestionControl(): CongestionControl { @@ -157,6 +148,48 @@ class Publisher internal constructor( jniPublisher?.close() } + class Put internal constructor( + private var jniPublisher: JNIPublisher?, + val value: Value, + var attachment: Attachment? = null + ) : Resolvable { + + fun withAttachment(attachment: Attachment) = apply { this.attachment = attachment } + + @Throws(ZenohException::class) + override fun res() { + jniPublisher?.put(value, attachment) + } + } + + class Write internal constructor( + private var jniPublisher: JNIPublisher?, + val value: Value, + val sampleKind: SampleKind, + var attachment: Attachment? = null + ) : Resolvable { + + fun withAttachment(attachment: Attachment) = apply { this.attachment = attachment } + + @Throws(ZenohException::class) + override fun res() { + jniPublisher?.write(sampleKind, value, attachment) + } + } + + class Delete internal constructor( + private var jniPublisher: JNIPublisher?, + var attachment: Attachment? = null + ) : Resolvable { + + fun withAttachment(attachment: Attachment) = apply { this.attachment = attachment } + + @Throws(ZenohException::class) + override fun res() { + jniPublisher?.delete(attachment) + } + } + /** * Publisher Builder. * diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Put.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Put.kt index 132a7025..1d27f522 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Put.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/publication/Put.kt @@ -20,6 +20,7 @@ import io.zenoh.exceptions.ZenohException import io.zenoh.keyexpr.KeyExpr import io.zenoh.prelude.Encoding import io.zenoh.prelude.SampleKind +import io.zenoh.sample.Attachment import io.zenoh.value.Value /** @@ -49,13 +50,15 @@ import io.zenoh.value.Value * @property congestionControl The [CongestionControl] to be applied when routing the data. * @property priority The [Priority] of zenoh messages. * @property kind The [SampleKind] of the sample (put or delete). + * @property attachment An optional user [Attachment]. */ open class Put protected constructor( val keyExpr: KeyExpr, val value: Value, val congestionControl: CongestionControl, val priority: Priority, - val kind: SampleKind + val kind: SampleKind, + val attachment: Attachment? ) { companion object { @@ -91,7 +94,8 @@ open class Put protected constructor( private var value: Value, private var congestionControl: CongestionControl = CongestionControl.DROP, private var priority: Priority = Priority.DATA, - private var kind: SampleKind = SampleKind.PUT + private var kind: SampleKind = SampleKind.PUT, + private var attachment: Attachment? = null ): Resolvable { /** Change the [Encoding] of the written data. */ @@ -109,10 +113,13 @@ open class Put protected constructor( /** Change the [SampleKind] of the sample. If set to [SampleKind.DELETE], performs a delete operation. */ fun kind(kind: SampleKind) = apply { this.kind = kind } + /** Set an attachment to the put operation. */ + fun withAttachment(attachment: Attachment) = apply { this.attachment = attachment } + /** Resolves the put operation. */ @Throws(ZenohException::class) override fun res() { - val put = Put(keyExpr, value, congestionControl, priority, kind) + val put = Put(keyExpr, value, congestionControl, priority, kind, attachment) session.run { resolvePut(keyExpr, put) } } } diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt index 7b1070e0..11c2fd45 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt @@ -19,6 +19,7 @@ import io.zenoh.Session import io.zenoh.exceptions.ZenohException import io.zenoh.handlers.BlockingQueueHandler import io.zenoh.handlers.Handler +import io.zenoh.sample.Attachment import io.zenoh.selector.Selector import io.zenoh.value.Value import java.time.Duration @@ -83,6 +84,7 @@ class Get private constructor() { private var target: QueryTarget = QueryTarget.BEST_MATCHING private var consolidation: ConsolidationMode = ConsolidationMode.NONE private var value: Value? = null + private var attachment: Attachment? = null private var onClose: (() -> Unit)? = null private constructor(other: Builder<*>, handler: Handler?) : this(other.session, other.selector) { @@ -100,6 +102,7 @@ class Get private constructor() { this.target = other.target this.consolidation = other.consolidation this.value = other.value + this.attachment = other.attachment this.onClose = other.onClose } @@ -136,6 +139,12 @@ class Get private constructor() { return this } + /** Specify an [Attachment]. */ + fun withAttachment(attachment: Attachment): Builder { + this.attachment = attachment + return this + } + /** * Specify an action to be invoked when the Get operation is over. * @@ -177,7 +186,8 @@ class Get private constructor() { timeout, target, consolidation, - value + value, + attachment ) } } diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Reply.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Reply.kt index e7b5ee0e..e0ba2e76 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Reply.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Reply.kt @@ -22,6 +22,7 @@ import io.zenoh.prelude.SampleKind import io.zenoh.value.Value import io.zenoh.keyexpr.KeyExpr import io.zenoh.queryable.Query +import io.zenoh.sample.Attachment import org.apache.commons.net.ntp.TimeStamp /** @@ -109,6 +110,7 @@ abstract class Reply private constructor(val replierId: String) : ZenohType { private var kind = SampleKind.PUT private var timeStamp: TimeStamp? = null + private var attachment: Attachment? = null /** * Sets the [SampleKind] of the replied [Sample]. @@ -120,12 +122,17 @@ abstract class Reply private constructor(val replierId: String) : ZenohType { */ fun withTimeStamp(timeStamp: TimeStamp) = apply { this.timeStamp = timeStamp } + /** + * Appends an [Attachment] to the reply. + */ + fun withAttachment(attachment: Attachment) = apply { this.attachment = attachment } + /** * Constructs the reply sample with the provided parameters and triggers the reply to the query. */ @Throws(ZenohException::class) override fun res() { - val sample = Sample(keyExpr, value, kind, timeStamp) + val sample = Sample(keyExpr, value, kind, timeStamp, attachment) return query.reply(Success("", sample)).res() } } diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/queryable/Query.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/queryable/Query.kt index faa27977..30d6d4a9 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/queryable/Query.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/queryable/Query.kt @@ -23,6 +23,7 @@ import io.zenoh.exceptions.ZenohException import io.zenoh.jni.JNIQuery import io.zenoh.keyexpr.KeyExpr import io.zenoh.query.Reply +import io.zenoh.sample.Attachment /** * Represents a Zenoh Query in Kotlin. @@ -32,6 +33,7 @@ import io.zenoh.query.Reply * @property keyExpr The key expression to which the query is associated. * @property selector The selector * @property value Optional value in case the received query was declared using "with query". + * @property attachment Optional [Attachment]. * @property jniQuery Delegate object in charge of communicating with the underlying native code. * @constructor Instances of Query objects are only meant to be created through the JNI upon receiving * a query request. Therefore, the constructor is private. @@ -40,6 +42,7 @@ class Query internal constructor( val keyExpr: KeyExpr, val selector: Selector, val value: Value?, + val attachment: Attachment?, private var jniQuery: JNIQuery? ) : AutoCloseable, ZenohType { diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/sample/Attachment.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/sample/Attachment.kt new file mode 100644 index 00000000..1da85f82 --- /dev/null +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/sample/Attachment.kt @@ -0,0 +1,60 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.sample + +/** + * Attachment + * + * An attachment consists of a list of non-unique ordered key value pairs, where keys are UTF-8 Strings and the values are bytes. + * Inserting at the same key multiple times leads to both values being transmitted for that key. + * + * Attachments can be added to a message sent through Zenoh while performing puts, queries and replies. + * + * Using attachments will result in performance loss. + * + * @property values + * @constructor Create empty Attachment + */ +class Attachment internal constructor(val values: List>) { + + class Builder { + + private val values: MutableList> = mutableListOf() + + fun add(key: ByteArray, value: ByteArray) = apply { + values.add(key to value) + } + + fun add(key: String, value: ByteArray) = apply { + values.add(key.toByteArray() to value) + } + + fun add(key: String, value: String) = apply { + values.add(key.toByteArray() to value.toByteArray()) + } + + fun addAll(elements: Collection>) { + values.addAll(elements) + } + + fun addAll(elements: Iterable>) { + values.addAll(elements) + } + + fun res(): Attachment { + return Attachment(values) + } + } +} diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/sample/Sample.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/sample/Sample.kt index e84cd96d..5762d4cd 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/sample/Sample.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/sample/Sample.kt @@ -30,12 +30,14 @@ import org.apache.commons.net.ntp.TimeStamp * @property value The [Value] of the sample. * @property kind The [SampleKind] of the sample. * @property timestamp Optional [TimeStamp]. + * @property attachment Optional [Attachment]. */ class Sample( val keyExpr: KeyExpr, val value: Value, val kind: SampleKind, - val timestamp: TimeStamp? + val timestamp: TimeStamp?, + val attachment: Attachment? = null ): ZenohType { override fun toString(): String { return if (kind == SampleKind.DELETE) "$kind($keyExpr)" else "$kind($keyExpr: $value)" diff --git a/zenoh-java/src/commonTest/kotlin/io/zenoh/UserAttachmentTest.kt b/zenoh-java/src/commonTest/kotlin/io/zenoh/UserAttachmentTest.kt new file mode 100644 index 00000000..380c202d --- /dev/null +++ b/zenoh-java/src/commonTest/kotlin/io/zenoh/UserAttachmentTest.kt @@ -0,0 +1,274 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh + +import io.zenoh.jni.decodeAttachment +import io.zenoh.jni.encodeAttachment +import io.zenoh.jni.toByteArray +import io.zenoh.jni.toInt +import io.zenoh.keyexpr.intoKeyExpr +import io.zenoh.prelude.Encoding +import io.zenoh.prelude.KnownEncoding +import io.zenoh.prelude.SampleKind +import io.zenoh.query.Reply +import io.zenoh.sample.Attachment +import io.zenoh.sample.Sample +import io.zenoh.value.Value +import java.time.Duration +import kotlin.test.* + +class UserAttachmentTest { + + companion object { + val value = Value("test", Encoding(KnownEncoding.TEXT_PLAIN)) + val keyExpr = "example/testing/attachment".intoKeyExpr() + val attachmentPairs = arrayListOf( + "key1" to "value1", "key2" to "value2", "key3" to "value3", "repeatedKey" to "value1", "repeatedKey" to "value2" + ) + val attachment = + Attachment(attachmentPairs.map { it.first.encodeToByteArray() to it.second.encodeToByteArray() }) + } + + private fun assertAttachmentOk(attachment: Attachment?) { + assertNotNull(attachment) + val receivedPairs = attachment.values + assertEquals(attachmentPairs.size, receivedPairs.size) + for ((index, receivedPair) in receivedPairs.withIndex()) { + assertEquals(attachmentPairs[index].first, receivedPair.first.decodeToString()) + assertEquals(attachmentPairs[index].second, receivedPair.second.decodeToString()) + } + } + + @Test + fun putWithAttachmentTest() { + var receivedSample: Sample? = null + val session = Session.open() + + val subscriber = session.declareSubscriber(keyExpr).with { sample -> receivedSample = sample }.res() + session.put(keyExpr, value).withAttachment(attachment).res() + + Thread.sleep(500) + + subscriber.close() + session.close() + + assertNotNull(receivedSample) + assertEquals(value, receivedSample!!.value) + assertAttachmentOk(receivedSample!!.attachment) + } + + @Test + fun publisherPutWithAttachmentTest() { + val session = Session.open() + + var receivedSample: Sample? = null + val publisher = session.declarePublisher(keyExpr).res() + val subscriber = session.declareSubscriber(keyExpr).with { sample -> + receivedSample = sample + }.res() + + publisher.put("test").withAttachment(attachment).res() + publisher.close() + subscriber.close() + session.close() + + assertAttachmentOk(receivedSample!!.attachment!!) + } + + @Test + fun publisherPutWithoutAttachmentTest() { + val session = Session.open() + + var receivedSample: Sample? = null + val publisher = session.declarePublisher(keyExpr).res() + val subscriber = session.declareSubscriber(keyExpr).with { sample -> + receivedSample = sample + }.res() + publisher.put("test").res() + + Thread.sleep(1000) + + subscriber.close() + publisher.close() + session.close() + + assertNotNull(receivedSample) + assertNull(receivedSample!!.attachment) + } + + @Test + fun publisherWriteWithAttachmentTest() { + val session = Session.open() + + var receivedSample: Sample? = null + val publisher = session.declarePublisher(keyExpr).res() + val subscriber = session.declareSubscriber(keyExpr).with { sample -> + receivedSample = sample + }.res() + + publisher.write(SampleKind.PUT, Value("test")).withAttachment(attachment).res() + Thread.sleep(500) + + subscriber.close() + publisher.close() + session.close() + + assertAttachmentOk(receivedSample!!.attachment!!) + } + + @Test + fun publisherWriteWithoutAttachmentTest() { + val session = Session.open() + + var receivedSample: Sample? = null + val publisher = session.declarePublisher(keyExpr).res() + val subscriber = session.declareSubscriber(keyExpr).with { sample -> + receivedSample = sample + }.res() + + publisher.write(SampleKind.PUT, Value("test")).res() + + Thread.sleep(500) + + publisher.close() + subscriber.close() + session.close() + + assertNotNull(receivedSample) + assertNull(receivedSample!!.attachment) + } + + @Test + fun publisherDeleteWithAttachmentTest() { + val session = Session.open() + + var receivedSample: Sample? = null + val publisher = session.declarePublisher(keyExpr).res() + val subscriber = session.declareSubscriber(keyExpr).with { sample -> + receivedSample = sample + }.res() + + publisher.delete().withAttachment(attachment).res() + + Thread.sleep(500) + subscriber.close() + publisher.close() + session.close() + + assertAttachmentOk(receivedSample!!.attachment!!) + } + + @Test + fun publisherDeleteWithoutAttachmentTest() { + val session = Session.open() + + var receivedSample: Sample? = null + val publisher = session.declarePublisher(keyExpr).res() + val subscriber = session.declareSubscriber(keyExpr).with { sample -> + receivedSample = sample + }.res() + + publisher.delete().res() + + subscriber.close() + publisher.close() + session.close() + + assertNotNull(receivedSample) + assertNull(receivedSample!!.attachment) + } + + @Test + fun queryWithAttachmentTest() { + val session = Session.open() + + var receivedAttachment: Attachment? = null + + val queryable = session.declareQueryable(keyExpr).with { query -> + receivedAttachment = query.attachment + query.reply(keyExpr).success("test").res() + }.res() + + session.get(keyExpr).with {}.withAttachment(attachment).timeout(Duration.ofMillis(1000)).res() + Thread.sleep(1000) + + queryable.close() + session.close() + assertAttachmentOk(receivedAttachment) + } + + @Test + fun queryReplyWithAttachmentTest() { + val session = Session.open() + + var receivedAttachment: Attachment? = null + + val queryable = session.declareQueryable(keyExpr).with { query -> + query.reply(keyExpr).success("test").withAttachment(attachment).res() + }.res() + + session.get(keyExpr).with { reply -> + (reply as Reply.Success) + receivedAttachment = reply.sample.attachment + }.timeout(Duration.ofMillis(1000)).res() + + Thread.sleep(1000) + + queryable.close() + session.close() + assertAttachmentOk(receivedAttachment) + } + + @Test + fun queryReplyWithoutAttachmentTest() { + var reply: Reply? = null + val session = Session.open() + val queryable = session.declareQueryable(keyExpr).with { query -> + query.reply(keyExpr).success("test").res() + }.res() + + session.get(keyExpr).with { + reply = it + }.timeout(Duration.ofMillis(1000)).res() + + Thread.sleep(1000) + + queryable.close() + session.close() + + assertNotNull(reply) + assertTrue(reply is Reply.Success) + assertNull((reply as Reply.Success).sample.attachment) + } + + @Test + fun encodeAndDecodeNumbersTest() { + val numbers: List = arrayListOf(0, 1, -1, 12345, -12345, 123567, 123456789, -123456789) + + for (number in numbers) { + val bytes = number.toByteArray() + val decodedNumber: Int = bytes.toInt() + assertEquals(number, decodedNumber) + } + } + + @Test + fun encodeAndDecodeAttachmentTest() { + val encodedAttachment = encodeAttachment(attachment) + val decodedAttachment = decodeAttachment(encodedAttachment) + + assertAttachmentOk(decodedAttachment) + } +} diff --git a/zenoh-jni/src/publisher.rs b/zenoh-jni/src/publisher.rs index 227929b2..525be99e 100644 --- a/zenoh-jni/src/publisher.rs +++ b/zenoh-jni/src/publisher.rs @@ -28,6 +28,7 @@ use zenoh::{ use crate::{ errors::{Error, Result}, sample::decode_sample_kind, + utils::{decode_byte_array, vec_to_attachment}, }; use crate::{ put::{decode_congestion_control, decode_priority}, @@ -44,6 +45,7 @@ use zenoh::SessionDeclarations; /// - `_class`: The JNI class. /// - `payload`: The payload to be published, represented as a Java byte array (`JByteArray`). /// - `encoding`: The encoding type of the payload. +/// - `encoded_attachment`: Optional encoded attachment. May be null. /// - `ptr`: The raw pointer to the Zenoh publisher ([Publisher]). /// /// Safety: @@ -60,10 +62,17 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIPublisher_putViaJNI( _class: JClass, payload: JByteArray, encoding: jint, + encoded_attachment: JByteArray, ptr: *const Publisher<'static>, ) { let publisher = Arc::from_raw(ptr); - match perform_put(&env, payload, encoding, publisher.clone()) { + match perform_put( + &env, + payload, + encoding, + encoded_attachment, + publisher.clone(), + ) { Ok(_) => {} Err(err) => { _ = err.throw_on_jvm(&mut env).map_err(|err| { @@ -150,6 +159,7 @@ pub(crate) unsafe fn declare_publisher( /// - `env`: The JNI environment. /// - `payload`: The payload as a `JByteArray`. /// - `encoding`: The encoding of the payload. +/// - `encoded_attachment`: Optional encoded attachment. May be null. /// - `publisher`: The Zenoh publisher. /// /// Returns: @@ -159,11 +169,16 @@ fn perform_put( env: &JNIEnv, payload: JByteArray, encoding: jint, + encoded_attachment: JByteArray, publisher: Arc, ) -> Result<()> { let value = decode_value(env, payload, encoding)?; - publisher - .put(value) + let mut publication = publisher.put(value); + if !encoded_attachment.is_null() { + let aux = decode_byte_array(env, encoded_attachment)?; + publication = publication.with_attachment(vec_to_attachment(aux)) + }; + publication .res_sync() .map_err(|err| Error::Session(err.to_string())) } @@ -258,6 +273,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIPublisher_setPriorityViaJNI( /// - `payload`: The payload as a `JByteArray`. /// - `encoding`: The [zenoh::Encoding] of the payload. /// - `sample_kind`: The [zenoh::SampleKind] to use. +/// - `encoded_attachment`: Optional encoded attachment. May be null. /// - `publisher`: The Zenoh [Publisher]. /// /// Returns: @@ -268,12 +284,17 @@ fn perform_write( payload: JByteArray, encoding: jint, sample_kind: jint, + encoded_attachment: JByteArray, publisher: Arc, ) -> Result<()> { let value = decode_value(env, payload, encoding)?; let sample_kind = decode_sample_kind(sample_kind)?; - publisher - .write(sample_kind, value) + let mut publication = publisher.write(sample_kind, value); + if !encoded_attachment.is_null() { + let aux = decode_byte_array(env, encoded_attachment)?; + publication = publication.with_attachment(vec_to_attachment(aux)) + }; + publication .res() .map_err(|err| Error::Session(format!("{}", err))) } @@ -288,6 +309,7 @@ fn perform_write( /// - `payload`: The payload to be published, represented as a [Java byte array](JByteArray). /// - `encoding`: The [`encoding`](zenoh::Encoding) of the payload. /// - `sample_kind`: The [`kind`](zenoh::SampleKind) to use. +/// - `encoded_attachment`: Optional encoded attachment. May be null. /// - `ptr`: The raw pointer to the Zenoh publisher ([Publisher]). /// /// Safety: @@ -305,10 +327,18 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIPublisher_writeViaJNI( payload: JByteArray, encoding: jint, sample_kind: jint, + encoded_attachment: JByteArray, ptr: *const Publisher<'static>, ) { let publisher = Arc::from_raw(ptr); - match perform_write(&env, payload, encoding, sample_kind, publisher.clone()) { + match perform_write( + &env, + payload, + encoding, + sample_kind, + encoded_attachment, + publisher.clone(), + ) { Ok(_) => {} Err(err) => { _ = err.throw_on_jvm(&mut env).map_err(|err| { @@ -325,14 +355,24 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIPublisher_writeViaJNI( /// Performs a DELETE operation via JNI using the specified Zenoh publisher. /// /// Parameters: +/// - `env`: The JNI environment. +/// - `encoded_attachment`: Optional encoded attachment. May be null. /// - `publisher`: The Zenoh [Publisher]. /// /// Returns: /// - A [Result] indicating the success or failure of the operation. /// -fn perform_delete(publisher: Arc) -> Result<()> { - publisher - .delete() +fn perform_delete( + env: &JNIEnv, + encoded_attachment: JByteArray, + publisher: Arc, +) -> Result<()> { + let mut delete = publisher.delete(); + if !encoded_attachment.is_null() { + let aux = decode_byte_array(env, encoded_attachment)?; + delete = delete.with_attachment(vec_to_attachment(aux)) + }; + delete .res() .map_err(|err| Error::Session(format!("{}", err))) } @@ -344,6 +384,7 @@ fn perform_delete(publisher: Arc) -> Result<()> { /// Parameters: /// - `env`: The JNI environment. /// - `_class`: The JNI class. +/// - `encoded_attachment`: Optional encoded attachment. May be null. /// - `ptr`: The raw pointer to the [Zenoh publisher](Publisher). /// /// Safety: @@ -358,10 +399,11 @@ fn perform_delete(publisher: Arc) -> Result<()> { pub unsafe extern "C" fn Java_io_zenoh_jni_JNIPublisher_deleteViaJNI( mut env: JNIEnv, _class: JClass, + encoded_attachment: JByteArray, ptr: *const Publisher<'static>, ) { let publisher = Arc::from_raw(ptr); - match perform_delete(publisher.clone()) { + match perform_delete(&env, encoded_attachment, publisher.clone()) { Ok(_) => {} Err(err) => { _ = err.throw_on_jvm(&mut env).map_err(|err| { diff --git a/zenoh-jni/src/put.rs b/zenoh-jni/src/put.rs index a370b2da..0e2cf410 100644 --- a/zenoh-jni/src/put.rs +++ b/zenoh-jni/src/put.rs @@ -14,6 +14,7 @@ use crate::errors::{Error, Result}; use crate::sample::decode_sample_kind; +use crate::utils::{decode_byte_array, vec_to_attachment}; use crate::value::decode_value; use jni::objects::JByteArray; @@ -34,6 +35,7 @@ use zenoh::prelude::r#sync::*; /// - `congestion_control`: The [CongestionControl] mechanism specified. /// - `priority`: The [Priority] mechanism specified. /// - `sample_kind`: The [SampleKind] of the put operation. +/// - `attachment`: An optional attachment, encoded into a byte array. May be null. /// /// Returns: /// - A `Result` indicating the result of the `get` operation, with an [Error] in case of failure. @@ -48,6 +50,7 @@ pub(crate) fn on_put( congestion_control: jint, priority: jint, sample_kind: jint, + attachment: JByteArray, ) -> Result<()> { let value = decode_value(env, payload, encoding)?; let sample_kind = decode_sample_kind(sample_kind)?; @@ -71,13 +74,19 @@ pub(crate) fn on_put( }; let key_expr_clone = key_expr.deref().clone(); - match session + + let mut put_builder = session .put(key_expr_clone, value.to_owned()) .kind(sample_kind) .congestion_control(congestion_control) - .priority(priority) - .res() - { + .priority(priority); + + if !attachment.is_null() { + let attachment = decode_byte_array(env, attachment)?; + put_builder = put_builder.with_attachment(vec_to_attachment(attachment)) + } + + match put_builder.res() { Ok(_) => { log::trace!("Put on '{key_expr}' with value '{value}' and encoding '{}'. Kind: '{sample_kind}', Congestion control: '{congestion_control:?}', Priority: '{priority:?}'", value.encoding); Ok(()) diff --git a/zenoh-jni/src/query.rs b/zenoh-jni/src/query.rs index af3a88bc..333b6cfc 100644 --- a/zenoh-jni/src/query.rs +++ b/zenoh-jni/src/query.rs @@ -23,15 +23,19 @@ use zenoh::{ prelude::{sync::SyncResolve, KeyExpr, SplitBuffer}, query::{ConsolidationMode, QueryTarget}, queryable::Query, - sample::Sample, + sample::{Attachment, Sample}, value::Value, }; -use crate::sample::decode_sample; use crate::{ errors::{Error, Result}, + utils::attachment_to_vec, value::decode_value, }; +use crate::{ + sample::decode_sample, + utils::{decode_byte_array, vec_to_attachment}, +}; /// Replies with success to a Zenoh query via JNI. /// @@ -47,6 +51,7 @@ use crate::{ /// - `sample_kind`: The kind of sample. /// - `timestamp_enabled`: A boolean indicating whether the timestamp is enabled. /// - `timestamp_ntp_64`: The NTP64 timestamp value. +/// - `attachment`: Optional user attachment encoded as a byte array. May be null. /// /// Safety: /// - This function is marked as unsafe due to raw pointer manipulation and JNI interaction. @@ -66,6 +71,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI( sample_kind: jint, timestamp_enabled: jboolean, timestamp_ntp_64: jlong, + attachment: JByteArray, ) { let key_expr = Arc::from_raw(key_expr_ptr); let key_expr_clone = key_expr.deref().clone(); @@ -87,9 +93,22 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI( return; } }; + let attachment: Option = if !attachment.is_null() { + match decode_byte_array(&env, attachment) { + Ok(attachment_bytes) => Some(vec_to_attachment(attachment_bytes)), + Err(err) => { + _ = err.throw_on_jvm(&mut env).map_err(|err| { + log::error!("Unable to throw exception on query reply failure. {}", err) + }); + return; + } + } + } else { + None + }; let query = Arc::from_raw(query_ptr); - query_reply(&query, Ok(sample), env); + query_reply(env, &query, Ok(sample), attachment); mem::forget(query) } @@ -109,6 +128,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replySuccessViaJNI( /// - `key_expr`: The key expression associated with the query result. /// - `payload`: The payload as a `JByteArray`. /// - `encoding`: The encoding of the payload as a jint. +/// - `attachment`: The user attachment bytes. /// /// Safety: /// - This function is marked as unsafe due to raw pointer manipulation and JNI interaction. @@ -124,6 +144,7 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replyErrorViaJNI( ptr: *const zenoh::queryable::Query, payload: JByteArray, encoding: jint, + attachment: JByteArray, ) { let errorValue = match decode_value(&env, payload, encoding) { Ok(value) => value, @@ -134,9 +155,22 @@ pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuery_replyErrorViaJNI( return; } }; + let attachment: Option = if !attachment.is_null() { + match decode_byte_array(&env, attachment) { + Ok(attachment_bytes) => Some(vec_to_attachment(attachment_bytes)), + Err(err) => { + _ = err.throw_on_jvm(&mut env).map_err(|err| { + log::error!("Unable to throw exception on query reply failure. {}", err) + }); + return; + } + } + } else { + None + }; let query = Arc::from_raw(ptr); - query_reply(&query, Err(errorValue), env); + query_reply(env, &query, Err(errorValue), attachment); mem::forget(query) } @@ -209,6 +243,17 @@ pub(crate) fn on_query( None => (false, JPrimitiveArray::default(), 0), }; + let attachment_bytes = match query.attachment().map_or_else( + || env.byte_array_from_slice(&[]), + |attachment| env.byte_array_from_slice(attachment_to_vec(attachment.clone()).as_slice()), + ) { + Ok(byte_array) => Ok(byte_array), + Err(err) => Err(Error::Jni(format!( + "Error processing attachment of reply: {}.", + err + ))), + }?; + let key_expr = query.key_expr().clone(); let key_expr_ptr = Arc::into_raw(Arc::new(key_expr)); let query_ptr = Arc::into_raw(Arc::new(query)); @@ -217,13 +262,14 @@ pub(crate) fn on_query( .call_method( callback_global_ref, "run", - "(JLjava/lang/String;Z[BIJ)V", + "(JLjava/lang/String;Z[BI[BJ)V", &[ JValue::from(key_expr_ptr as jlong), JValue::from(&selector_params_jstr), JValue::from(with_value), JValue::from(&payload), JValue::from(encoding), + JValue::from(&attachment_bytes), JValue::from(query_ptr as jlong), ], ) @@ -251,12 +297,29 @@ pub(crate) fn on_query( } /// Helper function to perform a reply to a query. -fn query_reply(query: &Arc, reply: core::result::Result, mut env: JNIEnv) { - match query - .reply(reply) - .res() - .map_err(|err| Error::Session(err.to_string())) - { +fn query_reply( + mut env: JNIEnv, + query: &Arc, + reply: core::result::Result, + attachment: Option, +) { + let result = if let Some(attachment) = attachment { + query + .reply(reply) + .with_attachment(attachment) + .unwrap_or_else(|(builder, _)| { + log::warn!("Unable to append attachment to query reply"); + builder + }) + .res() + .map_err(|err| Error::Session(err.to_string())) + } else { + query + .reply(reply) + .res() + .map_err(|err| Error::Session(err.to_string())) + }; + match result { Ok(_) => {} Err(err) => { _ = err.throw_on_jvm(&mut env).map_err(|err| { diff --git a/zenoh-jni/src/reply.rs b/zenoh-jni/src/reply.rs index 753dd178..f3660a03 100644 --- a/zenoh-jni/src/reply.rs +++ b/zenoh-jni/src/reply.rs @@ -26,8 +26,8 @@ use zenoh::{ value::Value, }; -use crate::errors::Error; use crate::errors::Result; +use crate::{errors::Error, utils::attachment_to_vec}; pub(crate) fn on_reply( mut env: JNIEnv, @@ -58,11 +58,23 @@ fn on_reply_success( || (0, false), |timestamp| (timestamp.get_time().as_u64(), true), ); + + let attachment_bytes = match sample.attachment.map_or_else( + || env.byte_array_from_slice(&[]), + |attachment| env.byte_array_from_slice(attachment_to_vec(attachment).as_slice()), + ) { + Ok(byte_array) => Ok(byte_array), + Err(err) => Err(Error::Jni(format!( + "Error processing attachment of reply: {}.", + err + ))), + }?; + let key_expr_ptr = Arc::into_raw(Arc::new(sample.key_expr)); let result = match env.call_method( callback_global_ref, "run", - "(Ljava/lang/String;ZJ[BIIJZ)V", + "(Ljava/lang/String;ZJ[BIIJZ[B)V", &[ JValue::from(&zenoh_id), JValue::from(true), @@ -72,6 +84,7 @@ fn on_reply_success( JValue::from(kind), JValue::from(timestamp as i64), JValue::from(is_valid), + JValue::from(&attachment_bytes), ], ) { Ok(_) => Ok(()), diff --git a/zenoh-jni/src/session.rs b/zenoh-jni/src/session.rs index 02d498e1..cb4803e6 100644 --- a/zenoh-jni/src/session.rs +++ b/zenoh-jni/src/session.rs @@ -19,7 +19,10 @@ use crate::query::{decode_consolidation, decode_query_target}; use crate::queryable::declare_queryable; use crate::reply::on_reply; use crate::subscriber::declare_subscriber; -use crate::utils::{decode_string, get_callback_global_ref, get_java_vm, load_on_close}; +use crate::utils::{ + decode_byte_array, decode_string, get_callback_global_ref, get_java_vm, load_on_close, + vec_to_attachment, +}; use crate::value::decode_value; use jni::objects::{JByteArray, JClass, JObject, JString}; @@ -262,6 +265,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declarePublisherViaJNI( /// - `congestion_control`: The [CongestionControl] mechanism specified. /// - `priority`: The [Priority] mechanism specified. /// - `sample_kind`: The [SampleKind] of the put operation. +/// - `attachment`: Optional attachment encoded into a byte array. May be null. /// /// Safety: /// - The function is marked as unsafe due to raw pointer manipulation and JNI interaction. @@ -283,6 +287,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_putViaJNI( congestion_control: jint, priority: jint, sample_kind: jint, + attachment: JByteArray, ) { let session = Arc::from_raw(session_ptr); let key_expr = Arc::from_raw(key_expr_ptr); @@ -295,6 +300,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_putViaJNI( congestion_control, priority, sample_kind, + attachment, ) { Ok(_) => {} Err(err) => { @@ -520,6 +526,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_undeclareKeyExprViaJNI( /// - `timeout_ms`: The timeout in milliseconds. /// - `target`: The [QueryTarget] as the ordinal of the enum. /// - `consolidation`: The [ConsolidationMode] as the ordinal of the enum. +/// - `attachment`: An optional attachment encoded into a byte array. /// /// Safety: /// - The function is marked as unsafe due to raw pointer manipulation and JNI interaction. @@ -544,6 +551,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI( timeout_ms: jlong, target: jint, consolidation: jint, + attachment: JByteArray, ) { let session = Arc::from_raw(session_ptr); let key_expr = Arc::from_raw(key_expr); @@ -558,6 +566,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI( target, consolidation, None, + attachment, ) { Ok(_) => {} Err(err) => { @@ -590,6 +599,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI( /// - `consolidation`: The [ConsolidationMode] as the ordinal of the enum. /// - `payload`: The payload of the [Value] /// - `encoding`: The [Encoding] as the ordinal of the enum. +/// - `attachment`: An optional attachment encoded into a byte array. /// /// Safety: /// - The function is marked as unsafe due to raw pointer manipulation and JNI interaction. @@ -616,6 +626,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getWithValueViaJNI( consolidation: jint, payload: JByteArray, encoding: jint, + attachment: JByteArray, ) { let session = Arc::from_raw(session_ptr); let key_expr = Arc::from_raw(key_expr_ptr); @@ -630,6 +641,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getWithValueViaJNI( target, consolidation, Some((payload, encoding)), + attachment, ) { Ok(_) => {} Err(err) => { @@ -659,6 +671,7 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getWithValueViaJNI( /// - `consolidation`: The [ConsolidationMode] as the ordinal of the enum. /// - `value_params`: Parameters of the value (payload as [JByteArray] and encoding as [jint]) to /// be set in case the get is performed "with value". +/// - `encoded_attachment`: An optional attachment encoded into a byte array. /// /// Returns: /// - A `Result` indicating the result of the `get` operation. @@ -675,6 +688,7 @@ fn on_get_query( target: jint, consolidation: jint, value_params: Option<(JByteArray, jint)>, + encoded_attachment: JByteArray, ) -> Result<()> { let java_vm = Arc::new(get_java_vm(env)?); let callback_global_ref = get_callback_global_ref(env, callback)?; @@ -715,6 +729,12 @@ fn on_get_query( binding = Some(value) } + if !encoded_attachment.is_null() { + let aux = decode_byte_array(env, encoded_attachment)?; + let attachment = vec_to_attachment(aux); + get_builder = get_builder.with_attachment(attachment); + } + get_builder .res() .map(|_| { diff --git a/zenoh-jni/src/subscriber.rs b/zenoh-jni/src/subscriber.rs index a1309add..ae99f351 100644 --- a/zenoh-jni/src/subscriber.rs +++ b/zenoh-jni/src/subscriber.rs @@ -22,8 +22,11 @@ use jni::{ use zenoh::prelude::r#sync::*; use zenoh::subscriber::Subscriber; -use crate::errors::{Error, Result}; use crate::utils::{get_callback_global_ref, get_java_vm, load_on_close}; +use crate::{ + errors::{Error, Result}, + utils::attachment_to_vec, +}; /// Frees the memory associated with a Zenoh subscriber raw pointer via JNI. /// @@ -113,11 +116,25 @@ pub(crate) unsafe fn declare_subscriber( |timestamp| (timestamp.get_time().as_u64(), true), ); + let attachment_bytes = match sample.attachment.map_or_else( + || env.byte_array_from_slice(&[]), + |attachment| env.byte_array_from_slice(attachment_to_vec(attachment).as_slice()), + ) { + Ok(byte_array) => byte_array, + Err(err) => { + log::error!( + "On subscriber callback error. Error processing attachment: {}.", + err.to_string() + ); + return; + } + }; + let key_expr_ptr = Arc::into_raw(Arc::new(sample.key_expr)); match env.call_method( &callback_global_ref, "run", - "(J[BIIJZ)V", + "(J[BIIJZ[B)V", &[ JValue::from(key_expr_ptr as jlong), JValue::from(&byte_array), @@ -125,6 +142,7 @@ pub(crate) unsafe fn declare_subscriber( JValue::from(kind), JValue::from(timestamp as i64), JValue::from(is_valid), + JValue::from(&attachment_bytes), ], ) { Ok(_) => {} diff --git a/zenoh-jni/src/utils.rs b/zenoh-jni/src/utils.rs index 03e886cd..71eebaf1 100644 --- a/zenoh-jni/src/utils.rs +++ b/zenoh-jni/src/utils.rs @@ -15,9 +15,10 @@ use std::sync::Arc; use jni::{ - objects::{JObject, JString}, + objects::{JByteArray, JObject, JString}, JNIEnv, JavaVM, }; +use zenoh::sample::{Attachment, AttachmentBuilder}; use crate::errors::{Error, Result}; @@ -49,6 +50,19 @@ pub(crate) fn get_callback_global_ref( }) } +/// Helper function to convert a JByteArray into a Vec. +pub(crate) fn decode_byte_array(env: &JNIEnv<'_>, payload: JByteArray) -> Result> { + let payload_len = env + .get_array_length(&payload) + .map(|length| length as usize) + .map_err(|err| Error::Jni(err.to_string()))?; + let mut buff = vec![0; payload_len]; + env.get_byte_array_region(payload, 0, &mut buff[..]) + .map_err(|err| Error::Jni(err.to_string()))?; + let buff: Vec = unsafe { std::mem::transmute::, Vec>(buff) }; + Ok(buff) +} + /// A type that calls a function when dropped pub(crate) struct CallOnDrop(core::mem::MaybeUninit); impl CallOnDrop { @@ -98,3 +112,62 @@ pub(crate) fn load_on_close( } }) } + +/// This function is used in conjunction with the Kotlin function +/// `decodeAttachment(attachmentBytes: ByteArray): Attachment` which takes a byte array with the +/// format , repeating this +/// pattern for as many pairs there are in the attachment. +/// +/// The kotlin function expects both key size and value size to be i32 integers expressed with +/// little endian format. +/// +pub(crate) fn attachment_to_vec(attachment: Attachment) -> Vec { + let mut buffer: Vec = Vec::new(); + for (key, value) in attachment.iter() { + buffer.extend((key.len() as i32).to_le_bytes()); + buffer.extend(&key[..]); + buffer.extend((value.len() as i32).to_le_bytes()); + buffer.extend(&value[..]); + } + buffer +} + +/// This function is used in conjunction with the Kotlin function +/// `encodeAttachment(attachment: Attachment): ByteArray` which converts the attachment into a +/// ByteArray with the format , repeating this +/// pattern for as many pairs there are in the attachment. +/// +/// Both key size and value size are i32 integers with little endian format. +/// +pub(crate) fn vec_to_attachment(bytes: Vec) -> Attachment { + let mut builder = AttachmentBuilder::new(); + let mut idx = 0; + let i32_size = std::mem::size_of::(); + let mut slice_size; + + while idx < bytes.len() { + slice_size = i32::from_le_bytes( + bytes[idx..idx + i32_size] + .try_into() + .expect("Error decoding i32 while processing attachment."), //This error should never happen. + ); + idx += i32_size; + + let key = &bytes[idx..idx + slice_size as usize]; + idx += slice_size as usize; + + slice_size = i32::from_le_bytes( + bytes[idx..idx + i32_size] + .try_into() + .expect("Error decoding i32 while processing attachment."), //This error should never happen. + ); + idx += i32_size; + + let value = &bytes[idx..idx + slice_size as usize]; + idx += slice_size as usize; + + builder.insert(key, value); + } + + builder.build() +} diff --git a/zenoh-jni/src/value.rs b/zenoh-jni/src/value.rs index 3611c45b..d4fa030e 100644 --- a/zenoh-jni/src/value.rs +++ b/zenoh-jni/src/value.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -use crate::errors::{Error, Result}; +use crate::{errors::Result, utils::decode_byte_array}; use jni::{objects::JByteArray, JNIEnv}; use zenoh::{ buffers::{writer::Writer, ZBuf}, @@ -28,16 +28,7 @@ pub(crate) fn build_value(payload: &[u8], encoding: KnownEncoding) -> Value { } pub(crate) fn decode_value(env: &JNIEnv<'_>, payload: JByteArray, encoding: i32) -> Result { - let payload_len = env - .get_array_length(&payload) - .map(|length| length as usize) - .map_err(|err| Error::Jni(err.to_string()))?; - - let mut buff = vec![0; payload_len]; - env.get_byte_array_region(payload, 0, &mut buff[..]) - .map_err(|err| Error::Jni(err.to_string()))?; - - let buff: Vec = buff.iter().map(|&x| x as u8).collect(); + let buff = decode_byte_array(env, payload)?; let encoding = match KnownEncoding::try_from(encoding as u8) { Ok(encoding) => encoding, Err(_) => {