Skip to content

Commit

Permalink
Merge pull request #612 from nats-io/merge_dev_to_main
Browse files Browse the repository at this point in the history
PR to merge dev branch into main
  • Loading branch information
kozlovic authored Nov 7, 2022
2 parents 9bca84d + 76101ed commit 92316be
Show file tree
Hide file tree
Showing 11 changed files with 1,293 additions and 725 deletions.
8 changes: 4 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ endif(NATS_BUILD_WITH_TLS)
# Versionning and Doc

set(NATS_VERSION_MAJOR 3)
set(NATS_VERSION_MINOR 4)
set(NATS_VERSION_PATCH 1)
set(NATS_VERSION_SUFFIX "")
set(NATS_VERSION_MINOR 5)
set(NATS_VERSION_PATCH 0)
set(NATS_VERSION_SUFFIX "-dev")

set(NATS_VERSION_REQUIRED_NUMBER 0x030400)
set(NATS_VERSION_REQUIRED_NUMBER 0x030500)

if(NATS_UPDATE_VERSION OR NATS_UPDATE_DOC)
configure_file(
Expand Down
924 changes: 326 additions & 598 deletions doc/DoxyFile.NATS.Client

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ extern const int64_t jsDefaultRequestWait;
#define jsAckInProgress "+WPI"
#define jsAckTerm "+TERM"

// jsExtDomainT is used to create a StreamSource External APIPrefix
#define jsExtDomainT "$JS.%s.API"

// jsApiAccountInfo is for obtaining general information about JetStream.
#define jsApiAccountInfo "%.*s.INFO"

Expand Down
162 changes: 160 additions & 2 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@ js_cleanStreamState(jsStreamState *state)
_destroyStreamStateSubjects(state->Subjects);
}

static void
_destroyStreamAlternate(jsStreamAlternate *sa)
{
if (sa == NULL)
return;

NATS_FREE((char*) sa->Name);
NATS_FREE((char*) sa->Domain);
NATS_FREE((char*) sa->Cluster);
NATS_FREE(sa);
}

void
jsStreamInfo_Destroy(jsStreamInfo *si)
{
Expand All @@ -226,6 +238,9 @@ jsStreamInfo_Destroy(jsStreamInfo *si)
for (i=0; i<si->SourcesLen; i++)
_destroyStreamSourceInfo(si->Sources[i]);
NATS_FREE(si->Sources);
for (i=0; i<si->AlternatesLen; i++)
_destroyStreamAlternate(si->Alternates[i]);
NATS_FREE(si->Alternates);
NATS_FREE(si);
}

