From bd1364b99263b0e61a5fe88b3292dd3497f95094 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Tue, 18 Feb 2020 11:22:43 +0100 Subject: [PATCH] support editing streams --- README.md | 43 ++++++++++++++++++++++++ go.mod | 2 +- go.sum | 8 +++-- internal/jsch/consumers.go | 8 ++--- internal/jsch/jsch.go | 8 ++--- internal/jsch/streams.go | 38 ++++++++++++++++++---- internal/jsch/streams_test.go | 38 ++++++++++++++++++++++ internal/jsch/templates.go | 6 ++-- jsm/jsm_test.go | 23 +++++++++++++ jsm/str_command.go | 61 ++++++++++++++++++++++++++--------- 10 files changed, 198 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index a7e9328..137abf5 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,8 @@ JetStream is the [NATS.io](https://nats.io) persistence engine that will support + [Creating](#creating) + [Listing](#listing) + [Querying](#querying) + + [Copying](#copying) + + [Editing](#editing) + [Publishing Into a Stream](#publishing-into-a-stream) + [Deleting Data](#deleting-data) + [Deleting Sets](#deleting-sets) @@ -431,6 +433,45 @@ $ jsm str info ORDERS -j This is the general pattern for the entire `jsm` utility - prompting for needed information but every action can be run non-interactively making it usable as a cli api. All information output like seen above can be turned into JSON using `-j`. +#### Copying + +A stream can be copied into another, which also allows the configuration of the new one to be adjusted via CLI flags: + +```nohighlight +$ jsm str cp ORDERS ARCHIVE --subjects "ORDERS_ARCVHIVE.*" --max-age 2y +Stream ORDERS was created + +Information for Stream ARCHIVE + +Configuration: + + Subjects: ORDERS_ARCVHIVE.* +... + Maximum Age: 17520h0m0s +... +``` + +#### Editing + +A stream configuration can be edited, which allows the configuration to be adjusted via CLI flags. Here I have a incorrectly created ORDERS stream that I fix: + +```nohighlight +$ jsm str info ORDERS -j | jq .config.subjects +[ + "ORDERS.new" +] + +$ jsm str edit ORDERS --subjects "ORDERS.*" +Stream ORDERS was updated + +Information for Stream ORDERS + +Configuration: + + Subjects: ORDERS.* +.... +``` + #### Publishing Into a Stream Now let's add in some messages to our Stream. You can use `nats-pub` or `nats-bench`. Or even `nats-req` to see the publish ack being returned (these are included in the `synadia/jsm:latest` docker image). @@ -1167,6 +1208,7 @@ The command `jsm events` will show you an audit log of all API access events whi |`server.JetStreamDeleteStreamT`|Deletes a Stream and all its data|empty payload, Stream name in subject|Standard OK/ERR| |`server.JetStreamPurgeStreamT`|Purges all of the data in a Stream, leaves the Stream|empty payload, Stream name in subject|Standard OK/ERR| |`server.JetStreamDeleteMsgT`|Deletes a specific message in the Stream by sequence, useful for GDPR compliance|`stream_seq`, Stream name in subject|Standard OK/ERR| +|`server.JetStreamUpdateStreamT`|Updates the configuration of an existing stream|Stream name in subject|Standard OK/ERR| #### Stream Templates @@ -1202,6 +1244,7 @@ Stream and Consumer Admin ``` $JS.STREAM.LIST $JS.STREAM..CREATE +$JS.STREAM..UPDATE $JS.STREAM..INFO $JS.STREAM..DELETE $JS.STREAM..PURGE diff --git a/go.mod b/go.mod index ff44a79..44e7f51 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/dustin/go-humanize v1.0.0 github.com/google/go-cmp v0.4.0 - github.com/nats-io/nats-server/v2 v2.1.5-0.20200211052250-9c6efee1d171 + github.com/nats-io/nats-server/v2 v2.1.5-0.20200218040645-a9bebc145396 github.com/nats-io/nats.go v1.9.1 github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect diff --git a/go.sum b/go.sum index adc1cb0..8f5783d 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,12 @@ github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= -github.com/nats-io/nats-server/v2 v2.1.5-0.20200211052250-9c6efee1d171 h1:OkWGiB384FUD3Cj2Pz96si1sOYzlp+1QNmlrKdwaz/E= -github.com/nats-io/nats-server/v2 v2.1.5-0.20200211052250-9c6efee1d171/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro= +github.com/nats-io/nats-server/v2 v2.1.5-0.20200211194714-a27d1068905e h1:Oc2qVue0d8CiGzm8ZI66J4BeotA5PbtM18RRIr7Dvu0= +github.com/nats-io/nats-server/v2 v2.1.5-0.20200211194714-a27d1068905e/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro= +github.com/nats-io/nats-server/v2 v2.1.5-0.20200217135231-ab456c13f897 h1:0fD4rgxs5FrQdbsbaUiJ3JmkRnQqFKnwtra1RbXZZ0I= +github.com/nats-io/nats-server/v2 v2.1.5-0.20200217135231-ab456c13f897/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro= +github.com/nats-io/nats-server/v2 v2.1.5-0.20200218040645-a9bebc145396 h1:/6bZkgGacqNEU5jkDTn1ZCFj/awvYyRPUfXSDmLBW2U= +github.com/nats-io/nats-server/v2 v2.1.5-0.20200218040645-a9bebc145396/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro= github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4= diff --git a/internal/jsch/consumers.go b/internal/jsch/consumers.go index 9ae8a28..f953376 100644 --- a/internal/jsch/consumers.go +++ b/internal/jsch/consumers.go @@ -94,7 +94,7 @@ func createDurableConsumer(request server.CreateConsumerRequest) (name string, e } if IsErrorResponse(response) { - return "", fmt.Errorf("%s", string(response.Data)) + return "", fmt.Errorf(string(response.Data)) } return request.Config.Durable, nil @@ -112,7 +112,7 @@ func createEphemeralConsumer(request server.CreateConsumerRequest) (name string, } if IsErrorResponse(response) { - return "", fmt.Errorf("%s", string(response.Data)) + return "", fmt.Errorf(string(response.Data)) } parts := strings.Split(string(response.Data), " ") @@ -188,7 +188,7 @@ func loadConsumerInfo(s string, c string) (info server.ConsumerInfo, err error) } if IsErrorResponse(response) { - return info, fmt.Errorf("%s", string(response.Data)) + return info, fmt.Errorf(string(response.Data)) } info = server.ConsumerInfo{} @@ -467,7 +467,7 @@ func (c *Consumer) Delete() (err error) { } if IsErrorResponse(response) { - return fmt.Errorf("%s", string(response.Data)) + return fmt.Errorf(string(response.Data)) } if IsOKResponse(response) { diff --git a/internal/jsch/jsch.go b/internal/jsch/jsch.go index 5e1b6fd..45f948f 100644 --- a/internal/jsch/jsch.go +++ b/internal/jsch/jsch.go @@ -118,7 +118,7 @@ func JetStreamAccountInfo() (info server.JetStreamAccountStats, err error) { } if IsErrorResponse(response) { - return info, fmt.Errorf("%s", string(response.Data)) + return info, fmt.Errorf(string(response.Data)) } err = json.Unmarshal(response.Data, &info) @@ -139,7 +139,7 @@ func StreamNames() (streams []string, err error) { } if IsErrorResponse(response) { - return streams, fmt.Errorf("%s", string(response.Data)) + return streams, fmt.Errorf(string(response.Data)) } err = json.Unmarshal(response.Data, &streams) @@ -162,7 +162,7 @@ func StreamTemplateNames() (templates []string, err error) { } if IsErrorResponse(response) { - return templates, fmt.Errorf("%s", string(response.Data)) + return templates, fmt.Errorf(string(response.Data)) } err = json.Unmarshal(response.Data, &templates) @@ -185,7 +185,7 @@ func ConsumerNames(stream string) (consumers []string, err error) { } if IsErrorResponse(response) { - return consumers, fmt.Errorf("%s", string(response.Data)) + return consumers, fmt.Errorf(string(response.Data)) } err = json.Unmarshal(response.Data, &consumers) diff --git a/internal/jsch/streams.go b/internal/jsch/streams.go index c06f050..1f53312 100644 --- a/internal/jsch/streams.go +++ b/internal/jsch/streams.go @@ -78,7 +78,7 @@ func NewStreamFromDefault(name string, dflt server.StreamConfig, opts ...StreamO } if IsErrorResponse(response) { - return nil, fmt.Errorf("%s", string(response.Data)) + return nil, fmt.Errorf(string(response.Data)) } return LoadStream(name) @@ -150,7 +150,7 @@ func loadStreamInfo(stream string) (info *server.StreamInfo, err error) { } if IsErrorResponse(response) { - return nil, fmt.Errorf("%s", string(response.Data)) + return nil, fmt.Errorf(string(response.Data)) } info = &server.StreamInfo{} @@ -253,6 +253,30 @@ func NoAck() StreamOption { } } +// UpdateConfiguration updates the stream using cfg modified by opts, reloads configuration from the server post update +func (s *Stream) UpdateConfiguration(cfg server.StreamConfig, opts ...StreamOption) error { + ncfg, err := NewStreamConfiguration(cfg, opts...) + if err != nil { + return err + } + + jcfg, err := json.Marshal(ncfg) + if err != nil { + return err + } + + response, err := nc.Request(fmt.Sprintf(server.JetStreamUpdateStreamT, s.Name()), jcfg, Timeout) + if err != nil { + return err + } + + if IsErrorResponse(response) { + return fmt.Errorf(string(response.Data)) + } + + return s.Reset() +} + // Reset reloads the Stream configuration from the JetStream server func (s *Stream) Reset() error { return loadConfigForStream(s) @@ -291,7 +315,7 @@ func (s *Stream) ConsumerNames() (names []string, err error) { } if IsErrorResponse(response) { - return names, fmt.Errorf("%s", string(response.Data)) + return names, fmt.Errorf(string(response.Data)) } err = json.Unmarshal(response.Data, &names) @@ -345,7 +369,7 @@ func (s *Stream) Delete() error { } if IsErrorResponse(response) { - return fmt.Errorf("%s", string(response.Data)) + return fmt.Errorf(string(response.Data)) } return nil @@ -359,7 +383,7 @@ func (s *Stream) Purge() error { } if IsErrorResponse(response) { - return fmt.Errorf("%s", string(response.Data)) + return fmt.Errorf(string(response.Data)) } return nil @@ -373,7 +397,7 @@ func (s *Stream) LoadMessage(seq int) (msg server.StoredMsg, err error) { } if IsErrorResponse(response) { - return server.StoredMsg{}, fmt.Errorf("%s", string(response.Data)) + return server.StoredMsg{}, fmt.Errorf(string(response.Data)) } msg = server.StoredMsg{} @@ -393,7 +417,7 @@ func (s *Stream) DeleteMessage(seq int) (err error) { } if IsErrorResponse(response) { - return fmt.Errorf("%s", string(response.Data)) + return fmt.Errorf(string(response.Data)) } return nil diff --git a/internal/jsch/streams_test.go b/internal/jsch/streams_test.go index b96c5f5..5e4512a 100644 --- a/internal/jsch/streams_test.go +++ b/internal/jsch/streams_test.go @@ -221,6 +221,44 @@ func TestStream_LoadOrNewConsumerFromDefault(t *testing.T) { } } +func TestStream_UpdateConfiguration(t *testing.T) { + srv, nc := startJSServer(t) + defer srv.Shutdown() + defer nc.Flush() + + stream, err := jsch.NewStream("q1", jsch.FileStorage(), jsch.Subjects("ORDERS.new")) + checkErr(t, err, "create failed") + + if stream.Configuration().Subjects[0] != "ORDERS.new" { + t.Fatalf("expected [ORDERS.new], got %v", stream.Configuration().Subjects) + } + + cfg := stream.Configuration() + cfg.Subjects = []string{"ORDERS.*"} + + err = stream.UpdateConfiguration(cfg) + checkErr(t, err, "update failed") + + if len(stream.Configuration().Subjects) != 1 { + t.Fatalf("expected [ORDERS.*], got %v", stream.Configuration().Subjects) + } + + if stream.Configuration().Subjects[0] != "ORDERS.*" { + t.Fatalf("expected [ORDERS.*], got %v", stream.Configuration().Subjects) + } + + err = stream.UpdateConfiguration(stream.Configuration(), jsch.Subjects("ARCHIVE.*")) + checkErr(t, err, "update failed") + + if len(stream.Configuration().Subjects) != 1 { + t.Fatalf("expected [ARCHIVE.*], got %v", stream.Configuration().Subjects) + } + + if stream.Configuration().Subjects[0] != "ARCHIVE.*" { + t.Fatalf("expected [ARCHIVE.*], got %v", stream.Configuration().Subjects) + } +} + func TestStream_ConsumerNames(t *testing.T) { srv, nc := startJSServer(t) defer srv.Shutdown() diff --git a/internal/jsch/templates.go b/internal/jsch/templates.go index d1b4b6e..9dbff6b 100644 --- a/internal/jsch/templates.go +++ b/internal/jsch/templates.go @@ -36,7 +36,7 @@ func NewStreamTemplate(name string, maxStreams uint32, config server.StreamConfi } if IsErrorResponse(response) { - return nil, fmt.Errorf("%s", string(response.Data)) + return nil, fmt.Errorf(string(response.Data)) } return LoadStreamTemplate(name) @@ -70,7 +70,7 @@ func loadConfigForStreamTemplate(template *StreamTemplate) (err error) { } if IsErrorResponse(response) { - return fmt.Errorf("%s", string(response.Data)) + return fmt.Errorf(string(response.Data)) } info := server.StreamTemplateInfo{} @@ -93,7 +93,7 @@ func (t *StreamTemplate) Delete() error { } if IsErrorResponse(response) { - return fmt.Errorf("%s", string(response.Data)) + return fmt.Errorf(string(response.Data)) } return nil diff --git a/jsm/jsm_test.go b/jsm/jsm_test.go index 95ad38a..4ea26a0 100644 --- a/jsm/jsm_test.go +++ b/jsm/jsm_test.go @@ -365,6 +365,29 @@ func TestCLIConsumerNext(t *testing.T) { } } +func TestCLIStreamEdit(t *testing.T) { + srv, _ := setupJStreamTest(t) + defer srv.Shutdown() + + mem1, err := jsch.NewStreamFromDefault("mem1", mem1Stream()) + checkErr(t, err, "could not create stream: %v", err) + streamShouldExist(t, "mem1") + + runJsmCli(t, fmt.Sprintf("--server='%s' str edit mem1 --subjects other", srv.ClientURL())) + + err = mem1.Reset() + checkErr(t, err, "could not reset stream: %v", err) + + if len(mem1.Subjects()) != 1 { + t.Fatalf("expected [other] got %v", mem1.Subjects()) + } + + if mem1.Subjects()[0] != "other" { + t.Fatalf("expected [other] got %v", mem1.Subjects()) + } + +} + func TestCLIStreamCopy(t *testing.T) { srv, _ := setupJStreamTest(t) defer srv.Shutdown() diff --git a/jsm/str_command.go b/jsm/str_command.go index 092caf6..67d6c0f 100644 --- a/jsm/str_command.go +++ b/jsm/str_command.go @@ -70,15 +70,19 @@ func configureStreamCommand(app *kingpin.Application) { f.Flag("max-bytes", "Maximum bytes to keep").Default("0").Int64Var(&c.maxBytesLimit) f.Flag("max-age", "Maximum age of messages to keep").Default("").StringVar(&c.maxAgeLimit) f.Flag("storage", "Storage backend to use (file, memory)").EnumVar(&c.storage, "file", "f", "memory", "m") - f.Flag("json", "Produce JSON output").Short('j').BoolVar(&c.json) f.Flag("retention", "Defines a retention policy (limits, interest, work)").EnumVar(&c.retentionPolicyS, "limits", "interest", "workq", "work") f.Flag("max-msg-size", "Maximum size any 1 message may be").Int32Var(&c.maxMsgSize) + f.Flag("json", "Produce JSON output").Short('j').BoolVar(&c.json) } strAdd := str.Command("create", "Create a new Stream").Alias("add").Alias("new").Action(c.addAction) strAdd.Arg("stream", "Stream name").StringVar(&c.stream) addCreateFlags(strAdd) + strEdit := str.Command("edit", "Edits an existing stream").Action(c.editAction) + strEdit.Arg("stream", "Stream to retrieve edit").StringVar(&c.stream) + addCreateFlags(strEdit) + strCopy := str.Command("copy", "Creates a new Stream based on the configuration of another").Alias("cp").Action(c.cpAction) strCopy.Arg("source", "Source Stream to copy").Required().StringVar(&c.stream) strCopy.Arg("destination", "New Stream to create").Required().StringVar(&c.destination) @@ -310,22 +314,10 @@ func (c *streamCmd) reportAction(pc *kingpin.ParseContext) error { return nil } -func (c *streamCmd) cpAction(pc *kingpin.ParseContext) error { - if c.stream == c.destination { - kingpin.Fatalf("source and destination Stream names cannot be the same") - } - - c.connectAndAskStream() - - sourceStream, err := jsch.LoadStream(c.stream) - kingpin.FatalIfError(err, "could not request Stream %s configuration", c.stream) - - source, err := sourceStream.Information() - kingpin.FatalIfError(err, "could not request Stream %s configuration", c.stream) +func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig) (api.StreamConfig, error) { + var err error - cfg := source.Config cfg.NoAck = !c.ack - cfg.Name = c.destination if len(c.subjects) > 0 { cfg.Subjects = c.splitCLISubjects() @@ -349,13 +341,50 @@ func (c *streamCmd) cpAction(pc *kingpin.ParseContext) error { if c.maxAgeLimit != "" { cfg.MaxAge, err = parseDurationString(c.maxAgeLimit) - kingpin.FatalIfError(err, "invalid maximum age limit format") + if err != nil { + return api.StreamConfig{}, fmt.Errorf("invalid maximum age limit format: %v", err) + } } if c.maxMsgSize != 0 { cfg.MaxMsgSize = c.maxMsgSize } + return cfg, nil +} + +func (c *streamCmd) editAction(pc *kingpin.ParseContext) error { + c.connectAndAskStream() + + sourceStream, err := jsch.LoadStream(c.stream) + kingpin.FatalIfError(err, "could not request Stream %s configuration", c.stream) + + cfg, err := c.copyAndEditStream(sourceStream.Configuration()) + kingpin.FatalIfError(err, "could not create new configuration for Stream %s", c.stream) + + err = sourceStream.UpdateConfiguration(cfg) + kingpin.FatalIfError(err, "could not edit Stream %s", c.stream) + + fmt.Printf("Stream %s was updated\n\n", c.stream) + + return c.infoAction(pc) +} + +func (c *streamCmd) cpAction(pc *kingpin.ParseContext) error { + if c.stream == c.destination { + kingpin.Fatalf("source and destination Stream names cannot be the same") + } + + c.connectAndAskStream() + + sourceStream, err := jsch.LoadStream(c.stream) + kingpin.FatalIfError(err, "could not request Stream %s configuration", c.stream) + + cfg, err := c.copyAndEditStream(sourceStream.Configuration()) + kingpin.FatalIfError(err, "could not copy Stream %s", c.stream) + + cfg.Name = c.destination + _, err = jsch.NewStreamFromDefault(cfg.Name, cfg) kingpin.FatalIfError(err, "could not create Stream")