Skip to content
This repository has been archived by the owner on Jul 28, 2021. It is now read-only.

Commit

Permalink
Merge pull request #97 from nats-io/edit
Browse files Browse the repository at this point in the history
support editing streams
  • Loading branch information
ripienaar authored Feb 18, 2020
2 parents ce4a3ec + bd1364b commit 2b441b8
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 37 deletions.
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ JetStream is the [NATS.io](https://nats.io) persistence engine that will support
+ [Creating](#creating)
+ [Listing](#listing)
+ [Querying](#querying)
+ [Copying](#copying)
+ [Editing](#editing)
+ [Publishing Into a Stream](#publishing-into-a-stream)
+ [Deleting Data](#deleting-data)
+ [Deleting Sets](#deleting-sets)
Expand Down Expand Up @@ -431,6 +433,45 @@ $ jsm str info ORDERS -j

This is the general pattern for the entire `jsm` utility - prompting for needed information but every action can be run non-interactively making it usable as a cli api. All information output like seen above can be turned into JSON using `-j`.

#### Copying

A stream can be copied into another, which also allows the configuration of the new one to be adjusted via CLI flags:

```nohighlight
$ jsm str cp ORDERS ARCHIVE --subjects "ORDERS_ARCVHIVE.*" --max-age 2y
Stream ORDERS was created
Information for Stream ARCHIVE
Configuration:
Subjects: ORDERS_ARCVHIVE.*
...
Maximum Age: 17520h0m0s
...
```

#### Editing

A stream configuration can be edited, which allows the configuration to be adjusted via CLI flags. Here I have a incorrectly created ORDERS stream that I fix:

```nohighlight
$ jsm str info ORDERS -j | jq .config.subjects
[
"ORDERS.new"
]
$ jsm str edit ORDERS --subjects "ORDERS.*"
Stream ORDERS was updated
Information for Stream ORDERS
Configuration:
Subjects: ORDERS.*
....
```

#### Publishing Into a Stream

Now let's add in some messages to our Stream. You can use `nats-pub` or `nats-bench`. Or even `nats-req` to see the publish ack being returned (these are included in the `synadia/jsm:latest` docker image).
Expand Down Expand Up @@ -1167,6 +1208,7 @@ The command `jsm events` will show you an audit log of all API access events whi
|`server.JetStreamDeleteStreamT`|Deletes a Stream and all its data|empty payload, Stream name in subject|Standard OK/ERR|
|`server.JetStreamPurgeStreamT`|Purges all of the data in a Stream, leaves the Stream|empty payload, Stream name in subject|Standard OK/ERR|
|`server.JetStreamDeleteMsgT`|Deletes a specific message in the Stream by sequence, useful for GDPR compliance|`stream_seq`, Stream name in subject|Standard OK/ERR|
|`server.JetStreamUpdateStreamT`|Updates the configuration of an existing stream|Stream name in subject|Standard OK/ERR|

#### Stream Templates

Expand Down Expand Up @@ -1202,6 +1244,7 @@ Stream and Consumer Admin
```
$JS.STREAM.LIST
$JS.STREAM.<stream>.CREATE
$JS.STREAM.<stream>.UPDATE
$JS.STREAM.<stream>.INFO
$JS.STREAM.<stream>.DELETE
$JS.STREAM.<stream>.PURGE
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/dustin/go-humanize v1.0.0
github.com/google/go-cmp v0.4.0
github.com/nats-io/nats-server/v2 v2.1.5-0.20200211052250-9c6efee1d171
github.com/nats-io/nats-server/v2 v2.1.5-0.20200218040645-a9bebc145396
github.com/nats-io/nats.go v1.9.1
github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server/v2 v2.1.5-0.20200211052250-9c6efee1d171 h1:OkWGiB384FUD3Cj2Pz96si1sOYzlp+1QNmlrKdwaz/E=
github.com/nats-io/nats-server/v2 v2.1.5-0.20200211052250-9c6efee1d171/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro=
github.com/nats-io/nats-server/v2 v2.1.5-0.20200211194714-a27d1068905e h1:Oc2qVue0d8CiGzm8ZI66J4BeotA5PbtM18RRIr7Dvu0=
github.com/nats-io/nats-server/v2 v2.1.5-0.20200211194714-a27d1068905e/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro=
github.com/nats-io/nats-server/v2 v2.1.5-0.20200217135231-ab456c13f897 h1:0fD4rgxs5FrQdbsbaUiJ3JmkRnQqFKnwtra1RbXZZ0I=
github.com/nats-io/nats-server/v2 v2.1.5-0.20200217135231-ab456c13f897/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro=
github.com/nats-io/nats-server/v2 v2.1.5-0.20200218040645-a9bebc145396 h1:/6bZkgGacqNEU5jkDTn1ZCFj/awvYyRPUfXSDmLBW2U=
github.com/nats-io/nats-server/v2 v2.1.5-0.20200218040645-a9bebc145396/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro=
github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4=
Expand Down
8 changes: 4 additions & 4 deletions internal/jsch/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func createDurableConsumer(request server.CreateConsumerRequest) (name string, e
}

if IsErrorResponse(response) {
return "", fmt.Errorf("%s", string(response.Data))
return "", fmt.Errorf(string(response.Data))
}

return request.Config.Durable, nil
Expand All @@ -112,7 +112,7 @@ func createEphemeralConsumer(request server.CreateConsumerRequest) (name string,
}

if IsErrorResponse(response) {
return "", fmt.Errorf("%s", string(response.Data))
return "", fmt.Errorf(string(response.Data))
}

parts := strings.Split(string(response.Data), " ")
Expand Down Expand Up @@ -188,7 +188,7 @@ func loadConsumerInfo(s string, c string) (info server.ConsumerInfo, err error)
}

if IsErrorResponse(response) {
return info, fmt.Errorf("%s", string(response.Data))
return info, fmt.Errorf(string(response.Data))
}

info = server.ConsumerInfo{}
Expand Down Expand Up @@ -467,7 +467,7 @@ func (c *Consumer) Delete() (err error) {
}

if IsErrorResponse(response) {
return fmt.Errorf("%s", string(response.Data))
return fmt.Errorf(string(response.Data))
}

if IsOKResponse(response) {
Expand Down
8 changes: 4 additions & 4 deletions internal/jsch/jsch.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func JetStreamAccountInfo() (info server.JetStreamAccountStats, err error) {
}

if IsErrorResponse(response) {
return info, fmt.Errorf("%s", string(response.Data))
return info, fmt.Errorf(string(response.Data))
}

err = json.Unmarshal(response.Data, &info)
Expand All @@ -139,7 +139,7 @@ func StreamNames() (streams []string, err error) {
}

if IsErrorResponse(response) {
return streams, fmt.Errorf("%s", string(response.Data))
return streams, fmt.Errorf(string(response.Data))
}

err = json.Unmarshal(response.Data, &streams)
Expand All @@ -162,7 +162,7 @@ func StreamTemplateNames() (templates []string, err error) {
}

if IsErrorResponse(response) {
return templates, fmt.Errorf("%s", string(response.Data))
return templates, fmt.Errorf(string(response.Data))
}

err = json.Unmarshal(response.Data, &templates)
Expand All @@ -185,7 +185,7 @@ func ConsumerNames(stream string) (consumers []string, err error) {
}

if IsErrorResponse(response) {
return consumers, fmt.Errorf("%s", string(response.Data))
return consumers, fmt.Errorf(string(response.Data))
}

err = json.Unmarshal(response.Data, &consumers)
Expand Down
38 changes: 31 additions & 7 deletions internal/jsch/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewStreamFromDefault(name string, dflt server.StreamConfig, opts ...StreamO
}

if IsErrorResponse(response) {
return nil, fmt.Errorf("%s", string(response.Data))
return nil, fmt.Errorf(string(response.Data))
}

return LoadStream(name)
Expand Down Expand Up @@ -150,7 +150,7 @@ func loadStreamInfo(stream string) (info *server.StreamInfo, err error) {
}

if IsErrorResponse(response) {
return nil, fmt.Errorf("%s", string(response.Data))
return nil, fmt.Errorf(string(response.Data))
}

info = &server.StreamInfo{}
Expand Down Expand Up @@ -253,6 +253,30 @@ func NoAck() StreamOption {
}
}

// UpdateConfiguration updates the stream using cfg modified by opts, reloads configuration from the server post update
func (s *Stream) UpdateConfiguration(cfg server.StreamConfig, opts ...StreamOption) error {
ncfg, err := NewStreamConfiguration(cfg, opts...)
if err != nil {
return err
}

jcfg, err := json.Marshal(ncfg)
if err != nil {
return err
}

response, err := nc.Request(fmt.Sprintf(server.JetStreamUpdateStreamT, s.Name()), jcfg, Timeout)
if err != nil {
return err
}

if IsErrorResponse(response) {
return fmt.Errorf(string(response.Data))
}

return s.Reset()
}

// Reset reloads the Stream configuration from the JetStream server
func (s *Stream) Reset() error {
return loadConfigForStream(s)
Expand Down Expand Up @@ -291,7 +315,7 @@ func (s *Stream) ConsumerNames() (names []string, err error) {
}

if IsErrorResponse(response) {
return names, fmt.Errorf("%s", string(response.Data))
return names, fmt.Errorf(string(response.Data))
}

err = json.Unmarshal(response.Data, &names)
Expand Down Expand Up @@ -345,7 +369,7 @@ func (s *Stream) Delete() error {
}

if IsErrorResponse(response) {
return fmt.Errorf("%s", string(response.Data))
return fmt.Errorf(string(response.Data))
}

return nil
Expand All @@ -359,7 +383,7 @@ func (s *Stream) Purge() error {
}

if IsErrorResponse(response) {
return fmt.Errorf("%s", string(response.Data))
return fmt.Errorf(string(response.Data))
}

return nil
Expand All @@ -373,7 +397,7 @@ func (s *Stream) LoadMessage(seq int) (msg server.StoredMsg, err error) {
}

if IsErrorResponse(response) {
return server.StoredMsg{}, fmt.Errorf("%s", string(response.Data))
return server.StoredMsg{}, fmt.Errorf(string(response.Data))
}

msg = server.StoredMsg{}
Expand All @@ -393,7 +417,7 @@ func (s *Stream) DeleteMessage(seq int) (err error) {
}

if IsErrorResponse(response) {
return fmt.Errorf("%s", string(response.Data))
return fmt.Errorf(string(response.Data))
}

return nil
Expand Down
38 changes: 38 additions & 0 deletions internal/jsch/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,44 @@ func TestStream_LoadOrNewConsumerFromDefault(t *testing.T) {
}
}

func TestStream_UpdateConfiguration(t *testing.T) {
srv, nc := startJSServer(t)
defer srv.Shutdown()
defer nc.Flush()

stream, err := jsch.NewStream("q1", jsch.FileStorage(), jsch.Subjects("ORDERS.new"))
checkErr(t, err, "create failed")

if stream.Configuration().Subjects[0] != "ORDERS.new" {
t.Fatalf("expected [ORDERS.new], got %v", stream.Configuration().Subjects)
}

cfg := stream.Configuration()
cfg.Subjects = []string{"ORDERS.*"}

err = stream.UpdateConfiguration(cfg)
checkErr(t, err, "update failed")

if len(stream.Configuration().Subjects) != 1 {
t.Fatalf("expected [ORDERS.*], got %v", stream.Configuration().Subjects)
}

if stream.Configuration().Subjects[0] != "ORDERS.*" {
t.Fatalf("expected [ORDERS.*], got %v", stream.Configuration().Subjects)
}

err = stream.UpdateConfiguration(stream.Configuration(), jsch.Subjects("ARCHIVE.*"))
checkErr(t, err, "update failed")

if len(stream.Configuration().Subjects) != 1 {
t.Fatalf("expected [ARCHIVE.*], got %v", stream.Configuration().Subjects)
}

if stream.Configuration().Subjects[0] != "ARCHIVE.*" {
t.Fatalf("expected [ARCHIVE.*], got %v", stream.Configuration().Subjects)
}
}

func TestStream_ConsumerNames(t *testing.T) {
srv, nc := startJSServer(t)
defer srv.Shutdown()
Expand Down
6 changes: 3 additions & 3 deletions internal/jsch/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewStreamTemplate(name string, maxStreams uint32, config server.StreamConfi
}

if IsErrorResponse(response) {
return nil, fmt.Errorf("%s", string(response.Data))
return nil, fmt.Errorf(string(response.Data))
}

return LoadStreamTemplate(name)
Expand Down Expand Up @@ -70,7 +70,7 @@ func loadConfigForStreamTemplate(template *StreamTemplate) (err error) {
}

if IsErrorResponse(response) {
return fmt.Errorf("%s", string(response.Data))
return fmt.Errorf(string(response.Data))
}

info := server.StreamTemplateInfo{}
Expand All @@ -93,7 +93,7 @@ func (t *StreamTemplate) Delete() error {
}

if IsErrorResponse(response) {
return fmt.Errorf("%s", string(response.Data))
return fmt.Errorf(string(response.Data))
}

return nil
Expand Down
23 changes: 23 additions & 0 deletions jsm/jsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,29 @@ func TestCLIConsumerNext(t *testing.T) {
}
}

func TestCLIStreamEdit(t *testing.T) {
srv, _ := setupJStreamTest(t)
defer srv.Shutdown()

mem1, err := jsch.NewStreamFromDefault("mem1", mem1Stream())
checkErr(t, err, "could not create stream: %v", err)
streamShouldExist(t, "mem1")

runJsmCli(t, fmt.Sprintf("--server='%s' str edit mem1 --subjects other", srv.ClientURL()))

err = mem1.Reset()
checkErr(t, err, "could not reset stream: %v", err)

if len(mem1.Subjects()) != 1 {
t.Fatalf("expected [other] got %v", mem1.Subjects())
}

if mem1.Subjects()[0] != "other" {
t.Fatalf("expected [other] got %v", mem1.Subjects())
}

}

func TestCLIStreamCopy(t *testing.T) {
srv, _ := setupJStreamTest(t)
defer srv.Shutdown()
Expand Down
Loading

0 comments on commit 2b441b8

Please sign in to comment.