Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multiple jq threads #17

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions jq.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
package libjq_go

import (
"sync"

"github.com/flant/libjq-go/pkg/jq"
)

// Jq is handy shortcut to use a default jq invoker with enabled cache for programs
var initOnce sync.Once
var defaultCgoCaller jq.CgoCaller
var defaultCache *jq.JqCache

// Jq is a handy shortcut to create a jq invoker with default settings.
//
// Created invokers will share a cache for programs and a cgo caller.
func Jq() *jq.Jq {
initOnce.Do(func() {
defaultCache = jq.NewJqCache()
defaultCgoCaller = jq.NewCgoCaller()
})
return jq.NewJq().
WithCache(jq.JqDefaultCache())
WithCache(defaultCache).
WithCgoCaller(defaultCgoCaller)
}
124 changes: 124 additions & 0 deletions jq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package libjq_go

import (
"fmt"
"sync"
"testing"

. "github.com/onsi/gomega"

"github.com/flant/libjq-go/pkg/jq"
)

func Test_OneProgram_OneInput(t *testing.T) {
Expand Down Expand Up @@ -66,3 +69,124 @@ func Test_RunError(t *testing.T) {
g.Expect(err).Should(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring("Cannot iterate over string"))
}

// PoC of multiple jq thread
func Test_two_cgo_callers(t *testing.T) {
g := NewWithT(t)

cgoCaller := jq.NewCgoCaller()
invoker := jq.NewJq().
WithCache(jq.NewJqCache()).
WithCgoCaller(cgoCaller)
// New cache is required for every new cgoCaller!
// Cache saves jq_state from jq_compile in a pinned thread, so another cgoCaller
// cannot access that jq_state in another thread.
//WithCache(jq.JqDefaultCache()):
// Assertion failed: (0 && "invalid instruction"), function jq_nextAssertion failed: (jv_is_valid(v, file src/execute.c, line 401.
// al)), function stack_pop, file src/execute.c, line 177.
// SIGABRT: abort

var wg sync.WaitGroup

wg.Add(2)

// jq with default cgo caller
go func() {
defer wg.Done()

p1, _ := Jq().Program(`.bb//"NO"`).Precompile()

for i := 0; i < 500; i++ {
res, err := Jq().Program(".foo").Run(`{"foo":"bar"}`)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(res).To(Equal(`"bar"`))

res, err = p1.Run(`{"foo":"bar"}`)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(res).To(Equal(`"NO"`))
}

}()

// jq with another, parallel cgo caller
go func() {
defer wg.Done()

p1, _ := invoker.Program(`.bb//"NO"`).Precompile()

for i := 0; i < 500; i++ {
res, err := p1.Run(`{"foo":"bar"}`)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(res).To(Equal(`"NO"`))

res, err = invoker.Program(".foo").Run(`{"foo":"bar"}`)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(res).To(Equal(`"bar"`))
}

}()

wg.Wait()
}

// PoC of multiple jq thread
func Test_multiple_cgo_callers(t *testing.T) {
g := NewWithT(t)

jqPoolLen := 16

// init invokers
jqPool := []*jq.Jq{}
for i := 0; i < jqPoolLen; i++ {
invoker := jq.NewJq().
WithCache(jq.NewJqCache()).
WithCgoCaller(jq.NewCgoCaller())
jqPool = append(jqPool, invoker)
}

consumersCount := jqPoolLen * 2 // twice as invokers

// Start consumers
var wg sync.WaitGroup
wg.Add(consumersCount)

for i := 0; i < consumersCount; i++ {
invokerIndex := i % jqPoolLen
go func(n int) {
defer wg.Done()

invoker := jqPool[n]

p1, _ := invoker.Program(`.bb//"NO"`).Precompile()

foo, _ := invoker.Program(".foo").Precompile()

for i := 0; i < 100; i++ {
res, err := p1.Run(`{"foo":"bar"}`)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(res).To(Equal(`"NO"`))

res, err = foo.Run(`{"foo":"bar"}`)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(res).To(Equal(`"bar"`))
}

p2, _ := invoker.Program(`.bb//"YES"`).Precompile()

p3, _ := invoker.Program(".foobar").Precompile()

for i := 0; i < 100; i++ {
res, err := p2.Run(`{"foo":"bar"}`)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(res).To(Equal(`"YES"`))

res, err = p3.Run(`{"foobar":"bar"}`)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(res).To(Equal(`"bar"`))
}

}(invokerIndex)
}

wg.Wait()
}
22 changes: 7 additions & 15 deletions pkg/jq/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,20 @@ import (
"github.com/flant/libjq-go/pkg/libjq"
)

/*
Simple cache for jq state objects with compiled programs.
*/

// JqCache is a simple cache for JqState objects.
type JqCache struct {
StateCache map[string]*libjq.JqState
m sync.Mutex
}

var jqDefaultCacheInstance *JqCache

var JqDefaultCache = func() *JqCache {
if jqDefaultCacheInstance == nil {
jqDefaultCacheInstance = NewJqCache()
}
return jqDefaultCacheInstance
}

func NewJqCache() *JqCache {
return &JqCache{
StateCache: make(map[string]*libjq.JqState),
m: sync.Mutex{},
}
}

// Get returns cached JqState object or nil of no object is registered for key.
func (jc *JqCache) Get(key string) *libjq.JqState {
jc.m.Lock()
defer jc.m.Unlock()
Expand All @@ -40,20 +29,23 @@ func (jc *JqCache) Get(key string) *libjq.JqState {
return nil
}

// Set register a JqState object for key.
func (jc *JqCache) Set(key string, state *libjq.JqState) {
jc.m.Lock()
jc.StateCache[key] = state
jc.m.Unlock()
}

// Teardown calls Teardown for cached JqState object.
func (jc *JqCache) Teardown(key string) {
jc.m.Lock()
defer jc.m.Unlock()
if v, ok := jc.StateCache[key]; ok {
v.Teardown()
if jqState, ok := jc.StateCache[key]; ok {
jqState.Teardown()
}
}

// TeardownAll calls Teardown for all cached JqState objects.
func (jc *JqCache) TeardownAll() {
jc.m.Lock()
defer jc.m.Unlock()
Expand Down
74 changes: 74 additions & 0 deletions pkg/jq/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package jq

import (
"sync"
"testing"

. "github.com/onsi/gomega"

"github.com/flant/libjq-go/pkg/libjq"
)

func Test_JqCache_Get_Set(t *testing.T) {
g := NewWithT(t)

s := &libjq.JqState{}

c := NewJqCache()
c.Set("state", s)

s2 := c.Get("state")
g.Expect(s2).Should(Equal(s))
}

func Test_JqCache_Get_Set_Parallel(t *testing.T) {
g := NewWithT(t)

s1 := &libjq.JqState{}
s2 := &libjq.JqState{}
s3 := &libjq.JqState{}

c := NewJqCache()
c.Set("state1", s1)
c.Set("state2", s2)
c.Set("state3", s3)

var wg sync.WaitGroup
wg.Add(30)

for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
c.Set("state1", s1)
c.Set("state2", s2)
s := c.Get("state3")
g.Expect(s).Should(Equal(s3))
}
}()
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
c.Set("state1", s1)
c.Set("state3", s3)
s := c.Get("state2")
g.Expect(s).Should(Equal(s2))
}
}()
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
c.Set("state3", s3)
c.Set("state2", s2)
s := c.Get("state1")
g.Expect(s).Should(Equal(s1))
}
}()
}

