Skip to content
This repository has been archived by the owner on Jul 28, 2021. It is now read-only.

Commit

Permalink
Merge pull request #47 from nats-io/46
Browse files Browse the repository at this point in the history
rename obs subjects to filter subject
  • Loading branch information
ripienaar authored Jan 8, 2020
2 parents 0e1a7ec + 98513c9 commit 7ba9362
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 37 deletions.
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ The rest of this document introduces the `jsm` utility, but for completeness and

```bash
$ jsm ms add ORDERS --subjects "ORDERS.*" --ack --max-msgs=-1 --max-bytes=-1 --max-age=1y --storage file --retention stream --max-msg-size=-1
$ jsm obs add ORDERS NEW --subject ORDERS.received --ack explicit --pull --deliver all --max-deliver=-1 --sample 100
$ jsm obs add ORDERS DISPATCH --subject ORDERS.processed --ack explicit --pull --deliver all --max-deliver=-1 --sample 100
$ jsm obs add ORDERS MONITOR --subject '' --ack none --target monitor.ORDERS --deliver last --replay instant
$ jsm obs add ORDERS NEW --filter ORDERS.received --ack explicit --pull --deliver all --max-deliver=-1 --sample 100
$ jsm obs add ORDERS DISPATCH --filter ORDERS.processed --ack explicit --pull --deliver all --max-deliver=-1 --sample 100
$ jsm obs add ORDERS MONITOR --filter '' --ack none --target monitor.ORDERS --deliver last --replay instant
```

## Getting Started
Expand Down Expand Up @@ -461,7 +461,7 @@ $ jsm obs add --sample 100
? Observable name NEW
? Delivery target
? Start policy (all, last, 1h, msg sequence) all
? Subject to consume (blank for all) ORDERS.received
? Filter Message Set by subject (blank for all) ORDERS.received
? Maximum Allowed Deliveries 20
Information for observable ORDERS > NEW
Expand Down Expand Up @@ -495,7 +495,7 @@ A Maximum Delivery limit of 20 is set, this means if the message is not acknowle
Again this can all be done in a single CLI call, lets make the `DISPATCH` Observable:

```
$ jsm obs add ORDERS DISPATCH --subject ORDERS.processed --ack explicit --pull --deliver all --sample 100 --max-deliver 20
$ jsm obs add ORDERS DISPATCH --filter ORDERS.processed --ack explicit --pull --deliver all --sample 100 --max-deliver 20
```

#### Creating Push-Based Observables
Expand All @@ -510,7 +510,7 @@ $ jsm obs add
? Start policy (all, last, 1h, msg sequence) last
? Acknowledgement policy none
? Replay policy instant
? Subject to consume (blank for all)
? Filter Message Set by subject (blank for all)
? Maximum Allowed Deliveries -1
Information for observable ORDERS > MONITOR
Expand All @@ -534,7 +534,7 @@ State:
Again you can do this with a single non interactive command:

```
$ jsm obs add ORDERS MONITOR --ack none --target monitor.ORDERS --deliver last --replay instant --subject ''
$ jsm obs add ORDERS MONITOR --ack none --target monitor.ORDERS --deliver last --replay instant --filter ''
```

#### Listing
Expand Down Expand Up @@ -822,7 +822,7 @@ Lets look at each of these, first we make a new Message Set `ORDERS` and add 100
Now create a `DeliverAll` pull-based Observable:

```
$ jsm obs add ORDERS ALL --pull --subject ORDERS.processed --ack none --replay instant --deliver all
$ jsm obs add ORDERS ALL --pull --filter ORDERS.processed --ack none --replay instant --deliver all
$ jsm obs next ORDERS ALL
--- received on ORDERS.processed
order 1
Expand All @@ -833,7 +833,7 @@ Acknowledged message
Now create a `DeliverLast` pull-based Observable:

```
$ jsm obs add ORDERS LAST --pull --subject ORDERS.processed --ack none --replay instant --deliver last
$ jsm obs add ORDERS LAST --pull --filter ORDERS.processed --ack none --replay instant --deliver last
$ jsm obs next ORDERS LAST
--- received on ORDERS.processed
order 100
Expand All @@ -844,7 +844,7 @@ Acknowledged message
Now create a `MsgSetSeq` pull-based Observable:

