From b7e825e177104ed8e309df1999e27a4bb21a0b0e Mon Sep 17 00:00:00 2001 From: Lev <1187448+levb@users.noreply.github.com> Date: Thu, 12 Oct 2023 06:20:45 -0700 Subject: [PATCH] [Merge from dev] nats v2.10 support (#684) * [CHANGED] Travis build changes (#680) * [CHANGED] Travis build changes - Use Ubuntu focal 20.04 - Added testing against nats-server latest release, and against the `dev` branch - Changed gcc `sanitize=address` builds to Debug to get line numbers in the output - Added `NATS_TEST_VALGRIND=yes` to `sanitize=thread` to reduce repeat counts and avoid timeouts * Update buildOnTravis.sh Co-authored-by: Ziya Suzen * Fixed flapping test_ServerPoolUpdatedOnClusterUpdate * PR feedback: test _checkPool to retry # subjects * PR feedback: refactored _checkPool --------- Co-authored-by: Ziya Suzen * [ADDED] Support for (multiple) ConsumerConfig.FilterSubjects (#679) * [CHANGED] Travis build changes - Use Ubuntu focal 20.04 - Added testing against nats-server latest release, and against the `dev` branch - Changed gcc `sanitize=address` builds to Debug to get line numbers in the output - Added `NATS_TEST_VALGRIND=yes` to `sanitize=thread` to reduce repeat counts and avoid timeouts * Update buildOnTravis.sh Co-authored-by: Ziya Suzen * Fixed flapping test_ServerPoolUpdatedOnClusterUpdate * [ADDED] Support for (multiple) ConsumerConfig.FilterSubjects * PR feedback: took out a redundant check * point travis to build with NATS main, not dev --------- Co-authored-by: Ziya Suzen * [Added] More v2.10 related changes (#682) * Added Metadata to Stream, Consumer configs Merged some other fixes from go PR * leak * Added jsStreamConfig.Compression * Added jsStreamConfig.FirstSeq * Added jsStreamConfig.SubjectTransform * Added jsStreamSourceInfo.SubjectTransforms and .FilterSubjects * Added jsStreamConfig.ConsumerLimits * PR feedback: nits * PR feedback: verify metadata values in test * PR feedback: fix a cast * Fixed the broken test (order of values) --------- Co-authored-by: Ziya Suzen --- .gitignore | 2 + .travis.yml | 36 ++++- buildOnTravis.sh | 21 ++- src/js.c | 28 +++- src/js.h | 3 + src/jsm.c | 238 ++++++++++++++++++++++++++++++-- src/micro.c | 61 ++------ src/micro_endpoint.c | 6 +- src/micro_monitoring.c | 34 +---- src/microp.h | 2 - src/nats.h | 118 +++++++++++++--- src/natsp.h | 11 ++ src/util.c | 127 +++++++++++++++++ src/util.h | 14 +- test/test.c | 306 +++++++++++++++++++++++++++++++++-------- 15 files changed, 827 insertions(+), 180 deletions(-) diff --git a/.gitignore b/.gitignore index f452f0dbe..fdedaffa8 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,8 @@ cmake-build*/ install/ html/ !doc/html/ +test/datastore_*/ +test/conf_* # Emacs *~ diff --git a/.travis.yml b/.travis.yml index 2bfb18886..999d017fb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ language: cpp -dist: bionic +dist: focal os: linux cache: @@ -25,6 +25,34 @@ env: jobs: include: + - name: "NATS server - latest release" + compiler: gcc + addons: + apt: + sources: + - ubuntu-toolchain-r-test + - sourceline: ppa:ubuntu-toolchain-r/test + packages: + - g++-9 + env: + - NATS_TEST_SERVER_VERSION=latest + - MATRIX_EVAL="CC=gcc-9" + - BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release" DO_COVERAGE="no" + + - name: "NATS server - main" + compiler: gcc + addons: + apt: + sources: + - ubuntu-toolchain-r-test + - sourceline: ppa:ubuntu-toolchain-r/test + packages: + - g++-9 + env: + - NATS_TEST_SERVER_VERSION=main + - MATRIX_EVAL="CC=gcc-9" + - BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release" DO_COVERAGE="no" + - name: "gcc-9 - TLS OFF" compiler: gcc addons: @@ -62,7 +90,7 @@ jobs: - g++-9 env: - MATRIX_EVAL="CC=gcc-9" - - BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no" + - BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no" - name: "gcc-9 - Lib msg delivery - sanitize address" compiler: gcc @@ -75,7 +103,7 @@ jobs: - g++-9 env: - MATRIX_EVAL="CC=gcc-9" - - NATS_DEFAULT_TO_LIB_MSG_DELIVERY=yes BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no" + - NATS_DEFAULT_TO_LIB_MSG_DELIVERY=yes BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no" - name: "gcc-9 - Write deadline - sanitize address" compiler: gcc @@ -101,7 +129,7 @@ jobs: - g++-9 env: - MATRIX_EVAL="CC=gcc-9" - - BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=thread" DO_COVERAGE="no" + - BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=thread" NATS_TEST_VALGRIND=yes DO_COVERAGE="no" - name: "clang-8 - TLS OFF" compiler: clang diff --git a/buildOnTravis.sh b/buildOnTravis.sh index afe491243..fbfe5e29c 100755 --- a/buildOnTravis.sh +++ b/buildOnTravis.sh @@ -10,6 +10,23 @@ echo "coverage = " $2 echo "build opts = " $3 echo "test opts = " $4 +if [ "$NATS_TEST_SERVER_VERSION" != "" ]; then + rel=$NATS_TEST_SERVER_VERSION + mkdir -p $HOME/nats-server-$rel + if [ "$rel" = "latest" ]; then + rel=$(curl -s https://api.github.com/repos/nats-io/nats-server/releases/latest | jq -r '.tag_name') + fi + + if [ "$rel" != "${rel#v}" ] && wget https://github.com/nats-io/nats-server/releases/download/$rel/nats-server-$rel-linux-amd64.tar.gz; then + tar -xzf nats-server-$rel-linux-amd64.tar.gz + mv nats-server-$rel-linux-amd64 $HOME/nats-server-$rel + else + curl -sf "https://binaries.nats.dev/nats-io/nats-server/v2@$rel" | PREFIX=. sh + mv nats-server $HOME/nats-server-$rel + fi + PATH=$HOME/nats-server-$rel:$PATH +fi + if [ "$1" != "gcc" ]; then if [ "$2" = "coverage" ]; then # only coverage for gcc compiler @@ -35,8 +52,10 @@ res=$? if [ $res -ne 0 ]; then exit $res fi -export NATS_TEST_SERVER_VERSION="$(nats-server -v)" + export NATS_TEST_TRAVIS=yes +export NATS_TEST_SERVER_VERSION="$(nats-server -v)" +echo "Using NATS server version: $NATS_TEST_SERVER_VERSION" ctest --timeout 60 --output-on-failure $4 res=$? if [ $res -ne 0 ]; then diff --git a/src/js.c b/src/js.c index 74def9f08..2dc52d517 100644 --- a/src/js.c +++ b/src/js.c @@ -2201,18 +2201,34 @@ _processConsInfo(const char **dlvSubject, jsConsumerInfo *info, jsConsumerConfig jsConsumerConfig *ccfg = info->Config; const char *dg = NULL; natsStatus s = NATS_OK; + bool matches = false; + int i; *dlvSubject = NULL; // Make sure this new subject matches or is a subset. - if (!nats_IsStringEmpty(subj) - && !nats_IsStringEmpty(ccfg->FilterSubject) - && (strcmp(subj, ccfg->FilterSubject) != 0)) + if (!nats_IsStringEmpty(subj)) { - return nats_setError(NATS_ERR, "subject '%s' does not match consumer filter subject '%s'", - subj, ccfg->FilterSubject); + if (nats_IsStringEmpty(ccfg->FilterSubject) && (ccfg->FilterSubjectsLen == 0)) + { + matches = true; + } + else if (!nats_IsStringEmpty(ccfg->FilterSubject) && nats_HasPrefix(subj, ccfg->FilterSubject)) + { + matches = true; + } + else if (ccfg->FilterSubjectsLen > 0) + { + for (i = 0; (i < ccfg->FilterSubjectsLen) && !matches; i++) + { + matches = nats_HasPrefix(subj, ccfg->FilterSubjects[i]); + } + } + if (!matches) + { + return nats_setError(NATS_ERR, "subject '%s' does not match any consumer filter subjects.", subj); + } } - // Check that if user wants to create a queue sub, // the consumer has no HB nor FC. queue = (nats_IsStringEmpty(queue) ? NULL : queue); diff --git a/src/js.h b/src/js.h index 79ce521cd..d98fb68f4 100644 --- a/src/js.h +++ b/src/js.h @@ -80,6 +80,9 @@ extern const int64_t jsDefaultRequestWait; #define jsStorageTypeFileStr "file" #define jsStorageTypeMemStr "memory" +#define jsStorageCompressionNoneStr "none" +#define jsStorageCompressionS2Str "s2" + #define jsDeliverAllStr "all" #define jsDeliverLastStr "last" #define jsDeliverNewStr "new" diff --git a/src/jsm.c b/src/jsm.c index 2a5d5b73d..fed19665e 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -116,8 +116,7 @@ _destroyRePublish(jsRePublish *rp) NATS_FREE(rp); } -void -js_destroyStreamConfig(jsStreamConfig *cfg) +void js_destroyStreamConfig(jsStreamConfig *cfg) { int i; @@ -136,6 +135,9 @@ js_destroyStreamConfig(jsStreamConfig *cfg) _destroyStreamSource(cfg->Sources[i]); NATS_FREE(cfg->Sources); _destroyRePublish(cfg->RePublish); + nats_freeMetadata(&(cfg->Metadata)); + NATS_FREE((char *)cfg->SubjectTransform.Source); + NATS_FREE((char *)cfg->SubjectTransform.Destination); NATS_FREE(cfg); } @@ -168,10 +170,19 @@ _destroyClusterInfo(jsClusterInfo *cluster) static void _destroyStreamSourceInfo(jsStreamSourceInfo *info) { + int i; + if (info == NULL) return; NATS_FREE(info->Name); + NATS_FREE((char*)info->FilterSubject); + for (i=0; i < info->SubjectTransformsLen; i++) + { + NATS_FREE((char *)info->SubjectTransforms[i].Source); + NATS_FREE((char *)info->SubjectTransforms[i].Destination); + } + NATS_FREE(info->SubjectTransforms); _destroyExternalStream(info->External); NATS_FREE(info); } @@ -535,6 +546,113 @@ _marshalStorageType(jsStorageType storage, natsBuffer *buf) return NATS_UPDATE_ERR_STACK(s); } +static natsStatus +_unmarshalStorageCompression(nats_JSON *json, const char *fieldName, jsStorageCompression *compression) +{ + natsStatus s = NATS_OK; + const char *str = NULL; + + s = nats_JSONGetStrPtr(json, "compression", &str); + if (str == NULL) + return NATS_UPDATE_ERR_STACK(s); + + if (strcmp(str, jsStorageCompressionNoneStr) == 0) + *compression = js_StorageCompressionNone; + else if (strcmp(str, jsStorageCompressionS2Str) == 0) + *compression = js_StorageCompressionS2; + else + s = nats_setError(NATS_ERR, "unable to unmarshal storage compression '%s'", str); + + return NATS_UPDATE_ERR_STACK(s); +} + +static natsStatus +_marshalStorageCompression(jsStorageCompression compression, natsBuffer *buf) +{ + natsStatus s; + const char *st = NULL; + + s = natsBuf_Append(buf, ",\"compression\":\"", -1); + switch (compression) + { + case js_StorageCompressionNone: + st = jsStorageCompressionNoneStr; + break; + case js_StorageCompressionS2: + st = jsStorageCompressionS2Str; + break; + default: + return nats_setError(NATS_INVALID_ARG, "invalid storage type %d", (int)compression); + } + IFOK(s, natsBuf_Append(buf, st, -1)); + IFOK(s, natsBuf_AppendByte(buf, '"')); + return NATS_UPDATE_ERR_STACK(s); +} + +static natsStatus +_unmarshalSubjectTransformConfig(nats_JSON *obj, jsSubjectTransformConfig *cfg) +{ + natsStatus s = NATS_OK; + + memset(cfg, 0, sizeof(jsSubjectTransformConfig)); + if (obj == NULL) + { + return NATS_OK; + } + + IFOK(s, nats_JSONGetStr(obj, "src", (char **)&(cfg->Source))); + IFOK(s, nats_JSONGetStr(obj, "dest", (char **)&(cfg->Destination))); + return NATS_UPDATE_ERR_STACK(s); +} + +static natsStatus +_marshalSubjectTransformConfig(jsSubjectTransformConfig *cfg, natsBuffer *buf) +{ + natsStatus s; + if (cfg == NULL || (nats_IsStringEmpty(cfg->Source) && nats_IsStringEmpty(cfg->Destination))) + return NATS_OK; + + s = natsBuf_Append(buf, ",\"subject_transform\":{", -1); + IFOK(s, natsBuf_Append(buf, "\"src\":\"", -1)); + if (cfg->Source != NULL) + IFOK(s, natsBuf_Append(buf, cfg->Source, -1)); + IFOK(s, natsBuf_Append(buf, "\",\"dest\":\"", -1)); + if (cfg->Destination != NULL) + IFOK(s, natsBuf_Append(buf, cfg->Destination, -1)); + IFOK(s, natsBuf_Append(buf, "\"}", -1)); + return NATS_UPDATE_ERR_STACK(s); +} + +static natsStatus +_marshalStreamConsumerLimits(jsStreamConsumerLimits *limits, natsBuffer *buf) +{ + natsStatus s; + if (limits == NULL || (limits->InactiveThreshold == 0 && limits->MaxAckPending == 0)) + return NATS_OK; + + s = natsBuf_Append(buf, ",\"consumer_limits\":{", -1); + IFOK(s, nats_marshalLong(buf, false, "inactive_threshold", limits->InactiveThreshold)); + IFOK(s, nats_marshalLong(buf, true, "max_ack_pending", limits->MaxAckPending)); + IFOK(s, natsBuf_AppendByte(buf, '}')); + return NATS_UPDATE_ERR_STACK(s); +} + +static natsStatus +_unmarshalStreamConsumerLimits(nats_JSON *obj, jsStreamConsumerLimits *limits) +{ + natsStatus s = NATS_OK; + + memset(limits, 0, sizeof(*limits)); + if (obj == NULL) + { + return NATS_OK; + } + + IFOK(s, nats_JSONGetLong(obj, "inactive_threshold", &limits->InactiveThreshold)); + IFOK(s, nats_JSONGetInt(obj, "max_ack_pending", &limits->MaxAckPending)); + return NATS_UPDATE_ERR_STACK(s); +} + static natsStatus _unmarshalRePublish(nats_JSON *json, const char *fieldName, jsRePublish **new_republish) { @@ -570,6 +688,7 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig jsStreamConfig *cfg = NULL; nats_JSON **sources = NULL; int sourcesLen = 0; + nats_JSON *obj = NULL; natsStatus s; if (fieldName != NULL) @@ -633,6 +752,15 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig IFOK(s, nats_JSONGetBool(jcfg, "mirror_direct", &(cfg->MirrorDirect))); IFOK(s, nats_JSONGetBool(jcfg, "discard_new_per_subject", &(cfg->DiscardNewPerSubject))); + IFOK(s, nats_unmarshalMetadata(jcfg, "metadata", &(cfg->Metadata))); + IFOK(s, _unmarshalStorageCompression(jcfg, "storage", &(cfg->Compression))); + IFOK(s, nats_JSONGetULong(jcfg, "first_seq", &(cfg->FirstSeq))); + IFOK(s, nats_JSONGetObject(jcfg, "subject_transform", &obj)); + IFOK(s, _unmarshalSubjectTransformConfig(obj, &(cfg->SubjectTransform))); + obj = NULL; + IFOK(s, nats_JSONGetObject(jcfg, "consumer_limits", &obj)); + IFOK(s, _unmarshalStreamConsumerLimits(obj, &(cfg->ConsumerLimits))); + if (s == NATS_OK) *new_cfg = cfg; else @@ -754,6 +882,12 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg) if ((s == NATS_OK) && cfg->DiscardNewPerSubject) IFOK(s, natsBuf_Append(buf, ",\"discard_new_per_subject\":true", -1)); + IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata)); + IFOK(s, _marshalStorageCompression(cfg->Compression, buf)); + IFOK(s, nats_marshalULong(buf, true, "first_seq", cfg->FirstSeq)); + IFOK(s, _marshalSubjectTransformConfig(&cfg->SubjectTransform, buf)); + IFOK(s, _marshalStreamConsumerLimits(&cfg->ConsumerLimits, buf)); + IFOK(s, natsBuf_AppendByte(buf, '}')); if (s == NATS_OK) @@ -941,6 +1075,8 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour nats_JSON *json = NULL; jsStreamSourceInfo *ssi = NULL; natsStatus s; + nats_JSON **subjectTransforms = NULL; + int subjectTransformsLen = 0; if (fieldName != NULL) { @@ -961,6 +1097,27 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour IFOK(s, _unmarshalExternalStream(json, "external", &(ssi->External))); IFOK(s, nats_JSONGetULong(json, "lag", &(ssi->Lag))); IFOK(s, nats_JSONGetLong(json, "active", &(ssi->Active))); + IFOK(s, nats_JSONGetStr(json, "filter_subject", (char **)&(ssi->FilterSubject))); + + // Get the sources and unmarshal if present + IFOK(s, nats_JSONGetArrayObject(json, "subject_transforms", &subjectTransforms, &subjectTransformsLen)); + if ((s == NATS_OK) && (subjectTransforms != NULL)) + { + int i; + + ssi->SubjectTransforms = (jsSubjectTransformConfig *)NATS_CALLOC(subjectTransformsLen, sizeof(jsSubjectTransformConfig)); + if (ssi->SubjectTransforms == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + + for (i = 0; (s == NATS_OK) && (i < subjectTransformsLen); i++) + { + s = _unmarshalSubjectTransformConfig(subjectTransforms[i], &(ssi->SubjectTransforms[i])); + if (s == NATS_OK) + ssi->SubjectTransformsLen++; + } + // Free the array of JSON objects that was allocated by nats_JSONGetArrayObject. + NATS_FREE(subjectTransforms); + } if (s == NATS_OK) *new_src = ssi; @@ -1118,6 +1275,7 @@ jsStreamConfig_Init(jsStreamConfig *cfg) cfg->Storage = js_FileStorage; cfg->Discard = js_DiscardOld; cfg->Replicas = 1; + cfg->Compression = js_StorageCompressionNone; return NATS_OK; } @@ -1269,6 +1427,22 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo if (msc) _restoreMirrorAndSourcesExternal(cfg); + // Make sure the 2.10 config fields actually worked, in case the server is + // older. + if ((s == NATS_OK) && (new_si != NULL) && (*new_si != NULL) + && (cfg->Compression != (*new_si)->Config->Compression) + && (cfg->FirstSeq != (*new_si)->Config->FirstSeq) + && (cfg->Metadata.Count != (*new_si)->Config->Metadata.Count) + && nats_StringEquals(cfg->SubjectTransform.Source, (*new_si)->Config->SubjectTransform.Source) + && nats_StringEquals(cfg->SubjectTransform.Destination, (*new_si)->Config->SubjectTransform.Destination) + && (cfg->ConsumerLimits.InactiveThreshold != (*new_si)->Config->ConsumerLimits.InactiveThreshold) + && (cfg->ConsumerLimits.MaxAckPending != (*new_si)->Config->ConsumerLimits.MaxAckPending) + ) + { + // <>/<> wrong error + return nats_setError(NATS_INVALID_ARG, "%s", jsErrStreamConfigRequired); + } + natsBuf_Destroy(buf); natsMsg_Destroy(resp); NATS_FREE(subj); @@ -2756,6 +2930,23 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo IFOK(s, natsBuf_Append(buf, cfg->FilterSubject, -1)); IFOK(s, natsBuf_AppendByte(buf, '"')); } + if ((s == NATS_OK) && (cfg->FilterSubjectsLen > 0)) + { + int i; + + s = natsBuf_Append(buf, ",\"filter_subjects\":[", -1); + for (i = 0; (s == NATS_OK) && (i < cfg->FilterSubjectsLen); i++) + { + if (i > 0) + s = natsBuf_AppendByte(buf, ','); + IFOK(s, natsBuf_AppendByte(buf, '"')); + IFOK(s, natsBuf_Append(buf, cfg->FilterSubjects[i], -1)); + IFOK(s, natsBuf_AppendByte(buf, '"')); + } + + IFOK(s, natsBuf_AppendByte(buf, ']')); + } + IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata)); IFOK(s, _marshalReplayPolicy(buf, cfg->ReplayPolicy)) if ((s == NATS_OK) && (cfg->RateLimit > 0)) s = nats_marshalULong(buf, true, "rate_limit_bps", cfg->RateLimit); @@ -2815,6 +3006,8 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo void js_destroyConsumerConfig(jsConsumerConfig *cc) { + int i; + if (cc == NULL) return; @@ -2824,7 +3017,11 @@ js_destroyConsumerConfig(jsConsumerConfig *cc) NATS_FREE((char*) cc->DeliverSubject); NATS_FREE((char*) cc->DeliverGroup); NATS_FREE((char*) cc->FilterSubject); - NATS_FREE((char*) cc->SampleFrequency); + for (i = 0; i < cc->FilterSubjectsLen; i++) + NATS_FREE((char *)cc->FilterSubjects[i]); + nats_freeMetadata(&(cc->Metadata)); + NATS_FREE((char *)cc->FilterSubjects); + NATS_FREE((char *)cc->SampleFrequency); NATS_FREE(cc->BackOff); NATS_FREE(cc); } @@ -2931,6 +3128,7 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi IFOK(s, nats_JSONGetLong(cjson, "ack_wait", &(cc->AckWait))); IFOK(s, nats_JSONGetLong(cjson, "max_deliver", &(cc->MaxDeliver))); IFOK(s, nats_JSONGetStr(cjson, "filter_subject", (char**) &(cc->FilterSubject))); + IFOK(s, nats_JSONGetArrayStr(cjson, "filter_subjects", (char ***)&(cc->FilterSubjects), &(cc->FilterSubjectsLen))); IFOK(s, _unmarshalReplayPolicy(cjson, "replay_policy", &(cc->ReplayPolicy))); IFOK(s, nats_JSONGetULong(cjson, "rate_limit_bps", &(cc->RateLimit))); IFOK(s, nats_JSONGetStr(cjson, "sample_freq", (char**) &(cc->SampleFrequency))); @@ -2946,6 +3144,7 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi IFOK(s, nats_JSONGetArrayLong(cjson, "backoff", &(cc->BackOff), &(cc->BackOffLen))); IFOK(s, nats_JSONGetLong(cjson, "num_replicas", &(cc->Replicas))); IFOK(s, nats_JSONGetBool(cjson, "mem_storage", &(cc->MemoryStorage))); + IFOK(s, nats_unmarshalMetadata(cjson, "metadata", &(cc->Metadata))); } if (s == NATS_OK) @@ -3083,14 +3282,14 @@ js_AddConsumer(jsConsumerInfo **new_ci, jsCtx *js, { // No subject filter, use . // otherwise, the filter subject goes at the end. - if (nats_IsStringEmpty(cfg->FilterSubject)) - res = nats_asprintf(&subj, jsApiConsumerCreateExT, - js_lenWithoutTrailingDot(o.Prefix), o.Prefix, - stream, cfg->Name); - else + if (!nats_IsStringEmpty(cfg->FilterSubject) && (cfg->FilterSubjectsLen == 0)) res = nats_asprintf(&subj, jsApiConsumerCreateExWithFilterT, js_lenWithoutTrailingDot(o.Prefix), o.Prefix, stream, cfg->Name, cfg->FilterSubject); + else + res = nats_asprintf(&subj, jsApiConsumerCreateExT, + js_lenWithoutTrailingDot(o.Prefix), o.Prefix, + stream, cfg->Name); } else if (nats_IsStringEmpty(cfg->Durable)) res = nats_asprintf(&subj, jsApiConsumerCreateT, @@ -3114,6 +3313,14 @@ js_AddConsumer(jsConsumerInfo **new_ci, jsCtx *js, // If we got a response, check for error or return the consumer info result. IFOK(s, _unmarshalConsumerCreateOrGetResp(new_ci, resp, errCode)); + if ((s == NATS_OK) + && (new_ci != NULL) + && (cfg->FilterSubjectsLen > 0) + && ((*new_ci)->Config->FilterSubjectsLen == 0)) + { + s = nats_setError(NATS_INVALID_ARG, "%s", "multiple consumer filter subjects not supported by the server"); + } + NATS_FREE(subj); natsMsg_Destroy(resp); natsBuf_Destroy(buf); @@ -3633,6 +3840,8 @@ js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone) c->Description = NULL; c->BackOff = NULL; c->FilterSubject = NULL; + c->FilterSubjects = NULL; + c->FilterSubjectsLen = 0; c->SampleFrequency = NULL; c->DeliverSubject = NULL; c->DeliverGroup = NULL; @@ -3652,6 +3861,19 @@ js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone) else memcpy(c->BackOff, org->BackOff, org->BackOffLen*sizeof(int64_t)); } + if ((s == NATS_OK) && (org->FilterSubjects != NULL) && (org->FilterSubjectsLen > 0)) + { + c->FilterSubjects = (const char **)NATS_CALLOC(org->FilterSubjectsLen, sizeof(const char *)); + if (c->FilterSubjects == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + + for (int i = 0; (s == NATS_OK) && (i < org->FilterSubjectsLen); i++) + { + IF_OK_DUP_STRING(s, c->FilterSubjects[i], org->FilterSubjects[i]); + } + c->FilterSubjectsLen = org->FilterSubjectsLen; + } + IFOK(s, nats_cloneMetadata(&(c->Metadata), org->Metadata)); if (s == NATS_OK) *clone = c; else diff --git a/src/micro.c b/src/micro.c index 0ee378536..a250dfe32 100644 --- a/src/micro.c +++ b/src/micro.c @@ -16,6 +16,7 @@ #include "microp.h" #include "conn.h" #include "opts.h" +#include "util.h" static inline void _lock_service(microService *m) { natsMutex_Lock(m->service_mu); } static inline void _unlock_service(microService *m) { natsMutex_Unlock(m->service_mu); } @@ -457,7 +458,8 @@ _clone_service_config(microServiceConfig **out, microServiceConfig *cfg) MICRO_CALL(err, micro_strdup((char **)&new_cfg->Name, cfg->Name)); MICRO_CALL(err, micro_strdup((char **)&new_cfg->Version, cfg->Version)); MICRO_CALL(err, micro_strdup((char **)&new_cfg->Description, cfg->Description)); - MICRO_CALL(err, micro_clone_metadata(&new_cfg->Metadata, &new_cfg->MetadataLen, cfg->Metadata, cfg->MetadataLen)); + MICRO_CALL(err, micro_ErrorFromStatus( + nats_cloneMetadata(&new_cfg->Metadata, cfg->Metadata))); MICRO_CALL(err, micro_clone_endpoint_config(&new_cfg->Endpoint, cfg->Endpoint)); if (err != NULL) { @@ -480,7 +482,7 @@ _free_cloned_service_config(microServiceConfig *cfg) NATS_FREE((char *)cfg->Name); NATS_FREE((char *)cfg->Version); NATS_FREE((char *)cfg->Description); - micro_free_cloned_metadata(cfg->Metadata, cfg->MetadataLen); + nats_freeMetadata(&cfg->Metadata); micro_free_cloned_endpoint_config(cfg->Endpoint); NATS_FREE(cfg); } @@ -739,7 +741,8 @@ microService_GetInfo(microServiceInfo **new_info, microService *m) MICRO_CALL(err, micro_strdup((char **)&info->Version, m->cfg->Version)); MICRO_CALL(err, micro_strdup((char **)&info->Description, m->cfg->Description)); MICRO_CALL(err, micro_strdup((char **)&info->Id, m->id)); - MICRO_CALL(err, micro_clone_metadata(&info->Metadata, &info->MetadataLen, m->cfg->Metadata, m->cfg->MetadataLen)); + MICRO_CALL(err, micro_ErrorFromStatus( + nats_cloneMetadata(&info->Metadata, m->cfg->Metadata))); if (err == NULL) { @@ -768,7 +771,8 @@ microService_GetInfo(microServiceInfo **new_info, microService *m) { MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Name, ep->name)); MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Subject, ep->subject)); - MICRO_CALL(err, micro_clone_metadata(&(info->Endpoints[len].Metadata), &info->Endpoints[len].MetadataLen, ep->config->Metadata, ep->config->MetadataLen)); + MICRO_CALL(err, micro_ErrorFromStatus( + nats_cloneMetadata(&info->Endpoints[len].Metadata, ep->config->Metadata))); if (err == NULL) { len++; @@ -801,14 +805,14 @@ void microServiceInfo_Destroy(microServiceInfo *info) { NATS_FREE((char *)info->Endpoints[i].Name); NATS_FREE((char *)info->Endpoints[i].Subject); - micro_free_cloned_metadata(info->Endpoints[i].Metadata, info->Endpoints[i].MetadataLen); + nats_freeMetadata(&info->Endpoints[i].Metadata); } NATS_FREE((char *)info->Endpoints); NATS_FREE((char *)info->Name); NATS_FREE((char *)info->Version); NATS_FREE((char *)info->Description); NATS_FREE((char *)info->Id); - micro_free_cloned_metadata(info->Metadata, info->MetadataLen); + nats_freeMetadata(&info->Metadata); NATS_FREE(info); } @@ -906,48 +910,3 @@ void microServiceStats_Destroy(microServiceStats *stats) NATS_FREE((char *)stats->Id); NATS_FREE(stats); } - -void micro_free_cloned_metadata(const char **metadata, int len) -{ - int i; - - if (metadata == NULL) - return; - - for (i = 0; i < len*2; i++) - { - NATS_FREE((char *)metadata[i]); - } - NATS_FREE((char **)metadata); -} - -microError *micro_clone_metadata(const char ***new_metadata, int *new_len, const char **metadata, int len) -{ - char **dup = NULL; - int i; - - if (new_metadata == NULL) - return micro_ErrorInvalidArg; - *new_metadata = NULL; - - if (len == 0) - return NULL; - - dup = NATS_CALLOC(len * 2, sizeof(char *)); - if (dup == NULL) - return micro_ErrorOutOfMemory; - - for (i = 0; i < len*2; i++) - { - micro_strdup(&dup[i], metadata[i]); - if (dup[i] == NULL) - { - micro_free_cloned_metadata((const char **)dup, i); - return micro_ErrorOutOfMemory; - } - } - - *new_metadata = (const char **)dup; - *new_len = len; - return NULL; -} diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index 95240989e..273ad53e0 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -14,6 +14,7 @@ #include #include "microp.h" +#include "util.h" static microError *_dup_with_prefix(char **dst, const char *prefix, const char *src); @@ -323,7 +324,8 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) MICRO_CALL(err, micro_strdup((char **)&new_cfg->Name, cfg->Name)); MICRO_CALL(err, micro_strdup((char **)&new_cfg->Subject, cfg->Subject)); - MICRO_CALL(err, micro_clone_metadata(&new_cfg->Metadata, &new_cfg->MetadataLen, cfg->Metadata, cfg->MetadataLen)); + MICRO_CALL(err, micro_ErrorFromStatus( + nats_cloneMetadata(&new_cfg->Metadata, cfg->Metadata))); if (err != NULL) { @@ -344,7 +346,7 @@ void micro_free_cloned_endpoint_config(microEndpointConfig *cfg) // to be freed. NATS_FREE((char *)cfg->Name); NATS_FREE((char *)cfg->Subject); - micro_free_cloned_metadata(cfg->Metadata, cfg->MetadataLen); + nats_freeMetadata(&cfg->Metadata); NATS_FREE(cfg); } diff --git a/src/micro_monitoring.c b/src/micro_monitoring.c index 82608d13c..80f1188d6 100644 --- a/src/micro_monitoring.c +++ b/src/micro_monitoring.c @@ -250,30 +250,6 @@ marshal_ping(natsBuffer **new_buf, microService *m) return NULL; } -natsStatus -_marshal_metadata(natsBuffer *buf, const char **metadata, int len) -{ - natsStatus s = NATS_OK; - int i; - - if (len > 0) - { - IFOK(s, natsBuf_Append(buf, "\"metadata\":{", -1)); - for (i = 0; ((s == NATS_OK) && (i < len)); i++) - { - IFOK(s, natsBuf_AppendByte(buf, '"')); - IFOK(s, natsBuf_Append(buf, metadata[i * 2], -1)); - IFOK(s, natsBuf_Append(buf, "\":\"", 3)); - IFOK(s, natsBuf_Append(buf, metadata[i * 2 + 1], -1)); - IFOK(s, natsBuf_AppendByte(buf, '"')); - if (i != len - 1) - IFOK(s, natsBuf_AppendByte(buf, ',')); - } - IFOK(s, natsBuf_Append(buf, "},", 2)); - } - return NATS_OK; -} - static microError * marshal_info(natsBuffer **new_buf, microServiceInfo *info) { @@ -293,8 +269,9 @@ marshal_info(natsBuffer **new_buf, microServiceInfo *info) for (i = 0; ((s == NATS_OK) && (i < info->EndpointsLen)); i++) { IFOK(s, natsBuf_AppendByte(buf, '{')); - IFOK_attr("name", info->Endpoints[i].Name, ","); - IFOK(s, _marshal_metadata(buf, info->Endpoints[i].Metadata, info->Endpoints[i].MetadataLen)); + IFOK_attr("name", info->Endpoints[i].Name, ""); + IFOK(s, nats_marshalMetadata(buf, true, "metadata", info->Endpoints[i].Metadata)); + IFOK(s, natsBuf_AppendByte(buf, ',')); IFOK_attr("subject", info->Endpoints[i].Subject, ""); IFOK(s, natsBuf_AppendByte(buf, '}')); // end endpoint if (i != info->EndpointsLen - 1) @@ -303,8 +280,9 @@ marshal_info(natsBuffer **new_buf, microServiceInfo *info) IFOK(s, natsBuf_Append(buf, "],", 2)); } - IFOK_attr("id", info->Id, ","); - IFOK(s, _marshal_metadata(buf, info->Metadata, info->MetadataLen)); + IFOK_attr("id", info->Id, ""); + IFOK(s, nats_marshalMetadata(buf, true, "metadata", info->Metadata)); + IFOK(s, natsBuf_AppendByte(buf, ',')); IFOK_attr("name", info->Name, ","); IFOK_attr("type", info->Type, ","); IFOK_attr("version", info->Version, ""); diff --git a/src/microp.h b/src/microp.h index 75b90646a..8cf780c00 100644 --- a/src/microp.h +++ b/src/microp.h @@ -143,7 +143,6 @@ extern microError *micro_ErrorInvalidArg; microError *micro_add_endpoint(microEndpoint **new_ep, microService *m, const char *prefix, microEndpointConfig *cfg, bool is_internal); microError *micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg); -microError *micro_clone_metadata(const char ***new_metadata, int *new_len, const char **metadata, int len); microError *micro_init_monitoring(microService *m); microError *micro_is_error_message(natsStatus s, natsMsg *msg); microError *micro_new_control_subject(char **newSubject, const char *verb, const char *name, const char *id); @@ -153,7 +152,6 @@ microError *micro_start_endpoint(microEndpoint *ep); microError *micro_stop_endpoint(microEndpoint *ep); void micro_free_cloned_endpoint_config(microEndpointConfig *cfg); -void micro_free_cloned_metadata(const char **metadata, int len); void micro_free_endpoint(microEndpoint *ep); void micro_free_request(microRequest *req); void micro_release_endpoint(microEndpoint *ep); diff --git a/src/nats.h b/src/nats.h index d4b7c2265..15ee67442 100644 --- a/src/nats.h +++ b/src/nats.h @@ -216,6 +216,19 @@ typedef struct natsMsgList } natsMsgList; +/** \brief A type to represent user-provided metadata, a list of k=v pairs. + * + * Used in JetStream, microservice configuration. + */ + +typedef struct natsMetadata +{ + // User-provided metadata for the stream, encoded as an array of {"key", "value",...} + const char **List; + // Number of key/value pairs in Metadata, 1/2 of the length of the array. + int Count; +} natsMetadata; + /** * The JetStream context. Use for JetStream assets management and communication. * @@ -284,6 +297,15 @@ typedef enum } jsStorageType; +/** + * Determines how messages are compressed when stored for retention. + */ +typedef enum +{ + js_StorageCompressionNone = 0, ///< Specifies no compression. It's the default. + js_StorageCompressionS2, ///< Specifies S2. +} jsStorageCompression; + /** * Determines how the consumer should select the first message to deliver. */ @@ -393,6 +415,26 @@ typedef struct jsRePublish } jsRePublish; +/** + * SubjectTransformConfig is for applying a subject transform (to matching + * messages) before doing anything else when a new message is received + */ +typedef struct jsSubjectTransformConfig +{ + const char *Source; + const char *Destination; +} jsSubjectTransformConfig; + +/** + * SubjectTransformConfig is for applying a subject transform (to matching + * messages) before doing anything else when a new message is received + */ +typedef struct jsStreamConsumerLimits +{ + int64_t InactiveThreshold; + int MaxAckPending; +} jsStreamConsumerLimits; + /** * Configuration of a JetStream stream. * @@ -406,6 +448,9 @@ typedef struct jsRePublish * * \note The strings are applications owned and will not be freed by the library. * + * \note NATS server 2.10 added user-provided Metadata, storage Compression + * type, FirstSeq to specify the starting sequence number, and SubjectTransform. + * * @see jsStreamConfig_Init * * \code{.unparsed} @@ -511,6 +556,31 @@ typedef struct jsStreamConfig { // Allow KV like semantics to also discard new on a per subject basis bool DiscardNewPerSubject; + /** + * @brief Configuration options introduced in 2.10 + * + * - Metadata is a user-provided array of key/value pairs, encoded as a + * string array [n1, v1, n2, v2, ...] representing key/value pairs + * {n1:v1, n2:v2, ...}. + * + * - Compression: js_StorageCompressionNone (default) or + * js_StorageCompressionS2 + * + * - FirstSeq: the starting sequence number for the stream. + * + * - SubjectTransformConfig is for applying a subject transform (to + * matching messages) before doing anything else when a new message is + * received + * + * - ConsumerLimits is for setting the limits on certain options on all + * consumers of the stream. + */ + + natsMetadata Metadata; + jsStorageCompression Compression; + uint64_t FirstSeq; + jsSubjectTransformConfig SubjectTransform; + jsStreamConsumerLimits ConsumerLimits; } jsStreamConfig; /** @@ -632,6 +702,9 @@ typedef struct jsStreamSourceInfo jsExternalStream *External; uint64_t Lag; int64_t Active; + const char * FilterSubject; + jsSubjectTransformConfig *SubjectTransforms; + int SubjectTransformsLen; } jsStreamSourceInfo; @@ -716,6 +789,13 @@ typedef struct jsStreamNamesList * instead, it will receive only messages headers (if present) with the addition of * the header #JSMsgSize ("Nats-Msg-Size"), whose value is the payload size. * + * \note NATS server 2.10 added FilterSubjects, an array of multiple filter + * subjects. It is mutually exclusive with the previously available single + * FilterSubject. + * + * \note NATS server 2.10 added consumer Metadata which contains user-provided + * string name/value pairs. + * * @see jsConsumerConfig_Init * * \code{.unparsed} @@ -770,6 +850,14 @@ typedef struct jsConsumerConfig // Force memory storage. bool MemoryStorage; + // Configuration options introduced in 2.10 + + // Multiple filter subjects + const char **FilterSubjects; + int FilterSubjectsLen; + + // User-provided metadata for the consumer, encoded as an array of {"key", "value",...} + natsMetadata Metadata; } jsConsumerConfig; /** @@ -7366,12 +7454,10 @@ struct micro_endpoint_config_s const char *Subject; /** - * @brief Metadata for the endpoint in the form of a string array [n1, v1, - * n2, v2, ...] representing key/value pairs {n1:v1, n2:v2, ...}. - * MetadataLen contains the number of **pairs** in Metadata. + * @briefMetadata for the endpoint, a JSON-encoded user-provided object, + * e.g. `{"key":"value"}` */ - const char **Metadata; - int MetadataLen; + natsMetadata Metadata; /** * @brief The request handler for the endpoint. @@ -7401,12 +7487,10 @@ struct micro_endpoint_info_s const char *Subject; /** - * @brief The metadata for the endpoint in the form of a string array [n1, - * v1, n2, v2, ...] representing key/value pairs {n1:v1, n2:v2, ...}. - * MetadataLen contains the number of **pairs** in Metadata. + * @briefMetadata for the endpoint, a JSON-encoded user-provided object, + * e.g. `{"key":"value"}` */ - const char **Metadata; - int MetadataLen; + natsMetadata Metadata; }; /** @@ -7475,12 +7559,9 @@ struct micro_service_config_s const char *Description; /** - * @brief Metadata for the service in the form of a string array [n1, v1, - * n2, v2, ...] representing key/value pairs {n1:v1, n2:v2, ...}. - * MetadataLen contains the number of **pairs** in Metadata. + * @brief Metadata for the service, a JSON-encoded user-provided object, e.g. `{"key":"value"}` */ - const char **Metadata; - int MetadataLen; + natsMetadata Metadata; /** * @brief The "main" (aka default) endpoint configuration. @@ -7562,12 +7643,9 @@ struct micro_service_info_s const char *Id; /** - * @brief The service metadata in the form of a string array [n1, v1, n2, - * v2, ...] representing key/value pairs {n1:v1, n2:v2, ...}. MetadataLen - * contains the number of **pairs** in Metadata. + * @brief Metadata for the service, a JSON-encoded user-provided object, e.g. `{"key":"value"}` */ - const char **Metadata; - int MetadataLen; + natsMetadata Metadata; /** * @brief Endpoints. diff --git a/src/natsp.h b/src/natsp.h index a3dac236b..662fe9e07 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -103,6 +103,17 @@ #define MAX_FRAMES (50) #define nats_IsStringEmpty(s) ((((s) == NULL) || ((s)[0] == '\0')) ? true : false) +#define nats_HasPrefix(_s, _prefix) (nats_IsStringEmpty(_s) ? nats_IsStringEmpty(_prefix) : (strncmp((_s), (_prefix), strlen(_prefix)) == 0)) + +static inline bool nats_StringEquals(const char *s1, const char *s2) +{ + if (s1 == NULL) + return (s2 == NULL); + if (s2 == NULL) + return false; + + return strcmp(s1, s2); +} #define DUP_STRING(s, s1, s2) \ { \ diff --git a/src/util.c b/src/util.c index 7064f2d18..aa4902e8e 100644 --- a/src/util.c +++ b/src/util.c @@ -2428,3 +2428,130 @@ bool nats_IsSubjectValid(const char *subject, bool wcAllowed) } return true; } + +natsStatus +nats_marshalMetadata(natsBuffer *buf, bool comma, const char *fieldName, natsMetadata md) +{ + natsStatus s = NATS_OK; + int i; + const char *start = (comma ? ",\"" : "\""); + + if (md.Count <= 0) + return NATS_OK; + + IFOK(s, natsBuf_Append(buf, start, -1)); + IFOK(s, natsBuf_Append(buf, fieldName, -1)); + IFOK(s, natsBuf_Append(buf, "\":{", 3)); + for (i = 0; (s == NATS_OK) && (i < md.Count); i++) + { + IFOK(s, natsBuf_AppendByte(buf, '"')); + IFOK(s, natsBuf_Append(buf, md.List[i * 2], -1)); + IFOK(s, natsBuf_Append(buf, "\":\"", 3)); + IFOK(s, natsBuf_Append(buf, md.List[i * 2 + 1], -1)); + IFOK(s, natsBuf_AppendByte(buf, '"')); + + if (i != md.Count - 1) + IFOK(s, natsBuf_AppendByte(buf, ',')); + } + IFOK(s, natsBuf_AppendByte(buf, '}')); + return NATS_OK; +} + +static natsStatus +_addMD(void *closure, const char *fieldName, nats_JSONField *f) +{ + natsMetadata *md = (natsMetadata *)closure; + + char *name = NATS_STRDUP(fieldName); + char *value = NATS_STRDUP(f->value.vstr); + if ((name == NULL) || (value == NULL)) + { + NATS_FREE(name); + NATS_FREE(value); + return nats_setDefaultError(NATS_NO_MEMORY); + } + + md->List[md->Count * 2] = name; + md->List[md->Count * 2 + 1] = value; + md->Count++; + return NATS_OK; +} + +natsStatus +nats_unmarshalMetadata(nats_JSON *json, const char *fieldName, natsMetadata *md) +{ + natsStatus s = NATS_OK; + nats_JSON *mdJSON = NULL; + int n; + + md->List = NULL; + md->Count = 0; + if (json == NULL) + return NATS_OK; + + s = nats_JSONGetObject(json, fieldName, &mdJSON); + if ((s != NATS_OK) || (mdJSON == NULL)) + return NATS_OK; + + n = natsStrHash_Count(mdJSON->fields); + md->List = NATS_CALLOC(n * 2, sizeof(char *)); + if (md->List == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + IFOK(s, nats_JSONRange(mdJSON, TYPE_STR, 0, _addMD, md)); + + return s; +} + +natsStatus +nats_cloneMetadata(natsMetadata *clone, natsMetadata md) +{ + natsStatus s = NATS_OK; + int i = 0; + int n; + char **list = NULL; + + clone->Count = 0; + clone->List = NULL; + if (md.Count == 0) + return NATS_OK; + + n = md.Count * 2; + list = NATS_CALLOC(n, sizeof(char *)); + if (list == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + + for (i = 0; (s == NATS_OK) && (i < n); i++) + { + list[i] = NATS_STRDUP(md.List[i]); + if (list[i] == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + } + + if (s == NATS_OK) + { + clone->List = (const char **)list; + clone->Count = md.Count; + } + else + { + for (n = i, i = 0; i < n; i++) + NATS_FREE(list[i]); + NATS_FREE(list); + } + return s; +} + +void +nats_freeMetadata(natsMetadata *md) +{ + if (md == NULL) + return; + + for (int i = 0; i < md->Count * 2; i++) + { + NATS_FREE((char *)md->List[i]); + } + NATS_FREE(md->List); + md->List = NULL; + md->Count = 0; +} diff --git a/src/util.h b/src/util.h index b7c156da6..127bce0bb 100644 --- a/src/util.h +++ b/src/util.h @@ -15,6 +15,7 @@ #define UTIL_H_ #include "natsp.h" +#include "mem.h" #define JSON_MAX_NEXTED 100 @@ -45,7 +46,6 @@ typedef struct { char *str; natsStrHash *fields; - } nats_JSON; typedef struct @@ -243,6 +243,18 @@ nats_marshalULong(natsBuffer *buf, bool comma, const char *fieldName, uint64_t u natsStatus nats_marshalDuration(natsBuffer *out_buf, bool comma, const char *field_name, int64_t d); +natsStatus +nats_marshalMetadata(natsBuffer *buf, bool comma, const char *fieldName, natsMetadata md); + +natsStatus +nats_unmarshalMetadata(nats_JSON *json, const char *fieldName, natsMetadata *md); + +natsStatus +nats_cloneMetadata(natsMetadata *clone, natsMetadata md); + +void +nats_freeMetadata(natsMetadata *md); + bool nats_IsSubjectValid(const char *subject, bool wcAllowed); diff --git a/test/test.c b/test/test.c index 44d55a261..1e9ab7dfa 100644 --- a/test/test.c +++ b/test/test.c @@ -7425,46 +7425,53 @@ _checkPool(natsConnection *nc, char **expectedURLs, int expectedURLsCount) { int i, j, attempts; natsSrv *srv; - char *url; + char *url = NULL; char buf[64]; - bool ok; + bool ok = false; natsMutex_Lock(nc->mu); - if (nc->srvPool->size != expectedURLsCount) + for (attempts = 0; (!ok) && (attempts < 20); attempts++) { - printf("Expected pool size to be %d, got %d\n", expectedURLsCount, nc->srvPool->size); - natsMutex_Unlock(nc->mu); - return NATS_ERR; - } - for (attempts=0; attempts<20; attempts++) - { - for (i=0; isrvPool->size == expectedURLsCount); + for (i = 0; i < expectedURLsCount; i++) { + bool foundInPool = false; url = expectedURLs[i]; - ok = false; - for (j=0; jsrvPool->size; j++) + for (j = 0; j < nc->srvPool->size; j++) { srv = nc->srvPool->srvrs[j]; snprintf(buf, sizeof(buf), "%s:%d", srv->url->host, srv->url->port); if (strcmp(buf, url)) { - ok = true; + foundInPool = true; break; } } - if (!ok) + if (!foundInPool) { - natsMutex_Unlock(nc->mu); - nats_Sleep(100); - natsMutex_Lock(nc->mu); - continue; + ok = false; + break; } } + + if (ok) + break; + natsMutex_Unlock(nc->mu); - return NATS_OK; + nats_Sleep(100); + natsMutex_Lock(nc->mu); + } + + if (!ok) + { + if (nc->srvPool->size != expectedURLsCount) + printf("After 20 retries expected pool size to be %d, got %d\n", expectedURLsCount, nc->srvPool->size); + else if (url != NULL) + printf("After 20 retries did not find %s in pool\n", url); } + natsMutex_Unlock(nc->mu); - return NATS_ERR; + return ok ? NATS_OK : NATS_ERR; } static natsStatus @@ -22056,15 +22063,40 @@ test_JetStreamUnmarshalStreamConfig(void) json = NULL; test("Stream config with all: "); - if (snprintf(tmp, sizeof(tmp), "{\"name\":\"TEST\",\"description\":\"this is my stream\",\"subjects\":[\"foo\",\"bar\"],"\ - "\"retention\":\"workqueue\",\"max_consumers\":5,\"max_msgs\":10,\"max_bytes\":1000,"\ - "\"max_age\":20000000,\"max_msg_size\":1024,\"max_msgs_per_subject\":1,\"discard\":\"new\",\"storage\":\"memory\","\ - "\"num_replicas\":3,\"no_ack\":true,\"template_owner\":\"owner\","\ - "\"duplicate_window\":100000000000,\"placement\":{\"cluster\":\"cluster\",\"tags\":[\"tag1\",\"tag2\"]},"\ - "\"mirror\":{\"name\":\"TEST2\",\"opt_start_seq\":10,\"filter_subject\":\"foo\",\"external\":{\"api\":\"my_prefix\",\"deliver\":\"deliver_prefix\"}},"\ - "\"sources\":[{\"name\":\"TEST3\",\"opt_start_seq\":20,\"filter_subject\":\"bar\",\"external\":{\"api\":\"my_prefix2\",\"deliver\":\"deliver_prefix2\"}}],"\ - "\"sealed\":true,\"deny_delete\":true,\"deny_purge\":true,\"allow_rollup_hdrs\":true,\"republish\":{\"src\":\"foo\",\"dest\":\"bar\"},"\ - "\"allow_direct\":true,\"mirror_direct\":true}") >= (int) sizeof(tmp)) + if (snprintf(tmp, sizeof(tmp), "{" + "\"name\":\"TEST\"" + ",\"description\":\"this is my stream\"" + ",\"subjects\":[\"foo\",\"bar\"]" + ",\"retention\":\"workqueue\"" + ",\"max_consumers\":5" + ",\"max_msgs\":10" + ",\"max_bytes\":1000" + ",\"max_age\":20000000" + ",\"max_msgs_per_subject\":1" + ",\"max_msg_size\":1024" + ",\"discard\":\"new\"" + ",\"storage\":\"memory\"" + ",\"num_replicas\":3" + ",\"no_ack\":true" + ",\"template_owner\":\"owner\"" + ",\"duplicate_window\":100000000000" + ",\"placement\":{\"cluster\":\"cluster\",\"tags\":[\"tag1\",\"tag2\"]}" + ",\"mirror\":{\"name\":\"TEST2\",\"opt_start_seq\":10,\"filter_subject\":\"foo\",\"external\":{\"api\":\"my_prefix\",\"deliver\":\"deliver_prefix\"}}" + ",\"sources\":[{\"name\":\"TEST3\",\"opt_start_seq\":20,\"filter_subject\":\"bar\",\"external\":{\"api\":\"my_prefix2\",\"deliver\":\"deliver_prefix2\"}}]" + ",\"sealed\":true" + ",\"deny_delete\":true" + ",\"deny_purge\":true" + ",\"allow_rollup_hdrs\":true" + ",\"republish\":{\"src\":\"foo\",\"dest\":\"bar\"}" + ",\"allow_direct\":true" + ",\"mirror_direct\":true" + ",\"discard_new_per_subject\":true" + ",\"metadata\":{\"foo\":\"bar\"}" + ",\"compression\":\"s2\"" + ",\"first_seq\":9999" + ",\"subject_transform\":{\"src\":\"foo\",\"dest\":\"bar\"}" + ",\"consumer_limits\":{\"inactive_threshold\":1000,\"max_ack_pending\":99}" + "}") >= (int) sizeof(tmp)) { abort(); } @@ -22109,7 +22141,19 @@ test_JetStreamUnmarshalStreamConfig(void) && (strcmp(sc->RePublish->Source, "foo") == 0) && (sc->RePublish->Destination != NULL) && (strcmp(sc->RePublish->Destination, "bar") == 0)) - && sc->AllowDirect && sc->MirrorDirect); + && sc->AllowDirect && sc->MirrorDirect + && sc->DiscardNewPerSubject + && (sc->Metadata.Count == 1) + && (sc->Metadata.List != NULL) + && (strcmp(sc->Metadata.List[0], "foo") == 0) + && (strcmp(sc->Metadata.List[1], "bar") == 0) + && (sc->Compression == js_StorageCompressionS2) + && (sc->FirstSeq == 9999) + && (strcmp(sc->SubjectTransform.Source, "foo") == 0) + && (strcmp(sc->SubjectTransform.Destination, "bar") == 0) + && (sc->ConsumerLimits.InactiveThreshold == 1000) + && (sc->ConsumerLimits.MaxAckPending == 99) + ); js_destroyStreamConfig(sc); sc = NULL; nats_JSONDestroy(json); @@ -22122,18 +22166,19 @@ test_JetStreamUnmarshalStreamInfo(void) natsStatus s; nats_JSON *json = NULL; jsStreamInfo *si = NULL; - const char *good[] = { + const char *good[] = { "{\"cluster\":{\"name\":\"S1\",\"leader\":\"S2\"}}", "{\"cluster\":{\"name\":\"S1\",\"leader\":\"S2\",\"replicas\":[{\"name\":\"S1\",\"current\":true,\"offline\":false,\"active\":123,\"lag\":456},{\"name\":\"S1\",\"current\":false,\"offline\":true,\"active\":123,\"lag\":456}]}}", "{\"mirror\":{\"name\":\"M\",\"lag\":123,\"active\":456}}", "{\"mirror\":{\"name\":\"M\",\"external\":{\"api\":\"MyApi\",\"deliver\":\"deliver.prefix\"},\"lag\":123,\"active\":456}}", "{\"sources\":[{\"name\":\"S1\",\"lag\":123,\"active\":456}]}", + "{\"sources\":[{\"name\":\"S1\",\"lag\":123,\"active\":456,\"filter_subject\":\"foo\",\"subject_transforms:\":[{\"src\":\"foo\",\"dest\":\"bar\"}]}]}", "{\"sources\":[{\"name\":\"S1\",\"lag\":123,\"active\":456},{\"name\":\"S2\",\"lag\":123,\"active\":456}]}", "{\"sources\":[{\"name\":\"S1\",\"external\":{\"api\":\"MyApi\",\"deliver\":\"deliver.prefix\"},\"lag\":123,\"active\":456},{\"name\":\"S2\",\"lag\":123,\"active\":456}]}", "{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":\"abc\"}]}", "{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":\"abc\"},{\"name\":\"S2\",\"domain\":\"domain\",\"cluster\":\"abc\"}]}", }; - const char *bad[] = { + const char *bad[] = { "{\"config\":123}", "{\"config\":{\"retention\":\"bad_policy\"}}", "{\"config\":{\"discard\":\"bad_policy\"}}", @@ -22321,6 +22366,20 @@ test_JetStreamMarshalStreamConfig(void) rp.HeadersOnly = true; sc.RePublish = &rp; + // 2.10 options: Compression, Metadata, etc. + sc.Compression = js_StorageCompressionS2; + sc.Metadata.List = (const char *[]){"k1", "v1", "k2", "v2"}; + sc.Metadata.Count = 2; + sc.FirstSeq = 9999; + sc.SubjectTransform = (jsSubjectTransformConfig) { + .Source = "foo", + .Destination = "bar", + }; + sc.ConsumerLimits = (jsStreamConsumerLimits) { + .InactiveThreshold = 1000, + .MaxAckPending = 99, + }; + test("Marshal stream config: "); s = js_marshalStreamConfig(&buf, &sc); testCond((s == NATS_OK) && (buf != NULL) && (natsBuf_Len(buf) > 0)); @@ -22390,7 +22449,19 @@ test_JetStreamMarshalStreamConfig(void) && rsc->RePublish->HeadersOnly && rsc->AllowDirect && rsc->MirrorDirect - && rsc->DiscardNewPerSubject); + && rsc->DiscardNewPerSubject + && (rsc->Compression == js_StorageCompressionS2) + && (rsc->Metadata.Count == 2) + && (strcmp(rsc->Metadata.List[0], "k2") == 0) + && (strcmp(rsc->Metadata.List[1], "v2") == 0) + && (strcmp(rsc->Metadata.List[2], "k1") == 0) + && (strcmp(rsc->Metadata.List[3], "v1") == 0) + && (rsc->FirstSeq == 9999) + && (strcmp(rsc->SubjectTransform.Source, "foo") == 0) + && (strcmp(rsc->SubjectTransform.Destination, "bar") == 0) + && (rsc->ConsumerLimits.InactiveThreshold == 1000) + && (rsc->ConsumerLimits.MaxAckPending == 99) + ); js_destroyStreamConfig(rsc); rsc = NULL; // Check that this does not crash @@ -22478,6 +22549,7 @@ test_JetStreamUnmarshalConsumerInfo(void) "{\"config\":{\"num_replicas\":1}}", "{\"config\":{\"mem_storage\":true}}", "{\"config\":{\"name\":\"my_name\"}}", + "{\"config\":{\"name\":\"my_name\",\"metadata\":{\"k1\":\"v1\",\"k2\":\"v2\"}}}", }; const char *bad[] = { "{\"stream_name\":123}", @@ -23160,6 +23232,38 @@ test_JetStreamMgtStreams(void) && ((jerr == 0) || (jerr == JSStreamNameExistErr))); nats_clearLastError(); + if (serverVersionAtLeast(2, 10, 0)) + { + test("Create stream with 2.10 server features: "); + cfg.Name = "TEST210"; + cfg.Subjects = (const char*[]){"foo210"}; + cfg.SubjectsLen = 1; + cfg.Metadata.List = (const char *[]){"k1", "v1", "k2", "v2"}; + cfg.Metadata.Count = 2; + cfg.Compression = js_StorageCompressionS2; + cfg.FirstSeq = 9999; + cfg.SubjectTransform = (jsSubjectTransformConfig) {.Source = "foo210", .Destination = "bar210"}; + cfg.ConsumerLimits = (jsStreamConsumerLimits) {.InactiveThreshold = 1000, .MaxAckPending = 99}; + + s = js_AddStream(&si, js, &cfg, NULL, &jerr); + + testCond((s == NATS_OK) + && (si != NULL) + && (si->Config != NULL) + && (strcmp(si->Config->Name, "TEST210") == 0) + && (si->Config->Metadata.Count == 2) + && (si->Config->Compression == js_StorageCompressionS2) + && (si->Config->FirstSeq == 9999) + && (strcmp(si->Config->SubjectTransform.Source, "foo210") == 0) + && (strcmp(si->Config->SubjectTransform.Destination, "bar210") == 0) + && (si->Config->ConsumerLimits.InactiveThreshold == 1000) + && (si->Config->ConsumerLimits.MaxAckPending == 99) + && (jerr == 0) + ); + jsStreamInfo_Destroy(si); + si = NULL; + } + jerr = 0; // Reset config jsStreamConfig_Init(&cfg); @@ -24028,6 +24132,9 @@ test_JetStreamMgtConsumers(void) cfg.Heartbeat = 700; cfg.Replicas = 1; cfg.MemoryStorage = true; + cfg.Metadata.List = (const char *[]){"key1", "val1", "key2", "val2"}; + cfg.Metadata.Count = 2; + // We create a consumer with non existing stream, so we // expect this to fail. We are just checking that the config // is properly serialized. @@ -24046,6 +24153,7 @@ test_JetStreamMgtConsumers(void) "\"opt_start_seq\":100,"\ "\"opt_start_time\":\"2021-06-23T18:22:00.12345Z\",\"ack_policy\":\"explicit\","\ "\"ack_wait\":200,\"max_deliver\":300,\"filter_subject\":\"bar\","\ + "\"metadata\":{\"key1\":\"val1\",\"key2\":\"val2\"},"\ "\"replay_policy\":\"instant\",\"rate_limit_bps\":400,"\ "\"sample_freq\":\"60%%\",\"max_waiting\":500,\"max_ack_pending\":600,"\ "\"flow_control\":true,\"idle_heartbeat\":700,"\ @@ -24054,6 +24162,38 @@ test_JetStreamMgtConsumers(void) natsMsg_Destroy(resp); resp = NULL; + if (serverVersionAtLeast(2, 10, 0)) + { + test("Add consumer (non durable, filter subjects): "); + cfg.FilterSubject = NULL; + cfg.FilterSubjects = (const char *[]){"bar1", "bar2"}; + cfg.FilterSubjectsLen = 2; + s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr); + testCond((s = NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL)); + nats_clearLastError(); + + test("Verify config: "); + s = natsSubscription_NextMsg(&resp, sub, 1000); + testCond((s == NATS_OK) && (resp != NULL) && (strncmp(natsMsg_GetData(resp), "{\"stream_name\":\"MY_STREAM\"," + "\"config\":{\"deliver_policy\":\"last\"," + "\"description\":\"MyDescription\"," + "\"deliver_subject\":\"foo\"," + "\"opt_start_seq\":100," + "\"opt_start_time\":\"2021-06-23T18:22:00.12345Z\",\"ack_policy\":\"explicit\"," + "\"ack_wait\":200,\"max_deliver\":300,\"filter_subjects\":[\"bar1\",\"bar2\"]," + "\"metadata\":{\"key1\":\"val1\",\"key2\":\"val2\"},"\ + "\"replay_policy\":\"instant\",\"rate_limit_bps\":400," + "\"sample_freq\":\"60%%\",\"max_waiting\":500,\"max_ack_pending\":600," + "\"flow_control\":true,\"idle_heartbeat\":700," + "\"num_replicas\":1,\"mem_storage\":true}}", + natsMsg_GetDataLength(resp)) == 0)); + natsMsg_Destroy(resp); + resp = NULL; + cfg.FilterSubjects = NULL; + cfg.FilterSubjectsLen = 0; + cfg.FilterSubject = "bar"; + } + test("Create check sub: "); natsSubscription_Destroy(sub); sub = NULL; @@ -24077,6 +24217,7 @@ test_JetStreamMgtConsumers(void) "\"opt_start_seq\":100,"\ "\"opt_start_time\":\"2021-06-23T18:22:00.12345Z\",\"ack_policy\":\"explicit\","\ "\"ack_wait\":200,\"max_deliver\":300,\"filter_subject\":\"bar\","\ + "\"metadata\":{\"key1\":\"val1\",\"key2\":\"val2\"},"\ "\"replay_policy\":\"instant\",\"rate_limit_bps\":400,"\ "\"sample_freq\":\"60%%\",\"max_waiting\":500,\"max_ack_pending\":600,"\ "\"flow_control\":true,\"idle_heartbeat\":700,"\ @@ -24112,6 +24253,7 @@ test_JetStreamMgtConsumers(void) "\"opt_start_seq\":100,"\ "\"opt_start_time\":\"2021-06-23T18:22:00.12345Z\",\"ack_policy\":\"explicit\","\ "\"ack_wait\":200,\"max_deliver\":300,\"filter_subject\":\"bar\","\ + "\"metadata\":{\"key1\":\"val1\",\"key2\":\"val2\"},"\ "\"replay_policy\":\"instant\",\"rate_limit_bps\":400,"\ "\"sample_freq\":\"60%%\",\"max_waiting\":500,\"max_ack_pending\":600,"\ "\"flow_control\":true,\"idle_heartbeat\":700,"\ @@ -24348,6 +24490,32 @@ test_JetStreamMgtConsumers(void) jsConsumerInfo_Destroy(ci); ci = NULL; + if (serverVersionAtLeast(2, 10, 0)) + { + test("Update (filter subjects) works ok: "); + cfg.FilterSubject = NULL; + cfg.FilterSubjects = (const char *[]){"bar1", "bar2"}; + cfg.FilterSubjectsLen = 2; + s = js_UpdateConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0) && (ci != NULL) && (ci->Config != NULL) + && (strcmp(ci->Config->Description, "my description") == 0) + && (ci->Config->AckWait == NATS_SECONDS_TO_NANOS(2)) + && (ci->Config->MaxDeliver == 1) + && (strcmp(ci->Config->SampleFrequency, "30") == 0) + && (ci->Config->MaxAckPending == 10) + && (ci->Config->HeadersOnly) + && (ci->Config->FilterSubject == NULL) + && (ci->Config->FilterSubjectsLen == 2) + && (ci->Config->FilterSubjects != NULL) + && (strcmp(ci->Config->FilterSubjects[0], "bar1") == 0) + && (strcmp(ci->Config->FilterSubjects[1], "bar2") == 0)); + jsConsumerInfo_Destroy(ci); + ci = NULL; + cfg.FilterSubject = "bar.bat"; + cfg.FilterSubjects = NULL; + cfg.FilterSubjectsLen = 0; + } + test("Add pull consumer: "); jsConsumerConfig_Init(&cfg); cfg.Durable = "update_pull_consumer"; @@ -24659,9 +24827,11 @@ test_JetStreamMgtConsumers(void) cfg.Durable = "B"; cfg.Description = "C"; cfg.FilterSubject = "D"; - cfg.SampleFrequency = "E"; - cfg.DeliverSubject = "F"; - cfg.DeliverGroup = "G"; + cfg.FilterSubjects = (const char*[]){"E", "F"}; + cfg.FilterSubjectsLen = 2; + cfg.SampleFrequency = "G"; + cfg.DeliverSubject = "H"; + cfg.DeliverGroup = "I"; cfg.BackOff = (int64_t[]){NATS_MILLIS_TO_NANOS(50), NATS_MILLIS_TO_NANOS(250)}; cfg.BackOffLen = 2; s = js_cloneConsumerConfig(&cfg, &cloneCfg); @@ -24670,9 +24840,14 @@ test_JetStreamMgtConsumers(void) && (cloneCfg->Durable != NULL) && (strcmp(cloneCfg->Durable, "B") == 0) && (cloneCfg->Description != NULL) && (strcmp(cloneCfg->Description, "C") == 0) && (cloneCfg->FilterSubject != NULL) && (strcmp(cloneCfg->FilterSubject, "D") == 0) - && (cloneCfg->SampleFrequency != NULL) && (strcmp(cloneCfg->SampleFrequency, "E") == 0) - && (cloneCfg->DeliverSubject != NULL) && (strcmp(cloneCfg->DeliverSubject, "F") == 0) - && (cloneCfg->DeliverGroup != NULL) && (strcmp(cloneCfg->DeliverGroup, "G") == 0) + && (cloneCfg->FilterSubject != NULL) && (strcmp(cloneCfg->FilterSubject, "D") == 0) + && (cloneCfg->FilterSubjectsLen == 2) + && (cloneCfg->FilterSubjects != NULL) + && (strcmp(cloneCfg->FilterSubjects[0], "E") == 0) + && (strcmp(cloneCfg->FilterSubjects[1], "F") == 0) + && (cloneCfg->SampleFrequency != NULL) && (strcmp(cloneCfg->SampleFrequency, "G") == 0) + && (cloneCfg->DeliverSubject != NULL) && (strcmp(cloneCfg->DeliverSubject, "H") == 0) + && (cloneCfg->DeliverGroup != NULL) && (strcmp(cloneCfg->DeliverGroup, "I") == 0) && (cloneCfg->BackOffLen == 2) && (cloneCfg->BackOff != NULL) && (cloneCfg->BackOff[0] == NATS_MILLIS_TO_NANOS(50)) @@ -26198,6 +26373,29 @@ test_JetStreamSubscribe(void) && (strstr(nats_GetLastError(NULL), "filter subject") != NULL)); nats_clearLastError(); + if (serverVersionAtLeast(2, 10, 0)) + { + test("Create consumer with multiple filters: "); + jsConsumerConfig_Init(&cc); + cc.Durable = "dur-multi-filter"; + cc.DeliverSubject = "push.dur.sub.2"; + cc.FilterSubjectsLen = 2; + cc.FilterSubjects = (const char *[2]){"sub.1", "sub.2"}; + s = js_AddConsumer(NULL, js, "MULTIPLE_SUBJS", &cc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Subscribe subj != filters: "); + so.Consumer = "dur-multi-filter"; + s = js_Subscribe(&sub, js, "foo", _jsMsgHandler, &args, NULL, &so, &jerr); + testCond((s == NATS_ERR) && (sub == NULL) + && (strstr(nats_GetLastError(NULL), "filter subject") != NULL)); + nats_clearLastError(); + cc.FilterSubject = "sub.2"; + cc.FilterSubjects = NULL; + cc.FilterSubjectsLen = 0; + so.Consumer = "dur"; + } + test("Subject not required when binding to stream/consumer: "); s = js_Subscribe(&sub, js, NULL, _jsMsgHandler, &args, NULL, &so, &jerr); testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0)); @@ -32752,21 +32950,14 @@ test_MicroBasics(void) .Subject = "svc.do", .Handler = _microHandleRequestNoisy42, }; - const char *ep_md[] = { - "key1", "value1", - "key2", "value2", - "key3", "value3", - }; - const char *service_md[] = { - "skey1", "svalue1", - "skey2", "svalue2", - }; microEndpointConfig ep2_cfg = { .Name = "unused", .Subject = "svc.unused", .Handler = _microHandleRequestNoisy42, - .MetadataLen = 3, - .Metadata = ep_md, + .Metadata = (natsMetadata){ + .List = (const char *[]){"key1", "value1", "key2", "value2", "key3", "value3"}, + .Count = 3, + }, }; microEndpointConfig *eps[] = { &ep1_cfg, @@ -32776,8 +32967,10 @@ test_MicroBasics(void) .Version = "1.0.0", .Name = "CoolService", .Description = "returns 42", - .MetadataLen = 2, - .Metadata = service_md, + .Metadata = (natsMetadata){ + .List = (const char *[]){"skey1", "svalue1", "skey2", "svalue2"}, + .Count = 2, + }, }; natsMsg *reply = NULL; microServiceInfo *info = NULL; @@ -32837,7 +33030,7 @@ test_MicroBasics(void) (strlen(info->Id) > 0) && (strcmp(info->Description, "returns 42") == 0) && (strcmp(info->Version, "1.0.0") == 0) && - (info->MetadataLen == 2)); + (info->Metadata.Count == 2)); microServiceInfo_Destroy(info); } @@ -32878,13 +33071,12 @@ test_MicroBasics(void) snprintf(buf, sizeof(buf), "Validate INFO service metadata#%d: ", i); test(buf); - md = NULL; + md = NULL; testCond( (NATS_OK == nats_JSONGetObject(js, "metadata", &md)) && (NATS_OK == nats_JSONGetStrPtr(md, "skey1", &str)) && (strcmp(str, "svalue1") == 0) && (NATS_OK == nats_JSONGetStrPtr(md, "skey2", &str)) && (strcmp(str, "svalue2") == 0) ); - test("Validate INFO has 2 endpoints: "); array = NULL; array_len = 0; @@ -32892,7 +33084,7 @@ test_MicroBasics(void) testCond((NATS_OK == s) && (array != NULL) && (array_len == 2)); test("Validate INFO svc.do endpoint: "); - md = NULL; + md = NULL; testCond( (NATS_OK == nats_JSONGetStrPtr(array[0], "name", &str)) && (strcmp(str, "do") == 0) && (NATS_OK == nats_JSONGetStrPtr(array[0], "subject", &str)) && (strcmp(str, "svc.do") == 0) @@ -32900,7 +33092,7 @@ test_MicroBasics(void) ); test("Validate INFO unused endpoint with metadata: "); - md = NULL; + md = NULL; testCond( (NATS_OK == nats_JSONGetStrPtr(array[1], "name", &str)) && (strcmp(str, "unused") == 0) && (NATS_OK == nats_JSONGetStrPtr(array[1], "subject", &str)) && (strcmp(str, "svc.unused") == 0)