Skip to content

Commit

Permalink
check api levels
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Nov 5, 2024
1 parent 032ee9b commit 5ff7cca
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions cli/consumer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,11 @@ func (c *consumerCmd) editAction(pc *fisk.ParseContext) error {
}
}

err = c.checkConfigLevel(ncfg)
if err != nil {
return err
}

cons, err := c.mgr.NewConsumerFromDefault(c.stream, *ncfg)
if err != nil {
return err
Expand Down Expand Up @@ -1771,7 +1776,7 @@ func (c *consumerCmd) parsePauseUntil(until string) (time.Time, error) {
func (c *consumerCmd) resumeAction(_ *fisk.ParseContext) error {
c.connectAndSetup(true, true)

err := iu.RequireAPILevel(c.mgr, 1, "resuming consumers requires NATS Server 2.11")
err := iu.RequireAPILevel(c.mgr, 1, "resuming Consumers requires NATS Server 2.11")
if err != nil {
return err
}
Expand Down Expand Up @@ -1805,7 +1810,7 @@ func (c *consumerCmd) resumeAction(_ *fisk.ParseContext) error {
func (c *consumerCmd) pauseAction(_ *fisk.ParseContext) error {
c.connectAndSetup(true, true)

err := iu.RequireAPILevel(c.mgr, 1, "pausing consumers requires NATS Server 2.11")
err := iu.RequireAPILevel(c.mgr, 1, "pausing Consumers requires NATS Server 2.11")
if err != nil {
return err
}
Expand Down Expand Up @@ -1958,11 +1963,9 @@ func (c *consumerCmd) createAction(pc *fisk.ParseContext) (err error) {

c.connectAndSetup(true, false)

if !cfg.PauseUntil.IsZero() {
err := iu.RequireAPILevel(c.mgr, 1, "pausing consumers requires NATS Server 2.11")
if err != nil {
return err
}
err = c.checkConfigLevel(cfg)
if err != nil {
return err
}

created, err := c.mgr.NewConsumerFromDefault(c.stream, *cfg)
Expand All @@ -1975,6 +1978,24 @@ func (c *consumerCmd) createAction(pc *fisk.ParseContext) (err error) {
return nil
}

func (c *consumerCmd) checkConfigLevel(cfg *api.ConsumerConfig) error {
if !cfg.PauseUntil.IsZero() {
err := iu.RequireAPILevel(c.mgr, 1, "pausing consumers requires NATS Server 2.11")
if err != nil {
return err
}
}

if len(cfg.PriorityGroups) > 0 || cfg.PriorityPolicy != api.PriorityNone {
err := iu.RequireAPILevel(c.mgr, 1, "Consumer Groups requires NATS Server 2.11")
if err != nil {
return err
}
}

return nil
}

func (c *consumerCmd) getNextMsgDirect(stream string, consumer string) error {
req := &api.JSApiConsumerGetNextRequest{Batch: 1, Expires: opts().Timeout}

Expand Down

0 comments on commit 5ff7cca

Please sign in to comment.