wg.Wait()

g.Expect(c.Get("state1")).Should(Equal(s1))
g.Expect(c.Get("state2")).Should(Equal(s2))
g.Expect(c.Get("state3")).Should(Equal(s3))
}
53 changes: 30 additions & 23 deletions pkg/jq/cgo_caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,40 @@ import (
)

/*
libjq methods should run in one thread, so this trick with LockOsThread come up.
jq_state is thread safe, but not compatible with migration of go routines between thread.
That is why libjq methods should be called from the same thread where jq_state was created.
To achieve this, a dedicated go routine and a chan func() are used.
*/

var cgoCallsCh chan func()
var mu = sync.Mutex{}
type CgoCaller func(func())

// CgoCall is used to run C code of a jq in a dedicated go-routine locked to OS thread.
func CgoCall(f func()) {
mu.Lock()
if cgoCallsCh == nil {
cgoCallsCh = make(chan func())
go func() {
runtime.LockOSThread()
for {
select {
case f := <-cgoCallsCh:
f()
// NewCgoCaller is a factory of CgoCallers. CgoCaller is a way to run C code of a jq in a dedicated go-routine locked to OS thread.
// CgoCaller on first invoke creates a channel and starts a go-routine locked to os thread. This go-routine receives tasks to run via a channel.

func NewCgoCaller() CgoCaller {
var cgoCallTasksCh chan func()
var initOnce sync.Once

return func(f func()) {
initOnce.Do(func() {
cgoCallTasksCh = make(chan func())
go func() {
runtime.LockOSThread()
for {
select {
case f := <-cgoCallTasksCh:
f()
}
}
}
}()
}
mu.Unlock()
}()
})

done := make(chan struct{}, 1)
cgoCallsCh <- func() {
f()
done <- struct{}{}
var wg sync.WaitGroup
wg.Add(1)
cgoCallTasksCh <- func() {
f()
wg.Done()
}
wg.Wait()
}
<-done
}
Loading