diff --git a/go.mod b/go.mod index 2d6c72a2a..9a29020bb 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/stretchr/testify v1.8.4 gitlab.com/hfuss/mux-prometheus v0.0.5 golang.org/x/net v0.20.0 + golang.org/x/sync v0.5.0 golang.org/x/text v0.14.0 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index cf382953b..5b71904ab 100644 --- a/go.sum +++ b/go.sum @@ -237,6 +237,8 @@ golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/go.work.sum b/go.work.sum index ee356555a..408b4718f 100644 --- a/go.work.sum +++ b/go.work.sum @@ -318,6 +318,7 @@ go.mongodb.org/mongo-driver v1.7.5/go.mod h1:VXEWRZ6URJIkUq2SCAyapmhH0ZLRBP+FT4x go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= google.golang.org/api v0.153.0/go.mod h1:3qNJX5eOmhiWYc67jRA/3GsDw97UFb5ivv7Y2PrriAY= diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 2f07077b2..935f0c1ec 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -37,6 +37,7 @@ import ( "github.com/hyperledger/firefly/pkg/blockchain" "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/tokens" + "golang.org/x/sync/errgroup" ) type ConflictError struct { @@ -716,16 +717,31 @@ func (ft *FFTokens) handleNamespaceStarted(ctx context.Context, data fftypes.JSO // Make sure any pools that are marked as active in our DB are indeed active namespace := data.GetString("namespace") log.L(ctx).Debugf("Token connector '%s' started namespace '%s'. Ensuring all token pools active.", ft.Name(), namespace) + + g, ctx := errgroup.WithContext(ctx) + for _, pool := range ft.poolsToActivate[namespace] { - if _, err := ft.EnsureTokenPoolActive(ctx, pool); err == nil { - log.L(ctx).Debugf("Ensured token pool active '%s'", pool.ID) - } else { - // Log the error and continue trying to activate pools - // At this point we've already started - log.L(ctx).Errorf("Error ensuring token pool active '%s': %s", pool.ID, err.Error()) - } + currentPool := pool + g.Go(func() error { + _, err := ft.EnsureTokenPoolActive(ctx, currentPool) + if err == nil { + log.L(ctx).Debugf("Ensured token pool active '%s'", currentPool.ID) + } else { + log.L(ctx).Errorf("Error ensuring token pool active '%s': %s", currentPool.ID, err.Error()) + } + + return err + }) + } + // g.Wait waits for all goroutines activating token pools to complete + // and returns the first non-nil error returned + // by one of the goroutines. + if err := g.Wait(); err != nil { + // The above handleMessage will retry if errors occur + return err } + return nil } diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index 886e1d80c..78e234ffd 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -1901,7 +1901,7 @@ func TestHandleNamespaceStartedEnsureActiveError(t *testing.T) { httpmock.NewJsonResponderOrPanic(500, fftypes.JSONObject{})) _, err := h.handleMessage(context.Background(), "ns1", []byte(`{"event":"started","data":{"namespace": "ns1"}}`)) - assert.NoError(t, err) + assert.Error(t, err) } func TestHandlePoolActivated(t *testing.T) {