```
$ jsm obs add ORDERS TEN --pull --subject ORDERS.processed --ack none --replay instant --deliver 10
$ jsm obs add ORDERS TEN --pull --filter ORDERS.processed --ack none --replay instant --deliver 10
$ jsm obs next ORDERS TEN
--- received on ORDERS.processed
order 10
Expand All @@ -866,7 +866,7 @@ done
Then create an Observable that starts 2 minutes ago:

```
$ jsm obs add ORDERS 2MIN --pull --subject ORDERS.processed --ack none --replay instant --deliver 2m
$ jsm obs add ORDERS 2MIN --pull --filter ORDERS.processed --ack none --replay instant --deliver 2m
$ jsm obs next ORDERS 2MIN
--- received on ORDERS.processed
order 2
Expand All @@ -891,7 +891,7 @@ $ nats-sub my.monitor
Terminal 2:

```
$ jsm obs add ORDERS --subject '' --ack none --target 'my.monitor' --deliver last --replay instant --ephemeral
$ jsm obs add ORDERS --filter '' --ack none --target 'my.monitor' --deliver last --replay instant --ephemeral
```

The `--ephemeral` switch tells the system to make an Ephemeral Observable.
Expand All @@ -905,7 +905,7 @@ This is useful in load testing scenarios etc. This is called the `ReplayPolicy`
You can only set `ReplayPolicy` on push-based Observables.