Expand Down Expand Up @@ -264,8 +279,11 @@ _marshalExternalStream(jsExternalStream *external, const char *fieldName, natsBu
IFOK(s, natsBuf_Append(buf, fieldName, -1));
IFOK(s, natsBuf_Append(buf, "\":{\"api\":\"", -1));
IFOK(s, natsBuf_Append(buf, external->APIPrefix, -1));
IFOK(s, natsBuf_Append(buf, "\",\"deliver\":\"", -1));
IFOK(s, natsBuf_Append(buf, external->DeliverPrefix, -1));
if ((s == NATS_OK) && !nats_IsStringEmpty(external->DeliverPrefix))
{
IFOK(s, natsBuf_Append(buf, "\",\"deliver\":\"", -1));
IFOK(s, natsBuf_Append(buf, external->DeliverPrefix, -1));
}
IFOK(s, natsBuf_Append(buf, "\"}", -1));

return NATS_UPDATE_ERR_STACK(s);
Expand Down Expand Up @@ -952,12 +970,36 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStreamAlternate(nats_JSON *json, jsStreamAlternate **new_alt)
{
jsStreamAlternate *sa = NULL;
natsStatus s = NATS_OK;

sa = (jsStreamAlternate*) NATS_CALLOC(1, sizeof(jsStreamAlternate));
if (sa == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

s = nats_JSONGetStr(json, "name", (char**) &(sa->Name));
IFOK(s, nats_JSONGetStr(json, "domain", (char**) &(sa->Domain)));
IFOK(s, nats_JSONGetStr(json, "cluster", (char**) &(sa->Cluster)));

if (s == NATS_OK)
*new_alt = sa;
else
_destroyStreamAlternate(sa);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStreamInfoPaged(nats_JSON *json, jsStreamInfo **new_si, apiPaged *page)
{
jsStreamInfo *si = NULL;
nats_JSON **sources = NULL;
int sourcesLen = 0;
nats_JSON **alts = NULL;
int altsLen = 0;
natsStatus s;

si = (jsStreamInfo*) NATS_CALLOC(1, sizeof(jsStreamInfo));
Expand Down Expand Up @@ -988,6 +1030,24 @@ _unmarshalStreamInfoPaged(nats_JSON *json, jsStreamInfo **new_si, apiPaged *page
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(sources);
}
IFOK(s, nats_JSONGetArrayObject(json, "alternates", &alts, &altsLen));
if ((s == NATS_OK) && (alts != NULL))
{
int i;

si->Alternates = (jsStreamAlternate**) NATS_CALLOC(altsLen, sizeof(jsStreamAlternate*));
if (si->Alternates == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);

for (i=0; (s == NATS_OK) && (i<altsLen); i++)
{
s = _unmarshalStreamAlternate(alts[i], &(si->Alternates[i]));
if (s == NATS_OK)
si->AlternatesLen++;
}
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(alts);
}
if ((s == NATS_OK) && (page != NULL))
{
IFOK(s, nats_JSONGetLong(json, "total", &page->total));
Expand Down Expand Up @@ -1061,6 +1121,95 @@ jsStreamConfig_Init(jsStreamConfig *cfg)
return NATS_OK;
}

static void
_restoreMirrorAndSourcesExternal(jsStreamConfig *cfg)
{
int i;

// We are guaranteed that if a source's Domain is set, there was originally
// no External value. So free any External value and reset to NULL to
// restore the original setting.
if ((cfg->Mirror != NULL) && !nats_IsStringEmpty(cfg->Mirror->Domain))
{
_destroyExternalStream(cfg->Mirror->External);
cfg->Mirror->External = NULL;
}
for (i=0; i<cfg->SourcesLen; i++)
{
jsStreamSource *src = cfg->Sources[i];
if ((src != NULL) && !nats_IsStringEmpty(src->Domain))
{
_destroyExternalStream(src->External);
src->External = NULL;
}
}
}

static natsStatus
_convertDomain(jsStreamSource *src)
{
jsExternalStream *e = NULL;

e = (jsExternalStream*) NATS_CALLOC(1, sizeof(jsExternalStream));
if (e == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

if (nats_asprintf((char**) &(e->APIPrefix), jsExtDomainT, src->Domain) < 0)
{
NATS_FREE(e);
return nats_setDefaultError(NATS_NO_MEMORY);
}
src->External = e;
return NATS_OK;
}

static natsStatus
_convertMirrorAndSourcesDomain(bool *converted, jsStreamConfig *cfg)
{
natsStatus s = NATS_OK;
bool cm = false;
bool cs = false;
int i;

*converted = false;

if ((cfg->Mirror != NULL) && !nats_IsStringEmpty(cfg->Mirror->Domain))
{
if (cfg->Mirror->External != NULL)
return nats_setError(NATS_INVALID_ARG, "%s", "mirror's domain and external are both set");
cm = true;
}
for (i=0; i<cfg->SourcesLen; i++)
{
jsStreamSource *src = cfg->Sources[i];
if ((src != NULL) && !nats_IsStringEmpty(src->Domain))
{
if (src->External != NULL)
return nats_setError(NATS_INVALID_ARG, "%s", "source's domain and external are both set");
cs = true;
}
}
if (!cm && !cs)
return NATS_OK;

if (cm)
s = _convertDomain(cfg->Mirror);
if ((s == NATS_OK) && cs)
{
for (i=0; (s == NATS_OK) && (i<cfg->SourcesLen); i++)
{
jsStreamSource *src = cfg->Sources[i];
if ((src != NULL) && !nats_IsStringEmpty(src->Domain))
s = _convertDomain(src);
}
}
if (s == NATS_OK)
*converted = true;
else
_restoreMirrorAndSourcesExternal(cfg);
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamConfig *cfg, jsOptions *opts, jsErrCode *errCode)
{
Expand All @@ -1071,6 +1220,7 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
natsConnection *nc = NULL;
const char *apiT = NULL;
bool freePfx = false;
bool msc = false;
jsOptions o;

if (errCode != NULL)
Expand Down Expand Up @@ -1101,6 +1251,8 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
if (freePfx)
NATS_FREE((char*) o.Prefix);
}
if ((s == NATS_OK) && (action == jsStreamActionCreate))
s = _convertMirrorAndSourcesDomain(&msc, cfg);

// Marshal the stream create/update request
IFOK(s, js_marshalStreamConfig(&buf, cfg));
Expand All @@ -1111,6 +1263,12 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
// If we got a response, check for error or return the stream info result.
IFOK(s, _unmarshalStreamCreateResp(new_si, NULL, resp, errCode));

// If mirror and/or sources were converted for the domain, then we need
// to restore the original values (which will free the memory that was
// allocated for the conversion).
if (msc)
_restoreMirrorAndSourcesExternal(cfg);

natsBuf_Destroy(buf);
natsMsg_Destroy(resp);
NATS_FREE(subj);
Expand Down
Loading

0 comments on commit 92316be

Please sign in to comment.