Skip to content

Commit

Permalink
Utility function improvements, JSON cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
kyleu committed Oct 12, 2024
1 parent 7e3df1d commit a7b1572
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 30 deletions.
5 changes: 5 additions & 0 deletions app/controller/clib/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ import (
"github.com/kyleu/rituals/app/controller/cutil"
"github.com/kyleu/rituals/app/lib/sandbox"
"github.com/kyleu/rituals/app/lib/telemetry"
"github.com/kyleu/rituals/views"
"github.com/kyleu/rituals/views/vsandbox"
)

func SandboxList(w http.ResponseWriter, r *http.Request) {
controller.Act("sandbox.list", w, r, func(as *app.State, ps *cutil.PageState) (string, error) {
if title := r.URL.Query().Get("title"); title != "" {
ps.SetTitleAndData(title, title)
return controller.Render(r, as, &views.Debug{}, ps, title)
}
ps.SetTitleAndData("Sandboxes", sandbox.AllSandboxes)
return controller.Render(r, as, &vsandbox.List{}, ps, "sandbox")
})
Expand Down
3 changes: 1 addition & 2 deletions app/controller/cutil/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ func loadProfile(session util.ValueMap) (*user.Profile, error) {
}
s = util.ToJSON(m)
}
p := &user.Profile{}
err := util.FromJSON([]byte(s), p)
p, err := util.FromJSONObj[*user.Profile]([]byte(s))
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions app/enum/memberstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (m MemberStatus) MarshalJSON() ([]byte, error) {
}