```
$ jsm obs add ORDERS REPLAY --target out.original --subject ORDERS.processed --ack none --deliver all --sample 100 --replay original
$ jsm obs add ORDERS REPLAY --target out.original --filter ORDERS.processed --ack none --deliver all --sample 100 --replay original
...
Replay Policy: original
...
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/dustin/go-humanize v1.0.0
github.com/nats-io/nats-server/v2 v2.1.3-0.20200107213246-63311c51010b
github.com/nats-io/nats-server/v2 v2.1.3-0.20200108135121-19dc3ebadc67
github.com/nats-io/nats.go v1.9.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ github.com/nats-io/nats-server/v2 v2.1.3-0.20200107145616-b0b085a884a1 h1:m3gIuC
github.com/nats-io/nats-server/v2 v2.1.3-0.20200107145616-b0b085a884a1/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro=
github.com/nats-io/nats-server/v2 v2.1.3-0.20200107213246-63311c51010b h1:F7EvXiuvEOq5lcu9mwRwyqmxTRwQbAc1CKsDlkoAKOI=
github.com/nats-io/nats-server/v2 v2.1.3-0.20200107213246-63311c51010b/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro=
github.com/nats-io/nats-server/v2 v2.1.3-0.20200108135121-19dc3ebadc67 h1:ua06PaTRh1xPm9r/AnD+X4I6u/t166YqODilOZ+qYdU=
github.com/nats-io/nats-server/v2 v2.1.3-0.20200108135121-19dc3ebadc67/go.mod h1:FBtPk+0dj8JdsLJ1eiXh7OLXnJiwjO6VpJkzjYo7lro=
github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4=
Expand Down
2 changes: 1 addition & 1 deletion jsm/jetstreammgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (j *JetStreamMgmt) ObservableCreate(set string, cfg *api.ObservableConfig)
Config: *cfg,
}

if cfg.Subject == "" {
if cfg.FilterSubject == "" {
cfg.AckPolicy = api.AckExplicit
}

Expand Down
2 changes: 1 addition & 1 deletion jsm/jsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestCLIObsAdd(t *testing.T) {
srv, _, jsm := setupObsTest(t)
defer srv.Shutdown()

runJsmCli(t, fmt.Sprintf("--server='%s' obs add mem1 push1 --replay instant --deliver all --pull --subject '' --max-deliver 20", srv.ClientURL()))
runJsmCli(t, fmt.Sprintf("--server='%s' obs add mem1 push1 --replay instant --deliver all --pull --filter '' --max-deliver 20", srv.ClientURL()))
push1ShouldExist(t, jsm, "mem1")
}

Expand Down
2 changes: 1 addition & 1 deletion jsm/ms_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (c *msCmd) getAction(_ *kingpin.ParseContext) (err error) {
return nil
}

fmt.Printf("Item: %s#%d received %v on subject %s\n\n", c.set, c.msgID, item.Time, item.Subject)
fmt.Printf("Item: %s#%d received %v on Subject %s\n\n", c.set, c.msgID, item.Time, item.Subject)
fmt.Println(string(item.Data))
fmt.Println()
return nil
Expand Down
40 changes: 20 additions & 20 deletions jsm/obs_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ type obsCmd struct {
raw bool
destination string

maxDeliver int
pull bool
replayPolicy string
startPolicy string
ackPolicy string
ackWait time.Duration
samplePct int
subject string
delivery string
ephemeral bool
maxDeliver int
pull bool
replayPolicy string
startPolicy string
ackPolicy string
ackWait time.Duration
samplePct int
filterSubject string
delivery string
ephemeral bool
}

func configureObsCommand(app *kingpin.Application) {
Expand All @@ -70,7 +70,7 @@ func configureObsCommand(app *kingpin.Application) {

addCreateFlags := func(f *kingpin.CmdClause) {
f.Flag("target", "Push based delivery target subject").StringVar(&c.delivery)
f.Flag("subject", "Message set topic").Default("_unset_").StringVar(&c.subject)
f.Flag("filter", "Filter Message Set by subjects").Default("_unset_").StringVar(&c.filterSubject)
f.Flag("replay", "Replay Policy (instant, original)").EnumVar(&c.replayPolicy, "instant", "original")
f.Flag("deliver", "Start policy (all, last, 1h, msg sequence)").StringVar(&c.startPolicy)
f.Flag("ack", "Acknowledgement policy (none, all, explicit)").StringVar(&c.ackPolicy)
Expand Down Expand Up @@ -211,8 +211,8 @@ func (c *obsCmd) infoAction(pc *kingpin.ParseContext) error {
} else {
fmt.Printf(" Pull Mode: true\n")
}
if info.Config.Subject != "" {
fmt.Printf(" Subject: %s\n", info.Config.Subject)
if info.Config.FilterSubject != "" {
fmt.Printf(" Filter Subject: %s\n", info.Config.FilterSubject)
}
if info.Config.MsgSetSeq != 0 {
fmt.Printf(" Start Sequence: %d\n", info.Config.MsgSetSeq)
Expand Down Expand Up @@ -350,8 +350,8 @@ func (c *obsCmd) cpAction(pc *kingpin.ParseContext) (err error) {
cfg.AckPolicy = c.ackPolicyFromString(c.ackPolicy)
}

if c.subject != "_unset_" {
cfg.Subject = c.subject
if c.filterSubject != "_unset_" {
cfg.FilterSubject = c.filterSubject
}

if c.replayPolicy != "" {
Expand Down Expand Up @@ -464,15 +464,15 @@ func (c *obsCmd) createAction(pc *kingpin.ParseContext) (err error) {
cfg.ReplayPolicy = c.replayPolicyFromString(c.replayPolicy)
}

if c.subject == "_unset_" {
if c.filterSubject == "_unset_" {
err = survey.AskOne(&survey.Input{
Message: "Subject to consume (blank for all)",
Message: "Filter Message Set by subject (blank for all)",
Default: "",
Help: "Message Sets can consume more than one topic - or a wildcard - this allows you to select out just a single subject from all the ones entering the Set for delivery to the Observable. Settable using --subject",
}, &c.subject)
Help: "Message Sets can consume more than one subject - or a wildcard - this allows you to filter out just a single Subject from all the ones entering the Set for delivery to the Observable. Settable using --filter",
}, &c.filterSubject)
kingpin.FatalIfError(err, "could not ask for filtering subject")
}
cfg.Subject = c.subject
cfg.FilterSubject = c.filterSubject

if c.maxDeliver == 0 && cfg.AckPolicy != api.AckNone {
err = survey.AskOne(&survey.Input{
Expand Down

0 comments on commit 7ba9362

Please sign in to comment.