Skip to content

Commit

Permalink
Added metrics for batcher
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Sep 2, 2024
1 parent 749203a commit 7f738f5
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 19 deletions.
21 changes: 20 additions & 1 deletion platform/common/core/generic/vault/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,35 @@ SPDX-License-Identifier: Apache-2.0
package vault

import (
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/tracing"
"go.opentelemetry.io/otel/trace"
)

type Metrics struct {
CommitDuration metrics.Histogram
BatchedCommitDuration metrics.Histogram

Vault trace.Tracer
}

func NewMetrics(p trace.TracerProvider) *Metrics {
func NewMetrics(m metrics.Provider, p trace.TracerProvider) *Metrics {
return &Metrics{
CommitDuration: m.NewHistogram(metrics.HistogramOpts{
Namespace: "vault",
Name: "commit",
Help: "Histogram for the duration of commit",
Buckets: utils.ExponentialBucketTimeRange(0, 5*time.Second, 15),
}),
BatchedCommitDuration: m.NewHistogram(metrics.HistogramOpts{
Namespace: "vault",
Name: "batched_commit",
Help: "Histogram for the duration of commit with the batching overhead",
Buckets: utils.ExponentialBucketTimeRange(0, 5*time.Second, 15),
}),
Vault: p.Tracer("vault", tracing.WithMetricsOpts(tracing.MetricsOpts{
Namespace: "coresdk",
LabelNames: []tracing.LabelName{},
Expand Down
12 changes: 10 additions & 2 deletions platform/common/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
dbdriver "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/hash"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -105,6 +106,7 @@ func New[V driver.ValidationCode](
vcProvider driver.ValidationCodeProvider[V],
newInterceptor NewInterceptorFunc[V],
populator Populator,
metricsProvider metrics.Provider,
tracerProvider trace.TracerProvider,
) *Vault[V] {
v := &Vault[V]{
Expand All @@ -115,7 +117,7 @@ func New[V driver.ValidationCode](
vcProvider: vcProvider,
newInterceptor: newInterceptor,
populator: populator,
metrics: NewMetrics(tracerProvider),
metrics: NewMetrics(metricsProvider, tracerProvider),
}
v.commitBatcher = runner.NewBatchRunner[txCommitIndex](v.commitTXs, 100, 500*time.Millisecond)
return v
Expand Down Expand Up @@ -214,17 +216,22 @@ type commitInput struct {
}

func (db *Vault[V]) CommitTX(ctx context.Context, txID driver.TxID, block driver.BlockNum, indexInBloc driver.TxNum) error {
start := time.Now()
newCtx, span := db.metrics.Vault.Start(ctx, "commit")
defer span.End()
return db.commitBatcher.Run(txCommitIndex{
err := db.commitBatcher.Run(txCommitIndex{
ctx: newCtx,
txID: txID,
block: block,
indexInBloc: indexInBloc,
})
db.metrics.BatchedCommitDuration.Observe(time.Since(start).Seconds())
return err
}

func (db *Vault[V]) commitTXs(txs []txCommitIndex) []error {
db.logger.Debugf("Commit %d transactions", len(txs))
start := time.Now()
txIDs := make([]driver.TxID, len(txs))
for i, tx := range txs {
txIDs[i] = tx.txID
Expand All @@ -246,6 +253,7 @@ func (db *Vault[V]) commitTXs(txs []txCommitIndex) []error {
for {
err := db.commitRWs(inputs...)
if err == nil {
db.metrics.CommitDuration.Observe(time.Since(start).Seconds() / float64(len(txs)))
return collections.Repeat[error](nil, len(txs))
}
if !errors.HasCause(err, DeadlockDetected) {
Expand Down
3 changes: 3 additions & 0 deletions platform/common/core/generic/vault/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/cache/secondcache"
db2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/exp/slices"
Expand All @@ -40,6 +41,7 @@ func (p *testArtifactProvider) NewCachedVault(ddb VersionedPersistence) (*Vault[
&VCProvider{},
newInterceptor,
&populator{},
&disabled.Provider{},
&noop.TracerProvider{},
), nil
}
Expand All @@ -56,6 +58,7 @@ func (p *testArtifactProvider) NewNonCachedVault(ddb VersionedPersistence) (*Vau
&VCProvider{},
newInterceptor,
&populator{},
&disabled.Provider{},
&noop.TracerProvider{},
), nil
}
Expand Down
4 changes: 2 additions & 2 deletions platform/fabric/core/generic/channelprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"go.opentelemetry.io/otel/trace"
)

type VaultConstructor = func(configService driver.ConfigService, channel string, drivers []driver2.NamedDriver, tracerProvider trace.TracerProvider) (*vault.Vault, driver.TXIDStore, error)
type VaultConstructor = func(configService driver.ConfigService, channel string, drivers []driver2.NamedDriver, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) (*vault.Vault, driver.TXIDStore, error)

type Provider interface {
NewChannel(nw driver.FabricNetworkService, name string, quiet bool) (driver.Channel, error)
Expand Down Expand Up @@ -65,7 +65,7 @@ func (p *provider) NewChannel(nw driver.FabricNetworkService, channelName string
}

// Vault
vault, txIDStore, err := p.newVault(nw.ConfigService(), channelName, p.drivers, p.tracerProvider)
vault, txIDStore, err := p.newVault(nw.ConfigService(), channelName, p.drivers, p.metricsProvider, p.tracerProvider)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions platform/fabric/core/generic/vault/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db"
driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
)

var logger = flogging.MustGetLogger("fabric-sdk.core.vault")

func New(configService driver.ConfigService, channel string, drivers []driver2.NamedDriver, tracerProvider trace.TracerProvider) (*Vault, driver.TXIDStore, error) {
func New(configService driver.ConfigService, channel string, drivers []driver2.NamedDriver, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) (*Vault, driver.TXIDStore, error) {
var d driver2.Driver
for _, driver := range drivers {
if driver.Name == configService.VaultPersistenceType() {
Expand Down Expand Up @@ -50,10 +51,10 @@ func New(configService driver.ConfigService, channel string, drivers []driver2.N
if txIDStoreCacheSize > 0 {
logger.Debugf("creating txID store second cache with size [%d]", txIDStoreCacheSize)
c := txidstore.NewCache(txidStore, secondcache.NewTyped[*txidstore.Entry](txIDStoreCacheSize), logger)
return NewVault(persistence, c, tracerProvider), c, nil
return NewVault(persistence, c, metricsProvider, tracerProvider), c, nil
} else {
logger.Debugf("txID store without cache selected")
c := txidstore.NewNoCache(txidStore)
return NewVault(persistence, c, tracerProvider), c, nil
return NewVault(persistence, c, metricsProvider, tracerProvider), c, nil
}
}
4 changes: 3 additions & 1 deletion platform/fabric/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
fdriver "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
Expand All @@ -37,14 +38,15 @@ func NewTXIDStore(persistence txidstore.UnversionedPersistence) (*SimpleTXIDStor
}

// NewVault returns a new instance of Vault
func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, tracerProvider trace.TracerProvider) *Vault {
func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) *Vault {
return vault.New[fdriver.ValidationCode](
flogging.MustGetLogger("fabric-sdk.generic.vault"),
store,
txIDStore,
&fdriver.ValidationCodeProvider{},
newInterceptor,
&populator{},
metricsProvider,
tracerProvider,
)
}
Expand Down
5 changes: 3 additions & 2 deletions platform/fabric/core/generic/vault/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/cache/secondcache"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db"
dbdriver "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/exp/slices"
Expand All @@ -31,15 +32,15 @@ func (p *artifactsProvider) NewCachedVault(ddb VersionedPersistence) (*Vault, er
if err != nil {
return nil, err
}
return NewVault(ddb, txidstore.NewCache(txidStore, secondcache.NewTyped[*txidstore.Entry](100), logger), &noop.TracerProvider{}), nil
return NewVault(ddb, txidstore.NewCache(txidStore, secondcache.NewTyped[*txidstore.Entry](100), logger), &disabled.Provider{}, &noop.TracerProvider{}), nil
}

func (p *artifactsProvider) NewNonCachedVault(ddb VersionedPersistence) (*Vault, error) {
txidStore, err := NewTXIDStore(db.Unversioned(ddb))
if err != nil {
return nil, err
}
return NewVault(ddb, txidstore.NewNoCache(txidStore), &noop.TracerProvider{}), nil
return NewVault(ddb, txidstore.NewNoCache(txidStore), &disabled.Provider{}, &noop.TracerProvider{}), nil
}

func (p *artifactsProvider) NewMarshaller() vault.Marshaller {
Expand Down
5 changes: 3 additions & 2 deletions platform/orion/core/generic/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/events"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/kvs"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/hyperledger-labs/orion-server/pkg/types"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -87,7 +88,7 @@ func NewDB(ctx context.Context, kvss *kvs.KVS, config *config2.Config, name stri
return n, nil
}

func NewNetwork(ctx context.Context, kvss *kvs.KVS, eventsPublisher events.Publisher, eventsSubscriber events.Subscriber, tracerProvider trace.TracerProvider, config *config2.Config, name string, drivers []driver2.NamedDriver, networkConfig driver.NetworkConfig, listenerManager driver.ListenerManager) (*network, error) {
func NewNetwork(ctx context.Context, kvss *kvs.KVS, eventsPublisher events.Publisher, eventsSubscriber events.Subscriber, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider, config *config2.Config, name string, drivers []driver2.NamedDriver, networkConfig driver.NetworkConfig, listenerManager driver.ListenerManager) (*network, error) {
// Load configuration
n := &network{
ctx: ctx,
Expand Down Expand Up @@ -135,7 +136,7 @@ func NewNetwork(ctx context.Context, kvss *kvs.KVS, eventsPublisher events.Publi
return nil, errors.Wrapf(err, "failed creating vault")
}

n.vault, err = NewVault(n, persistence, tracerProvider)
n.vault, err = NewVault(n, persistence, metricsProvider, tracerProvider)
if err != nil {
return nil, errors.WithMessage(err, "failed to create vault")
}
Expand Down
5 changes: 3 additions & 2 deletions platform/orion/core/generic/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/orion/core/generic/vault"
"github.com/hyperledger-labs/fabric-smart-client/platform/orion/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -26,14 +27,14 @@ type Vault struct {
network Network
}

func NewVault(network Network, persistence *db.VersionedPersistence, tracerProvider trace.TracerProvider) (*Vault, error) {
func NewVault(network Network, persistence *db.VersionedPersistence, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) (*Vault, error) {
txIDStore, err := vault.NewSimpleTXIDStore(db.Unversioned(persistence))
if err != nil {
return nil, err
}

return &Vault{
Vault: vault.New(persistence, txidstore.NewNoCache[driver.ValidationCode](txIDStore), tracerProvider),
Vault: vault.New(persistence, txidstore.NewNoCache[driver.ValidationCode](txIDStore), metricsProvider, tracerProvider),
SimpleTXIDStore: txIDStore,
network: network,
}, nil
Expand Down
4 changes: 3 additions & 1 deletion platform/orion/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
odriver "github.com/hyperledger-labs/fabric-smart-client/platform/orion/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/hyperledger-labs/orion-server/pkg/types"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
Expand All @@ -30,14 +31,15 @@ func NewSimpleTXIDStore(persistence txidstore.UnversionedPersistence) (*SimpleTX
}

// New returns a new instance of Vault
func New(store vault.VersionedPersistence, txIDStore TXIDStore, tracerProvider trace.TracerProvider) *Vault {
func New(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) *Vault {
return vault.New[odriver.ValidationCode](
flogging.MustGetLogger("orion-sdk.generic.vault"),
store,
txIDStore,
&odriver.ValidationCodeProvider{},
newInterceptor,
&populator{},
metricsProvider,
tracerProvider,
)
}
Expand Down
7 changes: 5 additions & 2 deletions platform/orion/core/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/events"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/kvs"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -37,12 +38,13 @@ type ONSProvider struct {
networksMutex sync.Mutex
networks map[string]driver.OrionNetworkService
drivers []driver3.NamedDriver
metricsProvider metrics.Provider
tracerProvider trace.TracerProvider
networkConfigProvider driver.NetworkConfigProvider
listenerManagerProvider driver.ListenerManagerProvider
}

func NewOrionNetworkServiceProvider(configService driver2.ConfigService, config *Config, kvss *kvs.KVS, publisher events.Publisher, subscriber events.Subscriber, tracerProvider trace.TracerProvider, drivers []driver3.NamedDriver, networkConfigProvider driver.NetworkConfigProvider, listenerManagerProvider driver.ListenerManagerProvider) (*ONSProvider, error) {
func NewOrionNetworkServiceProvider(configService driver2.ConfigService, config *Config, kvss *kvs.KVS, publisher events.Publisher, subscriber events.Subscriber, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider, drivers []driver3.NamedDriver, networkConfigProvider driver.NetworkConfigProvider, listenerManagerProvider driver.ListenerManagerProvider) (*ONSProvider, error) {
provider := &ONSProvider{
configService: configService,
config: config,
Expand All @@ -51,6 +53,7 @@ func NewOrionNetworkServiceProvider(configService driver2.ConfigService, config
subscriber: subscriber,
networks: map[string]driver.OrionNetworkService{},
drivers: drivers,
metricsProvider: metricsProvider,
tracerProvider: tracerProvider,
networkConfigProvider: networkConfigProvider,
listenerManagerProvider: listenerManagerProvider,
Expand Down Expand Up @@ -126,5 +129,5 @@ func (p *ONSProvider) newONS(network string) (driver.OrionNetworkService, error)
return nil, err
}

return generic.NewNetwork(p.ctx, p.kvss, p.publisher, p.subscriber, p.tracerProvider, c, network, p.drivers, networkConfig, p.listenerManagerProvider.NewManager())
return generic.NewNetwork(p.ctx, p.kvss, p.publisher, p.subscriber, p.metricsProvider, p.tracerProvider, c, network, p.drivers, networkConfig, p.listenerManagerProvider.NewManager())
}
4 changes: 3 additions & 1 deletion platform/orion/sdk/dig/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/events"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/kvs"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"go.opentelemetry.io/otel/trace"
"go.uber.org/dig"
)
Expand Down Expand Up @@ -158,10 +159,11 @@ func newOrionNetworkServiceProvider(in struct {
Subscriber events.Subscriber
ConfigService driver.ConfigService
Config *core.Config
MetricsProvider metrics.Provider
TracerProvider trace.TracerProvider
Drivers []driver3.NamedDriver `group:"db-drivers"`
NetworkConfigProvider driver2.NetworkConfigProvider
ListenerManagerProvider driver2.ListenerManagerProvider
}) (*core.ONSProvider, error) {
return core.NewOrionNetworkServiceProvider(in.ConfigService, in.Config, in.KVSS, in.Publisher, in.Subscriber, in.TracerProvider, in.Drivers, in.NetworkConfigProvider, in.ListenerManagerProvider)
return core.NewOrionNetworkServiceProvider(in.ConfigService, in.Config, in.KVSS, in.Publisher, in.Subscriber, in.MetricsProvider, in.TracerProvider, in.Drivers, in.NetworkConfigProvider, in.ListenerManagerProvider)
}

0 comments on commit 7f738f5

Please sign in to comment.