From e6ac1032ff0549652e666a22394c10e511af2166 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 11 Sep 2024 09:47:50 -0700 Subject: [PATCH 1/4] Introduce Satellite sampler package --- .../satellitesamplerprocessor/config.go | 31 ++ .../satellitesamplerprocessor/config_test.go | 25 ++ .../satellitesamplerprocessor/factory.go | 46 +++ .../satellitesamplerprocessor/factory_test.go | 15 + .../satellitesamplerprocessor/go.mod | 37 ++ .../satellitesamplerprocessor/go.sum | 137 +++++++ .../internal/otelsampling/README.md | 61 ++++ .../otelsampling/cloudobstracestate.go | 129 +++++++ .../otelsampling/cloudobstracestate_test.go | 126 +++++++ .../internal/otelsampling/common.go | 125 +++++++ .../internal/otelsampling/encoding_test.go | 302 +++++++++++++++ .../internal/otelsampling/oteltracestate.go | 246 +++++++++++++ .../otelsampling/oteltracestate_test.go | 344 ++++++++++++++++++ .../internal/otelsampling/probability.go | 82 +++++ .../internal/otelsampling/probability_test.go | 292 +++++++++++++++ .../internal/otelsampling/randomness.go | 116 ++++++ .../internal/otelsampling/randomness_test.go | 51 +++ .../internal/otelsampling/threshold.go | 156 ++++++++ .../internal/otelsampling/threshold_test.go | 87 +++++ .../internal/otelsampling/w3ctracestate.go | 193 ++++++++++ .../otelsampling/w3ctracestate_test.go | 161 ++++++++ .../internal/sampler/tracesampler.go | 59 +++ .../satellitesamplerprocessor/traces.go | 124 +++++++ 23 files changed, 2945 insertions(+) create mode 100644 lightstep/processor/satellitesamplerprocessor/config.go create mode 100644 lightstep/processor/satellitesamplerprocessor/config_test.go create mode 100644 lightstep/processor/satellitesamplerprocessor/factory.go create mode 100644 lightstep/processor/satellitesamplerprocessor/factory_test.go create mode 100644 lightstep/processor/satellitesamplerprocessor/go.mod create mode 100644 lightstep/processor/satellitesamplerprocessor/go.sum create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/README.md create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/cloudobstracestate.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/cloudobstracestate_test.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/common.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/encoding_test.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/oteltracestate.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/oteltracestate_test.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/probability.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/probability_test.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/randomness.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/randomness_test.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/threshold.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/threshold_test.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/w3ctracestate.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/otelsampling/w3ctracestate_test.go create mode 100644 lightstep/processor/satellitesamplerprocessor/internal/sampler/tracesampler.go create mode 100644 lightstep/processor/satellitesamplerprocessor/traces.go diff --git a/lightstep/processor/satellitesamplerprocessor/config.go b/lightstep/processor/satellitesamplerprocessor/config.go new file mode 100644 index 0000000..1961bab --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/config.go @@ -0,0 +1,31 @@ +// Copyright ServiceNow, Inc +// SPDX-License-Identifier: Apache-2.0 + +package satellitesamplerprocessor + +import ( + "errors" + "fmt" +) + +var ( + errInvalidPercent = errors.New("sampling percent must be in (0, 100]") +) + +// Config defines configuration for Lightstep's "classic" Satellite sampler. +type Config struct { + // Percent in the range (0, 100]. Defaults to 100. + // + // Note that satellites began supporting percent-based + // sampling configuration at release 2022-04-28_17-39-22Z. + // When OneInN is set instead, use the formula `100.0 / + // float64(OneInN)`. + Percent float64 `mapstructure:"percent"` +} + +func (c *Config) Validate() error { + if c.Percent <= 0 || c.Percent > 100 { + return fmt.Errorf("%w: %v", errInvalidPercent, c.Percent) + } + return nil +} diff --git a/lightstep/processor/satellitesamplerprocessor/config_test.go b/lightstep/processor/satellitesamplerprocessor/config_test.go new file mode 100644 index 0000000..3ad573f --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/config_test.go @@ -0,0 +1,25 @@ +// Copyright ServiceNow, Inc +// SPDX-License-Identifier: Apache-2.0 + +package satellitesamplerprocessor + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func testConfig(pct float64) *Config { + return &Config{Percent: pct} +} + +func TestConfigValidate(t *testing.T) { + require.Error(t, testConfig(-1).Validate()) + require.Error(t, testConfig(0).Validate()) + require.Error(t, testConfig(101).Validate()) + + require.NoError(t, testConfig(1).Validate()) + require.NoError(t, testConfig(50).Validate()) + require.NoError(t, testConfig(99).Validate()) + require.NoError(t, testConfig(100).Validate()) +} diff --git a/lightstep/processor/satellitesamplerprocessor/factory.go b/lightstep/processor/satellitesamplerprocessor/factory.go new file mode 100644 index 0000000..e53ab8a --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/factory.go @@ -0,0 +1,46 @@ +// Copyright ServiceNow, Inc +// SPDX-License-Identifier: Apache-2.0 + +package satellitesamplerprocessor + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" +) + +const ( + typeStr = "satellitesampler" + stability = component.StabilityLevelStable +) + +func NewFactory() processor.Factory { + return processor.NewFactory( + component.MustNewType(typeStr), + createDefaultConfig, + processor.WithTraces(createTracesProcessorHelper, stability), + ) +} + +func createTracesProcessorHelper(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) { + tp, err := createTracesProcessor(ctx, set, cfg, nextConsumer) + if err != nil { + return nil, err + } + return processorhelper.NewTracesProcessor( + ctx, + set, + cfg, + nextConsumer, + tp.processTraces, + processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true})) +} + +func createDefaultConfig() component.Config { + return &Config{ + Percent: 100, + } +} diff --git a/lightstep/processor/satellitesamplerprocessor/factory_test.go b/lightstep/processor/satellitesamplerprocessor/factory_test.go new file mode 100644 index 0000000..030f0ca --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/factory_test.go @@ -0,0 +1,15 @@ +// Copyright ServiceNow, Inc +// SPDX-License-Identifier: Apache-2.0 + +package satellitesamplerprocessor + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" +) + +func TestDefaultConfig(t *testing.T) { + require.Equal(t, component.Config(testConfig(100)), createDefaultConfig()) +} diff --git a/lightstep/processor/satellitesamplerprocessor/go.mod b/lightstep/processor/satellitesamplerprocessor/go.mod new file mode 100644 index 0000000..a29de88 --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/go.mod @@ -0,0 +1,37 @@ +module github.com/lightstep/otel-collector-charts/lightstep/processor/satellitesamplerprocessor + +go 1.22.6 + +require ( + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/component v0.109.0 + go.opentelemetry.io/collector/consumer v0.109.0 + go.opentelemetry.io/collector/consumer/consumertest v0.109.0 + go.opentelemetry.io/collector/pdata v1.15.0 + go.opentelemetry.io/collector/processor v0.109.0 + go.uber.org/multierr v1.11.0 + go.uber.org/zap v1.27.0 + google.golang.org/protobuf v1.34.2 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/collector v0.109.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.109.0 // indirect + go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.109.0 // indirect + go.opentelemetry.io/otel v1.29.0 // indirect + go.opentelemetry.io/otel/metric v1.29.0 // indirect + go.opentelemetry.io/otel/trace v1.29.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect + google.golang.org/grpc v1.66.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/lightstep/processor/satellitesamplerprocessor/go.sum b/lightstep/processor/satellitesamplerprocessor/go.sum new file mode 100644 index 0000000..b237bbc --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/go.sum @@ -0,0 +1,137 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.20.2 h1:5ctymQzZlyOON1666svgwn3s6IKWgfbjsejTMiXIyjg= +github.com/prometheus/client_golang v1.20.2/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.57.0 h1:Ro/rKjwdq9mZn1K5QPctzh+MA4Lp0BuYk5ZZEVhoNcY= +github.com/prometheus/common v0.57.0/go.mod h1:7uRPFSUTbfZWsJ7MHY56sqt7hLQu3bxXHDnNhl8E9qI= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector v0.109.0 h1:ULnMWuwcy4ix1oP5RFFRcmpEbaU5YabW6nWcLMQQRo0= +go.opentelemetry.io/collector v0.109.0/go.mod h1:gheyquSOc5E9Y+xsPmpA+PBrpPc+msVsIalY76/ZvnQ= +go.opentelemetry.io/collector/component v0.109.0 h1:AU6eubP1htO8Fvm86uWn66Kw0DMSFhgcRM2cZZTYfII= +go.opentelemetry.io/collector/component v0.109.0/go.mod h1:jRVFY86GY6JZ61SXvUN69n7CZoTjDTqWyNC+wJJvzOw= +go.opentelemetry.io/collector/component/componentstatus v0.109.0 h1:LiyJOvkv1lVUqBECvolifM2lsXFEgVXHcIw0MWRf/1I= +go.opentelemetry.io/collector/component/componentstatus v0.109.0/go.mod h1:TBx2Leggcw1c1tM+Gt/rDYbqN9Unr3fMxHh2TbxLizI= +go.opentelemetry.io/collector/config/configtelemetry v0.109.0 h1:ItbYw3tgFMU+TqGcDVEOqJLKbbOpfQg3AHD8b22ygl8= +go.opentelemetry.io/collector/config/configtelemetry v0.109.0/go.mod h1:R0MBUxjSMVMIhljuDHWIygzzJWQyZHXXWIgQNxcFwhc= +go.opentelemetry.io/collector/consumer v0.109.0 h1:fdXlJi5Rat/poHPiznM2mLiXjcv1gPy3fyqqeirri58= +go.opentelemetry.io/collector/consumer v0.109.0/go.mod h1:E7PZHnVe1DY9hYy37toNxr9/hnsO7+LmnsixW8akLQI= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0 h1:+WZ6MEWQRC6so3IRrW916XK58rI9NnrFHKW/P19jQvc= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0/go.mod h1:spZ9Dn1MRMPDHHThdXZA5TrFhdOL1wsl0Dw45EBVoVo= +go.opentelemetry.io/collector/consumer/consumertest v0.109.0 h1:v4w9G2MXGJ/eabCmX1DvQYmxzdysC8UqIxa/BWz7ACo= +go.opentelemetry.io/collector/consumer/consumertest v0.109.0/go.mod h1:lECt0qOrx118wLJbGijtqNz855XfvJv0xx9GSoJ8qSE= +go.opentelemetry.io/collector/pdata v1.15.0 h1:q/T1sFpRKJnjDrUsHdJ6mq4uSqViR/f92yvGwDby/gY= +go.opentelemetry.io/collector/pdata v1.15.0/go.mod h1:2wcsTIiLAJSbqBq/XUUYbi+cP+N87d0jEJzmb9nT19U= +go.opentelemetry.io/collector/pdata/pprofile v0.109.0 h1:5lobQKeHk8p4WC7KYbzL6ZqqX3eSizsdmp5vM8pQFBs= +go.opentelemetry.io/collector/pdata/pprofile v0.109.0/go.mod h1:lXIifCdtR5ewO17JAYTUsclMqRp6h6dCowoXHhGyw8Y= +go.opentelemetry.io/collector/pdata/testdata v0.109.0 h1:gvIqy6juvqFET/6zi+zUOH1KZY/vtEDZW55u7gJ/hEo= +go.opentelemetry.io/collector/pdata/testdata v0.109.0/go.mod h1:zRttU/F5QMQ6ZXBMXCoSVG3EORTZLTK+UUS0VoMoT44= +go.opentelemetry.io/collector/processor v0.109.0 h1:Pgo9hib4ae1FSA47RB7TUUS26nConIlXcltzbxrjFg8= +go.opentelemetry.io/collector/processor v0.109.0/go.mod h1:Td43GwGMRCXin5JM/zAzMtLieobHTVVrD4Y7jSvsMtg= +go.opentelemetry.io/collector/processor/processorprofiles v0.109.0 h1:+w0vqF30eOskfpcIuZLAJb1dCWcayBlGWoQCOUWKzf4= +go.opentelemetry.io/collector/processor/processorprofiles v0.109.0/go.mod h1:k7pJ76mOeU1Fx1hoVEJExMK9mhMre8xdSS3+cOKvdM4= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel/exporters/prometheus v0.51.0 h1:G7uexXb/K3T+T9fNLCCKncweEtNEBMTO+46hKX5EdKw= +go.opentelemetry.io/otel/exporters/prometheus v0.51.0/go.mod h1:v0mFe5Kk7woIh938mrZBJBmENYquyA0IICrlYm4Y0t4= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= +go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= +go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= +go.opentelemetry.io/otel/sdk/metric v1.29.0 h1:K2CfmJohnRgvZ9UAj2/FhIf/okdWcNdBwe1m8xFXiSY= +go.opentelemetry.io/otel/sdk/metric v1.29.0/go.mod h1:6zZLdCl2fkauYoZIOn/soQIDSWFmNSRcICarHfuhNJQ= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= +google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/README.md b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/README.md new file mode 100644 index 0000000..d492716 --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/README.md @@ -0,0 +1,61 @@ +The code in this directory is is a copy of the OpenTelemetry package +in collector-contrib/pkg/sampling. The copy here has has ServiceNow +copyright because it was originally authored here. + +Code organization: + +# Tracestate handling + +- w3ctracestate.go: the outer tracestate structure like `key=value,...` +- oteltracestate.go: the inner tracestate structure like `key:value;...` +- cloudobstracestate.go: the inner tracestate structure like `key:value;...` (internal only) +- common.go: shared parser, serializer for either tracestate + +This includes an implementation of the W3C trace randomness feature, +described here: https://www.w3.org/TR/trace-context-2/#randomness-of-trace-id + +# Overview of tracestate identifiers + +There are two vendor codes: + +- "ot" refers to formal OTel specifications +- "sn" for ServiceNow refers to internal Cloud Observability sampling (as by the Lightstep satellite) + +The OTel trace state keys: + +- "p" refers to the [legacy OTel power-of-two sampling](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/tracestate-probability-sampling.md) +- "r" used in the legacy convention, 1-2 decimal digits +- "th" refers to the [modern OTel 56-bit sampling](https://github.com/open-telemetry/oteps/pull/235) +- "rv" refers to the modern randomness value, 14 hex digits. + +The Cloud Observability trace state keys: + +- "s" refers to the satellite sampling, uses the same encoding as "th" but is modeled as an acceptance threshold. + +Note that to convert from an OTel rejection threshold to a Satellite sampler acceptance threshold, the unsigned value of the threshold should be subtracted from the maximum adjusted count, + +``` +satelliteSamplerThreshold, _ = UnsignedToThreshold(MaxAdjustedCount - otelModernThreshold.Unsigned()) +``` + +# Encoding and decoding + +- probability.go: defines + `ProbabilityToThreshold()` + `(Threshold).Probability()` +- threshold.go: defines + `TValueToThreshold()` + `(Threshold).TValue()` + `(Threshold).ShouldSample()` +- randomness.go: defines + `TraceIDToRandomness()` + `RValueToRandomness()` + `(Randomness).RValue()` + +# Callers of note + +- In common-go/wire/oteltoegresspb/otel_to_egresspb.go: + `TraceStateToAdjustedCount()` + +- In internalcollector/components/satellitesamplerprocessor/traces.go: + `createTracesProcessor()` diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/cloudobstracestate.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/cloudobstracestate.go new file mode 100644 index 0000000..31283f9 --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/cloudobstracestate.go @@ -0,0 +1,129 @@ +// Copyright ServiceNow, Inc +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "io" + "strconv" +) + +// This file borrows everything it can from oteltracestate.go. The +// s-value here uses a different representation from the OTel T-value, +// because it was adopted before the encoding finalized in OTEP 235. +// +// The Satellite's S-value is an acceptance threshold, vs the OTel +// T-value which is a rejection thresold. There is a 1:1 mapping. + +type CloudObsTraceState struct { + commonTraceState + + // s-value and threshold are represented the same as t-value + threshold Threshold // t value parsed, as a threshold + svalue string // 1-14 ASCII hex digits +} + +const ( + // RName is the CloudObs tracestate field for S-value, which + // uses the same encoding as t-value. + SName = "s" +) + +func NewCloudObsTraceState(input string) (cots CloudObsTraceState, _ error) { + if len(input) > hardMaxOTelLength { + return cots, ErrTraceStateSize + } + + if !otelTracestateRe.MatchString(input) { + return CloudObsTraceState{}, strconv.ErrSyntax + } + + err := otelSyntax.scanKeyValues(input, func(key, value string) error { + switch key { + case SName: + // S-value is represented as (MaxAdjustedCount - TValue) + // because it is an acceptance threshold vs a rejection + // threshold. + tv, err := TValueToThreshold(value) + if err != nil { + cots.threshold = AlwaysSampleThreshold + return err + } + // We do not expect tv.unsigned == 0, which is zero + // adjusted count, from a satellite sampler. + if tv.unsigned == 0 { + cots.threshold = NeverSampleThreshold + } else { + cots.svalue = value + cots.threshold, _ = UnsignedToThreshold(MaxAdjustedCount - tv.Unsigned()) + } + default: + cots.kvs = append(cots.kvs, KV{ + Key: key, + Value: value, + }) + } + return nil + }) + + return cots, err +} + +func (cots *CloudObsTraceState) SValue() string { + return cots.svalue +} + +func (cots *CloudObsTraceState) SValueThreshold() (Threshold, bool) { + return cots.threshold, cots.svalue != "" +} + +func (cots *CloudObsTraceState) UpdateSValueWithSampling(sampledThreshold Threshold) error { + if len(cots.SValue()) != 0 && ThresholdGreater(cots.threshold, sampledThreshold) { + return ErrInconsistentSampling + } + cots.threshold = sampledThreshold + + inv, _ := UnsignedToThreshold(MaxAdjustedCount - sampledThreshold.Unsigned()) + cots.svalue = inv.TValue() + return nil +} + +func (cots *CloudObsTraceState) AdjustedCount() float64 { + if len(cots.svalue) == 0 { + return 0 + } + return cots.threshold.AdjustedCount() +} + +func (cots *CloudObsTraceState) ClearSValue() { + cots.svalue = "" + cots.threshold = AlwaysSampleThreshold +} + +func (cots *CloudObsTraceState) HasAnyValue() bool { + return len(cots.SValue()) != 0 || len(cots.ExtraValues()) != 0 +} + +func (cots *CloudObsTraceState) Serialize(w io.StringWriter) error { + ser := serializer{writer: w} + cnt := 0 + sep := func() { + if cnt != 0 { + ser.write(";") + } + cnt++ + } + if len(cots.SValue()) != 0 { + sep() + ser.write(SName) + ser.write(":") + ser.write(cots.SValue()) + } + for _, kv := range cots.ExtraValues() { + sep() + ser.write(kv.Key) + ser.write(":") + ser.write(kv.Value) + } + return ser.err +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/cloudobstracestate_test.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/cloudobstracestate_test.go new file mode 100644 index 0000000..794648e --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/cloudobstracestate_test.go @@ -0,0 +1,126 @@ +// Copyright ServiceNow, Inc +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "errors" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseCloudObsTraceState(t *testing.T) { + type testCase struct { + in string + sval string + expectErr error + } + const ns = "" + for _, test := range []testCase{ + // s-value correct cases + {"s:2", "2", nil}, + {"s:123", "123", nil}, + + // syntax errors + {"", ns, strconv.ErrSyntax}, + {"t=1,", ns, strconv.ErrSyntax}, + {"s:-1", ns, strconv.ErrSyntax}, + } { + t.Run(testName(test.in), func(t *testing.T) { + otts, err := NewCloudObsTraceState(test.in) + + if test.expectErr != nil { + require.True(t, errors.Is(err, test.expectErr), "%q: not expecting %v wanted %v", test.in, err, test.expectErr) + return + } + + require.NoError(t, err) + if test.sval != ns { + require.NotEqual(t, "", otts.SValue()) + require.Equal(t, test.sval, otts.SValue()) + } else { + require.Equal(t, "", otts.SValue(), "should have no s-value: %s", otts.SValue()) + } + // on success Serialize() should not modify + // test by re-parsing + var w strings.Builder + otts.Serialize(&w) + cpy, err := NewCloudObsTraceState(w.String()) + require.NoError(t, err) + require.Equal(t, otts, cpy) + }) + } +} + +func TestUpdateSValueWithSampling(t *testing.T) { + type testCase struct { + // The input otel tracestate; no error conditions tested + in string + + // The incoming adjusted count; defined whether + // s-value is present or not. + adjCountIn float64 + + // the update probability; threshold and tvalue are + // derived from this + prob float64 + + // when update error is expected + updateErr error + + // output s-value + out string + + // output adjusted count + adjCountOut float64 + } + for _, test := range []testCase{ + // 8/16 in, 2/16 out + {"s:8", 2, 0x2p-4, nil, "s:2", 8}, + + // 1/16 in, 50% update (error) + {"s:1", 16, 0x8p-4, ErrInconsistentSampling, "s:1", 16}, + + // no sampling in, 1/16 update + {"", 0, 0x1p-4, nil, "s:1", 16}, + + // none in, 100% update + {"", 0, 1, nil, "", 0}, + + // 1/2 in, 100% update (error) + {"s:8", 2, 1, ErrInconsistentSampling, "s:8", 2}, + + // a/16 in, 5/16 out + {"s:a", 16.0 / 10, 0x5 / 16.0, nil, "s:5", 16.0 / 5}, + } { + t.Run(test.in+"/"+test.out, func(t *testing.T) { + cots := CloudObsTraceState{} + if test.in != "" { + var err error + cots, err = NewCloudObsTraceState(test.in) + require.NoError(t, err) + } + + require.Equal(t, test.adjCountIn, cots.AdjustedCount()) + + newTh, err := ProbabilityToThreshold(test.prob) + require.NoError(t, err) + + upErr := cots.UpdateSValueWithSampling(newTh) + + if test.updateErr != nil { + require.Equal(t, test.updateErr, upErr) + } + + var outData strings.Builder + err = cots.Serialize(&outData) + require.NoError(t, err) + require.Equal(t, test.out, outData.String()) + + require.Equal(t, test.adjCountOut, cots.AdjustedCount()) + }) + } +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/common.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/common.go new file mode 100644 index 0000000..0593adb --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/common.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "errors" + "io" + "strings" + + "go.uber.org/multierr" +) + +// KV represents a key-value parsed from a section of the TraceState. +type KV struct { + Key string + Value string +} + +var ( + // ErrTraceStateSize is returned when a TraceState is over its + // size limit, as specified by W3C. + ErrTraceStateSize = errors.New("invalid tracestate size") +) + +// keyValueScanner defines distinct scanner behaviors for lists of +// key-values. +type keyValueScanner struct { + // maxItems is 32 or -1 + maxItems int + // trim is set if OWS (optional whitespace) should be removed + trim bool + // separator is , or ; + separator byte + // equality is = or : + equality byte +} + +// commonTraceState is embedded in both W3C and OTel trace states. +type commonTraceState struct { + kvs []KV +} + +// ExtraValues returns additional values are carried in this +// tracestate object (W3C or OpenTelemetry). +func (cts commonTraceState) ExtraValues() []KV { + return cts.kvs +} + +// trimOws removes optional whitespace on both ends of a string. +// this uses the strict definition for optional whitespace tiven +// in https://www.w3.org/TR/trace-context/#tracestate-header-field-values +func trimOws(input string) string { + return strings.Trim(input, " \t") +} + +// scanKeyValues is common code to scan either W3C or OTel tracestate +// entries, as parameterized in the keyValueScanner struct. +func (s keyValueScanner) scanKeyValues(input string, f func(key, value string) error) error { + var rval error + items := 0 + for input != "" { + items++ + if s.maxItems > 0 && items >= s.maxItems { + // W3C specifies max 32 entries, tested here + // instead of via the regexp. + return ErrTraceStateSize + } + + sep := strings.IndexByte(input, s.separator) + + var member string + if sep < 0 { + member = input + input = "" + } else { + member = input[:sep] + input = input[sep+1:] + } + + if s.trim { + // Trim only required for W3C; OTel does not + // specify whitespace for its value encoding. + member = trimOws(member) + } + + if member == "" { + // W3C allows empty list members. + continue + } + + eq := strings.IndexByte(member, s.equality) + if eq < 0 { + // We expect to find the `s.equality` + // character in this string because we have + // already validated the whole input syntax + // before calling this parser. I.e., this can + // never happen, and if it did, the result + // would be to skip malformed entries. + continue + } + if err := f(member[:eq], member[eq+1:]); err != nil { + rval = multierr.Append(rval, err) + } + } + return rval +} + +// serializer assists with checking and combining errors from +// (io.StringWriter).WriteString(). +type serializer struct { + writer io.StringWriter + err error +} + +// write handles errors from io.StringWriter. +func (ser *serializer) write(str string) { + _, err := ser.writer.WriteString(str) + ser.check(err) +} + +// check handles errors (e.g., from another serializer). +func (ser *serializer) check(err error) { + ser.err = multierr.Append(ser.err, err) +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/encoding_test.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/encoding_test.go new file mode 100644 index 0000000..6c4b35c --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/encoding_test.go @@ -0,0 +1,302 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "encoding/binary" + "errors" + "fmt" + "math/rand" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// must panics when the error is non-nil and returns the +// generic argument. this does not call require.NoError() in order to +// use a one-line test calling convention, meaning `must(functionCall())` +// ensures there is no error and returns the first argument. +// +// To do the same with NoError() means rewriting the expression with +// a two-line statement: +// +// value, err := functionCall() +// require.NoError(t, value) +func must[T any](t T, err error) T { + if err != nil { + panic(err) + } + return t +} + +// mustNot panics when the error is nil. this does not call +// require.Error() in order to use a one-line test calling +// convention, meaning `mustNot(functionCall())` ensures there is an +// error without requiring a separate variable assignment. +// +// To do the same with Error() means rewriting the expression with +// a two-line statement: +// +// _, err := functionCall() +// require.Error(t, value) +func mustNot[T any](_ T, err error) error { + if err == nil { + return fmt.Errorf("expected an error, got nil") + } + return err +} + +func probabilityToTValue(prob float64) (string, error) { + th, err := ProbabilityToThreshold(prob) + return th.TValue(), err +} + +func tValueToProbability(tv string) (float64, error) { + th, err := TValueToThreshold(tv) + return th.Probability(), err +} + +func TestValidProbabilityToTValue(t *testing.T) { + require.Equal(t, "0", must(probabilityToTValue(1.0))) + require.Equal(t, "8", must(probabilityToTValue(0.5))) + require.Equal(t, "ffffffffffffff", must(probabilityToTValue(0x1p-56))) + require.Equal(t, "aaaaaaaaaaaaac", must(probabilityToTValue(1/3.))) + require.Equal(t, "55555555555558", must(probabilityToTValue(2/3.))) + require.Equal(t, "54", must(probabilityToTValue(1-0x54p-8))) // 0x54p-8 is approximately 1/3 + require.Equal(t, "01", must(probabilityToTValue(1-0x1p-8))) +} + +func TestThresholdGreater(t *testing.T) { + require.True(t, ThresholdGreater( + must(TValueToThreshold("5")), + must(TValueToThreshold("4")), + )) + + require.True(t, ThresholdGreater( + must(TValueToThreshold("4")), + must(TValueToThreshold("04")), + )) + + require.False(t, ThresholdGreater( + must(TValueToThreshold("234")), + must(TValueToThreshold("4")), + )) + + require.True(t, ThresholdGreater( + must(TValueToThreshold("4")), + must(TValueToThreshold("234")), + )) +} + +func TestInvalidprobabilityToTValue(t *testing.T) { + // Too small + require.Error(t, mustNot(probabilityToTValue(0x1p-57))) + require.Error(t, mustNot(probabilityToTValue(0x1p-57))) + + // Too big + require.Error(t, mustNot(probabilityToTValue(1.1))) + require.Error(t, mustNot(probabilityToTValue(1.1))) +} + +func TestTValueToProbability(t *testing.T) { + require.Equal(t, 0.5, must(tValueToProbability("8"))) + require.Equal(t, 1-0x444p-12, must(tValueToProbability("444"))) + require.Equal(t, 1.0, must(tValueToProbability("0"))) + + // 0x55555554p-32 is very close to 1/3 + require.InEpsilon(t, 1-1/3., must(tValueToProbability("55555554")), 1e-9) +} + +func TestProbabilityToThreshold(t *testing.T) { + require.Equal(t, + must(TValueToThreshold("8")), + must(ProbabilityToThreshold(0.5))) + require.Equal(t, + must(TValueToThreshold("ffffffffffffff")), + must(ProbabilityToThreshold(0x1p-56))) + require.Equal(t, + must(TValueToThreshold("ffffffffffff00")), + must(ProbabilityToThreshold(0x100p-56))) + require.Equal(t, + must(TValueToThreshold("00000000000010")), + must(ProbabilityToThreshold(1.0-0x1p-52))) + require.Equal(t, + AlwaysSampleThreshold, + must(ProbabilityToThreshold(1.0))) + + zt, err := ProbabilityToThreshold(0) + require.Equal(t, zt, AlwaysSampleThreshold) + require.Error(t, err) + require.Equal(t, err, ErrProbabilityRange) +} + +func TestShouldSample(t *testing.T) { + // Test four boundary conditions for 50% sampling, + thresh := must(ProbabilityToThreshold(0.5)) + // Smallest TraceID that should NOT sample. + require.False(t, thresh.ShouldSample(TraceIDToRandomness(pcommon.TraceID{ + // 9 meaningless bytes + 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, + 0, // randomness starts here + 0, 0, 0, 0, 0, 0, + }))) + // Largest TraceID that should NOT sample. + require.False(t, thresh.ShouldSample(TraceIDToRandomness(pcommon.TraceID{ + // 9 meaningless bytes + 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, + 0x7f, // randomness starts here + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }))) + // Smallest TraceID that should sample. + require.True(t, thresh.ShouldSample(TraceIDToRandomness(pcommon.TraceID{ + // 9 meaningless bytes + 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, + 0x80, // randomness starts here + 0, 0, 0, 0, 0, 0, + }))) + // Largest TraceID that should sample. + require.True(t, thresh.ShouldSample(TraceIDToRandomness(pcommon.TraceID{ + // 9 meaningless bytes + 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, + 0xff, // randomness starts here + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }))) +} + +func TestRValueSyntax(t *testing.T) { + type testCase struct { + in string + expectErr error + } + for _, test := range []testCase{ + // correct cases + {"12341234123412", nil}, + + // wrong size + {"123412341234120", ErrRValueSize}, + {"1234123412341", ErrRValueSize}, + {"", ErrRValueSize}, + + // bad syntax + {"abcdefgabcdefg", strconv.ErrSyntax}, + } { + t.Run(testName(test.in), func(t *testing.T) { + rnd, err := RValueToRandomness(test.in) + + if test.expectErr != nil { + require.True(t, errors.Is(err, test.expectErr), + "%q: not expecting %v wanted %v", test.in, err, test.expectErr, + ) + require.Equal(t, must(RValueToRandomness("00000000000000")), rnd) + } else { + require.NoError(t, err, "%q", test.in) + + val, err := strconv.ParseUint(test.in, 16, 64) + require.NoError(t, err) + + require.Equal(t, TraceIDToRandomness( + // This explicitly constructs a TraceID from 9 random + // bytes plus the 7 lowest bytes of the input value. + pcommon.TraceID{ + byte(rand.Intn(256)), // 0 + byte(rand.Intn(256)), // 1 + byte(rand.Intn(256)), // 2 + byte(rand.Intn(256)), // 3 + byte(rand.Intn(256)), // 4 + byte(rand.Intn(256)), // 5 + byte(rand.Intn(256)), // 6 + byte(rand.Intn(256)), // 7 + byte(rand.Intn(256)), // 8 + byte(val >> 48 & 0xff), // 9 + byte(val >> 40 & 0xff), // 10 + byte(val >> 32 & 0xff), // 11 + byte(val >> 24 & 0xff), // 12 + byte(val >> 16 & 0xff), // 13 + byte(val >> 8 & 0xff), // 14 + byte(val >> 0 & 0xff), // 15 + }, + ), rnd) + } + }) + } +} + +func TestTValueSyntax(t *testing.T) { + type testCase struct { + in string + expectErr error + } + for _, test := range []testCase{ + // correct cases + {"1", nil}, + + // syntax error + {"", ErrTValueEmpty}, + {"g", strconv.ErrSyntax}, + } { + t.Run(testName(test.in), func(t *testing.T) { + _, err := TValueToThreshold(test.in) + + if test.expectErr != nil { + require.True(t, errors.Is(err, test.expectErr), + "%q: not expecting %v wanted %v", test.in, err, test.expectErr, + ) + } else { + require.NoError(t, err, "%q", test.in) + } + }) + } +} + +// There were two benchmarks used to choose the implementation for the +// Threshold type in this package. The results indicate that it is +// faster to compare a 56-bit number than to compare as 7 element +// []byte. + +type benchTIDs [1024]pcommon.TraceID + +func (tids *benchTIDs) init() { + for i := range tids { + binary.BigEndian.PutUint64(tids[i][:8], rand.Uint64()) + binary.BigEndian.PutUint64(tids[i][8:], rand.Uint64()) + } +} + +// The current implementation, using unsigned: +// +// BenchmarkThresholdCompareAsUint64-10 1000000000 0.4515 ns/op 0 B/op 0 allocs/op +// +// vs the tested and rejected, using bytes: +// +// BenchmarkThresholdCompareAsBytes-10 528679580 2.288 ns/op 0 B/op 0 allocs/op +func BenchmarkThresholdCompareAsUint64(b *testing.B) { + var tids benchTIDs + var comps [1024]Threshold + tids.init() + for i := range comps { + var err error + comps[i], err = ProbabilityToThreshold(rand.Float64()) + if err != nil { + b.Fatal(err) + } + } + + b.ReportAllocs() + b.ResetTimer() + yes := 0 + no := 0 + for i := 0; i < b.N; i++ { + idx := i % len(tids) + tid := tids[idx] + comp := comps[idx] + + if comp.ShouldSample(TraceIDToRandomness(tid)) { + yes++ + } else { + no++ + } + } +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/oteltracestate.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/oteltracestate.go new file mode 100644 index 0000000..36bde80 --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/oteltracestate.go @@ -0,0 +1,246 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "errors" + "io" + "regexp" + "strconv" +) + +// OpenTelemetryTraceState represents the `ot` section of the W3C tracestate +// which is specified generically in https://opentelemetry.io/docs/specs/otel/trace/tracestate-handling/. +// +// OpenTelemetry defines two specific values that convey sampling +// probability, known as T-Value (with "th", for threshold), R-Value +// (with key "rv", for random value), and extra values. +type OpenTelemetryTraceState struct { + commonTraceState + + // sampling r and t-values + rnd Randomness // r value parsed, as unsigned + rvalue string // 14 ASCII hex digits + threshold Threshold // t value parsed, as a threshold + tvalue string // 1-14 ASCII hex digits +} + +const ( + // rValueFieldName is the OTel tracestate field for R-value + rValueFieldName = "rv" + // tValueFieldName is the OTel tracestate field for T-value + tValueFieldName = "th" + + // hardMaxOTelLength is the maximum encoded size of an OTel + // tracestate value. + hardMaxOTelLength = 256 + + // chr = ucalpha / lcalpha / DIGIT / "." / "_" / "-" + // ucalpha = %x41-5A ; A-Z + // lcalpha = %x61-7A ; a-z + // key = lcalpha *(lcalpha / DIGIT ) + // value = *(chr) + // list-member = key ":" value + // list = list-member *( ";" list-member ) + otelKeyRegexp = lcAlphaRegexp + lcAlphanumRegexp + `*` + otelValueRegexp = `[a-zA-Z0-9._\-]*` + otelMemberRegexp = `(?:` + otelKeyRegexp + `:` + otelValueRegexp + `)` + otelSemicolonMemberRegexp = `(?:` + `;` + otelMemberRegexp + `)` + otelTracestateRegexp = `^` + otelMemberRegexp + otelSemicolonMemberRegexp + `*$` +) + +var ( + otelTracestateRe = regexp.MustCompile(otelTracestateRegexp) + + otelSyntax = keyValueScanner{ + maxItems: -1, + trim: false, + separator: ';', + equality: ':', + } + + // ErrInconsistentSampling is returned when a sampler update + // is illogical, indicating that the tracestate was not + // modified. Preferably, Samplers will avoid seeing this + // error by using a ThresholdGreater() test, which allows them + // to report a more clear error to the user. For example, if + // data arrives sampled at 1/100 and an equalizing sampler is + // configured for 1/2 sampling, the Sampler may detect the + // illogical condition itself using ThresholdGreater and skip + // the call to UpdateTValueWithSampling, which will have no + // effect and return this error. How a sampler decides to + // handle this condition is up to the sampler: for example the + // equalizing sampler can decide to pass through a span + // indicating 1/100 sampling or it can reject the span. + ErrInconsistentSampling = errors.New("cannot raise existing sampling probability") +) + +// NewOpenTelemetryTraceState returns a parsed representation of the +// OpenTelemetry tracestate section. Errors indicate an invalid +// tracestate was received. +func NewOpenTelemetryTraceState(input string) (OpenTelemetryTraceState, error) { + otts := OpenTelemetryTraceState{} + + if len(input) > hardMaxOTelLength { + return otts, ErrTraceStateSize + } + + if !otelTracestateRe.MatchString(input) { + return otts, strconv.ErrSyntax + } + + err := otelSyntax.scanKeyValues(input, func(key, value string) error { + var err error + switch key { + case rValueFieldName: + if otts.rnd, err = RValueToRandomness(value); err == nil { + otts.rvalue = value + } else { + // RValueRandomness() will return false, the error + // accumulates and is returned below. + otts.rvalue = "" + otts.rnd = Randomness{} + } + case tValueFieldName: + if otts.threshold, err = TValueToThreshold(value); err == nil { + otts.tvalue = value + } else { + // TValueThreshold() will return false, the error + // accumulates and is returned below. + otts.tvalue = "" + otts.threshold = AlwaysSampleThreshold + } + default: + otts.kvs = append(otts.kvs, KV{ + Key: key, + Value: value, + }) + } + return err + }) + + return otts, err +} + +// RValue returns the R-value (key: "rv") as a string or empty if +// there is no R-value set. +func (otts *OpenTelemetryTraceState) RValue() string { + return otts.rvalue +} + +// RValueRandomness returns the randomness object corresponding with +// RValue() and a boolean indicating whether the R-value is set. +func (otts *OpenTelemetryTraceState) RValueRandomness() (Randomness, bool) { + return otts.rnd, len(otts.rvalue) != 0 +} + +// TValue returns the T-value (key: "th") as a string or empty if +// there is no T-value set. +func (otts *OpenTelemetryTraceState) TValue() string { + return otts.tvalue +} + +// TValueThreshold returns the threshold object corresponding with +// TValue() and a boolean (equal to len(TValue()) != 0 indicating +// whether the T-value is valid. +func (otts *OpenTelemetryTraceState) TValueThreshold() (Threshold, bool) { + return otts.threshold, len(otts.tvalue) != 0 +} + +// UpdateTValueWithSampling modifies the TValue of this object, which +// changes its adjusted count. It is not logical to modify a sampling +// probability in the direction of larger probability. This prevents +// accidental loss of adjusted count. +// +// If the change of TValue leads to inconsistency, an error is returned. +func (otts *OpenTelemetryTraceState) UpdateTValueWithSampling(sampledThreshold Threshold) error { + // Note: there was once a code path here that optimized for + // cases where a static threshold is used, in which case the + // call to TValue() causes an unnecessary allocation per data + // item (w/ a constant result). We have eliminated that + // parameter, due to the significant potential for mis-use. + // Therefore, this method always recomputes TValue() of the + // sampledThreshold (on success). A future method such as + // UpdateTValueWithSamplingFixedTValue() could extend this + // API to address this allocation, although it is probably + // not significant. + if len(otts.TValue()) != 0 && ThresholdGreater(otts.threshold, sampledThreshold) { + return ErrInconsistentSampling + } + // Note NeverSampleThreshold is the (exclusive) upper boundary + // of valid thresholds, so the test above permits never- + // sampled updates, in which case the TValue() here is empty. + otts.threshold = sampledThreshold + otts.tvalue = sampledThreshold.TValue() + return nil +} + +// AdjustedCount returns the adjusted count for this item. If the +// TValue string is empty, this returns 0, otherwise returns +// Threshold.AdjustedCount(). +func (otts *OpenTelemetryTraceState) AdjustedCount() float64 { + if len(otts.tvalue) == 0 { + // Note: this case covers the zero state, where + // len(tvalue) == 0 and threshold == AlwaysSampleThreshold. + // We return 0 to indicate that no information is available. + return 0 + } + return otts.threshold.AdjustedCount() +} + +// ClearTValue is used to unset TValue, for use in cases where it is +// inconsistent on arrival. +func (otts *OpenTelemetryTraceState) ClearTValue() { + otts.tvalue = "" + otts.threshold = Threshold{} +} + +// SetRValue establishes explicit randomness for this TraceState. +func (otts *OpenTelemetryTraceState) SetRValue(randomness Randomness) { + otts.rnd = randomness + otts.rvalue = randomness.RValue() +} + +// ClearRValue unsets explicit randomness. +func (otts *OpenTelemetryTraceState) ClearRValue() { + otts.rvalue = "" + otts.rnd = Randomness{} +} + +// HasAnyValue returns true if there are any fields in this +// tracestate, including any extra values. +func (otts *OpenTelemetryTraceState) HasAnyValue() bool { + return len(otts.RValue()) != 0 || len(otts.TValue()) != 0 || len(otts.ExtraValues()) != 0 +} + +// Serialize encodes this TraceState object. +func (otts *OpenTelemetryTraceState) Serialize(w io.StringWriter) error { + ser := serializer{writer: w} + cnt := 0 + sep := func() { + if cnt != 0 { + ser.write(";") + } + cnt++ + } + if len(otts.RValue()) != 0 { + sep() + ser.write(rValueFieldName) + ser.write(":") + ser.write(otts.RValue()) + } + if len(otts.TValue()) != 0 { + sep() + ser.write(tValueFieldName) + ser.write(":") + ser.write(otts.TValue()) + } + for _, kv := range otts.ExtraValues() { + sep() + ser.write(kv.Key) + ser.write(":") + ser.write(kv.Value) + } + return ser.err +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/oteltracestate_test.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/oteltracestate_test.go new file mode 100644 index 0000000..7b34d9e --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/oteltracestate_test.go @@ -0,0 +1,344 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "errors" + "fmt" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// ExampleOpenTelemetryTraceState_AdjustedCount shows how to access +// the adjusted count for a sampled context when it has non-zero +// probability. +func ExampleOpenTelemetryTraceState_AdjustedCount() { + w3c, err := NewW3CTraceState("ot=th:c") + if err != nil { + panic(err) + } + ot := w3c.OTelValue() + + fmt.Printf("Adjusted count for T-value %q: %f", ot.TValue(), ot.AdjustedCount()) + + // Output: + // Adjusted count for T-value "c": 4.000000 +} + +func testName(in string) string { + if len(in) > 32 { + return in[:32] + "..." + } + return in +} + +func TestEmptyOpenTelemetryTraceState(t *testing.T) { + // Empty value is invalid + _, err := NewOpenTelemetryTraceState("") + require.Error(t, err) +} + +func TestOpenTelemetryTraceStateTValueSerialize(t *testing.T) { + const orig = "rv:10000000000000;th:3;a:b;c:d" + otts, err := NewOpenTelemetryTraceState(orig) + require.NoError(t, err) + require.Equal(t, "3", otts.TValue()) + tv, hasTv := otts.TValueThreshold() + require.True(t, hasTv) + require.Equal(t, 1-0x3p-4, tv.Probability()) + + require.NotEqual(t, "", otts.RValue()) + require.Equal(t, "10000000000000", otts.RValue()) + rv, hasRv := otts.RValueRandomness() + require.True(t, hasRv) + require.Equal(t, "10000000000000", rv.RValue()) + + require.True(t, otts.HasAnyValue()) + var w strings.Builder + require.NoError(t, otts.Serialize(&w)) + require.Equal(t, orig, w.String()) +} + +func TestOpenTelemetryTraceStateZero(t *testing.T) { + const orig = "th:0" + otts, err := NewOpenTelemetryTraceState(orig) + require.NoError(t, err) + require.True(t, otts.HasAnyValue()) + require.Equal(t, "0", otts.TValue()) + tv, hasTv := otts.TValueThreshold() + require.True(t, hasTv) + require.Equal(t, 1.0, tv.Probability()) + + var w strings.Builder + require.NoError(t, otts.Serialize(&w)) + require.Equal(t, orig, w.String()) +} + +func TestOpenTelemetryTraceStateRValuePValue(t *testing.T) { + // Ensures the caller can handle RValueSizeError and search + // for p-value in extra-values. + const orig = "rv:3;p:2" + otts, err := NewOpenTelemetryTraceState(orig) + require.Error(t, err) + require.Equal(t, ErrRValueSize, err) + require.Equal(t, "", otts.RValue()) + + // The error is oblivious to the old r-value, but that's ok. + require.Contains(t, err.Error(), "14 hex digits") + + require.Equal(t, []KV{{"p", "2"}}, otts.ExtraValues()) + + var w strings.Builder + require.NoError(t, otts.Serialize(&w)) + require.Equal(t, "p:2", w.String()) +} + +func TestOpenTelemetryTraceStateTValueUpdate(t *testing.T) { + const orig = "rv:abcdefabcdefab" + otts, err := NewOpenTelemetryTraceState(orig) + require.NoError(t, err) + require.Equal(t, "", otts.TValue()) + require.NotEqual(t, "", otts.RValue()) + + th, _ := TValueToThreshold("3") + require.NoError(t, otts.UpdateTValueWithSampling(th)) + + require.Equal(t, "3", otts.TValue()) + tv, hasTv := otts.TValueThreshold() + require.True(t, hasTv) + require.Equal(t, 1-0x3p-4, tv.Probability()) + + const updated = "rv:abcdefabcdefab;th:3" + var w strings.Builder + require.NoError(t, otts.Serialize(&w)) + require.Equal(t, updated, w.String()) +} + +func TestOpenTelemetryTraceStateRTUpdate(t *testing.T) { + otts, err := NewOpenTelemetryTraceState("a:b") + require.NoError(t, err) + require.Equal(t, "", otts.TValue()) + require.Equal(t, "", otts.RValue()) + require.True(t, otts.HasAnyValue()) + + th, _ := TValueToThreshold("3") + require.NoError(t, otts.UpdateTValueWithSampling(th)) + otts.SetRValue(must(RValueToRandomness("00000000000003"))) + + const updated = "rv:00000000000003;th:3;a:b" + var w strings.Builder + require.NoError(t, otts.Serialize(&w)) + require.Equal(t, updated, w.String()) +} + +func TestOpenTelemetryTraceStateRTClear(t *testing.T) { + otts, err := NewOpenTelemetryTraceState("a:b;rv:12341234123412;th:1234") + require.NoError(t, err) + + otts.ClearTValue() + otts.ClearRValue() + + const updated = "a:b" + var w strings.Builder + require.NoError(t, otts.Serialize(&w)) + require.Equal(t, updated, w.String()) +} + +func TestParseOpenTelemetryTraceState(t *testing.T) { + type testCase struct { + in string + rval string + tval string + extra []string + expectErr error + } + const ns = "" + for _, test := range []testCase{ + // t-value correct cases + {"th:2", ns, "2", nil, nil}, + {"th:ab", ns, "ab", nil, nil}, + {"th:abcdefabcdefab", ns, "abcdefabcdefab", nil, nil}, + + // correct with trailing zeros. the parser does not re-format + // to remove trailing zeros. + {"th:1000", ns, "1000", nil, nil}, + + // syntax errors + {"", ns, ns, nil, strconv.ErrSyntax}, + {"th:1;", ns, ns, nil, strconv.ErrSyntax}, + {"th:1=p:2", ns, ns, nil, strconv.ErrSyntax}, + {"th:1;p:2=s:3", ns, ns, nil, strconv.ErrSyntax}, + {":1;p:2=s:3", ns, ns, nil, strconv.ErrSyntax}, + {":;p:2=s:3", ns, ns, nil, strconv.ErrSyntax}, + {":;:", ns, ns, nil, strconv.ErrSyntax}, + {":", ns, ns, nil, strconv.ErrSyntax}, + {"th:;p=1", ns, ns, nil, strconv.ErrSyntax}, + {"th:$", ns, ns, nil, strconv.ErrSyntax}, // not-hexadecimal + {"th:0x1p+3", ns, ns, nil, strconv.ErrSyntax}, // + is invalid + {"th:14.5", ns, ns, nil, strconv.ErrSyntax}, // integer syntax + {"th:-1", ns, ns, nil, strconv.ErrSyntax}, // non-negative + + // too many digits + {"th:ffffffffffffffff", ns, ns, nil, ErrTValueSize}, + {"th:100000000000000", ns, ns, nil, ErrTValueSize}, + + // one field + {"e100:1", ns, ns, []string{"e100:1"}, nil}, + + // two fields + {"e1:1;e2:2", ns, ns, []string{"e1:1", "e2:2"}, nil}, + + // one extra key, two ways + {"th:2;extra:stuff", ns, "2", []string{"extra:stuff"}, nil}, + {"extra:stuff;th:2", ns, "2", []string{"extra:stuff"}, nil}, + + // two extra fields + {"e100:100;th:1;e101:101", ns, "1", []string{"e100:100", "e101:101"}, nil}, + {"th:1;e100:100;e101:101", ns, "1", []string{"e100:100", "e101:101"}, nil}, + {"e100:100;e101:101;th:1", ns, "1", []string{"e100:100", "e101:101"}, nil}, + + // parse error prevents capturing unrecognized keys + {"1:1;u:V", ns, ns, nil, strconv.ErrSyntax}, + {"X:1;u:V", ns, ns, nil, strconv.ErrSyntax}, + {"x:1;u:V", ns, ns, []string{"x:1", "u:V"}, nil}, + + // r-value + {"rv:22222222222222;extra:stuff", "22222222222222", ns, []string{"extra:stuff"}, nil}, + {"extra:stuff;rv:22222222222222", "22222222222222", ns, []string{"extra:stuff"}, nil}, + {"rv:ffffffffffffff", "ffffffffffffff", ns, nil, nil}, + + // r-value range error (15 bytes of hex or more) + {"rv:100000000000000", ns, ns, nil, ErrRValueSize}, + {"rv:fffffffffffffffff", ns, ns, nil, ErrRValueSize}, + + // no trailing ; + {"x:1;", ns, ns, nil, strconv.ErrSyntax}, + + // empty key + {"x:", ns, ns, []string{"x:"}, nil}, + + // charset test + {"x:0X1FFF;y:.-_-.;z:", ns, ns, []string{"x:0X1FFF", "y:.-_-.", "z:"}, nil}, + {"x1y2z3:1-2-3;y1:y_1;xy:-;th:50", ns, "50", []string{"x1y2z3:1-2-3", "y1:y_1", "xy:-"}, nil}, + + // size exceeded + {"x:" + strings.Repeat("_", 255), ns, ns, nil, ErrTraceStateSize}, + {"x:" + strings.Repeat("_", 254), ns, ns, []string{"x:" + strings.Repeat("_", 254)}, nil}, + } { + t.Run(testName(test.in), func(t *testing.T) { + otts, err := NewOpenTelemetryTraceState(test.in) + + if test.expectErr != nil { + require.True(t, errors.Is(err, test.expectErr), "%q: not expecting %v wanted %v", test.in, err, test.expectErr) + } else { + require.NoError(t, err) + } + require.Equal(t, test.rval, otts.RValue()) + require.Equal(t, test.tval, otts.TValue()) + var expect []KV + for _, ex := range test.extra { + k, v, _ := strings.Cut(ex, ":") + expect = append(expect, KV{ + Key: k, + Value: v, + }) + } + require.Equal(t, expect, otts.ExtraValues()) + + if test.expectErr != nil { + return + } + // on success Serialize() should not modify + // test by re-parsing + var w strings.Builder + require.NoError(t, otts.Serialize(&w)) + cpy, err := NewOpenTelemetryTraceState(w.String()) + require.NoError(t, err) + require.Equal(t, otts, cpy) + }) + } +} + +func TestUpdateTValueWithSampling(t *testing.T) { + type testCase struct { + // The input otel tracestate; no error conditions tested + in string + + // The incoming adjusted count; defined whether + // t-value is present or not. + adjCountIn float64 + + // the update probability; threshold and tvalue are + // derived from this + prob float64 + + // when update error is expected + updateErr error + + // output t-value + out string + + // output adjusted count + adjCountOut float64 + } + for _, test := range []testCase{ + // 8/16 in, sampled at 2/16 (smaller prob) => adjCount 8 + {"th:8", 2, 0x2p-4, nil, "th:e", 8}, + + // 8/16 in, sampled at 14/16 (larger prob) => error, adjCount 2 + {"th:8", 2, 0xep-4, ErrInconsistentSampling, "th:8", 2}, + + // 1/16 in, 50% update (larger prob) => (error) + {"th:f", 16, 0x8p-4, ErrInconsistentSampling, "th:f", 16}, + + // 1/1 sampling in, 1/16 update + {"th:0", 1, 0x1p-4, nil, "th:f", 16}, + + // no t-value in, 1/16 update + {"", 0, 0x1p-4, nil, "th:f", 16}, + + // none in, 100% update + {"", 0, 1, nil, "th:0", 1}, + + // 1/2 in, 100% update (error) + {"th:8", 2, 1, ErrInconsistentSampling, "th:8", 2}, + + // 1/1 in, 0x1p-56 update + {"th:0", 1, 0x1p-56, nil, "th:ffffffffffffff", 0x1p56}, + + // 1/1 in, 0x1p-56 update + {"th:0", 1, 0x1p-56, nil, "th:ffffffffffffff", 0x1p56}, + + // 2/3 in, 1/3 update. Note that 0x555 + 0xaab = 0x1000. + {"th:555", 1 / (1 - 0x555p-12), 0x555p-12, nil, "th:aab", 1 / (1 - 0xaabp-12)}, + } { + t.Run(test.in+"/"+test.out, func(t *testing.T) { + otts := OpenTelemetryTraceState{} + if test.in != "" { + var err error + otts, err = NewOpenTelemetryTraceState(test.in) + require.NoError(t, err) + } + + require.Equal(t, test.adjCountIn, otts.AdjustedCount()) + + newTh, err := ProbabilityToThreshold(test.prob) + require.NoError(t, err) + + upErr := otts.UpdateTValueWithSampling(newTh) + + require.Equal(t, test.updateErr, upErr) + + var outData strings.Builder + err = otts.Serialize(&outData) + require.NoError(t, err) + require.Equal(t, test.out, outData.String()) + + require.Equal(t, test.adjCountOut, otts.AdjustedCount()) + }) + } +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/probability.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/probability.go new file mode 100644 index 0000000..155f7a2 --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/probability.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "errors" + "math" +) + +// ErrProbabilityRange is returned when a value should be in the range [1/MaxAdjustedCount, 1]. +var ErrProbabilityRange = errors.New("sampling probability out of the range [1/MaxAdjustedCount, 1]") + +// MinSamplingProbability is the smallest representable probability +// and is the inverse of MaxAdjustedCount. +const MinSamplingProbability = 1.0 / float64(MaxAdjustedCount) + +// probabilityInRange tests MinSamplingProb <= prob <= 1. +func probabilityInRange(prob float64) bool { + return prob >= MinSamplingProbability && prob <= 1 +} + +// ProbabilityToThreshold converts a probability to a Threshold. It +// returns an error when the probability is out-of-range. +func ProbabilityToThreshold(prob float64) (Threshold, error) { + return ProbabilityToThresholdWithPrecision(prob, NumHexDigits) +} + +// ProbabilityToThresholdWithPrecision is like ProbabilityToThreshold +// with support for reduced precision. The `precision` argument determines +// how many significant hex digits will be used to encode the exact +// probability. +func ProbabilityToThresholdWithPrecision(fraction float64, precision int) (Threshold, error) { + // Assume full precision at 0. + if precision == 0 { + precision = NumHexDigits + } + if !probabilityInRange(fraction) { + return AlwaysSampleThreshold, ErrProbabilityRange + } + // Special case for prob == 1. + if fraction == 1 { + return AlwaysSampleThreshold, nil + } + + // Calculate the amount of precision needed to encode the + // threshold with reasonable precision. Here, we count the + // number of leading `0` or `f` characters and automatically + // add precision to preserve relative error near the extremes. + // + // Frexp() normalizes both the fraction and one-minus the + // fraction, because more digits of precision are needed if + // either value is near zero. Frexp returns an exponent <= 0. + // + // If `exp <= -4`, there will be a leading hex `0` or `f`. + // For every multiple of -4, another leading `0` or `f` + // appears, so this raises precision accordingly. + _, expF := math.Frexp(fraction) + _, expR := math.Frexp(1 - fraction) + precision = min(NumHexDigits, max(precision+expF/-hexBits, precision+expR/-hexBits)) + + // Compute the threshold + scaled := uint64(math.Round(fraction * float64(MaxAdjustedCount))) + threshold := MaxAdjustedCount - scaled + + // Round to the specified precision, if less than the maximum. + if shift := hexBits * (NumHexDigits - precision); shift != 0 { + half := uint64(1) << (shift - 1) + threshold += half + threshold >>= shift + threshold <<= shift + } + + return Threshold{ + unsigned: threshold, + }, nil +} + +// Probability is the sampling ratio in the range [MinSamplingProb, 1]. +func (t Threshold) Probability() float64 { + return float64(MaxAdjustedCount-t.unsigned) / float64(MaxAdjustedCount) +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/probability_test.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/probability_test.go new file mode 100644 index 0000000..0438e80 --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/probability_test.go @@ -0,0 +1,292 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "fmt" + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +// ExampleProbabilityToThresholdWithPrecision demonstrates how 1/3, +// 2/3, and 3/3 are encoded with precision 3. When working with +// arbitrary floating point values, it is recommended to use an +// explicit precision parameter so that T-values are both reasonably +// compact and accurate. +func ExampleProbabilityToThresholdWithPrecision() { + const divisor = 3.0 + const precision = 3 + + for dividend := 1.0; dividend <= divisor; dividend++ { + tval, _ := ProbabilityToThresholdWithPrecision(dividend/divisor, precision) + fmt.Println(tval.TValue()) + } + + // Output: + // aab + // 555 + // 0 +} + +// ExampleProbabilityToThreshold_rounding demonstrates that with full +// precision, the resulting t-value appears to round in an unexpected +// way. +func ExampleProbabilityToThreshold_rounding() { + // 1/3 sampling corresponds with a rejection threshold of (1 - 1/3). + const exampleProb = 1.0 / 3.0 + + // 1/3 in decimal is the repeating fraction of 6 (0.333333), while in + // hexadecimal it is the repeating fraction of a (0x0.555555). + tval, _ := ProbabilityToThreshold(exampleProb) + + // Note the trailing hex "c" below, which does not match + // intuition for a repeating pattern of hex "a" digits. Why + // is the final digit not hex "b"? The reason it is hex "c" + // is that ProbabilityToThreshold computes the number of spans + // selected as a 56-bit integer using a 52-bit significand. + // Because the fraction uses fewer bits than the threshold, + // the last digit rounds down, with 0x55555555555554 spans + // rejected out of 0x100000000000000. The subtraction of 0x4 + // from 0x10 leads to a trailing hex "c". + fmt.Println(tval.TValue()) + + // Output: + // aaaaaaaaaaaaac +} + +// ExampleProbabilityToThreshold_limitedprecision demonstrates the +// gap between Threshold values and probability values is not equal, +// clarifying which conversions are lossy. +func ExampleProbabilityToThreshold_limitedprecision() { + next := func(x float64, n int) float64 { + for ; n < 0; n++ { + x = math.Nextafter(x, 0) + } + return x + } + + // At probability 50% or above, only 52 bits of precision are + // available for floating point representation. + // + // In the range 1/2 to 1: 52 bits of precision are available; 4 trailing zero bits; + // In the range 1/4 to 1/2: 52 bits of precision are available; 3 trailing zero bits; + // In the range 1/8 to 1/4: 52 bits of precision are available; 2 trailing zero bits; + // In the range 1/16 to 1/8: 52 bits of precision are available; 1 trailing zero bits; + // Probabilties less than 1/16: 51 bits of precision are available + // Probabilties less than 1/32: 50 bits of precision are available. + // ... + // Probabilties less than 0x1p-N: 55-N bits of precision are available. + // ... + // Probabilities less than 0x1p-55: 0 bits of precision. + const large = 15.0 / 16 + const half = 8.0 / 16 + const quarter = 4.0 / 16 + const eighth = 2.0 / 16 + const small = 1.0 / 16 + for _, prob := range []float64{ + // Values from 1/2 to 15/16: last T-value digit always "8". + next(large, 0), + next(large, -1), + next(large, -2), + next(large, -3), + 0, + // Values from 1/4 to 1/2: last T-value digit always + // "4", "8", or "c". + next(half, 0), + next(half, -1), + next(half, -2), + next(half, -3), + 0, + // Values from 1/8 to 1/4, last T-value digit can be any + // even hex digit. + next(quarter, 0), + next(quarter, -1), + next(quarter, -2), + next(quarter, -3), + 0, + // Values from 1/16 to 1/8: Every adjacent probability + // value maps to an exact Threshold. + next(eighth, 0), + next(eighth, -1), + next(eighth, -2), + next(eighth, -3), + 0, + // Values less than 1/16 demonstrate lossy behavior. + // Here probability values can express more values + // than Thresholds can, so multiple probability values + // map to the same Threshold. Here, 1/16 and the next + // descending floating point value both map to T-value + // "f". + next(small, 0), + next(small, -1), + next(small, -2), + next(small, -3), + } { + if prob == 0 { + fmt.Println("--") + continue + } + tval, _ := ProbabilityToThreshold(prob) + fmt.Println(tval.TValue()) + } + + // Output: + // 1 + // 10000000000008 + // 1000000000001 + // 10000000000018 + // -- + // 8 + // 80000000000004 + // 80000000000008 + // 8000000000000c + // -- + // c + // c0000000000002 + // c0000000000004 + // c0000000000006 + // -- + // e + // e0000000000001 + // e0000000000002 + // e0000000000003 + // -- + // f + // f + // f0000000000001 + // f0000000000001 +} + +// ExampleProbabilityToThreshold_verysmall shows the smallest +// expressible sampling probability values. +func ExampleProbabilityToThreshold_verysmall() { + for _, prob := range []float64{ + MinSamplingProbability, // Skip 1 out of 2**56 + 0x2p-56, // Skip 2 out of 2**56 + 0x3p-56, // Skip 3 out of 2**56 + 0x4p-56, // Skip 4 out of 2**56 + 0x8p-56, // Skip 8 out of 2**56 + 0x10p-56, // Skip 0x10 out of 2**56 + } { + // Note that precision is automatically raised for + // such small probabilities, because leading 'f' and + // '0' digits are discounted. + tval, _ := ProbabilityToThresholdWithPrecision(prob, 3) + fmt.Println(tval.TValue()) + } + + // Output: + // ffffffffffffff + // fffffffffffffe + // fffffffffffffd + // fffffffffffffc + // fffffffffffff8 + // fffffffffffff +} + +func TestProbabilityToThresholdWithPrecision(t *testing.T) { + type kase struct { + prob float64 + exact string + rounded []string + } + + for _, test := range []kase{ + // Note: remember 8 is half of 16: hex rounds up at 8+, down at 7-. + { + 1 - 0x456789ap-28, + "456789a", + []string{ + "45678a", + "45679", + "4568", + "456", + "45", + "4", + }, + }, + // Add 3 leading zeros + { + 1 - 0x456789ap-40, + "000456789a", + []string{ + "00045678a", + "00045679", + "0004568", + "000456", + "00045", + "0004", + }, + }, + // Rounding up + { + 1 - 0x789abcdefp-40, + "0789abcdef", + []string{ + "0789abcdef", + "0789abcdf", + "0789abce", + "0789abd", + "0789ac", + "0789b", + "078a", + "079", + "08", + }, + }, + // Rounding down + { + 1 - 0x12345678p-32, + "12345678", + []string{ + "1234568", + "123456", + "12345", + "1234", + "123", + "12", + "1", + }, + }, + // Zeros + { + 1 - 0x80801p-28, + "0080801", + []string{ + "00808", + "008", + }, + }, + // 100% sampling + { + 1, + "0", + []string{ + "0", + }, + }, + } { + t.Run(test.exact, func(t *testing.T) { + th, err := ProbabilityToThreshold(test.prob) + require.NoError(t, err) + require.Equal(t, th.TValue(), test.exact) + + for _, round := range test.rounded { + t.Run(round, func(t *testing.T) { + // Requested precision is independent of leading zeros, + // so strip them to calculate test precision. + strip := round + for len(strip) > 0 && strip[0] == '0' { + strip = strip[1:] + } + rth, err := ProbabilityToThresholdWithPrecision(test.prob, len(strip)) + require.NoError(t, err) + require.Equal(t, round, rth.TValue()) + }) + } + }) + } +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/randomness.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/randomness.go new file mode 100644 index 0000000..67be3a5 --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/randomness.go @@ -0,0 +1,116 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "encoding/binary" + "errors" + "strconv" + + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// numRandomnessValues equals MaxAdjustedCount--this variable has been +// introduced to improve readability. Recall that MaxAdjustedCount is +// 2**56 which is one greater than the maximum RValue +// ("ffffffffffffff", i.e., "100000000000000"). +const numRandomnessValues = MaxAdjustedCount + +// ErrRValueSize is returned by RValueToRandomess in case of +// unexpected size. +var ErrRValueSize = errors.New("r-value must have 14 hex digits") + +// leastHalfTraceIDThresholdMask is the mask to use on the +// least-significant half of the TraceID, i.e., bytes 8-15. +// Because this is a 56 bit mask, the result after masking is +// the unsigned value of bytes 9 through 15. +// +// This helps extract 56 bits of randomness from the second half of +// the TraceID, as specified in https://www.w3.org/TR/trace-context-2/#randomness-of-trace-id +const leastHalfTraceIDThresholdMask = MaxAdjustedCount - 1 + +// AllProbabilitiesRandomness is sampled at all probabilities. +var AllProbabilitiesRandomness = Randomness{unsigned: numRandomnessValues - 1} + +// Randomness may be derived from R-value or TraceID. +// +// Randomness contains 56 bits of randomness, derived in one of two ways, see: +// https://www.w3.org/TR/trace-context-2/#randomness-of-trace-id +type Randomness struct { + // unsigned is in the range [0, MaxAdjustedCount-1] + unsigned uint64 +} + +// TraceIDToRandomness returns randomness from a TraceID (assumes +// the traceparent random flag was set). +func TraceIDToRandomness(id pcommon.TraceID) Randomness { + // To get the 56 bits we want, take the second half of the trace ID, + leastHalf := binary.BigEndian.Uint64(id[8:]) + return Randomness{ + // Then apply the mask to get the least-significant 56 bits / 7 bytes. + // Equivalently stated: zero the most-significant 8 bits. + unsigned: leastHalf & leastHalfTraceIDThresholdMask, + } +} + +// RValueToRandomness parses NumHexDigits hex bytes into a Randomness. +func RValueToRandomness(s string) (Randomness, error) { + if len(s) != NumHexDigits { + return Randomness{}, ErrRValueSize + } + + unsigned, err := strconv.ParseUint(s, hexBase, 64) + if err != nil { + return Randomness{}, err + } + + return Randomness{ + unsigned: unsigned, + }, nil +} + +// RValue formats the r-value encoding. +func (rnd Randomness) RValue() string { + // The important part here is to format a full 14-byte hex + // string, including leading zeros. We could accomplish the + // same with custom code or with fmt.Sprintf directives, but + // here we let strconv.FormatUint fill in leading zeros, as + // follows: + // + // Format (numRandomnessValues+Randomness) as a hex string + // Strip the leading hex digit, which is a "1" by design + // + // For example, a randomness that requires two leading zeros + // (all in hexadecimal): + // + // randomness is 7 bytes: aabbccddeeff + // numRandomnessValues is 2^56: 100000000000000 + // randomness+numRandomnessValues: 100aabbccddeeff + // strip the leading "1": 00aabbccddeeff + // + // If the value is out-of-range, the empty string will be + // returned. + if rnd.unsigned >= numRandomnessValues { + return "" + } + return strconv.FormatUint(numRandomnessValues+rnd.unsigned, hexBase)[1:] +} + +// Unsigned returns the unsigned representation of the random value. +// Items of data SHOULD be sampled when: +// +// Threshold.Unsigned() <= // Randomness.Unsigned(). +func (rnd Randomness) Unsigned() uint64 { + return rnd.unsigned +} + +// UnsignedToRandomness constructs a randomness using 56 random bits +// of unsigned number. If the input is out of range, an invalid value +// will be returned with an error. +func UnsignedToRandomness(x uint64) (Randomness, error) { + if x >= MaxAdjustedCount { + return AllProbabilitiesRandomness, ErrRValueSize + } + return Randomness{unsigned: x}, nil +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/randomness_test.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/randomness_test.go new file mode 100644 index 0000000..c41bdac --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/randomness_test.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestExplicitRandomness(t *testing.T) { + const val uint64 = 0x80000000000000 + rv, err := UnsignedToRandomness(val) + require.NoError(t, err) + require.Equal(t, val, rv.Unsigned()) + + rv, err = UnsignedToRandomness(MaxAdjustedCount) + require.Equal(t, err, ErrRValueSize) + require.Equal(t, AllProbabilitiesRandomness.Unsigned(), rv.Unsigned()) +} + +func ExampleTraceIDToRandomness() { + // TraceID represented in hex as "abababababababababd29d6a7215ced0" + var exampleTid = pcommon.TraceID{ + // 9 meaningless bytes + 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, + // 7 bytes randomness + 0xd2, 0x9d, 0x6a, 0x72, 0x15, 0xce, 0xd0, + } + rnd := TraceIDToRandomness(exampleTid) + + fmt.Printf("TraceIDToRandomness(%q).RValue() = %s", exampleTid, rnd.RValue()) + + // Output: TraceIDToRandomness("abababababababababd29d6a7215ced0").RValue() = d29d6a7215ced0 +} + +func ExampleRValueToRandomness() { + // Any 14 hex digits is a valid R-value. + const exampleRvalue = "d29d6a7215ced0" + + // This converts to the internal unsigned integer representation. + rnd, _ := RValueToRandomness(exampleRvalue) + + // The result prints the same as the input. + fmt.Printf("RValueToRandomness(%q).RValue() = %s", exampleRvalue, rnd.RValue()) + + // Output: RValueToRandomness("d29d6a7215ced0").RValue() = d29d6a7215ced0 +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/threshold.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/threshold.go new file mode 100644 index 0000000..8ce9e5d --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/threshold.go @@ -0,0 +1,156 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "errors" + "strconv" + "strings" +) + +const ( + // MaxAdjustedCount is 2^56 i.e. 0x100000000000000 i.e., 1<<56. + MaxAdjustedCount uint64 = 1 << 56 + + // NumHexDigits is the number of hex digits equalling 56 bits. + // This is the limit of sampling precision. + NumHexDigits = 56 / hexBits + + hexBits = 4 + hexBase = 16 +) + +// Threshold represents an exact sampling probability using 56 bits of +// precision. A Threshold expresses the number of spans, out of 2**56, +// that are rejected. +// +// These 56 bits are compared against 56 bits of randomness, either +// extracted from an R-value or a TraceID having the W3C-specified +// randomness bit set. +// +// Because Thresholds store 56 bits of information and floating point +// values store 52 bits of significand, some conversions between +// Threshold and probability values are lossy. The kinds of loss that +// occur depend on where in the probability scale it happens, as the +// step between adjacent floating point values adjusts with the exponent. +type Threshold struct { + // unsigned is in the range [0, MaxAdjustedCount] + // - 0 represents always sampling (0 Random values are less-than) + // - 1 represents sampling 1-in-(MaxAdjustedCount-1) + // - MaxAdjustedCount represents always sampling 1-in- + unsigned uint64 +} + +var ( + // ErrTValueSize is returned for t-values longer than NumHexDigits hex digits. + ErrTValueSize = errors.New("t-value exceeds 14 hex digits") + + // ErrEmptyTValue indicates no t-value was found, i.e., no threshold available. + ErrTValueEmpty = errors.New("t-value is empty") + + // AlwaysSampleThreshold represents 100% sampling. + AlwaysSampleThreshold = Threshold{unsigned: 0} + + // NeverSampledThreshold is a threshold value that will always not sample. + // The TValue() corresponding with this threshold is an empty string. + NeverSampleThreshold = Threshold{unsigned: MaxAdjustedCount} +) + +// TValueToThreshold returns a Threshold. Because TValue strings +// have trailing zeros omitted, this function performs the reverse. +func TValueToThreshold(s string) (Threshold, error) { + if len(s) > NumHexDigits { + return AlwaysSampleThreshold, ErrTValueSize + } + if len(s) == 0 { + return AlwaysSampleThreshold, ErrTValueEmpty + } + + // Having checked length above, there are no range errors + // possible. Parse the hex string to an unsigned value. + unsigned, err := strconv.ParseUint(s, hexBase, 64) + if err != nil { + return AlwaysSampleThreshold, err // e.g. parse error + } + + // The unsigned value requires shifting to account for the + // trailing zeros that were omitted by the encoding (see + // TValue for the reverse). Compute the number to shift by: + extendByHexZeros := NumHexDigits - len(s) + return Threshold{ + unsigned: unsigned << (hexBits * extendByHexZeros), + }, nil +} + +// UnsignedToThreshold constructs a threshold expressed in terms +// defined by number of rejections out of MaxAdjustedCount, which +// equals the number of randomness values. +func UnsignedToThreshold(unsigned uint64) (Threshold, error) { + if unsigned >= MaxAdjustedCount { + return NeverSampleThreshold, ErrTValueSize + } + return Threshold{unsigned: unsigned}, nil +} + +// TValue encodes a threshold, which is a variable-length hex string +// up to 14 characters. The empty string is returned for 100% +// sampling. +func (th Threshold) TValue() string { + // Always-sample is a special case because TrimRight() below + // will trim it to the empty string, which represents no t-value. + switch th { + case AlwaysSampleThreshold: + return "0" + case NeverSampleThreshold: + return "" + } + // For thresholds other than the extremes, format a full-width + // (14 digit) unsigned value with leading zeros, then, remove + // the trailing zeros. Use the logic for (Randomness).RValue(). + digits := Randomness(th).RValue() + + // Remove trailing zeros. + return strings.TrimRight(digits, "0") +} + +// ShouldSample returns true when the span passes this sampler's +// consistent sampling decision. The sampling decision can be +// expressed as a T <= R. +func (th Threshold) ShouldSample(rnd Randomness) bool { + return th.unsigned <= rnd.unsigned +} + +// Unsigned expresses the number of Randomness values (out of +// MaxAdjustedCount) that are rejected or not sampled. 0 means 100% +// sampling. +func (th Threshold) Unsigned() uint64 { + return th.unsigned +} + +// AdjustedCount returns the adjusted count for this item, which is +// the representativity of the item due to sampling, equal to the +// inverse of sampling probability. If the threshold equals +// NeverSampleThreshold, the item should not have been sampled, in +// which case the Adjusted count is zero. +// +// This term is defined here: +// https://opentelemetry.io/docs/specs/otel/trace/tracestate-probability-sampling/ +func (th Threshold) AdjustedCount() float64 { + if th == NeverSampleThreshold { + return 0 + } + return 1.0 / th.Probability() +} + +// ThresholdGreater allows direct comparison of Threshold values. +// Greater thresholds equate with smaller sampling probabilities. +func ThresholdGreater(a, b Threshold) bool { + return a.unsigned > b.unsigned +} + +// ThresholdLessThan allows direct comparison of Threshold values. +// Smaller thresholds equate with greater sampling probabilities. +func ThresholdLessThan(a, b Threshold) bool { + return a.unsigned < b.unsigned +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/threshold_test.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/threshold_test.go new file mode 100644 index 0000000..2a974f1 --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/threshold_test.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "encoding/hex" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestExplicitThreshold(t *testing.T) { + const val uint64 = 0x80000000000000 + th, err := UnsignedToThreshold(val) + require.NoError(t, err) + require.Equal(t, val, th.Unsigned()) + + th, err = UnsignedToThreshold(MaxAdjustedCount) + require.Equal(t, err, ErrTValueSize) + require.Equal(t, MaxAdjustedCount, th.Unsigned()) + + // Note: the input is more out-of-range than above, test th == + // NeverSampleThreshold. + th, err = UnsignedToThreshold(MaxAdjustedCount + 10) + require.Equal(t, err, ErrTValueSize) + require.Equal(t, NeverSampleThreshold, th) +} + +// ExampleTValueToThreshold demonstrates how to convert a T-value +// string to a Threshold value. +func ExampleTValueToThreshold() { + // "c" corresponds with rejecting 3/4 traces (or 0xc out of + // 0x10), which is 25% sampling. + const exampleTvalue = "c" + + tval, _ := TValueToThreshold(exampleTvalue) + + fmt.Printf("Probability(%q) = %f", exampleTvalue, tval.Probability()) + + // Output: + // Probability("c") = 0.250000 +} + +// ExampleTValueToThreshold demonstrates how to calculate whether a +// Threshold, calculated from a T-value, should be sampled at a given +// probability. +func ExampleThreshold_ShouldSample() { + const exampleTvalue = "c" + const exampleRvalue = "d29d6a7215ced0" + + tval, _ := TValueToThreshold(exampleTvalue) + rval, _ := RValueToRandomness(exampleRvalue) + + fmt.Printf("TValueToThreshold(%q).ShouldSample(RValueToRandomness(%q) = %v", + tval.TValue(), rval.RValue(), tval.ShouldSample(rval)) + + // Output: + // TValueToThreshold("c").ShouldSample(RValueToRandomness("d29d6a7215ced0") = true +} + +// ExampleTValueToThreshold_traceid demonstrates how to calculate whether a +// Threshold, calculated from a T-value, should be sampled at a given +// probability. +func ExampleThreshold_ShouldSample_traceid() { + const exampleTvalue = "c" + + // The leading 9 bytes (18 hex digits) of the TraceID string + // are not used, only the trailing 7 bytes (14 hex digits, + // i.e., 56 bits) are used. Here, the dont-care digits are + // set to 0xab and the R-value is "bd29d6a7215ced0". + const exampleHexTraceID = "abababababababababd29d6a7215ced0" + var tid pcommon.TraceID + idbytes, _ := hex.DecodeString(exampleHexTraceID) + copy(tid[:], idbytes) + + tval, _ := TValueToThreshold(exampleTvalue) + rval := TraceIDToRandomness(tid) + + fmt.Printf("TValueToThreshold(%q).ShouldSample(TraceIDToRandomness(%q) = %v", + tval.TValue(), exampleHexTraceID, tval.ShouldSample(rval)) + + // Output: + // TValueToThreshold("c").ShouldSample(TraceIDToRandomness("abababababababababd29d6a7215ced0") = true +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/w3ctracestate.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/w3ctracestate.go new file mode 100644 index 0000000..0aad05c --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/w3ctracestate.go @@ -0,0 +1,193 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "io" + "regexp" + "strconv" + "strings" +) + +// W3CTraceState represents the a parsed W3C `tracestate` header. +// +// This type receives and passes through `tracestate` fields defined +// by all vendors, while it parses and validates the +// [OpenTelemetryTraceState] field. After parsing the W3CTraceState, +// access the OpenTelemetry-defined fields using +// [W3CTraceState.OTelValue]. +type W3CTraceState struct { + // commonTraceState holds "extra" values (e.g., + // vendor-specific tracestate fields) which are propagated but + // not used by Sampling logic. + commonTraceState + + // otts stores OpenTelemetry-specified tracestate fields. + otts OpenTelemetryTraceState + + cots CloudObsTraceState +} + +const ( + hardMaxNumPairs = 32 + hardMaxW3CLength = 1024 + hardMaxKeyLength = 256 + hardMaxTenantLength = 241 + hardMaxSystemLength = 14 + + otelVendorCode = "ot" + cloudObsVendorCode = "sn" + + // keyRegexp is not an exact test, it permits all the + // characters and then we check various conditions. + + // key = simple-key / multi-tenant-key + // simple-key = lcalpha 0*255( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ) + // multi-tenant-key = tenant-id "@" system-id + // tenant-id = ( lcalpha / DIGIT ) 0*240( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ) + // system-id = lcalpha 0*13( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ) + // lcalpha = %x61-7A ; a-z + + lcAlphaRegexp = `[a-z]` + lcAlphanumPunctRegexp = `[a-z0-9\-\*/_]` + lcAlphanumRegexp = `[a-z0-9]` + multiTenantSep = `@` + tenantIDRegexp = lcAlphanumRegexp + lcAlphanumPunctRegexp + `*` + systemIDRegexp = lcAlphaRegexp + lcAlphanumPunctRegexp + `*` + multiTenantKeyRegexp = tenantIDRegexp + multiTenantSep + systemIDRegexp + simpleKeyRegexp = lcAlphaRegexp + lcAlphanumPunctRegexp + `*` + keyRegexp = `(?:(?:` + simpleKeyRegexp + `)|(?:` + multiTenantKeyRegexp + `))` + + // value = 0*255(chr) nblk-chr + // nblk-chr = %x21-2B / %x2D-3C / %x3E-7E + // chr = %x20 / nblk-chr + // + // Note the use of double-quoted strings in two places below. + // This is for \x expansion in these two cases. Also note + // \x2d is a hyphen character, so a quoted \ (i.e., \\\x2d) + // appears below. + valueNonblankCharRegexp = "[\x21-\x2b\\\x2d-\x3c\x3e-\x7e]" + valueCharRegexp = "[\x20-\x2b\\\x2d-\x3c\x3e-\x7e]" + valueRegexp = valueCharRegexp + `{0,255}` + valueNonblankCharRegexp + + // tracestate = list-member 0*31( OWS "," OWS list-member ) + // list-member = (key "=" value) / OWS + + owsCharSet = ` \t` + owsRegexp = `(?:[` + owsCharSet + `]*)` + w3cMemberRegexp = `(?:` + keyRegexp + `=` + valueRegexp + `)?` + + w3cOwsMemberOwsRegexp = `(?:` + owsRegexp + w3cMemberRegexp + owsRegexp + `)` + w3cCommaOwsMemberOwsRegexp = `(?:` + `,` + w3cOwsMemberOwsRegexp + `)` + + w3cTracestateRegexp = `^` + w3cOwsMemberOwsRegexp + w3cCommaOwsMemberOwsRegexp + `*$` + + // Note that fixed limits on tracestate size are captured above + // as '*' regular expressions, which allows the parser to exceed + // fixed limits, which are checked in code. This keeps the size + // of the compiled regexp reasonable. Some of the regexps above + // are too complex to expand e.g., 31 times. In the case of + // w3cTracestateRegexp, 32 elements are allowed, which means we + // want the w3cCommaOwsMemberOwsRegexp element to match at most + // 31 times, but this is checked in code. +) + +var ( + w3cTracestateRe = regexp.MustCompile(w3cTracestateRegexp) + + w3cSyntax = keyValueScanner{ + maxItems: hardMaxNumPairs, + trim: true, + separator: ',', + equality: '=', + } +) + +// NewW3CTraceState parses a W3C trace state, with special attention +// to the embedded OpenTelemetry trace state field. +func NewW3CTraceState(input string) (w3c W3CTraceState, _ error) { + if len(input) > hardMaxW3CLength { + return w3c, ErrTraceStateSize + } + + if !w3cTracestateRe.MatchString(input) { + return w3c, strconv.ErrSyntax + } + + err := w3cSyntax.scanKeyValues(input, func(key, value string) error { + if len(key) > hardMaxKeyLength { + return ErrTraceStateSize + } + if tenant, system, found := strings.Cut(key, multiTenantSep); found { + if len(tenant) > hardMaxTenantLength { + return ErrTraceStateSize + } + if len(system) > hardMaxSystemLength { + return ErrTraceStateSize + } + } + switch key { + case otelVendorCode: + var err error + w3c.otts, err = NewOpenTelemetryTraceState(value) + return err + case cloudObsVendorCode: + var err error + w3c.cots, err = NewCloudObsTraceState(value) + return err + default: + w3c.kvs = append(w3c.kvs, KV{ + Key: key, + Value: value, + }) + return nil + } + }) + return w3c, err +} + +// HasAnyValue indicates whether there are any values in this +// tracestate, including extra values. +func (w3c *W3CTraceState) HasAnyValue() bool { + return w3c.OTelValue().HasAnyValue() || w3c.CloudObsValue().HasAnyValue() || len(w3c.ExtraValues()) != 0 +} + +// OTelValue returns the OpenTelemetry tracestate value. +func (w3c *W3CTraceState) OTelValue() *OpenTelemetryTraceState { + return &w3c.otts +} + +func (w3c *W3CTraceState) CloudObsValue() *CloudObsTraceState { + return &w3c.cots +} + +// Serialize encodes this tracestate object for use as a W3C +// tracestate header value. +func (w3c *W3CTraceState) Serialize(w io.StringWriter) error { + ser := serializer{writer: w} + cnt := 0 + sep := func() { + if cnt != 0 { + ser.write(",") + } + cnt++ + } + if w3c.otts.HasAnyValue() { + sep() + ser.write("ot=") + ser.check(w3c.otts.Serialize(w)) + } + if w3c.cots.HasAnyValue() { + sep() + ser.write("sn=") + ser.check(w3c.cots.Serialize(w)) + } + for _, kv := range w3c.ExtraValues() { + sep() + ser.write(kv.Key) + ser.write("=") + ser.write(kv.Value) + } + return ser.err +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/w3ctracestate_test.go b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/w3ctracestate_test.go new file mode 100644 index 0000000..a92f2f6 --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/w3ctracestate_test.go @@ -0,0 +1,161 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelsampling + +import ( + "errors" + "fmt" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// ExampleW3CTraceState_Serialize shows how to parse and print a W3C +// tracestate. +func ExampleW3CTraceState() { + // This tracestate value encodes two sections, "ot" from + // OpenTelemetry and "zz" from a vendor. + w3c, err := NewW3CTraceState("ot=th:c;rv:d29d6a7215ced0;pn:abc,zz=vendorcontent") + if err != nil { + panic(err) + } + ot := w3c.OTelValue() + + fmt.Println("T-Value:", ot.TValue()) + fmt.Println("R-Value:", ot.RValue()) + fmt.Println("OTel Extra:", ot.ExtraValues()) + fmt.Println("Other Extra:", w3c.ExtraValues()) + + // Output: + // T-Value: c + // R-Value: d29d6a7215ced0 + // OTel Extra: [{pn abc}] + // Other Extra: [{zz vendorcontent}] +} + +// ExampleW3CTraceState_Serialize shows how to modify and serialize a +// new W3C tracestate. +func ExampleW3CTraceState_Serialize() { + w3c, err := NewW3CTraceState("") + if err != nil { + panic(err) + } + // Suppose a parent context was unsampled, the child span has + // been sampled at 25%. The child's context should carry the + // T-value of "c", serialize as "ot=th:c". + th, err := ProbabilityToThreshold(0.25) + if err != nil { + panic(err) + } + + // The update uses both the Threshold and its encoded string + // value, since in some code paths the Threshold will have + // just been parsed from a T-value, and in other code paths + // the T-value will be precalculated. + err = w3c.OTelValue().UpdateTValueWithSampling(th) + if err != nil { + panic(err) + } + + var buf strings.Builder + err = w3c.Serialize(&buf) + if err != nil { + panic(err) + } + + fmt.Println(buf.String()) + + // Output: + // ot=th:c +} + +func TestParseW3CTraceState(t *testing.T) { + type testCase struct { + in string + rval string + tval string + extra map[string]string + expectErr error + } + const ns = "" + for _, test := range []testCase{ + // correct cases, with various whitespace + {"ot=th:1", ns, "1", nil, nil}, + {" ot=th:1 ", ns, "1", nil, nil}, + {" ot=th:1,other=value ", ns, "1", map[string]string{ + "other": "value", + }, nil}, + {",,,", ns, ns, nil, nil}, + {" , ot=th:1, , other=value ", ns, "1", map[string]string{ + "other": "value", + }, nil}, + {"ot=th:100;rv:abcdabcdabcdff", "abcdabcdabcdff", "100", nil, nil}, + {" ot=th:100;rv:abcdabcdabcdff", "abcdabcdabcdff", "100", nil, nil}, + {"ot=th:100;rv:abcdabcdabcdff ", "abcdabcdabcdff", "100", nil, nil}, + {"ot=rv:11111111111111", "11111111111111", ns, nil, nil}, + {"ot=rv:ffffffffffffff,unknown=value,other=something", "ffffffffffffff", ns, map[string]string{ + "other": "something", + "unknown": "value", + }, nil}, + + // syntax errors + {"-1=2", ns, ns, nil, strconv.ErrSyntax}, // invalid key char + {"=", ns, ns, nil, strconv.ErrSyntax}, // invalid empty key + + // size errors + {strings.Repeat("x", hardMaxKeyLength+1) + "=v", ns, ns, nil, ErrTraceStateSize}, // too long simple key + {strings.Repeat("x", hardMaxTenantLength+1) + "@y=v", ns, ns, nil, ErrTraceStateSize}, // too long multitenant-id + {"y@" + strings.Repeat("x", hardMaxSystemLength+1) + "=v", ns, ns, nil, ErrTraceStateSize}, // too long system-id + {"x=" + strings.Repeat("y", hardMaxW3CLength-1), ns, ns, nil, ErrTraceStateSize}, + {strings.Repeat("x=y,", hardMaxNumPairs) + "x=y", ns, ns, nil, ErrTraceStateSize}, + } { + t.Run(testName(test.in), func(t *testing.T) { + w3c, err := NewW3CTraceState(test.in) + + if test.expectErr != nil { + require.True(t, errors.Is(err, test.expectErr), + "%q: not expecting %v wanted %v", test.in, err, test.expectErr, + ) + } else { + require.NoError(t, err, "%q", test.in) + } + if test.rval != ns { + require.True(t, w3c.OTelValue().HasAnyValue()) + require.True(t, w3c.HasAnyValue()) + require.Equal(t, test.rval, w3c.OTelValue().RValue()) + } else { + require.Equal(t, "", w3c.OTelValue().RValue()) + } + if test.tval != ns { + require.True(t, w3c.OTelValue().HasAnyValue()) + require.True(t, w3c.HasAnyValue()) + require.NotEqual(t, "", w3c.OTelValue().TValue()) + require.Equal(t, test.tval, w3c.OTelValue().TValue()) + } else { + require.Equal(t, "", w3c.OTelValue().TValue()) + } + if test.extra != nil { + require.True(t, w3c.HasAnyValue()) + actual := map[string]string{} + for _, kv := range w3c.ExtraValues() { + actual[kv.Key] = kv.Value + } + require.Equal(t, test.extra, actual) + } + + if test.expectErr != nil { + return + } + // on success Serialize() should not modify + // test by re-parsing + var w strings.Builder + require.NoError(t, w3c.Serialize(&w)) + cpy, err := NewW3CTraceState(w.String()) + require.NoError(t, err, "with %v", w.String()) + require.Equal(t, w3c, cpy, "with %v", w.String()) + }) + } +} diff --git a/lightstep/processor/satellitesamplerprocessor/internal/sampler/tracesampler.go b/lightstep/processor/satellitesamplerprocessor/internal/sampler/tracesampler.go new file mode 100644 index 0000000..0bd8a98 --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/internal/sampler/tracesampler.go @@ -0,0 +1,59 @@ +package sampler + +import ( + "hash/crc32" + "math" +) + +var SampleEverything = TraceSampler{ + partition: math.MaxUint32, + rate: 1, +} + +// TraceSampler is a uint64 where all spans with trace id <= TraceSampler will be included in the sample. +// All spans with trace id > partition will NOT be included in the sample. +// This is a separate type to avoid missing conversion of an old one-in-n sample rate to use the new sample method. +type TraceSampler struct { + partition uint32 + // Rate is the one-in-n-sample rate. + // This is used in many parts of the microsat for adjusting counts. + // Rather than recomputing it repeatedly, it's more economical to compute it once for the sampler. + rate float64 +} + +// NewTraceSampler simply returns a new sampler, for use outside the +// Satellite code base. Satellites use ComputeTraceSampler. +func NewTraceSampler(samplePercent float64) TraceSampler { + part, rate := func() (uint32, float64) { + switch samplePercent { + case 100: + return math.MaxUint32, 1 + case 0: + return 0, 0 + default: + floatPartition := float64(math.MaxUint32) * samplePercent / 100 + floatRate := 1 / (floatPartition / float64(math.MaxUint32)) + return uint32(floatPartition), floatRate + } + }() + + return TraceSampler{partition: part, rate: rate} +} + +// IsSampledOut returns true when a span should NOT pass through the +// pipeline. +func (s TraceSampler) IsSampledOut(traceID string) bool { + if s.partition == math.MaxUint32 { + return false + } + if s.partition == 0 { + return true + } + + hash := crc32.Checksum([]byte(traceID), crc32.IEEETable) + return hash > s.partition +} + +func (s TraceSampler) GetSampleRate() float64 { + return s.rate +} diff --git a/lightstep/processor/satellitesamplerprocessor/traces.go b/lightstep/processor/satellitesamplerprocessor/traces.go new file mode 100644 index 0000000..9fa55df --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/traces.go @@ -0,0 +1,124 @@ +// Copyright ServiceNow, Inc +// SPDX-License-Identifier: Apache-2.0 + +package satellitesamplerprocessor + +import ( + "context" + "fmt" + "strings" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" + "go.uber.org/zap" + + "github.com/lightstep/otel-collector-charts/lightstep/processor/satellitesamplerprocessor/internal/otelsampling" + "github.com/lightstep/otel-collector-charts/lightstep/processor/satellitesamplerprocessor/internal/sampler" +) + +type tracesProcessor struct { + logger *zap.Logger + traceSampler sampler.TraceSampler + singleTraceState string + staticThreshold otelsampling.Threshold + next consumer.Traces +} + +func createTracesProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Traces) (*tracesProcessor, error) { + oCfg := cfg.(*Config) + + // Synthesize a trace-state for use when no incoming tracestate is present. + th, err := otelsampling.ProbabilityToThreshold(oCfg.Percent / 100.0) + if err != nil { + return nil, err + } + w3c, _ := otelsampling.NewW3CTraceState("") + if err := w3c.CloudObsValue().UpdateSValueWithSampling(th); err != nil { + return nil, err + } + var w strings.Builder + if err := w3c.Serialize(&w); err != nil { + return nil, err + } + + return &tracesProcessor{ + logger: set.Logger, + traceSampler: sampler.NewTraceSampler(oCfg.Percent), + singleTraceState: w.String(), + staticThreshold: th, + next: nextConsumer, + }, nil +} + +func (p *tracesProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { + td.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool { + rs.ScopeSpans().RemoveIf(func(ils ptrace.ScopeSpans) bool { + ils.Spans().RemoveIf(func(s ptrace.Span) bool { + // The IsSampledOut() method takes a string. What kind of string? + // The expression is (annotate_grpc.go): + // + // wire.TranslateIDToGUID(span.SpanContext.TraceId) + // + // which is a 16-byte zero-padded hex encoding capturing the least + // significant 64 bits, i.e., the TraceId above calculated using + // + // if len(id) > 8 { + // return binary.BigEndian.Uint64(id[len(id)-8:]) + // } + // return binary.BigEndian.Uint64(id) + // + // where, in this case (OTLP ingest) we use the expression: + // + // wire.TranslateBytesToID(traceID) + // + // However, note that this pipeline will reject incorrect length, + // so length is always 16. We take the second 8 bytes as a hex + // string, zero pad it, then call the sampler. + tid := s.TraceID() + if p.traceSampler.IsSampledOut(fmt.Sprintf("%016x", tid[8:])) { + return true + } + + tsIn := s.TraceState().AsRaw() + if tsIn == "" { + // In case there is no incoming tracestate, the output is the + // static encoding resulting from just one sampling stage. + s.TraceState().FromRaw(p.singleTraceState) + return false + } + // Combination logic is needed. In case the SDK has sampled + // using an OTel spec (legacy or modern), the existing fields + // are preserved. This sampler uses a dedicated tracestate + // field to convey the satellite sampling threshold. + incoming, err := otelsampling.NewW3CTraceState(tsIn) + if err != nil { + p.logger.Error("W3C tracecontext: invalid incoming tracestate", zap.Error(err)) + return false + } + if err := incoming.CloudObsValue().UpdateSValueWithSampling(p.staticThreshold); err != nil { + p.logger.Error("W3C tracecontext: arriving trace state", zap.Error(err)) + } + + // Serialize and update. + var out strings.Builder + if err := incoming.Serialize(&out); err != nil { + p.logger.Error("W3C tracecontext: serialize tracestate", zap.Error(err)) + } + s.TraceState().FromRaw(out.String()) + return false + }) + // Filter out empty ScopeMetrics + return ils.Spans().Len() == 0 + }) + // Filter out empty ResourceMetrics + return rs.ScopeSpans().Len() == 0 + }) + // Maybe skip the data item entirely. + if td.ResourceSpans().Len() == 0 { + return td, processorhelper.ErrSkipProcessingData + } + return td, nil +} From c2cd1baeb215aea0197f627c871c3eaa33562ca4 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 13 Sep 2024 13:59:16 -0700 Subject: [PATCH 2/4] README --- .../satellitesamplerprocessor/README.md | 104 ++++++++++++++++++ .../internal/otelsampling/README.md | 64 +---------- 2 files changed, 108 insertions(+), 60 deletions(-) create mode 100644 lightstep/processor/satellitesamplerprocessor/README.md diff --git a/lightstep/processor/satellitesamplerprocessor/README.md b/lightstep/processor/satellitesamplerprocessor/README.md new file mode 100644 index 0000000..9a88d7a --- /dev/null +++ b/lightstep/processor/satellitesamplerprocessor/README.md @@ -0,0 +1,104 @@ +# Lightstep Satellite Sampler + +This package contains an OpenTelemetry processor for an OpenTelemetry +traces pipeline that makes sampling decisions consistent with the +legacy Lightstep Satellite. This component enables a slow transition +from Lightstep Satellites to OpenTelemetry Collectors without +simultaneously changing sampling algorithms. + + ## Recommended usage + +This component supports operating a mixture of Lightstep Satellites +and OpenTelemetry Collectors with consistent probability sampling. +Here is a recommended sequence of steps for performing a migratation +to OpenTelemetry Collectors for Lightstep Satellite users. + +### Build a custom OpenTelemetry Collector + +This component is provided as a standalone component, meant for +incorporating into a custom build of the OpenTelemetry Collector using +the [OpenTelemetry Collector +builder](https://opentelemetry.io/docs/collector/custom-collector/) +tool. In your Collector's build configuration, add the following +processor component: + +``` + - gomod: github.com/lightstep/otel-collector-charts/lightstep/processor/satellitesamplerprocessor VERSIONTAG +``` + +where `VERSIONTAG` corresponds with the targetted OpenTelemetry +Collector release version. At the time of this writing, the version +tag is `v0.109.0`. + +Users are advised to include the OpenTelemetry Probabilistic Sampler +processor in their build, to complete this transition. For example: + +``` + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.109.0 +``` + +Follow the usual steps to build your collector (e.g., `builder +--config build.yaml`). + +### Configure the sampler + +You will need to know the sampling probability configured with +Lightstep Satellites, in percentage terms. Say the Lightstep +Satellite is configured with 10% sampling (i.e., 1-in-10). + +Edit OpenTelemetry Collector configuration to include a +`satellitesatempler` block. In the following example, the OTel-Arrow +receiver and exporter are configured with `satellitesampler` with 10% +sampling and [concurrent batch +processor](https://github.com/open-telemetry/otel-arrow/blob/main/collector/processor/concurrentbatchprocessor/README.md). + +``` +exporters: + otelarrow: + ... +receivers: + otelarrow: + ... +processors: + satellitesampler: + percent: 10 + concurrentbatch: +service: + pipelines: + traces: + receivers: [otelarrow] + processors: [satellitesampler, concurrentbatch] + exporters: [otelarrow] +``` + +Collectors with this configuration may be deployed alongside a pool of +Lightstep Satellites sampling and the resulting traces will be +complete. + +### Migrate to the OpenTelemetry Probabilistic Sampler + +After decomissioning Lightstep Satellites and replacing them with +OpenTelemetry Collectors, users are advised to migrate to an +OpenTelemetry Collector processor with equivalent functionality, the +[Probabilistic Sampler Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/probabilisticsamplerprocessor/README.md). + +A change of sampling configuration, either to change algorithm or to +change probability, typically results in broken traces. Users are +advised to plan accordingly and make a quick transition between +samplers, with only a brief, planned period of broken traces. + +Redeploy the pool of Collectors with the Probabilistic Sampler +processor configured instead of the Satellite sampler processor. Make +this transition as quickly, if possible, because traces will be +potentially incomplete as long as both samplers are configured for the +same destination. + +``` +processors: + probabilisticsampler: + mode: equalizing + sampling_percentage: 10 +``` + +The "equalizing" mode is recommended, see that [component's +documentation](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/probabilisticsamplerprocessor/README.md#equalizing). diff --git a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/README.md b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/README.md index d492716..d95d35a 100644 --- a/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/README.md +++ b/lightstep/processor/satellitesamplerprocessor/internal/otelsampling/README.md @@ -1,61 +1,5 @@ The code in this directory is is a copy of the OpenTelemetry package -in collector-contrib/pkg/sampling. The copy here has has ServiceNow -copyright because it was originally authored here. - -Code organization: - -# Tracestate handling - -- w3ctracestate.go: the outer tracestate structure like `key=value,...` -- oteltracestate.go: the inner tracestate structure like `key:value;...` -- cloudobstracestate.go: the inner tracestate structure like `key:value;...` (internal only) -- common.go: shared parser, serializer for either tracestate - -This includes an implementation of the W3C trace randomness feature, -described here: https://www.w3.org/TR/trace-context-2/#randomness-of-trace-id - -# Overview of tracestate identifiers - -There are two vendor codes: - -- "ot" refers to formal OTel specifications -- "sn" for ServiceNow refers to internal Cloud Observability sampling (as by the Lightstep satellite) - -The OTel trace state keys: - -- "p" refers to the [legacy OTel power-of-two sampling](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/tracestate-probability-sampling.md) -- "r" used in the legacy convention, 1-2 decimal digits -- "th" refers to the [modern OTel 56-bit sampling](https://github.com/open-telemetry/oteps/pull/235) -- "rv" refers to the modern randomness value, 14 hex digits. - -The Cloud Observability trace state keys: - -- "s" refers to the satellite sampling, uses the same encoding as "th" but is modeled as an acceptance threshold. - -Note that to convert from an OTel rejection threshold to a Satellite sampler acceptance threshold, the unsigned value of the threshold should be subtracted from the maximum adjusted count, - -``` -satelliteSamplerThreshold, _ = UnsignedToThreshold(MaxAdjustedCount - otelModernThreshold.Unsigned()) -``` - -# Encoding and decoding - -- probability.go: defines - `ProbabilityToThreshold()` - `(Threshold).Probability()` -- threshold.go: defines - `TValueToThreshold()` - `(Threshold).TValue()` - `(Threshold).ShouldSample()` -- randomness.go: defines - `TraceIDToRandomness()` - `RValueToRandomness()` - `(Randomness).RValue()` - -# Callers of note - -- In common-go/wire/oteltoegresspb/otel_to_egresspb.go: - `TraceStateToAdjustedCount()` - -- In internalcollector/components/satellitesamplerprocessor/traces.go: - `createTracesProcessor()` +in +[collector-contrib/pkg/sampling](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/sampling/README.md). The +copy here has has Copyright ServiceNow, Inc because it was originally +authored at ServiceNow before being contributed to OpenTelemetry. From 10a2660c715cbf8f14e6f50097cf1b2c508a85b1 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 13 Sep 2024 14:09:21 -0700 Subject: [PATCH 3/4] typo --- lightstep/processor/satellitesamplerprocessor/README.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lightstep/processor/satellitesamplerprocessor/README.md b/lightstep/processor/satellitesamplerprocessor/README.md index 9a88d7a..ae4c0a8 100644 --- a/lightstep/processor/satellitesamplerprocessor/README.md +++ b/lightstep/processor/satellitesamplerprocessor/README.md @@ -89,9 +89,8 @@ samplers, with only a brief, planned period of broken traces. Redeploy the pool of Collectors with the Probabilistic Sampler processor configured instead of the Satellite sampler processor. Make -this transition as quickly, if possible, because traces will be -potentially incomplete as long as both samplers are configured for the -same destination. +this transition quickly, if possible, because traces will be +potentially incomplete as long as both samplers being used. ``` processors: From 04978d127f9969271b6c56f773c10c3ec0d78e58 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 23 Sep 2024 13:26:52 -0400 Subject: [PATCH 4/4] image --- .github/workflows/publish-image.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/publish-image.yml b/.github/workflows/publish-image.yml index 594e413..65319fc 100644 --- a/.github/workflows/publish-image.yml +++ b/.github/workflows/publish-image.yml @@ -8,11 +8,13 @@ on: - "**" paths: - "arrow/**" + - "lightstep/**" push: branches: - main paths: - "arrow/**" + - "lightstep/**" # Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds. env: