Skip to content

Commit

Permalink
Support reading nats events from a stream
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Dec 23, 2024
1 parent ad7a962 commit ebd4e6a
Showing 1 changed file with 75 additions and 40 deletions.
115 changes: 75 additions & 40 deletions cli/events_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"regexp"
"strings"
"sync"
"time"

"github.com/choria-io/fisk"
"github.com/nats-io/jsm.go"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

type eventsCmd struct {
Expand All @@ -39,6 +41,8 @@ type eventsCmd struct {
showServerAdvisories bool
showAll bool
extraSubjects []string
stream string
since time.Duration

sync.Mutex
}
Expand All @@ -57,34 +61,44 @@ func configureEventsCommand(app commandHost) {
events.Flag("js-advisory", "Shows advisory events (false)").UnNegatableBoolVar(&c.showJsAdvisories)
events.Flag("srv-advisory", "Shows NATS Server advisories (true)").Default("true").BoolVar(&c.showServerAdvisories)
events.Flag("subjects", "Show Advisories and Metrics received on specific subjects").PlaceHolder("SUBJECTS").StringsVar(&c.extraSubjects)
events.Flag("stream", "Reads events from a Stream").StringVar(&c.stream)
events.Flag("since", "When reading a stream reads from a certain duration ago").PlaceHolder("DURATION").DurationVar(&c.since)
}

func init() {
registerCommand("events", 7, configureEventsCommand)
}

func (c *eventsCmd) handleNATSEvent(m *nats.Msg) {
if !c.bodyFRe.MatchString(strings.ToUpper(string(m.Data))) {
func (c *eventsCmd) handleJsEvent(msg jetstream.Msg) {
c.handleNATSEventData(msg.Subject(), msg.Data())
}

func (c *eventsCmd) handleNATSEvent(msg *nats.Msg) {
c.handleNATSEventData(msg.Subject, msg.Data)
}

func (c *eventsCmd) handleNATSEventData(subject string, data []byte) {
if !c.bodyFRe.MatchString(strings.ToUpper(string(data))) {
return
}

if c.json && !c.ce {
fmt.Println(string(m.Data))
fmt.Println(string(data))
return
}

handle := func() error {
kind, event, err := api.ParseMessage(m.Data)
kind, event, err := api.ParseMessage(data)
if err != nil {
return fmt.Errorf("parsing failed: %s", err)
}

if opts().Trace {
log.Printf("Received %s event on subject %s", kind, m.Subject)
log.Printf("Received %s event on subject %s", kind, subject)
}

if kind == "io.nats.unknown_message" {
return fmt.Errorf("unknown event schema on subject %s", m.Subject)
return fmt.Errorf("unknown event schema on subject %s", subject)
}

ne, ok := event.(api.Event)
Expand Down Expand Up @@ -118,7 +132,7 @@ func (c *eventsCmd) handleNATSEvent(m *nats.Msg) {
err := handle()
if err != nil {
fmt.Printf("Event error: %s\n\n", err)
fmt.Println(leftPad(string(m.Data), 10))
fmt.Println(leftPad(string(data), 10))
}
}

Expand All @@ -139,47 +153,68 @@ func (c *eventsCmd) eventsAction(_ *fisk.ParseContext) error {
c.bodyFRe, err = regexp.Compile(strings.ToUpper(c.bodyF))
fisk.FatalIfError(err, "invalid body regular expression")

if !c.showAll && !c.showJsAdvisories && !c.showJsMetrics && !c.showServerAdvisories && len(c.extraSubjects) == 0 {
if !c.showAll && !c.showJsAdvisories && !c.showJsMetrics && !c.showServerAdvisories && len(c.extraSubjects) == 0 && c.stream == "" {
return fmt.Errorf("no events were chosen")
}

if c.showJsAdvisories || c.showAll {
c.Printf("Listening for Advisories on %s.>\n", jsm.EventSubject(api.JSAdvisoryPrefix, opts().Config.JSEventPrefix()))
nc.Subscribe(fmt.Sprintf("%s.>", jsm.EventSubject(api.JSAdvisoryPrefix, opts().Config.JSEventPrefix())), func(m *nats.Msg) {
c.handleNATSEvent(m)
})
}
if c.stream != "" {
c.Printf("Listening for Events in stream %s\n", c.stream)
js, err := jetstream.New(nc)
if err != nil {
return err
}

if c.showJsMetrics || c.showAll {
c.Printf("Listening for Metrics on %s.>\n", jsm.EventSubject(api.JSMetricPrefix, opts().Config.JSEventPrefix()))
nc.Subscribe(fmt.Sprintf("%s.>", jsm.EventSubject(api.JSMetricPrefix, opts().Config.JSEventPrefix())), func(m *nats.Msg) {
c.handleNATSEvent(m)
})
}
cfg := jetstream.OrderedConsumerConfig{}
if c.since > 0 {
start := time.Now().Add(-c.since)
cfg.OptStartTime = &start
}

if c.showServerAdvisories || c.showAll {
c.Printf("Listening for Client Connection events on $SYS.ACCOUNT.*.CONNECT\n")
nc.Subscribe("$SYS.ACCOUNT.*.CONNECT", func(m *nats.Msg) {
c.handleNATSEvent(m)
})

c.Printf("Listening for Client Disconnection events on $SYS.ACCOUNT.*.DISCONNECT\n")
nc.Subscribe("$SYS.ACCOUNT.*.DISCONNECT", func(m *nats.Msg) {
c.handleNATSEvent(m)
})

c.Printf("Listening for Authentication Errors events on $SYS.SERVER.*.CLIENT.AUTH.ERR\n")
nc.Subscribe("$SYS.SERVER.*.CLIENT.AUTH.ERR", func(m *nats.Msg) {
c.handleNATSEvent(m)
})
}
cons, err := js.OrderedConsumer(ctx, c.stream, cfg)
if err != nil {
return err
}

cons.Consume(c.handleJsEvent)
} else {
if c.showJsAdvisories || c.showAll {
c.Printf("Listening for Advisories on %s.>\n", jsm.EventSubject(api.JSAdvisoryPrefix, opts().Config.JSEventPrefix()))
nc.Subscribe(fmt.Sprintf("%s.>", jsm.EventSubject(api.JSAdvisoryPrefix, opts().Config.JSEventPrefix())), func(m *nats.Msg) {
c.handleNATSEvent(m)
})
}

if c.showJsMetrics || c.showAll {
c.Printf("Listening for Metrics on %s.>\n", jsm.EventSubject(api.JSMetricPrefix, opts().Config.JSEventPrefix()))
nc.Subscribe(fmt.Sprintf("%s.>", jsm.EventSubject(api.JSMetricPrefix, opts().Config.JSEventPrefix())), func(m *nats.Msg) {
c.handleNATSEvent(m)
})
}

if c.showServerAdvisories || c.showAll {
c.Printf("Listening for Client Connection events on $SYS.ACCOUNT.*.CONNECT\n")
nc.Subscribe("$SYS.ACCOUNT.*.CONNECT", func(m *nats.Msg) {
c.handleNATSEvent(m)
})

if len(c.extraSubjects) > 0 {
for _, s := range c.extraSubjects {
c.Printf("Listening for advisories on %s\n", s)
nc.Subscribe(s, func(m *nats.Msg) {
c.Printf("Listening for Client Disconnection events on $SYS.ACCOUNT.*.DISCONNECT\n")
nc.Subscribe("$SYS.ACCOUNT.*.DISCONNECT", func(m *nats.Msg) {
c.handleNATSEvent(m)
})

c.Printf("Listening for Authentication Errors events on $SYS.SERVER.*.CLIENT.AUTH.ERR\n")
nc.Subscribe("$SYS.SERVER.*.CLIENT.AUTH.ERR", func(m *nats.Msg) {
c.handleNATSEvent(m)
})
}

if len(c.extraSubjects) > 0 {
for _, s := range c.extraSubjects {
c.Printf("Listening for advisories on %s\n", s)
nc.Subscribe(s, func(m *nats.Msg) {
c.handleNATSEvent(m)
})
}
}
}

Expand Down

0 comments on commit ebd4e6a

Please sign in to comment.