From a0b92bdcb6e23b16c6bf821fd66b3db98792ca4f Mon Sep 17 00:00:00 2001 From: Allan Nava Date: Tue, 29 Oct 2024 14:12:29 +0100 Subject: [PATCH] added new methods for nomad --- nomad/request.go | 15 ++++++ nomad/service.go | 121 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 nomad/request.go diff --git a/nomad/request.go b/nomad/request.go new file mode 100644 index 0000000..f8be638 --- /dev/null +++ b/nomad/request.go @@ -0,0 +1,15 @@ +package nomad + +type ScaleRequest struct { + Count int `json:"Count,omitempty"` + Target ScaleGroupRequest `json:"Target"` +} + +type ScaleGroupRequest struct { + Group string `json:"Group,omitempty"` +} + +type RunJobRequest struct { + Job JobDefinition `json:"Job"` + Format string `json:"Format,omitempty"` +} diff --git a/nomad/service.go b/nomad/service.go index d03eac9..e231e12 100644 --- a/nomad/service.go +++ b/nomad/service.go @@ -13,6 +13,10 @@ type IService interface { GetDefinition(jobid, region string) (*JobDefinition, error) AllocationStats(allocID, region string) (*ResourceUsage, error) GetAllocations(clientID, region string) (*NomadAllocations, error) + DeleteJob(jobid, region string, purge bool) error + ScaleJob(jobid string, count int, region string) error + RunJob(definition JobDefinition, region string) error + RestartJob(jobid, region string) error } type service struct { @@ -20,12 +24,24 @@ type service struct { logger *zap.SugaredLogger } -func NewService(baseUrl string, logger *zap.SugaredLogger) IService { +type Options struct { + BaseUrl string + LogLevel string + Logger *zap.SugaredLogger +} + +func NewService(options Options) IService { client := resty.New() - client.SetBaseURL(baseUrl) + client.SetBaseURL(options.BaseUrl) + if options.LogLevel == "debug" { + client.SetDebug(true) + } + if options.Logger == nil { + options.Logger = zap.NewNop().Sugar() + } return &service{ client: client, - logger: logger, + logger: options.Logger, } } @@ -103,3 +119,102 @@ func (s *service) GetAllocations(clientID, region string) (*NomadAllocations, er } return &obj, nil } + + +func (s *service) RestartJob(jobid, region string) error { + err := s.ScaleJob(jobid, 0, region) + if err != nil { + s.logger.Errorf("Error scaling job while stopping: %v", err) + return err + } + + go func() { + time.Sleep(time.Second * 1) + err = s.ScaleJob(jobid, 1, region) + if err != nil { + s.logger.Errorf("Error restarting job while starting: %v", err) + } + }() + + return nil +} + +func (s *service) DeleteJob(jobid, region string, purge bool) error { + params := map[string]string{ + "region": region, + "purge": strconv.FormatBool(purge), + } + + resp, err := s.client. + R(). + SetQueryParams(params). + Delete("v1/job/" + jobid) + + if err != nil { + s.logger.Errorf("Error stopping job: %v", err) + return err + } + if resp.IsError() { + return fmt.Errorf("StopJob error %s", resp.String()) + } + + return nil +} + +func (s *service) ScaleJob(jobid string, count int, region string) error { + params := map[string]string{ + "region": region, + } + + request := ScaleRequest{ + Count: count, + Target: ScaleGroupRequest{ + Group: "restreamer", + }, + } + + resp, err := s.client. + R(). + SetQueryParams(params). + SetBody(request). + SetHeader("Content-Type", "application/json"). + Post("v1/job/" + jobid + "scale") + + if err != nil { + s.logger.Errorf("Error starting job: %v", err) + return err + } + if resp.IsError() { + return fmt.Errorf("StartJob error %s", resp.String()) + } + + return nil +} + +func (s *service) RunJob(definition JobDefinition, region string) error { + params := map[string]string{ + "region": region, + } + + request := RunJobRequest{ + Job: definition, + Format: "json", + } + + resp, err := s.client. + R(). + SetQueryParams(params). + SetBody(request). + SetHeader("content-type", "application/json"). + Post("v1/jobs") + + if err != nil { + s.logger.Errorf("Error starting job: %v", err) + return err + } + if resp.IsError() { + return fmt.Errorf("StartJob error %s", resp.String()) + } + + return nil +}