diff --git a/cli/events_command.go b/cli/events_command.go index 97a2a790..3f45eae8 100644 --- a/cli/events_command.go +++ b/cli/events_command.go @@ -19,11 +19,13 @@ import ( "regexp" "strings" "sync" + "time" "github.com/choria-io/fisk" "github.com/nats-io/jsm.go" "github.com/nats-io/jsm.go/api" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) type eventsCmd struct { @@ -39,6 +41,8 @@ type eventsCmd struct { showServerAdvisories bool showAll bool extraSubjects []string + stream string + since time.Duration sync.Mutex } @@ -57,34 +61,44 @@ func configureEventsCommand(app commandHost) { events.Flag("js-advisory", "Shows advisory events (false)").UnNegatableBoolVar(&c.showJsAdvisories) events.Flag("srv-advisory", "Shows NATS Server advisories (true)").Default("true").BoolVar(&c.showServerAdvisories) events.Flag("subjects", "Show Advisories and Metrics received on specific subjects").PlaceHolder("SUBJECTS").StringsVar(&c.extraSubjects) + events.Flag("stream", "Reads events from a Stream").StringVar(&c.stream) + events.Flag("since", "When reading a stream reads from a certain duration ago").PlaceHolder("DURATION").DurationVar(&c.since) } func init() { registerCommand("events", 7, configureEventsCommand) } -func (c *eventsCmd) handleNATSEvent(m *nats.Msg) { - if !c.bodyFRe.MatchString(strings.ToUpper(string(m.Data))) { +func (c *eventsCmd) handleJsEvent(msg jetstream.Msg) { + c.handleNATSEventData(msg.Subject(), msg.Data()) +} + +func (c *eventsCmd) handleNATSEvent(msg *nats.Msg) { + c.handleNATSEventData(msg.Subject, msg.Data) +} + +func (c *eventsCmd) handleNATSEventData(subject string, data []byte) { + if !c.bodyFRe.MatchString(strings.ToUpper(string(data))) { return } if c.json && !c.ce { - fmt.Println(string(m.Data)) + fmt.Println(string(data)) return } handle := func() error { - kind, event, err := api.ParseMessage(m.Data) + kind, event, err := api.ParseMessage(data) if err != nil { return fmt.Errorf("parsing failed: %s", err) } if opts().Trace { - log.Printf("Received %s event on subject %s", kind, m.Subject) + log.Printf("Received %s event on subject %s", kind, subject) } if kind == "io.nats.unknown_message" { - return fmt.Errorf("unknown event schema on subject %s", m.Subject) + return fmt.Errorf("unknown event schema on subject %s", subject) } ne, ok := event.(api.Event) @@ -118,7 +132,7 @@ func (c *eventsCmd) handleNATSEvent(m *nats.Msg) { err := handle() if err != nil { fmt.Printf("Event error: %s\n\n", err) - fmt.Println(leftPad(string(m.Data), 10)) + fmt.Println(leftPad(string(data), 10)) } } @@ -139,47 +153,68 @@ func (c *eventsCmd) eventsAction(_ *fisk.ParseContext) error { c.bodyFRe, err = regexp.Compile(strings.ToUpper(c.bodyF)) fisk.FatalIfError(err, "invalid body regular expression") - if !c.showAll && !c.showJsAdvisories && !c.showJsMetrics && !c.showServerAdvisories && len(c.extraSubjects) == 0 { + if !c.showAll && !c.showJsAdvisories && !c.showJsMetrics && !c.showServerAdvisories && len(c.extraSubjects) == 0 && c.stream == "" { return fmt.Errorf("no events were chosen") } - if c.showJsAdvisories || c.showAll { - c.Printf("Listening for Advisories on %s.>\n", jsm.EventSubject(api.JSAdvisoryPrefix, opts().Config.JSEventPrefix())) - nc.Subscribe(fmt.Sprintf("%s.>", jsm.EventSubject(api.JSAdvisoryPrefix, opts().Config.JSEventPrefix())), func(m *nats.Msg) { - c.handleNATSEvent(m) - }) - } + if c.stream != "" { + c.Printf("Listening for Events in stream %s\n", c.stream) + js, err := jetstream.New(nc) + if err != nil { + return err + } - if c.showJsMetrics || c.showAll { - c.Printf("Listening for Metrics on %s.>\n", jsm.EventSubject(api.JSMetricPrefix, opts().Config.JSEventPrefix())) - nc.Subscribe(fmt.Sprintf("%s.>", jsm.EventSubject(api.JSMetricPrefix, opts().Config.JSEventPrefix())), func(m *nats.Msg) { - c.handleNATSEvent(m) - }) - } + cfg := jetstream.OrderedConsumerConfig{} + if c.since > 0 { + start := time.Now().Add(-c.since) + cfg.OptStartTime = &start + } - if c.showServerAdvisories || c.showAll { - c.Printf("Listening for Client Connection events on $SYS.ACCOUNT.*.CONNECT\n") - nc.Subscribe("$SYS.ACCOUNT.*.CONNECT", func(m *nats.Msg) { - c.handleNATSEvent(m) - }) - - c.Printf("Listening for Client Disconnection events on $SYS.ACCOUNT.*.DISCONNECT\n") - nc.Subscribe("$SYS.ACCOUNT.*.DISCONNECT", func(m *nats.Msg) { - c.handleNATSEvent(m) - }) - - c.Printf("Listening for Authentication Errors events on $SYS.SERVER.*.CLIENT.AUTH.ERR\n") - nc.Subscribe("$SYS.SERVER.*.CLIENT.AUTH.ERR", func(m *nats.Msg) { - c.handleNATSEvent(m) - }) - } + cons, err := js.OrderedConsumer(ctx, c.stream, cfg) + if err != nil { + return err + } + + cons.Consume(c.handleJsEvent) + } else { + if c.showJsAdvisories || c.showAll { + c.Printf("Listening for Advisories on %s.>\n", jsm.EventSubject(api.JSAdvisoryPrefix, opts().Config.JSEventPrefix())) + nc.Subscribe(fmt.Sprintf("%s.>", jsm.EventSubject(api.JSAdvisoryPrefix, opts().Config.JSEventPrefix())), func(m *nats.Msg) { + c.handleNATSEvent(m) + }) + } + + if c.showJsMetrics || c.showAll { + c.Printf("Listening for Metrics on %s.>\n", jsm.EventSubject(api.JSMetricPrefix, opts().Config.JSEventPrefix())) + nc.Subscribe(fmt.Sprintf("%s.>", jsm.EventSubject(api.JSMetricPrefix, opts().Config.JSEventPrefix())), func(m *nats.Msg) { + c.handleNATSEvent(m) + }) + } + + if c.showServerAdvisories || c.showAll { + c.Printf("Listening for Client Connection events on $SYS.ACCOUNT.*.CONNECT\n") + nc.Subscribe("$SYS.ACCOUNT.*.CONNECT", func(m *nats.Msg) { + c.handleNATSEvent(m) + }) - if len(c.extraSubjects) > 0 { - for _, s := range c.extraSubjects { - c.Printf("Listening for advisories on %s\n", s) - nc.Subscribe(s, func(m *nats.Msg) { + c.Printf("Listening for Client Disconnection events on $SYS.ACCOUNT.*.DISCONNECT\n") + nc.Subscribe("$SYS.ACCOUNT.*.DISCONNECT", func(m *nats.Msg) { c.handleNATSEvent(m) }) + + c.Printf("Listening for Authentication Errors events on $SYS.SERVER.*.CLIENT.AUTH.ERR\n") + nc.Subscribe("$SYS.SERVER.*.CLIENT.AUTH.ERR", func(m *nats.Msg) { + c.handleNATSEvent(m) + }) + } + + if len(c.extraSubjects) > 0 { + for _, s := range c.extraSubjects { + c.Printf("Listening for advisories on %s\n", s) + nc.Subscribe(s, func(m *nats.Msg) { + c.handleNATSEvent(m) + }) + } } }