diff --git a/README.md b/README.md index b2b96ce..a7e9328 100644 --- a/README.md +++ b/README.md @@ -466,7 +466,9 @@ Statistics: After putting some throw away data into the Stream, we can purge all the data out - while keeping the Stream active: -#### Deleting Data +#### Deleting All Data + +To delete all data in a stream use `purge`: ```nohighlight $ jsm str purge ORDERS -f @@ -480,6 +482,14 @@ Statistics: Active Consumers: 0 ``` +#### Deleting A Message + +A single message can be securely removed from the stream: + +```nohighlight +$ jsm str rmm ORDERS 1 -f +``` + #### Deleting Sets Finally for demonstration purposes, you can also delete the whole Stream and recreate it so then we're ready for creating the Consumers: diff --git a/jsm/jsm_test.go b/jsm/jsm_test.go index ddbe002..95ad38a 100644 --- a/jsm/jsm_test.go +++ b/jsm/jsm_test.go @@ -459,3 +459,47 @@ func TestCLIBackupRestore(t *testing.T) { t.Fatalf("mem1 recreate failed") } } + +func TestCLIMessageRm(t *testing.T) { + srv, nc := setupConsTest(t) + defer srv.Shutdown() + + checkErr(t, nc.Publish("js.mem.1", []byte("msg1")), "publish failed") + checkErr(t, nc.Publish("js.mem.1", []byte("msg2")), "publish failed") + checkErr(t, nc.Publish("js.mem.1", []byte("msg3")), "publish failed") + + mem1, err := jsch.LoadStream("mem1") + checkErr(t, err, "load failed") + + state, err := mem1.State() + checkErr(t, err, "state failed") + + if state.Msgs != 3 { + t.Fatalf("no message added to stream") + } + + runJsmCli(t, fmt.Sprintf("--server='%s' str rmm mem1 2 -f", srv.ClientURL())) + state, err = mem1.State() + checkErr(t, err, "state failed") + + if state.Msgs != 2 { + t.Fatalf("message was not removed") + } + + msg, err := mem1.LoadMessage(1) + checkErr(t, err, "load failed") + if cmp.Equal(msg.Data, []byte("msg1")) { + checkErr(t, err, "load failed") + } + + msg, err = mem1.LoadMessage(3) + checkErr(t, err, "load failed") + if cmp.Equal(msg.Data, []byte("msg3")) { + checkErr(t, err, "load failed") + } + + msg, err = mem1.LoadMessage(2) + if err == nil { + t.Fatalf("loading delete message did not fail") + } +} diff --git a/jsm/str_command.go b/jsm/str_command.go index d03ecfc..092caf6 100644 --- a/jsm/str_command.go +++ b/jsm/str_command.go @@ -96,6 +96,11 @@ func configureStreamCommand(app *kingpin.Application) { strPurge.Flag("json", "Produce JSON output").Short('j').BoolVar(&c.json) strPurge.Flag("force", "Force removal without prompting").Short('f').BoolVar(&c.force) + strRmMsg := str.Command("rmm", "Securely removes an individual message from a Stream").Action(c.rmMsgAction) + strRmMsg.Arg("stream", "Stream name").StringVar(&c.stream) + strRmMsg.Arg("id", "Message ID to remove").Int64Var(&c.msgID) + strRmMsg.Flag("force", "Force removal without prompting").Short('f').BoolVar(&c.force) + strGet := str.Command("get", "Retrieves a specific message from a Stream").Action(c.getAction) strGet.Arg("stream", "Stream name").StringVar(&c.stream) strGet.Arg("id", "Message ID to retrieve").Int64Var(&c.msgID) @@ -630,6 +635,37 @@ func (c *streamCmd) lsAction(_ *kingpin.ParseContext) (err error) { return nil } +func (c *streamCmd) rmMsgAction(_ *kingpin.ParseContext) (err error) { + c.connectAndAskStream() + + if c.msgID == -1 { + id := "" + err = survey.AskOne(&survey.Input{ + Message: "Message ID to remove", + }, &id, survey.WithValidator(survey.Required)) + kingpin.FatalIfError(err, "invalid input") + + idint, err := strconv.Atoi(id) + kingpin.FatalIfError(err, "invalid number") + + c.msgID = int64(idint) + } + + stream, err := jsch.LoadStream(c.stream) + kingpin.FatalIfError(err, "could not load Stream %s", c.stream) + + if !c.force { + ok, err := askConfirmation(fmt.Sprintf("Really remove message %d from Stream %s", c.msgID, c.stream), false) + kingpin.FatalIfError(err, "could not obtain confirmation") + + if !ok { + return nil + } + } + + return stream.DeleteMessage(int(c.msgID)) +} + func (c *streamCmd) getAction(_ *kingpin.ParseContext) (err error) { c.connectAndAskStream()