Skip to content

Commit

Permalink
initial priority groups support
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Nov 7, 2024
1 parent 2b10f04 commit 16eaf7f
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 30 deletions.
167 changes: 143 additions & 24 deletions cli/consumer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,27 @@ type consumerCmd struct {
metadata map[string]string
pauseUntil string

dryRun bool
mgr *jsm.Manager
nc *nats.Conn
nak bool
fPull bool
fPush bool
fBound bool
fWaiting int
fAckPending int
fPending uint64
fIdle time.Duration
fCreated time.Duration
fReplicas uint
fInvert bool
fExpression string
fLeader string
interactive bool
dryRun bool
mgr *jsm.Manager
nc *nats.Conn
nak bool
fPull bool
fPush bool
fBound bool
fWaiting int
fAckPending int
fPending uint64
fIdle time.Duration
fCreated time.Duration
fReplicas uint
fInvert bool
fExpression string
fLeader string
interactive bool
pinnedGroups []string
pinnedTTL time.Duration
overflowGroups []string
groupName string
}

func configureConsumerCommand(app commandHost) {
Expand Down Expand Up @@ -171,6 +175,9 @@ func configureConsumerCommand(app commandHost) {
f.Flag("metadata", "Adds metadata to the consumer").PlaceHolder("META").IsSetByUser(&c.metadataIsSet).StringMapVar(&c.metadata)
if !edit {
f.Flag("pause", fmt.Sprintf("Pause the consumer for a duration after start or until a specific timestamp (eg %s)", time.Now().Format(time.DateTime))).StringVar(&c.pauseUntil)
f.Flag("pinned-groups", "Create a Pinned Client consumer based on these groups").StringsVar(&c.pinnedGroups)
f.Flag("pinned-ttl", "The time to allow for a client to pull before losing the pinned status").DurationVar(&c.pinnedTTL)
f.Flag("overflow-groups", "Create a Overflow consumer based on these groups").StringsVar(&c.overflowGroups)
}
}

Expand Down Expand Up @@ -267,6 +274,12 @@ func configureConsumerCommand(app commandHost) {
conPause.Arg("until", fmt.Sprintf("Pause until a specific time (eg %s)", time.Now().UTC().Format(time.DateTime))).PlaceHolder("TIME").StringVar(&c.pauseUntil)
conPause.Flag("force", "Force pause without prompting").Short('f').UnNegatableBoolVar(&c.force)

conUnpin := cons.Command("unpin", "Unpin the current Pinned Client from a Priority Group").Action(c.unpinAction)
conUnpin.Arg("stream", "Stream name").StringVar(&c.stream)
conUnpin.Arg("consumer", "Consumer name").StringVar(&c.consumer)
conUnpin.Arg("group", "The group to unpin").StringVar(&c.groupName)
conUnpin.Flag("force", "Force unpin without prompting").Short('f').UnNegatableBoolVar(&c.force)

conResume := cons.Command("resume", "Resume a paused consumer").Action(c.resumeAction)
conResume.Arg("stream", "Stream name").StringVar(&c.stream)
conResume.Arg("consumer", "Consumer name").StringVar(&c.consumer)
Expand All @@ -288,6 +301,61 @@ func init() {
registerCommand("consumer", 4, configureConsumerCommand)
}

func (c *consumerCmd) unpinAction(_ *fisk.ParseContext) error {
c.connectAndSetup(true, true)

if !c.selectedConsumer.IsPinnedClientPriority() {
return fmt.Errorf("consumer is not a pinned priority consumer")
}

nfo, err := c.selectedConsumer.State()
if err != nil {
return err
}

matched := map[string]api.PriorityGroupState{}
var groups []string
for _, v := range nfo.PriorityGroups {
if v.PinnedClientID != "" {
matched[v.Group] = v
groups = append(groups, v.Group)
}
}

if len(matched) == 0 {
return fmt.Errorf("no priority groups have pinned clients")
}

if c.groupName == "" {
err = iu.AskOne(&survey.Select{
Message: "Select a Group",
Options: groups,
PageSize: iu.SelectPageSize(len(groups)),
}, &c.groupName, survey.WithValidator(survey.Required))
if err != nil {
return err
}
}

if !c.force {
ok, err := askConfirmation(fmt.Sprintf("Really unpin client from group %s > %s > %s", c.stream, c.consumer, c.groupName), false)
fisk.FatalIfError(err, "could not obtain confirmation")

if !ok {
return nil
}
}

err = c.selectedConsumer.Unpin(c.groupName)
if err != nil {
return err
}

fmt.Printf("Unpinned client %s from Priority Group %s > %s > %s\n", matched[c.groupName].PinnedClientID, c.stream, c.consumer, c.groupName)

return nil
}

func (c *consumerCmd) findAction(_ *fisk.ParseContext) error {
var err error
var stream *jsm.Stream
Expand Down Expand Up @@ -755,6 +823,11 @@ func (c *consumerCmd) editAction(pc *fisk.ParseContext) error {
}
}

err = c.checkConfigLevel(ncfg)
if err != nil {
return err
}

cons, err := c.mgr.NewConsumerFromDefault(c.stream, *ncfg)
if err != nil {
return err
Expand Down Expand Up @@ -996,6 +1069,11 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo
} else {
cols.AddRowIf("Paused Until Deadline", fmt.Sprintf("%s (passed)", f(config.PauseUntil)), !config.PauseUntil.IsZero())
}
if config.PriorityPolicy != api.PriorityNone {
cols.AddRow("Priority Policy", config.PriorityPolicy)
cols.AddRow("Priority Groups", config.PriorityGroups)
cols.AddRowIf("Pinned TTL", config.PinnedTTL, config.PriorityPolicy == api.PriorityPinnedClient)
}

meta := iu.RemoveReservedMetadata(config.Metadata)
if len(meta) > 0 {
Expand Down Expand Up @@ -1068,6 +1146,19 @@ func (c *consumerCmd) showInfo(config api.ConsumerConfig, state api.ConsumerInfo
cols.AddRowf("Paused Until", "%s (%s remaining)", f(state.TimeStamp.Add(state.PauseRemaining)), state.PauseRemaining.Round(time.Second))
}

if len(state.PriorityGroups) > 0 && config.PriorityPolicy == api.PriorityPinnedClient {
groups := map[string]string{}
for _, v := range state.PriorityGroups {
msg := "No client"
if v.PinnedClientID != "" {
msg = fmt.Sprintf("pinned %s at %s", v.PinnedClientID, f(v.PinnedTS))
}

groups[v.Group] = msg
}
cols.AddMapStringsAsValue("Priority Groups", groups)
}

cols.Frender(os.Stdout)
}

Expand Down Expand Up @@ -1651,6 +1742,18 @@ func (c *consumerCmd) prepareConfig() (cfg *api.ConsumerConfig, err error) {
}
}

switch {
case len(c.pinnedGroups) > 0 && len(c.overflowGroups) > 0:
return nil, fmt.Errorf("setting both overflow and pinned groups are not supported")
case len(c.pinnedGroups) > 0:
cfg.PriorityPolicy = api.PriorityPinnedClient
cfg.PriorityGroups = c.pinnedGroups
cfg.PinnedTTL = c.pinnedTTL
case len(c.overflowGroups) > 0:
cfg.PriorityPolicy = api.PriorityOverflow
cfg.PriorityGroups = c.pinnedGroups
}

cfg.Metadata = iu.RemoveReservedMetadata(cfg.Metadata)

return cfg, nil
Expand Down Expand Up @@ -1679,7 +1782,7 @@ func (c *consumerCmd) parsePauseUntil(until string) (time.Time, error) {
func (c *consumerCmd) resumeAction(_ *fisk.ParseContext) error {
c.connectAndSetup(true, true)

err := iu.RequireAPILevel(c.mgr, 1, "resuming consumers requires NATS Server 2.11")
err := iu.RequireAPILevel(c.mgr, 1, "resuming Consumers requires NATS Server 2.11")
if err != nil {
return err
}
Expand Down Expand Up @@ -1713,7 +1816,7 @@ func (c *consumerCmd) resumeAction(_ *fisk.ParseContext) error {
func (c *consumerCmd) pauseAction(_ *fisk.ParseContext) error {
c.connectAndSetup(true, true)

err := iu.RequireAPILevel(c.mgr, 1, "pausing consumers requires NATS Server 2.11")
err := iu.RequireAPILevel(c.mgr, 1, "pausing Consumers requires NATS Server 2.11")
if err != nil {
return err
}
Expand Down Expand Up @@ -1866,11 +1969,9 @@ func (c *consumerCmd) createAction(pc *fisk.ParseContext) (err error) {

c.connectAndSetup(true, false)

if !cfg.PauseUntil.IsZero() {
err := iu.RequireAPILevel(c.mgr, 1, "pausing consumers requires NATS Server 2.11")
if err != nil {
return err
}
err = c.checkConfigLevel(cfg)
if err != nil {
return err
}

created, err := c.mgr.NewConsumerFromDefault(c.stream, *cfg)
Expand All @@ -1883,6 +1984,24 @@ func (c *consumerCmd) createAction(pc *fisk.ParseContext) (err error) {
return nil
}

func (c *consumerCmd) checkConfigLevel(cfg *api.ConsumerConfig) error {
if !cfg.PauseUntil.IsZero() {
err := iu.RequireAPILevel(c.mgr, 1, "pausing consumers requires NATS Server 2.11")
if err != nil {
return err
}
}

if len(cfg.PriorityGroups) > 0 || cfg.PriorityPolicy != api.PriorityNone {
err := iu.RequireAPILevel(c.mgr, 1, "Consumer Groups requires NATS Server 2.11")
if err != nil {
return err
}
}

return nil
}

func (c *consumerCmd) getNextMsgDirect(stream string, consumer string) error {
req := &api.JSApiConsumerGetNextRequest{Batch: 1, Expires: opts().Timeout}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ require (
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/klauspost/compress v1.17.11
github.com/mattn/go-isatty v0.0.20
github.com/nats-io/jsm.go v0.1.1-0.20241031085745-33958a03bf6d
github.com/nats-io/jsm.go v0.1.1-0.20241107105049-59758090235c
github.com/nats-io/jwt/v2 v2.7.2
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241030181516-1ee2b8a11af8
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241107032117-dd0bedda7b6e
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,10 @@ github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zx
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jsm.go v0.1.1-0.20241031085745-33958a03bf6d h1:enYK3r3gnTVbAOmA18NCPuwiOV4l+XdgAx4wG3lHCF8=
github.com/nats-io/jsm.go v0.1.1-0.20241031085745-33958a03bf6d/go.mod h1:vBf6RcQc+LQQaacRB5DsVm8g+VrUcPHH5ejH6yQ/Uwg=
github.com/nats-io/jwt/v2 v2.7.2 h1:SCRjfDLJ2q8naXp8YlGJJS5/yj3wGSODFYVi4nnwVMw=
github.com/nats-io/jwt/v2 v2.7.2/go.mod h1:kB6QUmqHG6Wdrzj0KP2L+OX4xiTPBeV+NHVstFaATXU=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241030181516-1ee2b8a11af8 h1:XMoB88mOGh1u64NNvtBEfidj+rEewunIfLCdmrhsNmY=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241030181516-1ee2b8a11af8/go.mod h1:VY1OpHND54C9/rK09yrZt7raHmTUioMOKPqJvRD3GDw=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241107032117-dd0bedda7b6e h1:IOoXHQJxuz++vUwiiM4nKMQynU5LrFbjZvS2JXFGoFg=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241107032117-dd0bedda7b6e/go.mod h1:VY1OpHND54C9/rK09yrZt7raHmTUioMOKPqJvRD3GDw=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand Down

0 comments on commit 16eaf7f

Please sign in to comment.