Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Prometheus exporter #22

Merged
merged 4 commits into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ WEBSERVER_ADDRESS=
WEBSERVER_PORT=
WEBSERVER_USERNAME=admin
WEBSERVER_PASSWORD=f2f9899c-567c-455f-8a82-77a2c66e736e
WEBSERVER_METRICS=true

TZ=Asia/Tehran
8 changes: 8 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ func setupEnv() {
"Cannot bind webserver_password env variable: %s",
)

warnOnErr(
viper.BindEnv(
"webserver_metrics",
"prometheus_metrics",
),
"Cannot bind webserver_metrics env variable: %s",
)

warnOnErr(
viper.BindEnv(
"webserver_username",
Expand Down
2 changes: 2 additions & 0 deletions config.local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ jobs:
- on-init: true
# - interval: 1s
- web-event: test
- web-event: test
- web-event: test
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Config struct {
WebServerPort uint `mapstructure:"webserver_port" json:"webserver_port,omitempty"`
WebserverUsername string `mapstructure:"webserver_username" json:"webserver_username,omitempty"`
WebServerPassword string `mapstructure:"webserver_password" json:"webserver_password,omitempty"`
WebServerMetrics bool `mapstructure:"webserver_metrics" json:"webserver_metrics,omitempty"`

Jobs []*JobConfig `mapstructure:"jobs" json:"jobs"`
}
Expand Down
50 changes: 17 additions & 33 deletions core/concurrency/concurrent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package concurrency

import (
"errors"
"fmt"
"sync"
)

// ConcurrentPool implements a simple semaphore-like structure to limit
// the number of concurrent goroutines working together.
type ConcurrentPool struct {
accessLock sync.Locker
lockerLock sync.Locker
available uint // total capacity of the pool
used uint // number of slots currently in use
changeChan chan interface{} // channel for signaling changes in the pool's state
available uint // total capacity of the pool
used *LockedValue[uint] // number of slots currently in use
changeChan chan interface{} // channel for signaling changes in the pool's state
}

// NewConcurrentPool creates a new ConcurrentPool with the specified capacity.
Expand All @@ -24,9 +24,8 @@ func NewConcurrentPool(capacity uint) (*ConcurrentPool, error) {
}
return &ConcurrentPool{
lockerLock: new(sync.Mutex),
accessLock: new(sync.Mutex),
available: capacity,
used: 0,
used: NewLockedValue[uint](0),
changeChan: make(chan interface{}),
}, nil
}
Expand All @@ -36,6 +35,7 @@ func NewConcurrentPool(capacity uint) (*ConcurrentPool, error) {
func (p *ConcurrentPool) Lock() {
p.lockerLock.Lock()
defer p.lockerLock.Unlock()
fmt.Print(p.get())
if p.available > p.get() {
p.increase()
return
Expand All @@ -59,37 +59,21 @@ func (p *ConcurrentPool) Unlock() {
}

func (p *ConcurrentPool) get() uint {
return p.access(get)
return p.used.Get()
}

func (p *ConcurrentPool) increase() {
p.access(increase)
p.used.Operate(
func(current uint) uint {
return (current + 1)
},
)
}

func (p *ConcurrentPool) decrease() {
p.access(decrease)
}

// access is the only way to access the internal state of the pool's `used` count.
// in order to maintain the integrity of the pool, it is protected by the internalSync mutex.
// every operation (get,increase,decrease) is encapsulated in a function that takes the pool as argument
func (p *ConcurrentPool) access(action func(p *ConcurrentPool) uint) uint {
p.accessLock.Lock()
defer p.accessLock.Unlock()
ans := action(p)
return ans
}

func get(p *ConcurrentPool) uint {
return p.used
}

func decrease(p *ConcurrentPool) uint {
p.used--
return p.used
}

func increase(p *ConcurrentPool) uint {
p.used++
return p.used
p.used.Operate(
func(current uint) uint {
return (current - 1)
},
)
}
12 changes: 6 additions & 6 deletions core/concurrency/concurrent_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ func TestConcurrentPool_LockUnlock(t *testing.T) {
pool, err := NewConcurrentPool(1)
assert.NoError(t, err)
pool.Lock()
assert.Equal(t, 1, pool.access(get))
assert.Equal(t, 1, pool.get())
pool.Unlock()
assert.Equal(t, 0, pool.access(get))
assert.Equal(t, 0, pool.get())
})

t.Run("Lock and Unlock with capacity 2", func(t *testing.T) {
pool, err := NewConcurrentPool(2)
assert.NoError(t, err)
pool.Lock()
assert.Equal(t, 1, pool.access(get))
assert.Equal(t, 1, pool.get())
pool.Lock()
assert.Equal(t, 2, pool.access(get))
assert.Equal(t, 2, pool.get())
pool.Unlock()
assert.Equal(t, 1, pool.access(get))
assert.Equal(t, 1, pool.get())
pool.Unlock()
assert.Equal(t, 0, pool.access(get))
assert.Equal(t, 0, pool.get())
})

t.Run("Unlock on a totally free pool", func(t *testing.T) {
Expand Down
26 changes: 26 additions & 0 deletions core/concurrency/lock_value_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package concurrency_test

import (
"testing"

"github.com/alecthomas/assert/v2"

"github.com/FMotalleb/crontab-go/core/concurrency"
)

func TestLockedValue(t *testing.T) {
// Create a new LockedValue with an initial value of 10
lv := concurrency.NewLockedValue[int](10)

// Test the Get method
assert.Equal(t, 10, lv.Get())

// Test the Set method
lv.Set(20)
assert.Equal(t, 20, lv.Get())

// Test the Operate method
operator := func(x int) int { return x * 2 }
lv.Operate(operator)
assert.Equal(t, 40, lv.Get())
}
38 changes: 38 additions & 0 deletions core/concurrency/locked_value.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package concurrency

import (
"sync"
)

type (
Operator[T any] func(T) T
LockedValue[T any] struct {
value T
lock sync.Locker
}
)

func NewLockedValue[T any](initial T) *LockedValue[T] {
return &LockedValue[T]{
value: initial,
lock: &sync.Mutex{},
}
}

func (lv *LockedValue[T]) Get() T {
lv.lock.Lock()
defer lv.lock.Unlock()
return lv.value
}

func (lv *LockedValue[T]) Set(newValue T) {
lv.lock.Lock()
defer lv.lock.Unlock()
lv.value = newValue
}

func (lv *LockedValue[T]) Operate(operator Operator[T]) {
lv.lock.Lock()
defer lv.lock.Unlock()
lv.value = operator(lv.value)
}
2 changes: 1 addition & 1 deletion core/event/web_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewEventListener(event string) WebEventListener {

// BuildTickChannel implements abstraction.Scheduler.
func (w *WebEventListener) BuildTickChannel() <-chan any {
global.CTX.AddEventListener(
global.CTX().AddEventListener(
w.event, func() {
w.c <- false
},
Expand Down
19 changes: 15 additions & 4 deletions core/global/global_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,38 @@
"context"
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/FMotalleb/crontab-go/core/concurrency"
"github.com/FMotalleb/crontab-go/ctxutils"
)

var CTX = newGlobalContext()
func CTX() *GlobalContext {
return ctx
}

var ctx = newGlobalContext()

type (
EventListenerMap = map[string][]func()
GlobalContext struct {
context.Context
lock sync.Locker
lock sync.Locker
countersValue map[string]*concurrency.LockedValue[float64]
counters map[string]prometheus.CounterFunc
}
)

func newGlobalContext() *GlobalContext {
ctx := &GlobalContext{
context.WithValue(
Context: context.WithValue(
context.Background(),
ctxutils.EventListeners,
EventListenerMap{},
),
&sync.Mutex{},
lock: &sync.Mutex{},
countersValue: make(map[string]*concurrency.LockedValue[float64]),
counters: make(map[string]prometheus.CounterFunc),
}
return ctx
}
Expand All @@ -35,7 +46,7 @@
return listeners.(EventListenerMap)
}

func (ctx *GlobalContext) AddEventListener(event string, listener func()) {

Check failure on line 49 in core/global/global_context.go

View workflow job for this annotation

GitHub Actions / ci (ubuntu-latest)

ST1016: methods on the same type should have the same receiver name (seen 2x "c", 2x "ctx") (stylecheck)

Check failure on line 49 in core/global/global_context.go

View workflow job for this annotation

GitHub Actions / analyze (go)

ST1016: methods on the same type should have the same receiver name (seen 2x "c", 2x "ctx") (stylecheck)

Check failure on line 49 in core/global/global_context.go

View workflow job for this annotation

GitHub Actions / analyze (go)

ST1016: methods on the same type should have the same receiver name (seen 2x "c", 2x "ctx") (stylecheck)

Check failure on line 49 in core/global/global_context.go

View workflow job for this annotation

GitHub Actions / ci (macos-latest)

ST1016: methods on the same type should have the same receiver name (seen 2x "c", 2x "ctx") (stylecheck)

Check failure on line 49 in core/global/global_context.go

View workflow job for this annotation

GitHub Actions / ci (windows-latest)

ST1016: methods on the same type should have the same receiver name (seen 2x "c", 2x "ctx") (stylecheck)
ctx.lock.Lock()
defer ctx.lock.Unlock()
listeners := ctx.EventListeners()
Expand Down
62 changes: 62 additions & 0 deletions core/global/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package global

import (
"context"
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/FMotalleb/crontab-go/core/concurrency"
"github.com/FMotalleb/crontab-go/ctxutils"
)

func (c *GlobalContext) MetricCounter(
ctx context.Context,
name string,
help string,
labels prometheus.Labels,
) *concurrency.LockedValue[float64] {
tag := name
for _, label := range []ctxutils.ContextKey{ctxutils.JobKey} {
if value, ok := ctx.Value(label).(string); ok {
labels[string(label)] = value
tag = fmt.Sprintf("%s,%s=%s", tag, label, value)
}
}
if c, ok := c.countersValue[tag]; ok {
return c
}
c.countersValue[tag] = concurrency.NewLockedValue[float64](0)
c.counters[tag] = promauto.NewCounterFunc(
prometheus.CounterOpts{
Name: name,
ConstLabels: labels,
Help: help,
Namespace: "crontab_go",
},
func() float64 {
item, ok := c.countersValue[tag]
if !ok {
return 0.0
}
ans := item.Get()
item.Set(0)
return ans
},
)
return c.MetricCounter(ctx, name, help, labels)
}

func (c *GlobalContext) CountSignals(ctx context.Context, name string, signal <-chan any, help string, labels prometheus.Labels) <-chan any {
counter := c.MetricCounter(ctx, name, help, labels)
out := make(chan any)
go func() {
for c := range signal {
fmt.Print("1")
counter.Set(counter.Get() + 1)
out <- c
}
}()
return out
}
6 changes: 4 additions & 2 deletions core/jobs/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package jobs
import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"

"github.com/FMotalleb/crontab-go/cmd"
"github.com/FMotalleb/crontab-go/config"
"github.com/FMotalleb/crontab-go/core/concurrency"
"github.com/FMotalleb/crontab-go/core/global"
"github.com/FMotalleb/crontab-go/ctxutils"
)

Expand All @@ -23,7 +25,7 @@ func InitializeJobs(log *logrus.Entry, cronInstance *cron.Cron) {
job.Concurrency = 1
}
c := context.Background()
c = context.WithValue(c, ctxutils.JobKey, job)
c = context.WithValue(c, ctxutils.JobKey, job.Name)

lock, err := concurrency.NewConcurrentPool(job.Concurrency)
if err != nil {
Expand All @@ -36,7 +38,7 @@ func InitializeJobs(log *logrus.Entry, cronInstance *cron.Cron) {
}

signal := buildSignal(*job, cronInstance, logger)

signal = global.CTX().CountSignals(c, "events", signal, "amount of events dispatched for this job", prometheus.Labels{})
tasks, doneHooks, failHooks := initTasks(*job, logger)
logger.Trace("Tasks initialized")

Expand Down
Loading
Loading