func (m *MemberStatus) UnmarshalJSON(data []byte) error {
var key string
if err := util.FromJSON(data, &key); err != nil {
key, err := util.FromJSONString(data)
if err != nil {
return err
}
*m = AllMemberStatuses.Get(key, nil)
Expand Down
4 changes: 2 additions & 2 deletions app/enum/modelservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (m ModelService) MarshalJSON() ([]byte, error) {
}

func (m *ModelService) UnmarshalJSON(data []byte) error {
var key string
if err := util.FromJSON(data, &key); err != nil {
key, err := util.FromJSONString(data)
if err != nil {
return err
}
*m = AllModelServices.Get(key, nil)
Expand Down
4 changes: 2 additions & 2 deletions app/enum/sessionstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (s SessionStatus) MarshalJSON() ([]byte, error) {
}

func (s *SessionStatus) UnmarshalJSON(data []byte) error {
var key string
if err := util.FromJSON(data, &key); err != nil {
key, err := util.FromJSONString(data)
if err != nil {
return err
}
*s = AllSessionStatuses.Get(key, nil)
Expand Down
3 changes: 3 additions & 0 deletions app/lib/filesystem/fsignore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
var defaultIgnore = []string{".DS_Store$", "^.git/", "^.idea/", "^build/", "^client/node_modules", ".html.go$", ".sql.go$"}

func buildIgnore(ign []string) []string {
if len(ign) == 1 && ign[0] == "-" {
return nil
}
ret := util.NewStringSlice(append([]string{}, defaultIgnore...))
ret.Push(ign...)
return ret.Slice
Expand Down
3 changes: 1 addition & 2 deletions app/lib/theme/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ func (s *Service) loadIfNeeded(logger util.Logger) {
s.cache = Themes{Default}
if s.files.IsDir(s.root) {
lo.ForEach(s.files.ListJSON(s.root, nil, true, logger), func(key string, _ int) {
t := &Theme{}
b, err := s.files.ReadFile(filepath.Join(s.root, key+util.ExtJSON))
if err != nil {
logger.Warnf("can't load theme [%s]: %+v", key, err)
}
err = util.FromJSON(b, t)
t, err := util.FromJSONObj[*Theme](b)
if err != nil {
logger.Warnf("can't load theme [%s]: %+v", key, err)
}
Expand Down
6 changes: 1 addition & 5 deletions app/lib/websocket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ func (s *Service) WriteChannel(message *Message, logger util.Logger, except ...u
return nil
}
json := util.ToJSON(message)
if size := len(conns.ConnIDs) - len(except); size > 0 {
logger.Debugf("sending message [%v::%v] to [%v] connections", message.Channel, message.Cmd, size)
}
lo.ForEach(conns.ConnIDs, func(conn uuid.UUID, _ int) {
if !lo.Contains(except, conn) {
connID := conn
Expand Down Expand Up @@ -138,8 +135,7 @@ func ReadSocketLoop(connID uuid.UUID, sock *websocket.Conn, onMessage func(m *Me
}
return errors.Wrapf(err, "error processing socket read loop for connection [%s]", connID.String())
}
m := &Message{}
err = util.FromJSON(message, m)
m, err := util.FromJSONObj[*Message](message)
if err != nil {
return errors.Wrap(err, "error decoding websocket message")
}
Expand Down
7 changes: 7 additions & 0 deletions app/util/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ func ArrayRemoveNil[T any](x []*T) []*T {
})
}

func ArrayRemoveEmpty[T comparable](x []T) []T {
var check T
return lo.Reject(x, func(el T, _ int) bool {
return el == check
})
}

func ArrayDereference[T any](x []*T) []T {
return lo.Map(x, func(el *T, _ int) T {
return lo.FromPtr(el)
Expand Down
50 changes: 39 additions & 11 deletions app/util/async.go
Original file line number Diff line number Diff line change
@@ -1,87 +1,115 @@
package util

import (
"fmt"
"sync"
"time"

"github.com/pkg/errors"
"github.com/samber/lo"
)

func AsyncCollect[T any, R any](items []T, f func(x T) (R, error)) ([]R, []error) {
func AsyncCollect[T any, R any](items []T, f func(x T) (R, error), loggers ...Logger) ([]R, []error) {
ret := make([]R, 0, len(items))
var errs []error
mu := sync.Mutex{}
size := len(items)
wg := sync.WaitGroup{}
wg.Add(len(items))
wg.Add(size)
var processed int

lo.ForEach(items, func(x T, _ int) {
i := x
go func() {
defer wg.Done()
r, err := f(i)
mu.Lock()
defer mu.Unlock()
processed++
if err == nil {
ret = append(ret, r)
} else {
errs = append(errs, errors.Wrapf(err, "error running async function for item [%v]", i))
}
mu.Unlock()
wg.Done()
for _, logger := range loggers {
logger.Debugf("processed [%d/%d] items", processed, size)
}
}()
})
wg.Wait()
return ret, errs
}

func AsyncCollectMap[T any, K comparable, R any](items []T, k func(x T) K, f func(x T) (R, error)) (map[K]R, map[K]error) {
func AsyncCollectMap[T any, K comparable, R any](items []T, k func(x T) K, f func(x T) (R, error), loggers ...Logger) (map[K]R, map[K]error) {
ret := make(map[K]R, len(items))
errs := map[K]error{}
mu := sync.Mutex{}
size := len(items)
wg := sync.WaitGroup{}
wg.Add(len(items))
wg.Add(size)
var processed int

lo.ForEach(items, func(x T, _ int) {
i := x
go func() {
defer wg.Done()
key := k(i)
r, err := f(i)
mu.Lock()
defer mu.Unlock()
processed++
if err == nil {
ret[key] = r
} else {
errs[key] = errors.Wrapf(err, "error running async function for item [%v]", key)
}
mu.Unlock()
wg.Done()
for _, logger := range loggers {
logger.Debugf("processed [%d/%d] items", processed, size)
}
}()
})
wg.Wait()
return ret, errs
}

func AsyncRateLimit[T any, R any](items []T, f func(x T) (R, error), maxConcurrent int, timeout time.Duration) ([]R, []error) {
func AsyncRateLimit[T any, R any](key string, items []T, f func(x T) (R, error), maxConcurrent int, timeout time.Duration, loggers ...Logger) ([]R, []error) {
ret := make([]R, 0, len(items))
errs := make([]error, 0)
mu := sync.Mutex{}
size := len(items)
wg := sync.WaitGroup{}
wg.Add(size)
var processed int
var started int
prefix := fmt.Sprintf("[%s] ", key)
log := func(msg string, args ...any) {
for _, logger := range loggers {
logger.Debugf(prefix+msg, args...)
}
}

limit := make(chan struct{}, maxConcurrent)
defer close(limit)
log("starting to process [%d] items, [%d] at once)", size, maxConcurrent)

for idx, item := range items {
select {
case limit <- struct{}{}:
wg.Add(1)
go func(item T, idx int) {
defer wg.Done()
defer func() { <-limit }()

started++
log("starting to process item [%d/%d]...", started, size)
r, err := f(item)
mu.Lock()
defer mu.Unlock()
processed++
if err == nil {
ret = append(ret, r)
} else {
errs = append(errs, errors.Wrapf(err, "error running async function for item [%v]", item))
}
log("processed [%d/%d] items", processed, size)
}(item, idx)
case <-time.After(timeout):
errs = append(errs, errors.Errorf("job timed out after [%v]", timeout))
Expand Down
6 changes: 2 additions & 4 deletions app/util/parsearray.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ func ParseArray(r any, path string, allowEmpty bool, coerce bool) ([]any, error)
if strings.TrimSpace(t) == "" {
return nil, nil
}
var ret []any
err := FromJSON([]byte(t), &ret)
ret, err := FromJSONObj[[]any]([]byte(t))
if err != nil {
if coerce {
return lo.ToAnySlice(StringSplitAndTrim(t, ",")), nil
Expand All @@ -27,8 +26,7 @@ func ParseArray(r any, path string, allowEmpty bool, coerce bool) ([]any, error)
if len(t) == 0 {
return nil, nil
}
var ret []any
err := FromJSON(t, &ret)
ret, err := FromJSONObj[[]any](t)
if err != nil {
if coerce {
return lo.ToAnySlice(StringSplitAndTrim(string(t), ",")), nil
Expand Down

0 comments on commit a7b1572

Please sign in to comment.