Skip to content
This repository has been archived by the owner on Jul 28, 2021. It is now read-only.

Commit

Permalink
Merge pull request #88 from nats-io/str_msg_rm
Browse files Browse the repository at this point in the history
support secure message delete on the cli
  • Loading branch information
ripienaar authored Jan 31, 2020
2 parents 74f38ee + bca5f7e commit 668d5ec
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 1 deletion.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,9 @@ Statistics:

After putting some throw away data into the Stream, we can purge all the data out - while keeping the Stream active:

#### Deleting Data
#### Deleting All Data

To delete all data in a stream use `purge`:

```nohighlight
$ jsm str purge ORDERS -f
Expand All @@ -480,6 +482,14 @@ Statistics:
Active Consumers: 0
```

#### Deleting A Message

A single message can be securely removed from the stream:

```nohighlight
$ jsm str rmm ORDERS 1 -f
```

#### Deleting Sets

Finally for demonstration purposes, you can also delete the whole Stream and recreate it so then we're ready for creating the Consumers:
Expand Down
44 changes: 44 additions & 0 deletions jsm/jsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,47 @@ func TestCLIBackupRestore(t *testing.T) {
t.Fatalf("mem1 recreate failed")
}
}

func TestCLIMessageRm(t *testing.T) {
srv, nc := setupConsTest(t)
defer srv.Shutdown()

checkErr(t, nc.Publish("js.mem.1", []byte("msg1")), "publish failed")
checkErr(t, nc.Publish("js.mem.1", []byte("msg2")), "publish failed")
checkErr(t, nc.Publish("js.mem.1", []byte("msg3")), "publish failed")

mem1, err := jsch.LoadStream("mem1")
checkErr(t, err, "load failed")

state, err := mem1.State()
checkErr(t, err, "state failed")

if state.Msgs != 3 {
t.Fatalf("no message added to stream")
}

runJsmCli(t, fmt.Sprintf("--server='%s' str rmm mem1 2 -f", srv.ClientURL()))
state, err = mem1.State()
checkErr(t, err, "state failed")

if state.Msgs != 2 {
t.Fatalf("message was not removed")
}

msg, err := mem1.LoadMessage(1)
checkErr(t, err, "load failed")
if cmp.Equal(msg.Data, []byte("msg1")) {
checkErr(t, err, "load failed")
}

msg, err = mem1.LoadMessage(3)
checkErr(t, err, "load failed")
if cmp.Equal(msg.Data, []byte("msg3")) {
checkErr(t, err, "load failed")
}

msg, err = mem1.LoadMessage(2)
if err == nil {
t.Fatalf("loading delete message did not fail")
}
}
36 changes: 36 additions & 0 deletions jsm/str_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func configureStreamCommand(app *kingpin.Application) {
strPurge.Flag("json", "Produce JSON output").Short('j').BoolVar(&c.json)
strPurge.Flag("force", "Force removal without prompting").Short('f').BoolVar(&c.force)

strRmMsg := str.Command("rmm", "Securely removes an individual message from a Stream").Action(c.rmMsgAction)
strRmMsg.Arg("stream", "Stream name").StringVar(&c.stream)
strRmMsg.Arg("id", "Message ID to remove").Int64Var(&c.msgID)
strRmMsg.Flag("force", "Force removal without prompting").Short('f').BoolVar(&c.force)

strGet := str.Command("get", "Retrieves a specific message from a Stream").Action(c.getAction)
strGet.Arg("stream", "Stream name").StringVar(&c.stream)
strGet.Arg("id", "Message ID to retrieve").Int64Var(&c.msgID)
Expand Down Expand Up @@ -630,6 +635,37 @@ func (c *streamCmd) lsAction(_ *kingpin.ParseContext) (err error) {
return nil
}

func (c *streamCmd) rmMsgAction(_ *kingpin.ParseContext) (err error) {
c.connectAndAskStream()

if c.msgID == -1 {
id := ""
err = survey.AskOne(&survey.Input{
Message: "Message ID to remove",
}, &id, survey.WithValidator(survey.Required))
kingpin.FatalIfError(err, "invalid input")

idint, err := strconv.Atoi(id)
kingpin.FatalIfError(err, "invalid number")

c.msgID = int64(idint)
}

stream, err := jsch.LoadStream(c.stream)
kingpin.FatalIfError(err, "could not load Stream %s", c.stream)

if !c.force {
ok, err := askConfirmation(fmt.Sprintf("Really remove message %d from Stream %s", c.msgID, c.stream), false)
kingpin.FatalIfError(err, "could not obtain confirmation")

if !ok {
return nil
}
}

return stream.DeleteMessage(int(c.msgID))
}

func (c *streamCmd) getAction(_ *kingpin.ParseContext) (err error) {
c.connectAndAskStream()

Expand Down

0 comments on commit 668d5ec

Please sign in to comment.