From c3c0c5fb0cfc357771cdf579305caeaedad61cdc Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 29 Sep 2022 18:07:30 -0600 Subject: [PATCH 1/6] Start v3.5.0 dev branch Signed-off-by: Ivan Kozlovic --- CMakeLists.txt | 6 +++--- doc/DoxyFile.NATS.Client | 2 +- src/version.h | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 35bfa1975..0d90ee04e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -243,11 +243,11 @@ endif(NATS_BUILD_WITH_TLS) # Versionning and Doc set(NATS_VERSION_MAJOR 3) -set(NATS_VERSION_MINOR 4) +set(NATS_VERSION_MINOR 5) set(NATS_VERSION_PATCH 0) -set(NATS_VERSION_SUFFIX "") +set(NATS_VERSION_SUFFIX "-dev") -set(NATS_VERSION_REQUIRED_NUMBER 0x030400) +set(NATS_VERSION_REQUIRED_NUMBER 0x030500) if(NATS_UPDATE_VERSION OR NATS_UPDATE_DOC) configure_file( diff --git a/doc/DoxyFile.NATS.Client b/doc/DoxyFile.NATS.Client index 8f0c9e09d..7489cf2dc 100644 --- a/doc/DoxyFile.NATS.Client +++ b/doc/DoxyFile.NATS.Client @@ -38,7 +38,7 @@ PROJECT_NAME = "NATS C Client with JetStream and Streaming support" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 3.4.0 +PROJECT_NUMBER = 3.5.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/src/version.h b/src/version.h index e1e75ede2..a0e6ebdef 100644 --- a/src/version.h +++ b/src/version.h @@ -22,16 +22,16 @@ extern "C" { #endif #define NATS_VERSION_MAJOR 3 -#define NATS_VERSION_MINOR 4 +#define NATS_VERSION_MINOR 5 #define NATS_VERSION_PATCH 0 -#define NATS_VERSION_STRING "3.4.0" +#define NATS_VERSION_STRING "3.5.0-dev" #define NATS_VERSION_NUMBER ((NATS_VERSION_MAJOR << 16) | \ (NATS_VERSION_MINOR << 8) | \ NATS_VERSION_PATCH) -#define NATS_VERSION_REQUIRED_NUMBER 0x030400 +#define NATS_VERSION_REQUIRED_NUMBER 0x030500 #ifdef __cplusplus } From 57dd765351ba385685bd21d43102839cbe076d20 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 29 Sep 2022 18:26:14 -0600 Subject: [PATCH 2/6] [ADDED] KeyValue: kvStatus_Bytes() to return size of the bucket Signed-off-by: Ivan Kozlovic --- src/kv.c | 6 ++++++ src/nats.h | 9 +++++++++ test/test.c | 24 +++++++++++++++++++++++- 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/src/kv.c b/src/kv.c index 38539f2ff..b6930fa00 100644 --- a/src/kv.c +++ b/src/kv.c @@ -1312,6 +1312,12 @@ kvStatus_Replicas(kvStatus *sts) return (sts == NULL || sts->si->Config == NULL ? 0 : sts->si->Config->Replicas); } +uint64_t +kvStatus_Bytes(kvStatus *sts) +{ + return (sts == NULL ? 0 : sts->si->State.Bytes); +} + void kvStatus_Destroy(kvStatus *sts) { diff --git a/src/nats.h b/src/nats.h index 2e046ace9..00112540e 100644 --- a/src/nats.h +++ b/src/nats.h @@ -7028,6 +7028,15 @@ kvStatus_TTL(kvStatus *sts); NATS_EXTERN int64_t kvStatus_Replicas(kvStatus *sts); +/** \brief Returns the size (in bytes) of this bucket. + * + * Returns the size (in bytes) of this bucket, or `0` if `sts` itself is `NULL`. + * + * @param sts the pointer to the #kvStatus object. + */ +NATS_EXTERN uint64_t +kvStatus_Bytes(kvStatus *sts); + /** \brief Destroys the KeyValue status object. * * Releases memory allocated for this #kvStatus object. diff --git a/test/test.c b/test/test.c index 7b7cc9fd3..a47ac7156 100644 --- a/test/test.c +++ b/test/test.c @@ -29538,6 +29538,13 @@ test_KeyValueBasics(void) s = (kvStore_Bucket(NULL) == NULL ? NATS_OK : NATS_ERR); testCond(s == NATS_OK); + test("Check bytes is 0: "); + s = kvStore_Status(&sts, kv); + IFOK(s, (kvStatus_Bytes(sts) == 0 ? NATS_OK : NATS_ERR)); + testCond(s == NATS_OK); + kvStatus_Destroy(sts); + sts = NULL; + test("Simple put (bad args): "); rev = 1234; s = kvStore_Put(&rev, NULL, "key", (const void*) "value", 5); @@ -29763,9 +29770,24 @@ test_KeyValueBasics(void) s = (kvStatus_Replicas(sts) == 1 ? NATS_OK : NATS_ERR); testCond(s == NATS_OK); + test("Check bytes: "); + { + jsStreamInfo *si = NULL; + + if (i == 0) + s = js_GetStreamInfo(&si, js, "KV_TEST0", NULL, NULL); + else + s = js_GetStreamInfo(&si, js, "KV_TEST1", NULL, NULL); + IFOK(s, (kvStatus_Bytes(sts) == si->State.Bytes ? NATS_OK : NATS_ERR)); + + jsStreamInfo_Destroy(si); + } + testCond(s == NATS_OK); + test("Check status with NULL: "); if ((kvStatus_History(NULL) != 0) || (kvStatus_Bucket(NULL) != NULL) - || (kvStatus_TTL(NULL) != 0) || (kvStatus_Values(NULL) != 0)) + || (kvStatus_TTL(NULL) != 0) || (kvStatus_Values(NULL) != 0) + || (kvStatus_Bytes(NULL) != 0)) { s = NATS_ERR; } From 6c5c6ba1d26e89d535218f5be1fc2f54f81ed522 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 26 Oct 2022 12:28:40 -0600 Subject: [PATCH 3/6] [CHANGED] KeyValue: library will no longer try to update the stream Until now, the js_CreateKeyValue() call would possibly attempt to update the underlying stream to set some properties that were added from release to release. This had an unexpected side effect when a stream in a newer server has new properties that the application linked to an older library was not aware of and would not unmarshal. Even if a stream comparison (minus the property we would want to update) would return OK, it could be wrong because we would disregard new properties not known by the old client library. Signed-off-by: Ivan Kozlovic --- src/kv.c | 25 +----------------- test/test.c | 74 ++++++++++++++++++++++++++--------------------------- 2 files changed, 37 insertions(+), 62 deletions(-) diff --git a/src/kv.c b/src/kv.c index b6930fa00..9a798a538 100644 --- a/src/kv.c +++ b/src/kv.c @@ -296,32 +296,9 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg) sc.Discard = js_DiscardNew; s = js_AddStream(&si, js, &sc, NULL, &jerr); - if ((s != NATS_OK) && (jerr == JSStreamNameExistErr)) - { - jsStreamInfo_Destroy(si); - si = NULL; - - nats_clearLastError(); - s = js_GetStreamInfo(&si, js, sc.Name, NULL, NULL); - if (s == NATS_OK) - { - si->Config->Discard = sc.Discard; - if (_sameStreamCfg(si->Config, &sc)) - { - jsStreamInfo_Destroy(si); - si = NULL; - s = js_UpdateStream(&si, js, &sc, NULL, NULL); - } - else - s = nats_setError(NATS_ERR, "%s", - "Existing configuration is different"); - } - } + // If the stream allow direct get message calls, then we will do so. if (s == NATS_OK) - { - // If the stream allow direct get message calls, then we will do so. kv->useDirect = si->Config->AllowDirect; - } jsStreamInfo_Destroy(si); } if (s == NATS_OK) diff --git a/test/test.c b/test/test.c index 09db1e6ef..c1834a502 100644 --- a/test/test.c +++ b/test/test.c @@ -31039,50 +31039,48 @@ static void test_KeyValueDiscardOldToNew(void) { kvStore *kv = NULL; - kvEntry *e = NULL; kvConfig kvc; natsStatus s; - int i; JS_SETUP(2, 7, 2); - // We are going to go from 2.7.1->2.7.2->2.7.1 and 2.7.2 again. - for (i=0; i<2; i++) - { - // Change the server version in the connection to - // create as-if we were connecting to a v2.7.1 server. - natsConn_Lock(nc); - nc->srvVersion.ma = 2; - nc->srvVersion.mi = 7; - nc->srvVersion.up = 1; - natsConn_Unlock(nc); + // Change the server version in the connection to + // create as-if we were connecting to a v2.7.1 server. + natsConn_Lock(nc); + nc->srvVersion.ma = 2; + nc->srvVersion.mi = 7; + nc->srvVersion.up = 1; + natsConn_Unlock(nc); - test("Check discard (old): "); - s = _checkDiscard(js, js_DiscardOld, &kv); - if ((s == NATS_OK) && (i == 0)) - s = kvStore_PutString(NULL, kv, "foo", "value"); - testCond(s == NATS_OK); - kvStore_Destroy(kv); - kv = NULL; + test("Check discard (old): "); + s = _checkDiscard(js, js_DiscardOld, &kv); + testCond(s == NATS_OK); + kvStore_Destroy(kv); + kv = NULL; - // Now change version to 2.7.2 - natsConn_Lock(nc); - nc->srvVersion.ma = 2; - nc->srvVersion.mi = 7; - nc->srvVersion.up = 2; - natsConn_Unlock(nc); + // Now change version to 2.7.2 + natsConn_Lock(nc); + nc->srvVersion.ma = 2; + nc->srvVersion.mi = 7; + nc->srvVersion.up = 2; + natsConn_Unlock(nc); - test("Check discard (new): "); - s = _checkDiscard(js, js_DiscardNew, &kv); - IFOK(s, kvStore_Get(&e, kv, "foo")); - if ((s == NATS_OK) && (strcmp(kvEntry_ValueString(e), "value") != 0)) - s = NATS_ERR; - testCond(s == NATS_OK); - kvEntry_Destroy(e); - e = NULL; - kvStore_Destroy(kv); - kv = NULL; - } + test("Check discard (old, no auto-update): "); + s = _checkDiscard(js, js_DiscardOld, &kv); + testCond((s == NATS_ERR) && (kv == NULL) + && (strstr(nats_GetLastError(NULL), "different configuration") != NULL)); + nats_clearLastError(); + + // Now delete the kv store and create against 2.7.2+ + test("Delete KV: "); + s = js_DeleteStream(js, "KV_TEST", NULL, NULL); + testCond(s == NATS_OK); + + test("Check discard (new): "); + s = _checkDiscard(js, js_DiscardNew, &kv); + testCond(s == NATS_OK); + kvStore_Destroy(kv); + kv = NULL; test("Check that other changes are rejected: "); kvConfig_Init(&kvc); @@ -31090,7 +31088,7 @@ test_KeyValueDiscardOldToNew(void) kvc.MaxBytes = 1024*1024; s = js_CreateKeyValue(&kv, js, &kvc); testCond((s == NATS_ERR) - && (strstr(nats_GetLastError(NULL), "configuration is different") != NULL)); + && (strstr(nats_GetLastError(NULL), "different configuration") != NULL)); kvStore_Destroy(kv); JS_TEARDOWN; @@ -31124,7 +31122,7 @@ test_KeyValueRePublish(void) rp.Destination = "bar.>"; kvc.RePublish =&rp; s = js_CreateKeyValue(&kv, js, &kvc); - testCond((s == NATS_ERR) && (strstr(nats_GetLastError(NULL), "can not change RePublish") != NULL)); + testCond((s == NATS_ERR) && (strstr(nats_GetLastError(NULL), "different configuration") != NULL)); nats_clearLastError(); test("Create with repub: "); From 2c872735246e64ee3683403dce85116961a251a6 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 26 Oct 2022 15:42:52 -0600 Subject: [PATCH 4/6] [ADDED] KeyValue: Support for Mirror and Sources Also updated state for proper Put() and Get() semantics with mirrors and across domains, e.g. leafnodes. In practice when a mirror is across a domain, should be named the same as origin. That allows an app to run anywhere without anything special in terms of domains when binding to the KV itself. (Removed dead-code following previous PR #607) Signed-off-by: Ivan Kozlovic --- src/js.h | 3 + src/jsm.c | 105 +++++++++++++++++- src/kv.c | 190 +++++++++++++++++++++++--------- src/nats.h | 7 ++ src/natsp.h | 2 + test/list.txt | 1 + test/test.c | 296 ++++++++++++++++++++++++++++++++++++++++++++++++-- 7 files changed, 544 insertions(+), 60 deletions(-) diff --git a/src/js.h b/src/js.h index 8f0f26044..79ce521cd 100644 --- a/src/js.h +++ b/src/js.h @@ -103,6 +103,9 @@ extern const int64_t jsDefaultRequestWait; #define jsAckInProgress "+WPI" #define jsAckTerm "+TERM" +// jsExtDomainT is used to create a StreamSource External APIPrefix +#define jsExtDomainT "$JS.%s.API" + // jsApiAccountInfo is for obtaining general information about JetStream. #define jsApiAccountInfo "%.*s.INFO" diff --git a/src/jsm.c b/src/jsm.c index 1fd25db59..c62fb3f83 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -264,8 +264,11 @@ _marshalExternalStream(jsExternalStream *external, const char *fieldName, natsBu IFOK(s, natsBuf_Append(buf, fieldName, -1)); IFOK(s, natsBuf_Append(buf, "\":{\"api\":\"", -1)); IFOK(s, natsBuf_Append(buf, external->APIPrefix, -1)); - IFOK(s, natsBuf_Append(buf, "\",\"deliver\":\"", -1)); - IFOK(s, natsBuf_Append(buf, external->DeliverPrefix, -1)); + if ((s == NATS_OK) && !nats_IsStringEmpty(external->DeliverPrefix)) + { + IFOK(s, natsBuf_Append(buf, "\",\"deliver\":\"", -1)); + IFOK(s, natsBuf_Append(buf, external->DeliverPrefix, -1)); + } IFOK(s, natsBuf_Append(buf, "\"}", -1)); return NATS_UPDATE_ERR_STACK(s); @@ -1061,6 +1064,95 @@ jsStreamConfig_Init(jsStreamConfig *cfg) return NATS_OK; } +static void +_restoreMirrorAndSourcesExternal(jsStreamConfig *cfg) +{ + int i; + + // We are guaranteed that if a source's Domain is set, there was originally + // no External value. So free any External value and reset to NULL to + // restore the original setting. + if ((cfg->Mirror != NULL) && !nats_IsStringEmpty(cfg->Mirror->Domain)) + { + _destroyExternalStream(cfg->Mirror->External); + cfg->Mirror->External = NULL; + } + for (i=0; iSourcesLen; i++) + { + jsStreamSource *src = cfg->Sources[i]; + if ((src != NULL) && !nats_IsStringEmpty(src->Domain)) + { + _destroyExternalStream(src->External); + src->External = NULL; + } + } +} + +static natsStatus +_convertDomain(jsStreamSource *src) +{ + jsExternalStream *e = NULL; + + e = (jsExternalStream*) NATS_CALLOC(1, sizeof(jsExternalStream)); + if (e == NULL) + return nats_setDefaultError(NATS_NO_MEMORY); + + if (nats_asprintf((char**) &(e->APIPrefix), jsExtDomainT, src->Domain) < 0) + { + NATS_FREE(e); + return nats_setDefaultError(NATS_NO_MEMORY); + } + src->External = e; + return NATS_OK; +} + +static natsStatus +_convertMirrorAndSourcesDomain(bool *converted, jsStreamConfig *cfg) +{ + natsStatus s = NATS_OK; + bool cm = false; + bool cs = false; + int i; + + *converted = false; + + if ((cfg->Mirror != NULL) && !nats_IsStringEmpty(cfg->Mirror->Domain)) + { + if (cfg->Mirror->External != NULL) + return nats_setError(NATS_INVALID_ARG, "%s", "mirror's domain and external are both set"); + cm = true; + } + for (i=0; iSourcesLen; i++) + { + jsStreamSource *src = cfg->Sources[i]; + if ((src != NULL) && !nats_IsStringEmpty(src->Domain)) + { + if (src->External != NULL) + return nats_setError(NATS_INVALID_ARG, "%s", "source's domain and external are both set"); + cs = true; + } + } + if (!cm && !cs) + return NATS_OK; + + if (cm) + s = _convertDomain(cfg->Mirror); + if ((s == NATS_OK) && cs) + { + for (i=0; (s == NATS_OK) && (iSourcesLen); i++) + { + jsStreamSource *src = cfg->Sources[i]; + if ((src != NULL) && !nats_IsStringEmpty(src->Domain)) + s = _convertDomain(src); + } + } + if (s == NATS_OK) + *converted = true; + else + _restoreMirrorAndSourcesExternal(cfg); + return NATS_UPDATE_ERR_STACK(s); +} + static natsStatus _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamConfig *cfg, jsOptions *opts, jsErrCode *errCode) { @@ -1071,6 +1163,7 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo natsConnection *nc = NULL; const char *apiT = NULL; bool freePfx = false; + bool msc = false; jsOptions o; if (errCode != NULL) @@ -1101,6 +1194,8 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo if (freePfx) NATS_FREE((char*) o.Prefix); } + if ((s == NATS_OK) && (action == jsStreamActionCreate)) + s = _convertMirrorAndSourcesDomain(&msc, cfg); // Marshal the stream create/update request IFOK(s, js_marshalStreamConfig(&buf, cfg)); @@ -1111,6 +1206,12 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo // If we got a response, check for error or return the stream info result. IFOK(s, _unmarshalStreamCreateResp(new_si, NULL, resp, errCode)); + // If mirror and/or sources were converted for the domain, then we need + // to restore the original values (which will free the memory that was + // allocated for the conversion). + if (msc) + _restoreMirrorAndSourcesExternal(cfg); + natsBuf_Destroy(buf); natsMsg_Destroy(resp); NATS_FREE(subj); diff --git a/src/kv.c b/src/kv.c index 9a798a538..50a28a659 100644 --- a/src/kv.c +++ b/src/kv.c @@ -21,9 +21,11 @@ #include "conn.h" #include "sub.h" -static const char *kvBucketNameTmpl = "KV_%s"; -static const char *kvSubjectsTmpl = "$KV.%s.>"; -static const char *kvSubjectsPreTmpl = "$KV.%s."; +static const char *kvBucketNamePre = "KV_"; +static const char *kvBucketNameTmpl = "KV_%s"; +static const char *kvSubjectsTmpl = "$KV.%s.>"; +static const char *kvSubjectsPreTmpl = "$KV.%s."; +static const char *kvSubjectsPreDomainTmpl = "%s.$KV.%s."; #define KV_WATCH_FOR_EVER (int64_t)(0x7FFFFFFFFFFFFFFF) @@ -119,6 +121,7 @@ _freeKV(kvStore *kv) NATS_FREE(kv->bucket); NATS_FREE(kv->stream); NATS_FREE(kv->pre); + NATS_FREE(kv->putPre); natsMutex_Destroy(kv->mu); NATS_FREE(kv); js_release(js); @@ -188,52 +191,38 @@ _createKV(kvStore **new_kv, jsCtx *js, const char *bucket) return NATS_UPDATE_ERR_STACK(s); } -static bool -_sameStrings(const char *s1, const char *s2) +static natsStatus +_changeBucketNameAndPutPrefixIfMirrorPresent(kvStore *kv, jsStreamInfo *si) { - bool s1Empty = nats_IsStringEmpty(s1); - bool s2Empty = nats_IsStringEmpty(s2); + natsStatus s = NATS_OK; + const char *bucket = NULL; + jsStreamSource *m = si->Config->Mirror; - // Same if both empty. - if (s1Empty && s2Empty) - return true; + if (m == NULL) + return NATS_OK; - // Not same if one is empty while other is not. - if ((s1Empty && !s2Empty) || (!s1Empty && s2Empty)) - return false; + bucket = m->Name; + if (strstr(m->Name, kvBucketNamePre) == m->Name) + bucket = m->Name + strlen(kvBucketNamePre); - // Return result of comparison of s1 and s2 - return (strcmp(s1, s2) == 0 ? true : false); -} + if ((m->External != NULL) && !nats_IsStringEmpty(m->External->APIPrefix)) + { + kv->useJSPrefix = false; -static bool -_sameStreamCfg(jsStreamConfig *oc, jsStreamConfig *nc) -{ - // Check some of the stream's configuration properties only, - // the ones that we set when creating a KV stream. - if (!_sameStrings(oc->Description, nc->Description)) - return false; - if (oc->SubjectsLen != nc->SubjectsLen) - return false; - if (!_sameStrings(oc->Subjects[0], nc->Subjects[0])) - return false; - if (oc->MaxMsgsPerSubject != nc->MaxMsgsPerSubject) - return false; - if (oc->MaxBytes != nc->MaxBytes) - return false; - if (oc->MaxAge != nc->MaxAge) - return false; - if (oc->MaxMsgSize != nc->MaxMsgSize) - return false; - if (oc->Storage != nc->Storage) - return false; - if (oc->Replicas != nc->Replicas) - return false; - if (oc->AllowRollup != nc->AllowRollup) - return false; - if (oc->DenyDelete != nc->DenyDelete) - return false; - return true; + NATS_FREE(kv->pre); + kv->pre = NULL; + if (nats_asprintf(&(kv->pre), kvSubjectsPreTmpl, bucket) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + else if (nats_asprintf(&(kv->putPre), kvSubjectsPreDomainTmpl, m->External->APIPrefix, bucket) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + } + else if (nats_asprintf(&(kv->putPre), kvSubjectsPreTmpl, bucket) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + + if (s == NATS_OK) + kv->usePutPre = true; + + return NATS_UPDATE_ERR_STACK(s); } natsStatus @@ -245,6 +234,8 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg) kvStore *kv = NULL; char *subject= NULL; jsStreamInfo *si = NULL; + const char *omn = NULL; + const char **osn = NULL; jsStreamConfig sc; if ((new_kv == NULL) || (js == NULL) || (cfg == NULL)) @@ -274,12 +265,11 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg) int64_t maxBytes = (cfg->MaxBytes == 0 ? -1 : cfg->MaxBytes); int32_t maxMsgSize = (cfg->MaxValueSize == 0 ? -1 : cfg->MaxValueSize); jsErrCode jerr = 0; + const char **subjects = (const char*[1]){subject}; jsStreamConfig_Init(&sc); sc.Name = kv->stream; sc.Description = cfg->Description; - sc.Subjects = (const char*[1]){subject}; - sc.SubjectsLen = 1; sc.MaxMsgsPerSubject = history; sc.MaxBytes = maxBytes; sc.MaxAge = cfg->TTL; @@ -291,15 +281,106 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg) sc.AllowDirect = true; sc.RePublish = cfg->RePublish; + if (cfg->Mirror != NULL) + { + jsStreamSource *m = cfg->Mirror; + + if (!nats_IsStringEmpty(m->Name) + && (strstr(m->Name, kvBucketNamePre) != m->Name)) + { + char *newName = NULL; + if (nats_asprintf(&newName, kvBucketNameTmpl, m->Name) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + { + omn = m->Name; + m->Name = newName; + } + } + sc.Mirror = m; + sc.MirrorDirect = true; + } + else if (cfg->SourcesLen > 0) + { + osn = (const char**) NATS_CALLOC(cfg->SourcesLen, sizeof(char*)); + if (osn == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + + if (s == NATS_OK) + { + int i; + + for (i=0; iSourcesLen; i++) + { + jsStreamSource *ss = cfg->Sources[i]; + + if (ss == NULL) + continue; + + // Set this regardless of error in the loop. We need it for + // proper cleanup at the end. + osn[i] = ss->Name; + + if ((s == NATS_OK) && !nats_IsStringEmpty(ss->Name) + && (strstr(ss->Name, kvBucketNamePre) != ss->Name)) + { + char *newName = NULL; + + if (nats_asprintf(&newName, kvBucketNameTmpl, ss->Name) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + ss->Name = newName; + } + } + if (s == NATS_OK) + { + sc.Sources = cfg->Sources; + sc.SourcesLen = cfg->SourcesLen; + } + } + } + else + { + sc.Subjects = subjects; + sc.SubjectsLen = 1; + } + // If connecting to a v2.7.2+, create with discard new policy if (natsConn_srvVersionAtLeast(kv->js->nc, 2, 7, 2)) sc.Discard = js_DiscardNew; s = js_AddStream(&si, js, &sc, NULL, &jerr); - // If the stream allow direct get message calls, then we will do so. if (s == NATS_OK) + { + // If the stream allow direct get message calls, then we will do so. kv->useDirect = si->Config->AllowDirect; + + s = _changeBucketNameAndPutPrefixIfMirrorPresent(kv, si); + } jsStreamInfo_Destroy(si); + + // Restore original mirror/source names + if (omn != NULL) + { + NATS_FREE((char*) cfg->Mirror->Name); + cfg->Mirror->Name = omn; + } + if (osn != NULL) + { + int i; + + for (i=0; iSourcesLen; i++) + { + jsStreamSource *ss = cfg->Sources[i]; + + if ((ss != NULL) && (ss->Name != osn[i])) + { + NATS_FREE((char*) ss->Name); + ss->Name = osn[i]; + } + } + NATS_FREE((char**) osn); + } } if (s == NATS_OK) *new_kv = kv; @@ -336,6 +417,8 @@ js_KeyValue(kvStore **new_kv, jsCtx *js, const char *bucket) if (si->Config->MaxMsgsPerSubject < 1) s = nats_setError(NATS_INVALID_ARG, "%s", kvErrBadBucket); + IFOK(s, _changeBucketNameAndPutPrefixIfMirrorPresent(kv, si)); + jsStreamInfo_Destroy(si); } @@ -563,7 +646,8 @@ _putEntry(uint64_t *rev, kvStore *kv, jsPubOptions *po, const char *key, const v natsStatus s = NATS_OK; jsPubAck *pa = NULL; jsPubAck **ppa = NULL; - DEFINE_BUF_FOR_SUBJECT; + char buffer[128]; + natsBuffer buf; if (rev != NULL) { @@ -577,7 +661,15 @@ _putEntry(uint64_t *rev, kvStore *kv, jsPubOptions *po, const char *key, const v if (!validKey(key)) return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey); - BUILD_SUBJECT(USE_JS_PREFIX); + s = natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)); + if (kv->useJSPrefix) + { + IFOK(s, natsBuf_Append(&buf, kv->js->opts.Prefix, -1)); + IFOK(s, natsBuf_AppendByte(&buf, '.')); + } + IFOK(s, natsBuf_Append(&buf, (kv->usePutPre ? kv->putPre : kv->pre), -1)); + IFOK(s, natsBuf_Append(&buf, key, -1)); + IFOK(s, natsBuf_AppendByte(&buf, 0)); IFOK(s, js_Publish(ppa, kv->js, natsBuf_Data(&buf), data, len, po, NULL)); if ((s == NATS_OK) && (rev != NULL)) diff --git a/src/nats.h b/src/nats.h index 00112540e..bfb1c90cd 100644 --- a/src/nats.h +++ b/src/nats.h @@ -375,6 +375,10 @@ typedef struct jsStreamSource int64_t OptStartTime; ///< UTC time expressed as number of nanoseconds since epoch. const char *FilterSubject; jsExternalStream *External; + // Domain and External are mutually exclusive. + // If Domain is set, an External value will be created with + // the APIPrefix constructed based on the Domain value. + const char *Domain; } jsStreamSource; @@ -1189,6 +1193,9 @@ typedef struct kvConfig jsStorageType StorageType; int Replicas; jsRePublish *RePublish; + jsStreamSource *Mirror; + jsStreamSource **Sources; + int SourcesLen; } kvConfig; diff --git a/src/natsp.h b/src/natsp.h index 8385e4844..fb6dde262 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -433,6 +433,8 @@ struct __kvStore char *bucket; char *stream; char *pre; + char *putPre; + bool usePutPre; bool useJSPrefix; bool useDirect; diff --git a/test/list.txt b/test/list.txt index 38f5138ef..83690fdd8 100644 --- a/test/list.txt +++ b/test/list.txt @@ -253,6 +253,7 @@ KeyValueCrossAccount KeyValueDiscardOldToNew KeyValueRePublish KeyValueMirrorDirectGet +KeyValueMirrorCrossDomains StanPBufAllocator StanConnOptions StanSubOptions diff --git a/test/test.c b/test/test.c index c1834a502..c8982fc22 100644 --- a/test/test.c +++ b/test/test.c @@ -5509,7 +5509,8 @@ _stopServer(natsPid pid) CloseHandle(pid->hThread); natsMutex_Lock(slMu); - natsHash_Remove(slMap, (int64_t) pid); + if (slMap != NULL) + natsHash_Remove(slMap, (int64_t) pid); natsMutex_Unlock(slMu); free(pid); @@ -5614,7 +5615,8 @@ _startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts } natsMutex_Lock(slMu); - natsHash_Set(slMap, (int64_t) pid, NULL, NULL); + if (slMap != NULL) + natsHash_Set(slMap, (int64_t) pid, NULL, NULL); natsMutex_Unlock(slMu); return (natsPid) pid; @@ -5644,7 +5646,8 @@ _stopServer(natsPid pid) waitpid(pid, &status, 0); natsMutex_Lock(slMu); - natsHash_Remove(slMap, (int64_t) pid); + if (slMap != NULL) + natsHash_Remove(slMap, (int64_t) pid); natsMutex_Unlock(slMu); } @@ -5720,7 +5723,8 @@ _startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts } natsMutex_Lock(slMu); - natsHash_Set(slMap, (int64_t) pid, NULL, NULL); + if (slMap != NULL) + natsHash_Set(slMap, (int64_t) pid, NULL, NULL); natsMutex_Unlock(slMu); // parent, return the child's PID back. @@ -22683,6 +22687,8 @@ test_JetStreamMgtStreams(void) jsStreamInfoList *siList = NULL; jsStreamNamesList *snList = NULL; int count = 0; + jsStreamSource ss; + jsExternalStream se; jsOptions o; int i; @@ -23107,6 +23113,7 @@ test_JetStreamMgtStreams(void) natsMsg_GetData(resp), natsMsg_GetDataLength(resp)) == 0)); jsStreamInfo_Destroy(si); + si = NULL; natsMsg_Destroy(resp); resp = NULL; @@ -23150,6 +23157,7 @@ test_JetStreamMgtStreams(void) && (strcmp(si->Config->Subjects[0], "foo.>") == 0) && (strcmp(si->Config->Subjects[1], "bar.*") == 0)); jsStreamInfo_Destroy(si); + si = NULL; test("List stream infos (bad args): "); s = js_Streams(NULL, js, NULL, NULL); @@ -23286,6 +23294,35 @@ test_JetStreamMgtStreams(void) s = js_StreamNames(&snList, js, &o, &jerr); testCond((s == NATS_NOT_FOUND) && (snList == NULL)); + test("Mirror domain and external set error: "); + jsStreamConfig_Init(&cfg); + cfg.Name = "MDESET"; + jsStreamSource_Init(&ss); + ss.Domain = "Domain"; + jsExternalStream_Init(&se); + se.DeliverPrefix = "some.prefix"; + ss.External = &se; + cfg.Mirror = &ss; + s = js_AddStream(&si, js, &cfg, NULL, NULL); + testCond((s == NATS_INVALID_ARG) && (si == NULL) + && (strstr(nats_GetLastError(NULL), "domain and external are both set") != NULL)); + nats_clearLastError(); + + test("Source domain and external set error: "); + jsStreamConfig_Init(&cfg); + cfg.Name = "SDESET"; + jsStreamSource_Init(&ss); + ss.Domain = "Domain"; + jsExternalStream_Init(&se); + se.DeliverPrefix = "some.prefix"; + ss.External = &se; + cfg.Sources = (jsStreamSource*[1]){&ss}; + cfg.SourcesLen = 1; + s = js_AddStream(&si, js, &cfg, NULL, NULL); + testCond((s == NATS_INVALID_ARG) && (si == NULL) + && (strstr(nats_GetLastError(NULL), "domain and external are both set") != NULL)); + nats_clearLastError(); + JS_TEARDOWN; } @@ -31222,6 +31259,228 @@ test_KeyValueMirrorDirectGet(void) JS_TEARDOWN; } +static void +test_KeyValueMirrorCrossDomains(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsConnection *lnc= NULL; + jsCtx *js = NULL; + jsCtx *ljs= NULL; + jsCtx *rjs= NULL; + natsPid pid = NATS_INVALID_PID; + natsPid pid2= NATS_INVALID_PID; + jsOptions o; + jsErrCode jerr = 0; + char datastore[256] = {'\0'}; + char datastore2[256] = {'\0'}; + char cmdLine[1024] = {'\0'}; + char confFile[256] = {'\0'}; + char lconfFile[256] = {'\0'}; + kvStore *kv = NULL; + kvStore *lkv = NULL; + kvStore *mkv = NULL; + kvStore *rkv = NULL; + kvEntry *e = NULL; + jsStreamInfo *si = NULL; + natsSubscription *sub = NULL; + kvConfig kvc; + jsStreamSource src; + int i; + + ENSURE_JS_VERSION(2, 9, 0); + + _makeUniqueDir(datastore, sizeof(datastore), "datastore_"); + _createConfFile(confFile, sizeof(confFile), + "server_name: HUB\n"\ + "listen: 127.0.0.1:4222\n"\ + "jetstream: { domain: HUB }\n"\ + "leafnodes { listen: 127.0.0.1:7422 }\n"); + + test("Start hub: "); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -c %s", datastore, confFile); + pid = _startServer("nats://127.0.0.1:4222", cmdLine, true); + CHECK_SERVER_STARTED(pid); + testCond(true); + + _makeUniqueDir(datastore2, sizeof(datastore2), "datastore_"); + _createConfFile(lconfFile, sizeof(lconfFile), + "server_name: LEAF\n"\ + "listen: 127.0.0.1:4223\n"\ + "jetstream: { domain: LEAF }\n"\ + "leafnodes {\n"\ + " remotes = [ { url: leaf://127.0.0.1:7422 } ]\n"\ + "}\n"); + + test("Start leaf: "); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -c %s", datastore, lconfFile); + pid2 = _startServer("nats://127.0.0.1:4223", cmdLine, true); + CHECK_SERVER_STARTED(pid2); + testCond(true); + + test("Connect to hub: "); + s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + testCond(s == NATS_OK); + + test("Sub to check LF connectivity: "); + s = natsConnection_SubscribeSync(&sub, nc, "check"); + IFOK(s, natsConnection_Flush(nc)); + testCond(s == NATS_OK); + + test("Get context: "); + s = natsConnection_JetStream(&js, nc, NULL); + testCond(s == NATS_OK); + + test("Create KV value: "); + kvConfig_Init(&kvc); + kvc.Bucket = "TEST"; + s = js_CreateKeyValue(&kv, js, &kvc); + testCond(s == NATS_OK); + + test("Put keys: "); + s = kvStore_PutString(NULL, kv, "name", "derek"); + IFOK(s, kvStore_PutString(NULL, kv, "age", "22")); + testCond(s == NATS_OK); + + test("Connect to leaf: "); + s = natsConnection_ConnectTo(&lnc, "nats://127.0.0.1:4223"); + testCond(s == NATS_OK); + + test("Check connectivity: "); + for (i=0; i<10; i++) + { + s = natsConnection_PublishString(lnc, "check", "hello"); + if (s == NATS_OK) + { + natsMsg *msg = NULL; + s = natsSubscription_NextMsg(&msg, sub, 500); + natsMsg_Destroy(msg); + if (s == NATS_OK) + break; + } + } + testCond(s == NATS_OK); + + test("Get context: "); + s = natsConnection_JetStream(&ljs, lnc, NULL); + testCond(s == NATS_OK); + + test("Create KV: "); + kvConfig_Init(&kvc); + kvc.Bucket = "MIRROR"; + jsStreamSource_Init(&src); + src.Name = "TEST"; + src.Domain = "HUB"; + kvc.Mirror = &src; + s = js_CreateKeyValue(&lkv, ljs, &kvc); + testCond(s == NATS_OK); + + test("Check config not changed: "); + testCond((strcmp(kvc.Bucket, "MIRROR") == 0) + && (kvc.Mirror != NULL) + && (strcmp(kvc.Mirror->Name, "TEST") == 0) + && (strcmp(kvc.Mirror->Domain, "HUB") == 0) + && (kvc.Mirror->External == NULL)); + + test("Get stream info: "); + s = js_GetStreamInfo(&si, ljs, "KV_MIRROR", NULL, &jerr); + testCond((s == NATS_OK) && (si != NULL) && (jerr == 0)); + + test("Check mirror direct: "); + testCond(si->Config->MirrorDirect); + jsStreamInfo_Destroy(si); + si = NULL; + + test("Check mirror syncs: "); + for (i=0; i<10; i++) + { + s = js_GetStreamInfo(&si, ljs, "KV_MIRROR", NULL, NULL); + if (s != NATS_OK) + break; + + if (si->State.Msgs != 2) + s = NATS_ERR; + + jsStreamInfo_Destroy(si); + si = NULL; + if (s == NATS_OK) + break; + nats_Sleep(250); + } + testCond(s == NATS_OK); + + // Bind locally from leafnode and make sure both get and put work. + test("Leaf KV: "); + s = js_KeyValue(&mkv, ljs, "MIRROR"); + testCond(s == NATS_OK); + + test("Put key: "); + s = kvStore_PutString(NULL, mkv, "name", "rip"); + testCond(s == NATS_OK); + + test("Get key: "); + s = kvStore_Get(&e, mkv, "name"); + if ((s == NATS_OK) && (e != NULL)) + s = (strcmp(kvEntry_ValueString(e), "rip") == 0 ? NATS_OK : NATS_ERR); + testCond(s == NATS_OK); + kvEntry_Destroy(e); + e = NULL; + + test("Get context for HUB: "); + jsOptions_Init(&o); + o.Domain = "HUB"; + s = natsConnection_JetStream(&rjs, lnc, &o); + testCond(s == NATS_OK); + + test("Get KV: "); + s = js_KeyValue(&rkv, rjs, "TEST"); + testCond(s == NATS_OK); + + test("Put key: "); + s = kvStore_PutString(NULL, rkv, "name", "ivan"); + testCond(s == NATS_OK); + + test("Get key: "); + s = kvStore_Get(&e, rkv, "name"); + if ((s == NATS_OK) && (e != NULL)) + s = (strcmp(kvEntry_ValueString(e), "ivan") == 0 ? NATS_OK : NATS_ERR); + testCond(s == NATS_OK); + kvEntry_Destroy(e); + e = NULL; + + test("Shutdown hub: "); + jsCtx_Destroy(js); + kvStore_Destroy(kv); + natsConnection_Destroy(nc); + _stopServer(pid); + pid = NATS_INVALID_PID; + testCond(true); + nats_Sleep(500); + + test("Get key: "); + // Use mkv here, not rkv. + s = kvStore_Get(&e, mkv, "name"); + if ((s == NATS_OK) && (e != NULL)) + s = (strcmp(kvEntry_ValueString(e), "ivan") == 0 ? NATS_OK : NATS_ERR); + testCond(s == NATS_OK); + kvEntry_Destroy(e); + e = NULL; + + natsSubscription_Destroy(sub); + kvStore_Destroy(rkv); + kvStore_Destroy(mkv); + kvStore_Destroy(lkv); + jsCtx_Destroy(rjs); + jsCtx_Destroy(ljs); + natsConnection_Destroy(lnc); + _stopServer(pid2); + rmtree(datastore2); + _stopServer(pid); + rmtree(datastore); + remove(confFile); + remove(lconfFile); +} + #if defined(NATS_HAS_STREAMING) static int @@ -33690,6 +33949,7 @@ static testInfo allTests[] = {"KeyValueDiscardOldToNew", test_KeyValueDiscardOldToNew}, {"KeyValueRePublish", test_KeyValueRePublish}, {"KeyValueMirrorDirectGet", test_KeyValueMirrorDirectGet}, + {"KeyValueMirrorCrossDomains", test_KeyValueMirrorCrossDomains}, #if defined(NATS_HAS_STREAMING) {"StanPBufAllocator", test_StanPBufAllocator}, @@ -33855,16 +34115,34 @@ int main(int argc, char **argv) // Shutdown servers that are still running likely due to failed test { + natsHash *pids = NULL; natsHashIter iter; int64_t key; - natsMutex_Lock(slMu); - natsHashIter_Init(&iter, slMap); + if (natsHash_Create(&pids, 16) == NATS_OK) + { + natsMutex_Lock(slMu); + natsHashIter_Init(&iter, slMap); + while (natsHashIter_Next(&iter, &key, NULL)) + { + natsHash_Set(pids, key, NULL, NULL); + natsHashIter_RemoveCurrent(&iter); + } + natsHashIter_Done(&iter); + natsHash_Destroy(slMap); + slMap = NULL; + natsMutex_Unlock(slMu); - while (natsHashIter_Next(&iter, &key, NULL)) - _stopServer((natsPid) key); + natsHashIter_Init(&iter, pids); + while (natsHashIter_Next(&iter, &key, NULL)) + _stopServer((natsPid) key); - natsHash_Destroy(slMap); + natsHash_Destroy(pids); + } + else + { + natsHash_Destroy(slMap); + } natsMutex_Destroy(slMu); } From 4613e1f7704db143da040c61a7cd7222cab9cc21 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 27 Oct 2022 18:58:50 -0600 Subject: [PATCH 5/6] Add test for watcher/delete/purge deletes Needed to have delete use the put prefix and bind to the stream when creating the watcher's subscription because in case of mirror the subject would not allow to find the stream. Signed-off-by: Ivan Kozlovic --- src/kv.c | 37 +++++------ test/test.c | 188 +++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 176 insertions(+), 49 deletions(-) diff --git a/src/kv.c b/src/kv.c index 50a28a659..a82826876 100644 --- a/src/kv.c +++ b/src/kv.c @@ -36,14 +36,17 @@ natsBuffer buf; #define USE_JS_PREFIX true #define KEY_NAME_ONLY false -#define BUILD_SUBJECT(p) \ +#define FOR_A_PUT true +#define NOT_FOR_A_PUT false + +#define BUILD_SUBJECT(p, fp) \ s = natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)); \ if ((p) && kv->useJSPrefix) \ { \ IFOK(s, natsBuf_Append(&buf, kv->js->opts.Prefix, -1)); \ IFOK(s, natsBuf_AppendByte(&buf, '.')); \ } \ -IFOK(s, natsBuf_Append(&buf, kv->pre, -1)); \ +IFOK(s, natsBuf_Append(&buf, ((fp) ? (kv->usePutPre ? kv->putPre : kv->pre) : kv->pre), -1)); \ IFOK(s, natsBuf_Append(&buf, key, -1)); \ IFOK(s, natsBuf_AppendByte(&buf, 0)); @@ -192,7 +195,7 @@ _createKV(kvStore **new_kv, jsCtx *js, const char *bucket) } static natsStatus -_changeBucketNameAndPutPrefixIfMirrorPresent(kvStore *kv, jsStreamInfo *si) +_changePutPrefixIfMirrorPresent(kvStore *kv, jsStreamInfo *si) { natsStatus s = NATS_OK; const char *bucket = NULL; @@ -355,7 +358,7 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg) // If the stream allow direct get message calls, then we will do so. kv->useDirect = si->Config->AllowDirect; - s = _changeBucketNameAndPutPrefixIfMirrorPresent(kv, si); + s = _changePutPrefixIfMirrorPresent(kv, si); } jsStreamInfo_Destroy(si); @@ -417,7 +420,7 @@ js_KeyValue(kvStore **new_kv, jsCtx *js, const char *bucket) if (si->Config->MaxMsgsPerSubject < 1) s = nats_setError(NATS_INVALID_ARG, "%s", kvErrBadBucket); - IFOK(s, _changeBucketNameAndPutPrefixIfMirrorPresent(kv, si)); + IFOK(s, _changePutPrefixIfMirrorPresent(kv, si)); jsStreamInfo_Destroy(si); } @@ -540,7 +543,7 @@ _getEntry(kvEntry **new_entry, bool *deleted, kvStore *kv, const char *key, uint if (!validKey(key)) return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey); - BUILD_SUBJECT(KEY_NAME_ONLY); + BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT); if (kv->useDirect) { @@ -646,8 +649,7 @@ _putEntry(uint64_t *rev, kvStore *kv, jsPubOptions *po, const char *key, const v natsStatus s = NATS_OK; jsPubAck *pa = NULL; jsPubAck **ppa = NULL; - char buffer[128]; - natsBuffer buf; + DEFINE_BUF_FOR_SUBJECT; if (rev != NULL) { @@ -661,15 +663,7 @@ _putEntry(uint64_t *rev, kvStore *kv, jsPubOptions *po, const char *key, const v if (!validKey(key)) return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey); - s = natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)); - if (kv->useJSPrefix) - { - IFOK(s, natsBuf_Append(&buf, kv->js->opts.Prefix, -1)); - IFOK(s, natsBuf_AppendByte(&buf, '.')); - } - IFOK(s, natsBuf_Append(&buf, (kv->usePutPre ? kv->putPre : kv->pre), -1)); - IFOK(s, natsBuf_Append(&buf, key, -1)); - IFOK(s, natsBuf_AppendByte(&buf, 0)); + BUILD_SUBJECT(USE_JS_PREFIX, FOR_A_PUT); IFOK(s, js_Publish(ppa, kv->js, natsBuf_Data(&buf), data, len, po, NULL)); if ((s == NATS_OK) && (rev != NULL)) @@ -772,7 +766,7 @@ _delete(kvStore *kv, const char *key, bool purge, kvPurgeOptions *opts) if (!validKey(key)) return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey); - BUILD_SUBJECT(USE_JS_PREFIX); + BUILD_SUBJECT(USE_JS_PREFIX, FOR_A_PUT); IFOK(s, natsMsg_Create(&msg, natsBuf_Data(&buf), NULL, NULL, 0)); if (s == NATS_OK) { @@ -885,6 +879,7 @@ kvStore_PurgeDeletes(kvStore *kv, kvPurgeOptions *opts) for (; h != NULL; ) { natsBuf_Reset(&buf); + // Use kv->pre here, always. IFOK(s, natsBuf_Append(&buf, kv->pre, -1)); IFOK(s, natsBuf_Append(&buf, h->key, -1)); IFOK(s, natsBuf_AppendByte(&buf, '\0')); @@ -991,6 +986,7 @@ kvWatcher_Next(kvEntry **new_entry, kvWatcher *w, int64_t timeout) } w->refs--; + // Use kv->pre here, always. if ((s == NATS_OK) && (strlen(msg->subject) <= strlen(w->kv->pre))) s = nats_setError(NATS_ERR, "invalid update's subject '%s'", msg->subject); @@ -1080,7 +1076,7 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti w->kv = kv; w->refs = 1; - BUILD_SUBJECT(KEY_NAME_ONLY); + BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT); IFOK(s, natsMutex_Create(&(w->mu))); if (s == NATS_OK) { @@ -1097,6 +1093,9 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti if (opts->IgnoreDeletes) w->ignoreDel = true; } + // Need to explicitly bind to the stream here because the subject + // we construct may not help find the stream when using mirrors. + so.Stream = kv->stream; s = js_SubscribeSync(&(w->sub), kv->js, natsBuf_Data(&buf), NULL, &so, NULL); IFOK(s, natsSubscription_SetPendingLimits(w->sub, -1, -1)); if (s == NATS_OK) diff --git a/test/test.c b/test/test.c index c8982fc22..36c117ce9 100644 --- a/test/test.c +++ b/test/test.c @@ -31259,6 +31259,40 @@ test_KeyValueMirrorDirectGet(void) JS_TEARDOWN; } +static natsStatus +_connectToHubAndCheckLeaf(natsConnection **hub, natsConnection *lnc) +{ + natsStatus s = NATS_OK; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; + int i; + + s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + IFOK(s, natsConnection_SubscribeSync(&sub, nc, "check")); + IFOK(s, natsConnection_Flush(nc)); + if (s == NATS_OK) + { + for (i=0; i<10; i++) + { + s = natsConnection_PublishString(lnc, "check", "hello"); + if (s == NATS_OK) + { + natsMsg *msg = NULL; + s = natsSubscription_NextMsg(&msg, sub, 500); + natsMsg_Destroy(msg); + if (s == NATS_OK) + break; + } + } + } + natsSubscription_Destroy(sub); + if (s == NATS_OK) + *hub = nc; + else + natsConnection_Destroy(nc); + return s; +} + static void test_KeyValueMirrorCrossDomains(void) { @@ -31283,7 +31317,9 @@ test_KeyValueMirrorCrossDomains(void) kvStore *rkv = NULL; kvEntry *e = NULL; jsStreamInfo *si = NULL; - natsSubscription *sub = NULL; + kvWatcher *w = NULL; + int ok = 0; + kvPurgeOptions po; kvConfig kvc; jsStreamSource src; int i; @@ -31318,13 +31354,16 @@ test_KeyValueMirrorCrossDomains(void) CHECK_SERVER_STARTED(pid2); testCond(true); - test("Connect to hub: "); - s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + test("Connect to leaf: "); + s = natsConnection_ConnectTo(&lnc, "nats://127.0.0.1:4223"); testCond(s == NATS_OK); - test("Sub to check LF connectivity: "); - s = natsConnection_SubscribeSync(&sub, nc, "check"); - IFOK(s, natsConnection_Flush(nc)); + test("Get context: "); + s = natsConnection_JetStream(&ljs, lnc, NULL); + testCond(s == NATS_OK); + + test("Connect to hub and check connectivity through leaf: "); + s = _connectToHubAndCheckLeaf(&nc, lnc); testCond(s == NATS_OK); test("Get context: "); @@ -31342,29 +31381,6 @@ test_KeyValueMirrorCrossDomains(void) IFOK(s, kvStore_PutString(NULL, kv, "age", "22")); testCond(s == NATS_OK); - test("Connect to leaf: "); - s = natsConnection_ConnectTo(&lnc, "nats://127.0.0.1:4223"); - testCond(s == NATS_OK); - - test("Check connectivity: "); - for (i=0; i<10; i++) - { - s = natsConnection_PublishString(lnc, "check", "hello"); - if (s == NATS_OK) - { - natsMsg *msg = NULL; - s = natsSubscription_NextMsg(&msg, sub, 500); - natsMsg_Destroy(msg); - if (s == NATS_OK) - break; - } - } - testCond(s == NATS_OK); - - test("Get context: "); - s = natsConnection_JetStream(&ljs, lnc, NULL); - testCond(s == NATS_OK); - test("Create KV: "); kvConfig_Init(&kvc); kvc.Bucket = "MIRROR"; @@ -31452,6 +31468,7 @@ test_KeyValueMirrorCrossDomains(void) jsCtx_Destroy(js); kvStore_Destroy(kv); natsConnection_Destroy(nc); + nc = NULL; _stopServer(pid); pid = NATS_INVALID_PID; testCond(true); @@ -31466,7 +31483,118 @@ test_KeyValueMirrorCrossDomains(void) kvEntry_Destroy(e); e = NULL; - natsSubscription_Destroy(sub); + test("Create watcher (name): "); + s = kvStore_Watch(&w, mkv, "name", NULL); + testCond(s == NATS_OK); + + test("Check watcher: "); + s = kvWatcher_Next(&e, w, 1000); + if (s == NATS_OK) + { + if ((strcmp(kvEntry_Key(e), "name") != 0) || (strcmp(kvEntry_ValueString(e), "ivan") != 0)) + s = NATS_ERR; + kvEntry_Destroy(e); + e = NULL; + } + IFOK(s, kvWatcher_Next(&e, w, 1000)); + if (s == NATS_OK) + { + if ((kvEntry_Key(e) != NULL) || (kvEntry_ValueString(e) != NULL)) + s = NATS_ERR; + kvEntry_Destroy(e); + e = NULL; + } + testCond(s == NATS_OK); + + test("No more: "); + s = kvWatcher_Next(&e, w, 250); + testCond((s == NATS_TIMEOUT) && (e == NULL)); + nats_clearLastError(); + + kvWatcher_Destroy(w); + w = NULL; + + test("Create watcher (all): ") + s = kvStore_WatchAll(&w, mkv, NULL); + testCond((s == NATS_OK) && (w != NULL)); + + test("Check watcher: "); + s = kvWatcher_Next(&e, w, 1000); + if (s == NATS_OK) + { + if ((strcmp(kvEntry_Key(e), "age") != 0) || (strcmp(kvEntry_ValueString(e), "22") != 0)) + s = NATS_ERR; + kvEntry_Destroy(e); + e = NULL; + } + IFOK(s, kvWatcher_Next(&e, w, 1000)); + if (s == NATS_OK) + { + if ((strcmp(kvEntry_Key(e), "name") != 0) || (strcmp(kvEntry_ValueString(e), "ivan") != 0)) + s = NATS_ERR; + kvEntry_Destroy(e); + e = NULL; + } + IFOK(s, kvWatcher_Next(&e, w, 1000)); + if (s == NATS_OK) + { + if ((kvEntry_Key(e) != NULL) || (kvEntry_ValueString(e) != NULL)) + s = NATS_ERR; + kvEntry_Destroy(e); + e = NULL; + } + testCond(s == NATS_OK); + + test("No more: "); + s = kvWatcher_Next(&e, w, 250); + testCond((s == NATS_TIMEOUT) && (e == NULL)); + nats_clearLastError(); + + test("Restart hub: "); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -c %s", datastore, confFile); + pid = _startServer("nats://127.0.0.1:4222", cmdLine, true); + CHECK_SERVER_STARTED(pid); + testCond(true); + + test("Connect to hub and check connectivity through leaf: "); + s = _connectToHubAndCheckLeaf(&nc, lnc); + testCond(s == NATS_OK); + + test("Delete keys: "); + s = kvStore_Delete(mkv, "age"); + IFOK(s, kvStore_Delete(mkv, "name")); + testCond(s == NATS_OK); + + test("Check mirror syncs: "); + for (i=0; (ok != 2) && (i < 10); i++) + { + if (kvWatcher_Next(&e, w, 1000) == NATS_OK) + { + if (((strcmp(kvEntry_Key(e), "age") == 0) || (strcmp(kvEntry_Key(e), "name") == 0)) + && (kvEntry_Operation(e) == kvOp_Delete)) + { + ok++; + } + kvEntry_Destroy(e); + e = NULL; + } + } + testCond((s == NATS_OK) && (ok == 2)); + + test("Purge deletes: "); + kvPurgeOptions_Init(&po); + po.DeleteMarkersOlderThan = -1; + s = kvStore_PurgeDeletes(mkv, &po); + testCond(s == NATS_OK); + + nats_clearLastError(); + test("Check stream: "); + s = js_GetStreamInfo(&si, ljs, "KV_MIRROR", NULL, NULL); + testCond((s == NATS_OK) && (si != NULL) && (si->State.Msgs == 0)); + jsStreamInfo_Destroy(si); + + kvWatcher_Destroy(w); + natsConnection_Destroy(nc); kvStore_Destroy(rkv); kvStore_Destroy(mkv); kvStore_Destroy(lkv); From 1e8a57c4ea548f9079b4dc80cae41908b6766c93 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 4 Nov 2022 13:51:56 -0600 Subject: [PATCH 6/6] [ADDED] Stream alternates in `jsStreamInfo` `jsStreamInfo` will now possibly have a list of stream alternates represented by a mirror: ``` typedef struct jsStreamInfo { (...) jsStreamAlternate **Alternates; int AlternatesLen; } jsStreamInfo; The definition of a stream alternate is: ``` typedef struct jsStreamAlternate { const char *Name; const char *Domain; const char *Cluster; } jsStreamAlternate; ``` Also added some of the new JS specific error codes. Signed-off-by: Ivan Kozlovic --- src/jsm.c | 57 +++++++++++++++++++++ src/nats.h | 13 +++++ src/status.h | 7 +++ test/list.txt | 1 + test/test.c | 139 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 217 insertions(+) diff --git a/src/jsm.c b/src/jsm.c index c62fb3f83..2a5d5b73d 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -211,6 +211,18 @@ js_cleanStreamState(jsStreamState *state) _destroyStreamStateSubjects(state->Subjects); } +static void +_destroyStreamAlternate(jsStreamAlternate *sa) +{ + if (sa == NULL) + return; + + NATS_FREE((char*) sa->Name); + NATS_FREE((char*) sa->Domain); + NATS_FREE((char*) sa->Cluster); + NATS_FREE(sa); +} + void jsStreamInfo_Destroy(jsStreamInfo *si) { @@ -226,6 +238,9 @@ jsStreamInfo_Destroy(jsStreamInfo *si) for (i=0; iSourcesLen; i++) _destroyStreamSourceInfo(si->Sources[i]); NATS_FREE(si->Sources); + for (i=0; iAlternatesLen; i++) + _destroyStreamAlternate(si->Alternates[i]); + NATS_FREE(si->Alternates); NATS_FREE(si); } @@ -955,12 +970,36 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour return NATS_UPDATE_ERR_STACK(s); } +static natsStatus +_unmarshalStreamAlternate(nats_JSON *json, jsStreamAlternate **new_alt) +{ + jsStreamAlternate *sa = NULL; + natsStatus s = NATS_OK; + + sa = (jsStreamAlternate*) NATS_CALLOC(1, sizeof(jsStreamAlternate)); + if (sa == NULL) + return nats_setDefaultError(NATS_NO_MEMORY); + + s = nats_JSONGetStr(json, "name", (char**) &(sa->Name)); + IFOK(s, nats_JSONGetStr(json, "domain", (char**) &(sa->Domain))); + IFOK(s, nats_JSONGetStr(json, "cluster", (char**) &(sa->Cluster))); + + if (s == NATS_OK) + *new_alt = sa; + else + _destroyStreamAlternate(sa); + + return NATS_UPDATE_ERR_STACK(s); +} + static natsStatus _unmarshalStreamInfoPaged(nats_JSON *json, jsStreamInfo **new_si, apiPaged *page) { jsStreamInfo *si = NULL; nats_JSON **sources = NULL; int sourcesLen = 0; + nats_JSON **alts = NULL; + int altsLen = 0; natsStatus s; si = (jsStreamInfo*) NATS_CALLOC(1, sizeof(jsStreamInfo)); @@ -991,6 +1030,24 @@ _unmarshalStreamInfoPaged(nats_JSON *json, jsStreamInfo **new_si, apiPaged *page // Free the array of JSON objects that was allocated by nats_JSONGetArrayObject. NATS_FREE(sources); } + IFOK(s, nats_JSONGetArrayObject(json, "alternates", &alts, &altsLen)); + if ((s == NATS_OK) && (alts != NULL)) + { + int i; + + si->Alternates = (jsStreamAlternate**) NATS_CALLOC(altsLen, sizeof(jsStreamAlternate*)); + if (si->Alternates == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + + for (i=0; (s == NATS_OK) && (iAlternates[i])); + if (s == NATS_OK) + si->AlternatesLen++; + } + // Free the array of JSON objects that was allocated by nats_JSONGetArrayObject. + NATS_FREE(alts); + } if ((s == NATS_OK) && (page != NULL)) { IFOK(s, nats_JSONGetLong(json, "total", &page->total)); diff --git a/src/nats.h b/src/nats.h index bfb1c90cd..ca3bab9ec 100644 --- a/src/nats.h +++ b/src/nats.h @@ -635,6 +635,17 @@ typedef struct jsStreamSourceInfo } jsStreamSourceInfo; +/** + * Information about an alternate stream represented by a mirror. + */ +typedef struct jsStreamAlternate +{ + const char *Name; + const char *Domain; + const char *Cluster; + +} jsStreamAlternate; + /** * Configuration and current state for this stream. * @@ -650,6 +661,8 @@ typedef struct jsStreamInfo jsStreamSourceInfo *Mirror; jsStreamSourceInfo **Sources; int SourcesLen; + jsStreamAlternate **Alternates; + int AlternatesLen; } jsStreamInfo; diff --git a/src/status.h b/src/status.h index 729d52f01..ee8bfe703 100644 --- a/src/status.h +++ b/src/status.h @@ -258,6 +258,13 @@ typedef enum { JSStreamMoveInProgressErr = 10124, ///< Stream move already in progress JSConsumerMaxRequestBatchExceededErr = 10125, ///< Consumer max request batch exceeds server limit JSConsumerReplicasExceedsStreamErr = 10126, ///< Consumer config replica count exceeds parent stream + JSConsumerNameContainsPathSeparatorsErr = 10127, ///< Consumer name can not contain path separators + JSStreamNameContainsPathSeparatorsErr = 10128, ///< Stream name can not contain path separators + JSStreamMoveNotInProgressErr = 10129, ///< Stream move not in progress + JSStreamNameExistRestoreFailedErr = 10130, ///< Stream name already in use, cannot restore + JSConsumerCreateFilterSubjectMismatchErr = 10131, ///< Consumer create request did not match filtered subject from create subject + JSConsumerCreateDurableAndNameMismatchErr = 10132, ///< Consumer Durable and Name have to be equal if both are provided + JSReplicasCountCannotBeNegativeErr = 10133, ///< Replicas count cannot be negative } jsErrCode; diff --git a/test/list.txt b/test/list.txt index 83690fdd8..ebc513c84 100644 --- a/test/list.txt +++ b/test/list.txt @@ -241,6 +241,7 @@ JetStreamDirectGetMsg JetStreamNakWithDelay JetStreamBackOffRedeliveries JetStreamInfoWithSubjects +JetStreamInfoAlternates KeyValueManager KeyValueBasics KeyValueWatch diff --git a/test/test.c b/test/test.c index d4efa130c..3d5dc66ce 100644 --- a/test/test.c +++ b/test/test.c @@ -21732,8 +21732,11 @@ test_JetStreamUnmarshalStreamInfo(void) "{\"cluster\":{\"name\":\"S1\",\"leader\":\"S2\",\"replicas\":[{\"name\":\"S1\",\"current\":true,\"offline\":false,\"active\":123,\"lag\":456},{\"name\":\"S1\",\"current\":false,\"offline\":true,\"active\":123,\"lag\":456}]}}", "{\"mirror\":{\"name\":\"M\",\"lag\":123,\"active\":456}}", "{\"mirror\":{\"name\":\"M\",\"external\":{\"api\":\"MyApi\",\"deliver\":\"deliver.prefix\"},\"lag\":123,\"active\":456}}", + "{\"sources\":[{\"name\":\"S1\",\"lag\":123,\"active\":456}]}", "{\"sources\":[{\"name\":\"S1\",\"lag\":123,\"active\":456},{\"name\":\"S2\",\"lag\":123,\"active\":456}]}", "{\"sources\":[{\"name\":\"S1\",\"external\":{\"api\":\"MyApi\",\"deliver\":\"deliver.prefix\"},\"lag\":123,\"active\":456},{\"name\":\"S2\",\"lag\":123,\"active\":456}]}", + "{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":\"abc\"}]}", + "{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":\"abc\"},{\"name\":\"S2\",\"domain\":\"domain\",\"cluster\":\"abc\"}]}", }; const char *bad[] = { "{\"config\":123}", @@ -21781,6 +21784,10 @@ test_JetStreamUnmarshalStreamInfo(void) "{\"sources\":[{\"name\":123}]}", "{\"sources\":[{\"name\":\"S1\",\"external\":123}]}", "{\"sources\":[{\"name\":\"S1\",\"external\":{\"deliver\":123}}]}", + "{\"alternates\":123}", + "{\"alternates\":[{\"name\":123}]}", + "{\"alternates\":[{\"name\":\"S1\",\"domain\":123}]}", + "{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":123}]}", }; int i; char tmp[64]; @@ -29738,6 +29745,137 @@ test_JetStreamInfoWithSubjects(void) JS_TEARDOWN; } +static natsStatus +_checkJSClusterReady(const char *url) +{ + natsStatus s = NATS_OK; + natsConnection *nc = NULL; + jsCtx *js = NULL; + jsErrCode jerr= 0; + int i; + jsOptions jo; + + jsOptions_Init(&jo); + jo.Wait = 1000; + + s = natsConnection_ConnectTo(&nc, url); + IFOK(s, natsConnection_JetStream(&js, nc, &jo)); + for (i=0; (s == NATS_OK) && (i<10); i++) + { + jsStreamInfo *si = NULL; + + s = js_GetStreamInfo(&si, js, "CHECK_CLUSTER", &jo, &jerr); + if (jerr == JSStreamNotFoundErr) + { + nats_clearLastError(); + s = NATS_OK; + break; + } + if ((s != NATS_OK) && (i < 9)) + { + s = NATS_OK; + nats_Sleep(500); + } + } + jsCtx_Destroy(js); + natsConnection_Destroy(nc); + return s; +} + +static void +test_JetStreamInfoAlternates(void) +{ + char datastore1[256] = {'\0'}; + char datastore2[256] = {'\0'}; + char datastore3[256] = {'\0'}; + char cmdLine[1024] = {'\0'}; + natsPid pid1 = NATS_INVALID_PID; + natsPid pid2 = NATS_INVALID_PID; + natsPid pid3 = NATS_INVALID_PID; + natsConnection *nc = NULL; + jsCtx *js = NULL; + jsStreamInfo *si = NULL; + jsStreamConfig sc; + jsStreamSource ss; + natsStatus s; + + ENSURE_JS_VERSION(2, 9, 0); + + test("Start cluster: "); + _makeUniqueDir(datastore1, sizeof(datastore1), "datastore_"); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -cluster_name abc -server_name A -cluster nats://127.0.0.1:6222 -routes nats://127.0.0.1:6222,nats://127.0.0.1:6223,nats://127.0.0.1:6224 -p 4222", datastore1); + pid1 = _startServer("nats://127.0.0.1:4222", cmdLine, true); + CHECK_SERVER_STARTED(pid1); + + _makeUniqueDir(datastore2, sizeof(datastore2), "datastore_"); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -cluster_name abc -server_name B -cluster nats://127.0.0.1:6223 -routes nats://127.0.0.1:6222,nats://127.0.0.1:6223,nats://127.0.0.1:6224 -p 4223", datastore2); + pid2 = _startServer("nats://127.0.0.1:4223", cmdLine, true); + CHECK_SERVER_STARTED(pid1); + + _makeUniqueDir(datastore3, sizeof(datastore3), "datastore_"); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -cluster_name abc -server_name C -cluster nats://127.0.0.1:6224 -routes nats://127.0.0.1:6222,nats://127.0.0.1:6223,nats://127.0.0.1:6224 -p 4224", datastore3); + pid3 = _startServer("nats://127.0.0.1:4224", cmdLine, true); + CHECK_SERVER_STARTED(pid1); + testCond(true); + + test("Check cluster: "); + s = _checkJSClusterReady("nats://127.0.0.1:4224"); + testCond(s == NATS_OK); + + test("Connect: "); + s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + testCond(s == NATS_OK); + + test("Get context: "); + s = natsConnection_JetStream(&js, nc, NULL); + testCond(s == NATS_OK); + + test("Create stream: "); + jsStreamConfig_Init(&sc); + sc.Name = "TEST"; + sc.Subjects = (const char*[1]){"foo"}; + sc.SubjectsLen = 1; + s = js_AddStream(NULL, js, &sc, NULL, NULL); + testCond(s == NATS_OK); + + test("Create mirror: "); + jsStreamConfig_Init(&sc); + sc.Name = "MIRROR"; + jsStreamSource_Init(&ss); + ss.Name = "TEST"; + sc.Mirror = &ss; + s = js_AddStream(NULL, js, &sc, NULL, NULL); + testCond(s == NATS_OK); + + test("Check for alternate: "); + s = js_GetStreamInfo(&si, js, "TEST", NULL, NULL); + testCond((s == NATS_OK) && (si != NULL) && (si->AlternatesLen == 2)); + + test("Check alternate content: "); + if ((strcmp(si->Alternates[0]->Cluster, "abc") != 0) + || (strcmp(si->Alternates[1]->Cluster, "abc") != 0)) + { + s = NATS_ERR; + } + else if (((strcmp(si->Alternates[0]->Name, "TEST") == 0) && (strcmp(si->Alternates[1]->Name, "MIRROR") != 0)) + || ((strcmp(si->Alternates[0]->Name, "MIRROR") == 0) && (strcmp(si->Alternates[1]->Name, "TEST") != 0))) + { + s = NATS_ERR; + } + testCond(s == NATS_OK); + jsStreamInfo_Destroy(si); + + jsCtx_Destroy(js); + natsConnection_Destroy(nc); + + _stopServer(pid3); + _stopServer(pid2); + _stopServer(pid1); + rmtree(datastore1); + rmtree(datastore2); + rmtree(datastore3); +} + static void test_KeyValueManager(void) { @@ -34077,6 +34215,7 @@ static testInfo allTests[] = {"JetStreamNakWithDelay", test_JetStreamNakWithDelay}, {"JetStreamBackOffRedeliveries", test_JetStreamBackOffRedeliveries}, {"JetStreamInfoWithSubjects", test_JetStreamInfoWithSubjects}, + {"JetStreamInfoAlternates", test_JetStreamInfoAlternates}, {"KeyValueManager", test_KeyValueManager}, {"KeyValueBasics", test_KeyValueBasics},