From 567c7f789606fc7fb1d82e8f694fc6ae6b851e39 Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Mon, 11 Nov 2024 14:12:03 -0500 Subject: [PATCH 01/10] checkpoint methods --- receiver/splunksearchapireceiver/config.go | 8 ++- receiver/splunksearchapireceiver/receiver.go | 69 +++++++++++++++++--- 2 files changed, 65 insertions(+), 12 deletions(-) diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index 629ecc7dc..40cfc4638 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -19,15 +19,17 @@ import ( "strings" "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" ) // Config struct to represent the configuration for the Splunk Search API receiver type Config struct { confighttp.ClientConfig `mapstructure:",squash"` - Username string `mapstructure:"splunk_username"` - Password string `mapstructure:"splunk_password"` - Searches []Search `mapstructure:"searches"` + Username string `mapstructure:"splunk_username"` + Password string `mapstructure:"splunk_password"` + Searches []Search `mapstructure:"searches"` + StorageID *component.ID `mapstructure:"storage"` } // Search struct to represent a Splunk search diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index 5080a522e..80e7d391d 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -16,9 +16,13 @@ package splunksearchapireceiver import ( "context" + "encoding/json" + "fmt" "net/http" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" + "go.etcd.io/etcd/proxy/grpcproxy/adapter" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -26,13 +30,23 @@ import ( "go.uber.org/zap" ) +const ( + eventStorageKey = "last_event_offset" +) + type splunksearchapireceiver struct { - host component.Host - logger *zap.Logger - logsConsumer consumer.Logs - config *Config - settings component.TelemetrySettings - client *http.Client + host component.Host + logger *zap.Logger + logsConsumer consumer.Logs + config *Config + settings component.TelemetrySettings + client *http.Client + storageClient adapter.StorageClient + record *eventRecord +} + +type eventRecord struct { + Offset string `json:"offset"` } func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component.Host) error { @@ -42,6 +56,17 @@ func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component return err } ssapir.client = client + + // create storage client + storageClient, err := adapter.GetStorageClient(ssapir.config.StorageID) + if err != nil { + return fmt.Errorf("failed to get storage client: %w", err) + } + ssapir.storageClient = storageClient + + // check if a checkpoint already exists + ssapir.loadCheckpoint(ctx) + go ssapir.runQueries(ctx) return nil } @@ -57,7 +82,6 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { if err != nil { ssapir.logger.Error("error creating search", zap.Error(err)) } - // fmt.Println("Search created successfully with ID: ", searchID) // wait for search to complete for { @@ -70,14 +94,12 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { } time.Sleep(2 * time.Second) } - // fmt.Println("Search completed successfully") // fetch search results results, err := ssapir.getSplunkSearchResults(ssapir.config, searchID) if err != nil { ssapir.logger.Error("error fetching search results", zap.Error(err)) } - // fmt.Println("Search results: ", results) // parse time strings to time.Time earliestTime, err := time.Parse(time.RFC3339, search.EarliestTime) @@ -131,6 +153,11 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { // Error from down the pipeline, freak out ssapir.logger.Error("error consuming logs", zap.Error(err)) } + ssapir.record.Offset = results.Results[len(results.Results)-1].Offset + err = ssapir.checkpoint(ctx) + if err != nil { + ssapir.logger.Error("error writing checkpoint", zap.Error(err)) + } } return nil } @@ -167,3 +194,27 @@ func (ssapir *splunksearchapireceiver) getSplunkSearchResults(config *Config, si } return resp, nil } + +func (ssapir *splunksearchapireceiver) checkpoint(ctx context.Context) error { + marshalBytes, err := json.Marshal(ssapir.record) + if err != nil { + return fmt.Errorf("failed to write checkpoint: %w", err) + } + return ssapir.storageClient.Set(ctx, eventStorageKey, marshalBytes) +} + +func (ssapir *splunksearchapireceiver) loadCheckpoint(ctx context.Context) { + marshalBytes, err := ssapir.storageClient.Get(ctx, eventStorageKey) + if err != nil { + ssapir.logger.Error("failed to read checkpoint", zap.Error(err)) + return + } + if marshalBytes == nil { + ssapir.logger.Info("no checkpoint found") + return + } + err = json.Unmarshal(marshalBytes, ssapir.record) + if err != nil { + ssapir.logger.Error("failed to unmarshal checkpoint", zap.Error(err)) + } +} From 33d9b0f6ee4c2a1bd1b6acfc613093f1d30057ae Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Thu, 14 Nov 2024 15:59:05 -0500 Subject: [PATCH 02/10] WIP --- go.mod | 2 + go.sum | 4 + receiver/splunksearchapireceiver/factory.go | 1 + receiver/splunksearchapireceiver/go.mod | 35 +++++++- receiver/splunksearchapireceiver/go.sum | 95 +++++++++++++++++--- receiver/splunksearchapireceiver/receiver.go | 7 +- 6 files changed, 129 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index e4ba0dbfb..c5c0657c8 100644 --- a/go.mod +++ b/go.mod @@ -308,6 +308,7 @@ require ( github.com/briandowns/spinner v1.23.0 // indirect github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect github.com/containerd/cgroups/v3 v3.0.3 // indirect + github.com/coreos/etcd v3.3.27+incompatible // indirect github.com/coreos/go-oidc/v3 v3.11.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/distribution/reference v0.6.0 // indirect @@ -398,6 +399,7 @@ require ( go.elastic.co/apm/module/apmzap/v2 v2.6.0 // indirect go.elastic.co/apm/v2 v2.6.0 // indirect go.elastic.co/fastjson v1.3.0 // indirect + go.etcd.io/etcd v3.3.27+incompatible // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.113.0 // indirect go.opentelemetry.io/collector/client v1.19.0 // indirect diff --git a/go.sum b/go.sum index 0a8eff1f1..4a5a80d69 100644 --- a/go.sum +++ b/go.sum @@ -1127,6 +1127,8 @@ github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkE github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/etcd v3.3.27+incompatible h1:QIudLb9KeBsE5zyYxd1mjzRSkzLg9Wf9QlRwFgd6oTA= +github.com/coreos/etcd v3.3.27+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-oidc/v3 v3.11.0 h1:Ia3MxdwpSw702YW0xgfmP1GVCMA9aEFWu12XUZ3/OtI= github.com/coreos/go-oidc/v3 v3.11.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0= @@ -2612,6 +2614,8 @@ go.elastic.co/fastjson v1.3.0/go.mod h1:K9vDh7O0ODsVKV2B5e2XYLY277QZaCbB3tS1SnAR go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= +go.etcd.io/etcd v3.3.27+incompatible h1:5hMrpf6REqTHV2LW2OclNpRtxI0k9ZplMemJsMSWju0= +go.etcd.io/etcd v3.3.27+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= diff --git a/receiver/splunksearchapireceiver/factory.go b/receiver/splunksearchapireceiver/factory.go index dc61db414..7d571fd4e 100644 --- a/receiver/splunksearchapireceiver/factory.go +++ b/receiver/splunksearchapireceiver/factory.go @@ -45,6 +45,7 @@ func createLogsReceiver(_ context.Context, logger: params.Logger, logsConsumer: consumer, config: ssapirConfig, + id: params.ID, settings: params.TelemetrySettings, } return ssapir, nil diff --git a/receiver/splunksearchapireceiver/go.mod b/receiver/splunksearchapireceiver/go.mod index c419d11af..df4d1ea72 100644 --- a/receiver/splunksearchapireceiver/go.mod +++ b/receiver/splunksearchapireceiver/go.mod @@ -3,34 +3,65 @@ module github.com/open-telemetry/opentelemtry-collector-contrib/receiver/splunks go 1.22.5 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.113.0 github.com/stretchr/testify v1.9.0 + go.etcd.io/etcd v3.3.27+incompatible go.opentelemetry.io/collector/component v0.113.0 go.opentelemetry.io/collector/consumer v0.113.0 go.opentelemetry.io/collector/pdata v1.19.0 - go.opentelemetry.io/collector/receiver v0.112.0 + go.opentelemetry.io/collector/receiver v0.113.0 go.uber.org/zap v1.27.0 ) require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/coreos/etcd v3.3.27+incompatible // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/elastic/lunes v0.1.0 // indirect + github.com/expr-lang/expr v1.16.9 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/goccy/go-json v0.10.3 // indirect + github.com/golang/protobuf v1.5.0 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/klauspost/compress v1.17.11 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.1.1 // indirect + github.com/leodido/go-syslog/v4 v4.2.0 // indirect + github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect + github.com/magefile/mage v1.15.0 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.113.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.11.1 // indirect + github.com/valyala/fastjson v1.6.4 // indirect go.opentelemetry.io/collector/client v1.19.0 // indirect go.opentelemetry.io/collector/config/configauth v0.113.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.19.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.19.0 // indirect go.opentelemetry.io/collector/config/configtls v1.19.0 // indirect go.opentelemetry.io/collector/config/internal v0.113.0 // indirect + go.opentelemetry.io/collector/confmap v1.19.0 // indirect + go.opentelemetry.io/collector/consumer/consumererror v0.113.0 // indirect + go.opentelemetry.io/collector/consumer/consumerprofiles v0.113.0 // indirect + go.opentelemetry.io/collector/consumer/consumertest v0.113.0 // indirect go.opentelemetry.io/collector/extension v0.113.0 // indirect go.opentelemetry.io/collector/extension/auth v0.113.0 // indirect + go.opentelemetry.io/collector/extension/experimental/storage v0.113.0 // indirect + go.opentelemetry.io/collector/featuregate v1.19.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.113.0 // indirect + go.opentelemetry.io/collector/semconv v0.113.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect + gonum.org/v1/gonum v0.15.1 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) @@ -41,7 +72,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect go.opentelemetry.io/collector/config/confighttp v0.113.0 go.opentelemetry.io/collector/config/configtelemetry v0.113.0 // indirect - go.opentelemetry.io/collector/pipeline v0.112.0 // indirect + go.opentelemetry.io/collector/pipeline v0.113.0 // indirect go.opentelemetry.io/otel v1.31.0 // indirect go.opentelemetry.io/otel/metric v1.31.0 // indirect go.opentelemetry.io/otel/trace v1.31.0 // indirect diff --git a/receiver/splunksearchapireceiver/go.sum b/receiver/splunksearchapireceiver/go.sum index a061e9529..482572095 100644 --- a/receiver/splunksearchapireceiver/go.sum +++ b/receiver/splunksearchapireceiver/go.sum @@ -1,6 +1,16 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/coreos/etcd v3.3.27+incompatible h1:QIudLb9KeBsE5zyYxd1mjzRSkzLg9Wf9QlRwFgd6oTA= +github.com/coreos/etcd v3.3.27+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/lunes v0.1.0 h1:amRtLPjwkWtzDF/RKzcEPMvSsSseLDLW+bnhfNSLRe4= +github.com/elastic/lunes v0.1.0/go.mod h1:xGphYIt3XdZRtyWosHQTErsQTd4OP1p9wsbVoHelrd4= +github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI= +github.com/expr-lang/expr v1.16.9/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= @@ -10,30 +20,64 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= +github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= +github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM= +github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-syslog/v4 v4.2.0 h1:A7vpbYxsO4e2E8udaurkLlxP5LDpDbmPMsGnuhb7jVk= +github.com/leodido/go-syslog/v4 v4.2.0/go.mod h1:eJ8rUfDN5OS6dOkCOBYlg2a+hbAg6pJa99QXXgMrd98= +github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b h1:11UHH39z1RhZ5dc4y4r/4koJo6IYFgTRMe/LlwRTEw0= +github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b/go.mod h1:WZxr2/6a/Ar9bMDc2rN/LJrE/hF6bXE4LPyDSIxwAfg= +github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= +github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.113.0 h1:ERdOiTmsDruI/s5oEgN45NsZW2roWXmO0u2aceR4GuM= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.113.0/go.mod h1:RkClsQhl8hdAg874Ot4kaG92s+6dW0Dvlt5HRxhsavc= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.113.0 h1:7A8MgFPYRQWq1RkFBktq01CW+eTYhiGML0IxQNv2uaM= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.113.0/go.mod h1:E1pc7mDXH+5s7RyXw291h8lz2dhzPzaDrAHqP1Lawvw= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.113.0 h1:EZ/ZNsovNcQq+wwAbTAWNY+6BHnv24NxvVoC6eYmtg8= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.113.0/go.mod h1:u21dEQ9yQ0JyLMSrKLWWzHG/lHSlteNfa/EQ7Vqcle4= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.113.0 h1:G8w+wg4nnqBqe297fBWnjJ5Tg2OYDVEMsdWA9/3ozxQ= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.113.0/go.mod h1:m3hDVsXPQzQfeji3+hn7NYJPHDRlHhQRNd5T7N5wZqc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -43,11 +87,17 @@ github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99 github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ= +github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.etcd.io/etcd v3.3.27+incompatible h1:5hMrpf6REqTHV2LW2OclNpRtxI0k9ZplMemJsMSWju0= +go.etcd.io/etcd v3.3.27+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.opentelemetry.io/collector/client v1.19.0 h1:TUal8WV1agTrZStgE7BJ8ZC0IHLGtrfgO9ogU9t1mv8= go.opentelemetry.io/collector/client v1.19.0/go.mod h1:jgiXMEM6l8L2QEyf2I/M47Zd8+G7e4z+6H8q5SkHOlQ= go.opentelemetry.io/collector/component v0.113.0 h1:/nx+RvZgxUEXP+YcTj69rEtuSEGkfaCyp/ad5zQGLjU= @@ -66,24 +116,40 @@ go.opentelemetry.io/collector/config/configtls v1.19.0 h1:GQ/cF1hgNqHVBq2oSSrOFX go.opentelemetry.io/collector/config/configtls v1.19.0/go.mod h1:1hyqnYB3JqEUlk1ME/s9HYz4oCRcxQCRxsJitFFT/cA= go.opentelemetry.io/collector/config/internal v0.113.0 h1:9RAzH8v7ItFT1npHpvP0SvUzBHcZDliCGRo9Spp6v7c= go.opentelemetry.io/collector/config/internal v0.113.0/go.mod h1:yC7E4h1Uj0SubxcFImh6OvBHFTjMh99+A5PuyIgDWqc= +go.opentelemetry.io/collector/confmap v1.19.0 h1:TQ0lZpAKqgsE0EKk+u4JA+uBbPYeFRmWP3GH43w40CY= +go.opentelemetry.io/collector/confmap v1.19.0/go.mod h1:GgNu1ElPGmLn9govqIfjaopvdspw4PJ9KeDtWC4E2Q4= go.opentelemetry.io/collector/consumer v0.113.0 h1:KJSiK5vSIY9dgPxwKfQ3gOgKtQsqc+7IB7mGhUAL5c8= go.opentelemetry.io/collector/consumer v0.113.0/go.mod h1:zHMlXYFaJlZoLCBR6UwWoyXZ/adcO1u2ydqUal3VmYU= -go.opentelemetry.io/collector/consumer/consumerprofiles v0.112.0 h1:ym+QxemlbWwfMSUto1hRTfcZeYbj2q8FpMzjk8O+X60= -go.opentelemetry.io/collector/consumer/consumerprofiles v0.112.0/go.mod h1:4PjDUpURFh85R6NLEHrEf/uZjpk4LAYmmOrqu+iZsyE= -go.opentelemetry.io/collector/consumer/consumertest v0.112.0 h1:pGvNH+H4rMygUOql6ynVQim6UFdimTiJ0HRfQL6v0GE= -go.opentelemetry.io/collector/consumer/consumertest v0.112.0/go.mod h1:rfVo0tYt/BaLWw3IaQKVQafjUlMsA5qTkvsSOfFrr9c= +go.opentelemetry.io/collector/consumer/consumererror v0.113.0 h1:Hd2N7n9RKbnKRaVrdw6fPBoQko5zZIgCxwVxkL6SAIE= +go.opentelemetry.io/collector/consumer/consumererror v0.113.0/go.mod h1:o0MAGFdzcr7LFTUQ6iivPPhbVmn2ZVIYm3FPXk2+JUo= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.113.0 h1:RftAcQUY5UOfbEK4s16jnORqTx16y9+PxA1lQwt98cQ= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.113.0/go.mod h1:ZuHrQ4pWguh6dw0DgTfcUtdY/T+cnOJJNP6LMbm5Y5A= +go.opentelemetry.io/collector/consumer/consumertest v0.113.0 h1:ua2AjNx3DUA8qElXNkggB4w3VDL/rBKBvryOQkhumH8= +go.opentelemetry.io/collector/consumer/consumertest v0.113.0/go.mod h1:vK8o4ZTZSiG3rVyqxZcCNmT/cvEfx34ig7V65L9+6Rg= go.opentelemetry.io/collector/extension v0.113.0 h1:Vp/YSL8ZCkJQrP1lf2Bm5yaTvcp6ROO3AnfuSL3GEXM= go.opentelemetry.io/collector/extension v0.113.0/go.mod h1:Pwp0TNqdHeER4V1I6H6oCvrto/riiOAqs3737BWCnjw= go.opentelemetry.io/collector/extension/auth v0.113.0 h1:4ggRy1vepOabUiCWfU+6M9P/ftXojMUNAvBpeLihYj8= go.opentelemetry.io/collector/extension/auth v0.113.0/go.mod h1:VbvAm2YZAqePkWgwn0m0vBaq3aC49CxPVwHmrJ24aeQ= +go.opentelemetry.io/collector/extension/experimental/storage v0.113.0 h1:Qq4IaB6bMUrf/bWoPZ5ESWywCt+vDi8I/ChYejIEPcc= +go.opentelemetry.io/collector/extension/experimental/storage v0.113.0/go.mod h1:BRmo+A7f06u/rhyLauU/Vogk+QRN0y1j2VVVgMGWrfQ= +go.opentelemetry.io/collector/featuregate v1.19.0 h1:ASea2sU+tdpKI3RxIJC/pufDAfwAmrvcQ4EmTHVu0B0= +go.opentelemetry.io/collector/featuregate v1.19.0/go.mod h1:47xrISO71vJ83LSMm8+yIDsUbKktUp48Ovt7RR6VbRs= go.opentelemetry.io/collector/pdata v1.19.0 h1:jmnU5R8TOCbwRr4B8sjdRxM7L5WnEKlQWX1dtLYxIbE= go.opentelemetry.io/collector/pdata v1.19.0/go.mod h1:Ox1YVLe87cZDB/TL30i4SUz1cA5s6AM6SpFMfY61ICs= -go.opentelemetry.io/collector/pdata/pprofile v0.112.0 h1:t+LYorcMqZ3sDz5/jp3xU2l5lIhIXuIOOGO4Ef9CG2c= -go.opentelemetry.io/collector/pdata/pprofile v0.112.0/go.mod h1:F2aTCoDzIaxEUK1g92LZvMwradySFMo3ZsAnBIpOdUg= -go.opentelemetry.io/collector/pipeline v0.112.0 h1:jqKDdb8k53OLPibvxzX6fmMec0ZHAtqe4p2+cuHclEI= -go.opentelemetry.io/collector/pipeline v0.112.0/go.mod h1:4vOvjVsoYTHVGTbfFwqfnQOSV2K3RKUHofh3jNRc2Mg= -go.opentelemetry.io/collector/receiver v0.112.0 h1:gdTBDOPGKMZlZghtN5A7ZLNlNwCHWYcoJQeIiXvyGEQ= -go.opentelemetry.io/collector/receiver v0.112.0/go.mod h1:3QmfSUiyFzRTnHUqF8fyEvQpU5q/xuwS43jGt8JXEEA= +go.opentelemetry.io/collector/pdata/pprofile v0.113.0 h1:VRf4p0VhfuaR+Epy/nMIlu/9t39WU9CUgHVUvpuGxfU= +go.opentelemetry.io/collector/pdata/pprofile v0.113.0/go.mod h1:5aDejksdXh5PdJN/OhpzATGT3kbNL0RMmw2Q0Q6E/o0= +go.opentelemetry.io/collector/pdata/testdata v0.113.0 h1:vRfn85jicO2F4eOTgsWtzmU/K3E/uZUtM1HEefvvJD8= +go.opentelemetry.io/collector/pdata/testdata v0.113.0/go.mod h1:sR+6eR+YEJhYZu9StbqzeWcCmHpfBAgX/qjP82HY9Gw= +go.opentelemetry.io/collector/pipeline v0.113.0 h1:vSRzRe3717jV0btCNPhVkhg2lu0uFxcm2VO+vhad/eE= +go.opentelemetry.io/collector/pipeline v0.113.0/go.mod h1:4vOvjVsoYTHVGTbfFwqfnQOSV2K3RKUHofh3jNRc2Mg= +go.opentelemetry.io/collector/receiver v0.113.0 h1:vraAbkPy8Pz9x5X39gV+j9t6x23PNsY2aJ6gQMugRbQ= +go.opentelemetry.io/collector/receiver v0.113.0/go.mod h1:IUa8/lNw8Qh4L5Q3jOeRWKW0ebQPoNcfhytxN5Puq2A= +go.opentelemetry.io/collector/receiver/receiverprofiles v0.113.0 h1:uVxuzjGe2t1sbwahSBowVHYnGzpzn8brmfn8z1UHvQg= +go.opentelemetry.io/collector/receiver/receiverprofiles v0.113.0/go.mod h1:khKDkzYJR2x2OPUqGSmoSncdINT9lUE5IThiHPDbqZk= +go.opentelemetry.io/collector/receiver/receivertest v0.113.0 h1:0vOvz3S4Q/KwcNCS9C7zPo0uxD6RSWktG88yGdxfV6g= +go.opentelemetry.io/collector/receiver/receivertest v0.113.0/go.mod h1:sRq5ctm5UE/0Ar562wnCVQ1zbAie/D127D1WbtbEuEc= +go.opentelemetry.io/collector/semconv v0.113.0 h1:twenSI7M7MJMJKW8D6a/GXxPZTPbama/weywBtV2iFw= +go.opentelemetry.io/collector/semconv v0.113.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= @@ -105,6 +171,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -133,14 +201,21 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= +gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= +google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 h1:wKguEg1hsxI2/L3hUYrpo1RVi48K+uTyzKqprwLXsb8= +google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142/go.mod h1:d6be+8HhtEtucleCbxpPW9PA9XwISACu8nvpPqF0BVo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E= google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index 1432c2de1..d6028e986 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -21,9 +21,9 @@ import ( "time" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" - "go.etcd.io/etcd/proxy/grpcproxy/adapter" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" @@ -45,8 +45,9 @@ type splunksearchapireceiver struct { logsConsumer consumer.Logs config *Config settings component.TelemetrySettings + id component.ID client splunkSearchAPIClient - storageClient adapter.StorageClient + storageClient storage.Client record *eventRecord } @@ -63,7 +64,7 @@ func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component } // create storage client - storageClient, err := adapter.GetStorageClient(ssapir.config.StorageID) + storageClient, err := adapter.GetStorageClient(ctx, host, ssapir.config.StorageID, ssapir.id) if err != nil { return fmt.Errorf("failed to get storage client: %w", err) } From 61b14525d6a395104b5e8a328412d4fb15a17fb6 Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Fri, 15 Nov 2024 11:40:08 -0500 Subject: [PATCH 03/10] functional checkpoint --- receiver/splunksearchapireceiver/config.go | 4 ++ receiver/splunksearchapireceiver/factory.go | 11 ++-- receiver/splunksearchapireceiver/model.go | 5 ++ receiver/splunksearchapireceiver/receiver.go | 58 +++++++++++--------- 4 files changed, 48 insertions(+), 30 deletions(-) diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index f11631a55..f7c809ab7 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -62,6 +62,10 @@ func (cfg *Config) Validate() error { return errors.New("at least one search must be provided") } + if cfg.StorageID == nil { + return errors.New("storage configuration must be provided") + } + for _, search := range cfg.Searches { if search.Query == "" { return errors.New("missing query in search") diff --git a/receiver/splunksearchapireceiver/factory.go b/receiver/splunksearchapireceiver/factory.go index 7d571fd4e..0c04ff377 100644 --- a/receiver/splunksearchapireceiver/factory.go +++ b/receiver/splunksearchapireceiver/factory.go @@ -42,11 +42,12 @@ func createLogsReceiver(_ context.Context, ) (receiver.Logs, error) { ssapirConfig := cfg.(*Config) ssapir := &splunksearchapireceiver{ - logger: params.Logger, - logsConsumer: consumer, - config: ssapirConfig, - id: params.ID, - settings: params.TelemetrySettings, + logger: params.Logger, + logsConsumer: consumer, + config: ssapirConfig, + id: params.ID, + settings: params.TelemetrySettings, + checkpointRecord: &EventRecord{}, } return ssapir, nil } diff --git a/receiver/splunksearchapireceiver/model.go b/receiver/splunksearchapireceiver/model.go index 61b6e7691..0320d8fdb 100644 --- a/receiver/splunksearchapireceiver/model.go +++ b/receiver/splunksearchapireceiver/model.go @@ -58,3 +58,8 @@ type SearchResultsResponse struct { Time string `json:"_time"` } `json:"results"` } + +// EventRecord struct stores the offset of the last event exported successfully +type EventRecord struct { + Offset int `json:"offset"` +} diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index d6028e986..51d8e9e86 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -40,19 +40,15 @@ var ( ) type splunksearchapireceiver struct { - host component.Host - logger *zap.Logger - logsConsumer consumer.Logs - config *Config - settings component.TelemetrySettings - id component.ID - client splunkSearchAPIClient - storageClient storage.Client - record *eventRecord -} - -type eventRecord struct { - Offset int `json:"offset"` + host component.Host + logger *zap.Logger + logsConsumer consumer.Logs + config *Config + settings component.TelemetrySettings + id component.ID + client splunkSearchAPIClient + storageClient storage.Client + checkpointRecord *EventRecord } func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component.Host) error { @@ -70,15 +66,26 @@ func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component } ssapir.storageClient = storageClient - // check if a checkpoint already exists + // if a checkpoint already exists, use the offset from the checkpoint ssapir.loadCheckpoint(ctx) + if ssapir.checkpointRecord.Offset != 0 { + ssapir.logger.Info("found offset checkpoint in storage extension", zap.Int("offset", ssapir.checkpointRecord.Offset)) + offset = ssapir.checkpointRecord.Offset + } go ssapir.runQueries(ctx) return nil } -func (ssapir *splunksearchapireceiver) Shutdown(_ context.Context) error { - return nil +func (ssapir *splunksearchapireceiver) Shutdown(ctx context.Context) error { + ssapir.logger.Info("shutting down logs receiver") + + err := ssapir.checkpoint(ctx) + if err != nil { + ssapir.logger.Error("failed checkpoint", zap.Error(err)) + } + + return ssapir.storageClient.Close(ctx) } func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { @@ -154,24 +161,26 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { // error from down the pipeline, freak out ssapir.logger.Error("error consuming logs", zap.Error(err)) } - // last batch of logs has been successfully exported, update checkpoint - ssapir.record.Offset = offset + // last batch of logs has been successfully exported + exportedEvents += logs.ResourceLogs().Len() + offset += len(results.Results) + + // update checkpoint + ssapir.checkpointRecord.Offset = offset + fmt.Println("offset", offset) err = ssapir.checkpoint(ctx) if err != nil { ssapir.logger.Error("error writing checkpoint", zap.Error(err)) } if limitReached { ssapir.logger.Info("limit reached, stopping search result export") - exportedEvents += logs.ResourceLogs().Len() break } // if the number of results is less than the results per request, we have queried all pages for the search if len(results.Results) < search.EventBatchSize { - exportedEvents += len(results.Results) break } - exportedEvents += logs.ResourceLogs().Len() - offset += len(results.Results) + } ssapir.logger.Info("all search results exported", zap.String("query", search.Query), zap.Int("total results", exportedEvents)) } @@ -234,7 +243,7 @@ func (ssapir *splunksearchapireceiver) getSplunkSearchResults(sid string, offset } func (ssapir *splunksearchapireceiver) checkpoint(ctx context.Context) error { - marshalBytes, err := json.Marshal(ssapir.record) + marshalBytes, err := json.Marshal(ssapir.checkpointRecord) if err != nil { return fmt.Errorf("failed to write checkpoint: %w", err) } @@ -251,8 +260,7 @@ func (ssapir *splunksearchapireceiver) loadCheckpoint(ctx context.Context) { ssapir.logger.Info("no checkpoint found") return } - err = json.Unmarshal(marshalBytes, ssapir.record) - if err != nil { + if err = json.Unmarshal(marshalBytes, ssapir.checkpointRecord); err != nil { ssapir.logger.Error("failed to unmarshal checkpoint", zap.Error(err)) } } From 1a3407af185abe0888d0163011cbd867e800d5eb Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Fri, 15 Nov 2024 14:26:50 -0500 Subject: [PATCH 04/10] debug logs, rm print --- receiver/splunksearchapireceiver/receiver.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index 51d8e9e86..d7b8017d3 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -78,7 +78,7 @@ func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component } func (ssapir *splunksearchapireceiver) Shutdown(ctx context.Context) error { - ssapir.logger.Info("shutting down logs receiver") + ssapir.logger.Debug("shutting down logs receiver") err := ssapir.checkpoint(ctx) if err != nil { @@ -167,7 +167,6 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { // update checkpoint ssapir.checkpointRecord.Offset = offset - fmt.Println("offset", offset) err = ssapir.checkpoint(ctx) if err != nil { ssapir.logger.Error("error writing checkpoint", zap.Error(err)) @@ -182,7 +181,7 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { } } - ssapir.logger.Info("all search results exported", zap.String("query", search.Query), zap.Int("total results", exportedEvents)) + ssapir.logger.Debug("all search results exported", zap.String("query", search.Query), zap.Int("total results", exportedEvents)) } return nil } @@ -261,6 +260,6 @@ func (ssapir *splunksearchapireceiver) loadCheckpoint(ctx context.Context) { return } if err = json.Unmarshal(marshalBytes, ssapir.checkpointRecord); err != nil { - ssapir.logger.Error("failed to unmarshal checkpoint", zap.Error(err)) + ssapir.logger.Fatal("failed to unmarshal checkpoint", zap.Error(err)) } } From f753e78561cc6f0c6a2d160bd9e46d897bcbf5b1 Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Fri, 15 Nov 2024 16:18:05 -0500 Subject: [PATCH 05/10] loadCheckpoint return error --- receiver/splunksearchapireceiver/receiver.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index d7b8017d3..b3efbb564 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -67,7 +67,9 @@ func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component ssapir.storageClient = storageClient // if a checkpoint already exists, use the offset from the checkpoint - ssapir.loadCheckpoint(ctx) + if err = ssapir.loadCheckpoint(ctx); err != nil { + return fmt.Errorf("failed to load checkpoint: %w", err) + } if ssapir.checkpointRecord.Offset != 0 { ssapir.logger.Info("found offset checkpoint in storage extension", zap.Int("offset", ssapir.checkpointRecord.Offset)) offset = ssapir.checkpointRecord.Offset @@ -249,17 +251,14 @@ func (ssapir *splunksearchapireceiver) checkpoint(ctx context.Context) error { return ssapir.storageClient.Set(ctx, eventStorageKey, marshalBytes) } -func (ssapir *splunksearchapireceiver) loadCheckpoint(ctx context.Context) { +func (ssapir *splunksearchapireceiver) loadCheckpoint(ctx context.Context) error { marshalBytes, err := ssapir.storageClient.Get(ctx, eventStorageKey) if err != nil { - ssapir.logger.Error("failed to read checkpoint", zap.Error(err)) - return + return err } if marshalBytes == nil { ssapir.logger.Info("no checkpoint found") - return - } - if err = json.Unmarshal(marshalBytes, ssapir.checkpointRecord); err != nil { - ssapir.logger.Fatal("failed to unmarshal checkpoint", zap.Error(err)) + return nil } + return json.Unmarshal(marshalBytes, ssapir.checkpointRecord) } From e775560077b856afb2b926298523a82812caea3f Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Mon, 18 Nov 2024 15:27:21 -0500 Subject: [PATCH 06/10] splunk failure test --- receiver/splunksearchapireceiver/config.go | 2 +- receiver/splunksearchapireceiver/go.mod | 9 +- receiver/splunksearchapireceiver/go.sum | 10 -- .../integration_test.go | 123 ++++++++++++++++++ receiver/splunksearchapireceiver/model.go | 3 +- receiver/splunksearchapireceiver/receiver.go | 44 +++++-- 6 files changed, 165 insertions(+), 26 deletions(-) create mode 100644 receiver/splunksearchapireceiver/integration_test.go diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index f7c809ab7..04e2cd039 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -63,7 +63,7 @@ func (cfg *Config) Validate() error { } if cfg.StorageID == nil { - return errors.New("storage configuration must be provided") + return errors.New("storage configuration is required for this receiver") } for _, search := range cfg.Searches { diff --git a/receiver/splunksearchapireceiver/go.mod b/receiver/splunksearchapireceiver/go.mod index df4d1ea72..033797a95 100644 --- a/receiver/splunksearchapireceiver/go.mod +++ b/receiver/splunksearchapireceiver/go.mod @@ -5,9 +5,9 @@ go 1.22.5 require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.113.0 github.com/stretchr/testify v1.9.0 - go.etcd.io/etcd v3.3.27+incompatible go.opentelemetry.io/collector/component v0.113.0 go.opentelemetry.io/collector/consumer v0.113.0 + go.opentelemetry.io/collector/extension/experimental/storage v0.113.0 go.opentelemetry.io/collector/pdata v1.19.0 go.opentelemetry.io/collector/receiver v0.113.0 go.uber.org/zap v1.27.0 @@ -16,7 +16,6 @@ require ( require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/coreos/etcd v3.3.27+incompatible // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/elastic/lunes v0.1.0 // indirect github.com/expr-lang/expr v1.16.9 // indirect @@ -26,8 +25,8 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/goccy/go-json v0.10.3 // indirect - github.com/golang/protobuf v1.5.0 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect @@ -55,13 +54,13 @@ require ( go.opentelemetry.io/collector/consumer/consumertest v0.113.0 // indirect go.opentelemetry.io/collector/extension v0.113.0 // indirect go.opentelemetry.io/collector/extension/auth v0.113.0 // indirect - go.opentelemetry.io/collector/extension/experimental/storage v0.113.0 // indirect go.opentelemetry.io/collector/featuregate v1.19.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.113.0 // indirect go.opentelemetry.io/collector/semconv v0.113.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect + go.opentelemetry.io/otel/sdk v1.31.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect gonum.org/v1/gonum v0.15.1 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/receiver/splunksearchapireceiver/go.sum b/receiver/splunksearchapireceiver/go.sum index 482572095..52a035194 100644 --- a/receiver/splunksearchapireceiver/go.sum +++ b/receiver/splunksearchapireceiver/go.sum @@ -2,8 +2,6 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/coreos/etcd v3.3.27+incompatible h1:QIudLb9KeBsE5zyYxd1mjzRSkzLg9Wf9QlRwFgd6oTA= -github.com/coreos/etcd v3.3.27+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -26,12 +24,9 @@ github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -96,8 +91,6 @@ github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXV github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.etcd.io/etcd v3.3.27+incompatible h1:5hMrpf6REqTHV2LW2OclNpRtxI0k9ZplMemJsMSWju0= -go.etcd.io/etcd v3.3.27+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.opentelemetry.io/collector/client v1.19.0 h1:TUal8WV1agTrZStgE7BJ8ZC0IHLGtrfgO9ogU9t1mv8= go.opentelemetry.io/collector/client v1.19.0/go.mod h1:jgiXMEM6l8L2QEyf2I/M47Zd8+G7e4z+6H8q5SkHOlQ= go.opentelemetry.io/collector/component v0.113.0 h1:/nx+RvZgxUEXP+YcTj69rEtuSEGkfaCyp/ad5zQGLjU= @@ -203,13 +196,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= -google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 h1:wKguEg1hsxI2/L3hUYrpo1RVi48K+uTyzKqprwLXsb8= -google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142/go.mod h1:d6be+8HhtEtucleCbxpPW9PA9XwISACu8nvpPqF0BVo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E= google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/receiver/splunksearchapireceiver/integration_test.go b/receiver/splunksearchapireceiver/integration_test.go new file mode 100644 index 000000000..b765f34d2 --- /dev/null +++ b/receiver/splunksearchapireceiver/integration_test.go @@ -0,0 +1,123 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/extension/experimental/storage" + "go.uber.org/zap" +) + +// Test the case where some data is exported, but a subsequent call for paginated data fails +func TestSplunkResultsPaginationFailure(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.Searches = []Search{ + { + Query: "search index=otel", + EarliestTime: "2024-11-14T00:00:00.000Z", + LatestTime: "2024-11-14T23:59:59.000Z", + EventBatchSize: 5, + }, + } + var callCount int = 0 + server := newMockSplunkServer(&callCount) + defer server.Close() + settings := componenttest.NewNopTelemetrySettings() + ssapir := newSSAPIReceiver(zap.NewNop(), cfg, settings, component.NewID(typeStr)) + ssapir.client, _ = newSplunkSearchAPIClient(context.Background(), settings, *cfg, componenttest.NewNopHost()) + ssapir.client.(*defaultSplunkSearchAPIClient).client = server.Client() + ssapir.client.(*defaultSplunkSearchAPIClient).endpoint = server.URL + ssapir.logsConsumer = &consumertest.LogsSink{} + + ssapir.storageClient = storage.NewNopClient() + + ssapir.initCheckpoint(context.Background()) + ssapir.runQueries(context.Background()) + require.Equal(t, 5, ssapir.checkpointRecord.Offset) + require.Equal(t, 1, callCount) +} + +func newMockSplunkServer(callCount *int) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.URL.String() == "/services/search/jobs" { + rw.Header().Set("Content-Type", "application/xml") + rw.WriteHeader(201) + rw.Write([]byte(` + + 123456 + + `)) + } + if req.URL.String() == "/services/search/v2/jobs/123456" { + rw.Header().Set("Content-Type", "application/xml") + rw.WriteHeader(200) + rw.Write([]byte(` + + + DISPATCH + + DONE + + + `)) + } + if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=0&count=5" && req.URL.Query().Get("offset") == "0" { + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(200) + rw.Write(splunkEventsResultsP1) + *callCount++ + } + if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=5&count=5" && req.URL.Query().Get("offset") == "5" { + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(400) + rw.Write([]byte("error, bad request")) + } + })) +} + +var splunkEventsResultsP1 = []byte(`{ + "init_offset": 0, + "results": [ + { + "_raw": "Hello, world!", + "_time": "2024-11-14T13:02:31.000-05:00" + }, + { + "_raw": "Goodbye, world!", + "_time": "2024-11-14T13:02:30.000-05:00" + }, + { + "_raw": "lorem ipsum", + "_time": "2024-11-14T13:02:29.000-05:00" + }, + { + "_raw": "dolor sit amet", + "_time": "2024-11-14T13:02:28.000-05:00" + }, + { + "_raw": "consectetur adipiscing elit", + "_time": "2024-11-14T13:02:27.000-05:00" + } + ] +}`) diff --git a/receiver/splunksearchapireceiver/model.go b/receiver/splunksearchapireceiver/model.go index 0320d8fdb..f5aa91697 100644 --- a/receiver/splunksearchapireceiver/model.go +++ b/receiver/splunksearchapireceiver/model.go @@ -61,5 +61,6 @@ type SearchResultsResponse struct { // EventRecord struct stores the offset of the last event exported successfully type EventRecord struct { - Offset int `json:"offset"` + Offset int `json:"offset"` + Search string `json:"search"` } diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index b3efbb564..7be73823f 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -51,6 +51,21 @@ type splunksearchapireceiver struct { checkpointRecord *EventRecord } +func newSSAPIReceiver( + logger *zap.Logger, + config *Config, + settings component.TelemetrySettings, + id component.ID, +) *splunksearchapireceiver { + return &splunksearchapireceiver{ + logger: logger, + config: config, + settings: settings, + id: id, + checkpointRecord: &EventRecord{}, + } +} + func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component.Host) error { ssapir.host = host var err error @@ -66,15 +81,7 @@ func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component } ssapir.storageClient = storageClient - // if a checkpoint already exists, use the offset from the checkpoint - if err = ssapir.loadCheckpoint(ctx); err != nil { - return fmt.Errorf("failed to load checkpoint: %w", err) - } - if ssapir.checkpointRecord.Offset != 0 { - ssapir.logger.Info("found offset checkpoint in storage extension", zap.Int("offset", ssapir.checkpointRecord.Offset)) - offset = ssapir.checkpointRecord.Offset - } - + ssapir.initCheckpoint(ctx) go ssapir.runQueries(ctx) return nil } @@ -243,6 +250,25 @@ func (ssapir *splunksearchapireceiver) getSplunkSearchResults(sid string, offset return resp, nil } +func (ssapir *splunksearchapireceiver) initCheckpoint(ctx context.Context) error { + // if a checkpoint already exists, use the offset from the checkpoint + if err := ssapir.loadCheckpoint(ctx); err != nil { + return fmt.Errorf("failed to load checkpoint: %w", err) + } + if ssapir.checkpointRecord.Offset != 0 { + // check if the search query in the checkpoint record matches any of the search queries in the config + for idx, search := range ssapir.config.Searches { + if search.Query == ssapir.checkpointRecord.Search { + ssapir.logger.Info("found offset checkpoint in storage extension", zap.Int("offset", ssapir.checkpointRecord.Offset), zap.String("search", ssapir.checkpointRecord.Search)) + // skip searches that have already been processed, use the offset from the checkpoint + ssapir.config.Searches = ssapir.config.Searches[idx:] + offset = ssapir.checkpointRecord.Offset + } + } + } + return nil +} + func (ssapir *splunksearchapireceiver) checkpoint(ctx context.Context) error { marshalBytes, err := json.Marshal(ssapir.checkpointRecord) if err != nil { From 28927d91033e3dbdc4e35aa9a7ac40524068e21d Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Mon, 18 Nov 2024 16:20:40 -0500 Subject: [PATCH 07/10] storage config test --- .../splunksearchapireceiver/config_test.go | 36 ++++++++++++++++++- receiver/splunksearchapireceiver/go.mod | 7 ++-- receiver/splunksearchapireceiver/go.sum | 10 +++--- 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/receiver/splunksearchapireceiver/config_test.go b/receiver/splunksearchapireceiver/config_test.go index 905b52e4d..90440234c 100644 --- a/receiver/splunksearchapireceiver/config_test.go +++ b/receiver/splunksearchapireceiver/config_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" ) func TestValidate(t *testing.T) { @@ -26,6 +27,7 @@ func TestValidate(t *testing.T) { endpoint string username string password string + storage string searches []Search errExpected bool errText string @@ -34,6 +36,7 @@ func TestValidate(t *testing.T) { desc: "Missing endpoint", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal", @@ -48,6 +51,7 @@ func TestValidate(t *testing.T) { desc: "Missing username", endpoint: "http://localhost:8089", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal", @@ -62,6 +66,7 @@ func TestValidate(t *testing.T) { desc: "Missing password", endpoint: "http://localhost:8089", username: "user", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal", @@ -72,11 +77,27 @@ func TestValidate(t *testing.T) { errExpected: true, errText: "missing Splunk password", }, + { + desc: "Missing storage", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "storage configuration is required for this receiver", + }, { desc: "Missing searches", endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", errExpected: true, errText: "at least one search must be provided", }, @@ -85,6 +106,7 @@ func TestValidate(t *testing.T) { endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { EarliestTime: "2024-10-30T04:00:00.000Z", @@ -99,6 +121,7 @@ func TestValidate(t *testing.T) { endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal", @@ -113,6 +136,7 @@ func TestValidate(t *testing.T) { endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal", @@ -128,6 +152,7 @@ func TestValidate(t *testing.T) { endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal", @@ -142,6 +167,7 @@ func TestValidate(t *testing.T) { endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal", @@ -157,6 +183,7 @@ func TestValidate(t *testing.T) { endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal | stats count by sourcetype", @@ -172,6 +199,7 @@ func TestValidate(t *testing.T) { endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal", @@ -186,6 +214,7 @@ func TestValidate(t *testing.T) { endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal", @@ -205,6 +234,7 @@ func TestValidate(t *testing.T) { endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal", @@ -216,10 +246,11 @@ func TestValidate(t *testing.T) { errExpected: false, }, { - desc: "Query with ealiest and latest time", + desc: "Query with earliest and latest time", endpoint: "http://localhost:8089", username: "user", password: "password", + storage: "file_storage", searches: []Search{ { Query: "search index=_internal earliest=2024-10-30T04:00:00.000Z latest=2024-10-30T14:00:00.000Z", @@ -238,6 +269,9 @@ func TestValidate(t *testing.T) { cfg.Username = tc.username cfg.Password = tc.password cfg.Searches = tc.searches + if tc.storage != "" { + cfg.StorageID = &component.ID{} + } err := cfg.Validate() if tc.errExpected { require.EqualError(t, err, tc.errText) diff --git a/receiver/splunksearchapireceiver/go.mod b/receiver/splunksearchapireceiver/go.mod index 033797a95..7d5c2c71f 100644 --- a/receiver/splunksearchapireceiver/go.mod +++ b/receiver/splunksearchapireceiver/go.mod @@ -6,8 +6,11 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.113.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.113.0 + go.opentelemetry.io/collector/confmap v1.20.0 go.opentelemetry.io/collector/consumer v0.113.0 + go.opentelemetry.io/collector/consumer/consumertest v0.113.0 go.opentelemetry.io/collector/extension/experimental/storage v0.113.0 + go.opentelemetry.io/collector/filter v0.114.0 go.opentelemetry.io/collector/pdata v1.19.0 go.opentelemetry.io/collector/receiver v0.113.0 go.uber.org/zap v1.27.0 @@ -31,7 +34,7 @@ require ( github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect - github.com/knadh/koanf/v2 v2.1.1 // indirect + github.com/knadh/koanf/v2 v2.1.2 // indirect github.com/leodido/go-syslog/v4 v4.2.0 // indirect github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect github.com/magefile/mage v1.15.0 // indirect @@ -48,10 +51,8 @@ require ( go.opentelemetry.io/collector/config/configopaque v1.19.0 // indirect go.opentelemetry.io/collector/config/configtls v1.19.0 // indirect go.opentelemetry.io/collector/config/internal v0.113.0 // indirect - go.opentelemetry.io/collector/confmap v1.19.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.113.0 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.113.0 // indirect - go.opentelemetry.io/collector/consumer/consumertest v0.113.0 // indirect go.opentelemetry.io/collector/extension v0.113.0 // indirect go.opentelemetry.io/collector/extension/auth v0.113.0 // indirect go.opentelemetry.io/collector/featuregate v1.19.0 // indirect diff --git a/receiver/splunksearchapireceiver/go.sum b/receiver/splunksearchapireceiver/go.sum index 52a035194..c53898099 100644 --- a/receiver/splunksearchapireceiver/go.sum +++ b/receiver/splunksearchapireceiver/go.sum @@ -44,8 +44,8 @@ github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NI github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= -github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM= -github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= +github.com/knadh/koanf/v2 v2.1.2 h1:I2rtLRqXRy1p01m/utEtpZSSA6dcJbgGVuE27kW2PzQ= +github.com/knadh/koanf/v2 v2.1.2/go.mod h1:Gphfaen0q1Fc1HTgJgSTC4oRX9R2R5ErYMZJy8fLJBo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -109,8 +109,8 @@ go.opentelemetry.io/collector/config/configtls v1.19.0 h1:GQ/cF1hgNqHVBq2oSSrOFX go.opentelemetry.io/collector/config/configtls v1.19.0/go.mod h1:1hyqnYB3JqEUlk1ME/s9HYz4oCRcxQCRxsJitFFT/cA= go.opentelemetry.io/collector/config/internal v0.113.0 h1:9RAzH8v7ItFT1npHpvP0SvUzBHcZDliCGRo9Spp6v7c= go.opentelemetry.io/collector/config/internal v0.113.0/go.mod h1:yC7E4h1Uj0SubxcFImh6OvBHFTjMh99+A5PuyIgDWqc= -go.opentelemetry.io/collector/confmap v1.19.0 h1:TQ0lZpAKqgsE0EKk+u4JA+uBbPYeFRmWP3GH43w40CY= -go.opentelemetry.io/collector/confmap v1.19.0/go.mod h1:GgNu1ElPGmLn9govqIfjaopvdspw4PJ9KeDtWC4E2Q4= +go.opentelemetry.io/collector/confmap v1.20.0 h1:ARfOwmkKxFOud1njl03yAHQ30+uenlzqCO6LBYamDTE= +go.opentelemetry.io/collector/confmap v1.20.0/go.mod h1:DMpd9Ay/ffls3JoQBQ73vWeRsz1rNuLbwjo6WtjSQus= go.opentelemetry.io/collector/consumer v0.113.0 h1:KJSiK5vSIY9dgPxwKfQ3gOgKtQsqc+7IB7mGhUAL5c8= go.opentelemetry.io/collector/consumer v0.113.0/go.mod h1:zHMlXYFaJlZoLCBR6UwWoyXZ/adcO1u2ydqUal3VmYU= go.opentelemetry.io/collector/consumer/consumererror v0.113.0 h1:Hd2N7n9RKbnKRaVrdw6fPBoQko5zZIgCxwVxkL6SAIE= @@ -127,6 +127,8 @@ go.opentelemetry.io/collector/extension/experimental/storage v0.113.0 h1:Qq4IaB6 go.opentelemetry.io/collector/extension/experimental/storage v0.113.0/go.mod h1:BRmo+A7f06u/rhyLauU/Vogk+QRN0y1j2VVVgMGWrfQ= go.opentelemetry.io/collector/featuregate v1.19.0 h1:ASea2sU+tdpKI3RxIJC/pufDAfwAmrvcQ4EmTHVu0B0= go.opentelemetry.io/collector/featuregate v1.19.0/go.mod h1:47xrISO71vJ83LSMm8+yIDsUbKktUp48Ovt7RR6VbRs= +go.opentelemetry.io/collector/filter v0.114.0 h1:5I97yblUxc6rXCYRn542aSrsNQLo/dE+87XROW2b5oU= +go.opentelemetry.io/collector/filter v0.114.0/go.mod h1:Nxwc+RD9AH4y/qYtkTP+Ac19CxgW5GAB+sJU4ACLr6g= go.opentelemetry.io/collector/pdata v1.19.0 h1:jmnU5R8TOCbwRr4B8sjdRxM7L5WnEKlQWX1dtLYxIbE= go.opentelemetry.io/collector/pdata v1.19.0/go.mod h1:Ox1YVLe87cZDB/TL30i4SUz1cA5s6AM6SpFMfY61ICs= go.opentelemetry.io/collector/pdata/pprofile v0.113.0 h1:VRf4p0VhfuaR+Epy/nMIlu/9t39WU9CUgHVUvpuGxfU= From d4df0dbeaefc6ad0f37632722cab29f8accbbfe2 Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Mon, 18 Nov 2024 16:30:40 -0500 Subject: [PATCH 08/10] lint, tidy --- go.mod | 8 +++----- go.sum | 16 ++++++---------- .../splunksearchapireceiver/integration_test.go | 2 +- receiver/splunksearchapireceiver/receiver.go | 5 ++++- 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index c5c0657c8..51ec0f89b 100644 --- a/go.mod +++ b/go.mod @@ -166,7 +166,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.113.0 - go.opentelemetry.io/collector/confmap v1.19.0 + go.opentelemetry.io/collector/confmap v1.20.0 go.opentelemetry.io/collector/confmap/provider/envprovider v1.19.0 go.opentelemetry.io/collector/confmap/provider/fileprovider v1.19.0 go.opentelemetry.io/collector/confmap/provider/httpsprovider v1.19.0 @@ -308,7 +308,6 @@ require ( github.com/briandowns/spinner v1.23.0 // indirect github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect github.com/containerd/cgroups/v3 v3.0.3 // indirect - github.com/coreos/etcd v3.3.27+incompatible // indirect github.com/coreos/go-oidc/v3 v3.11.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/distribution/reference v0.6.0 // indirect @@ -348,7 +347,7 @@ require ( github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect github.com/kelseyhightower/envconfig v1.4.0 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect - github.com/knadh/koanf/v2 v2.1.1 // indirect + github.com/knadh/koanf/v2 v2.1.2 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/go-syslog/v4 v4.2.0 // indirect github.com/lestrrat-go/strftime v1.1.0 // indirect @@ -399,7 +398,6 @@ require ( go.elastic.co/apm/module/apmzap/v2 v2.6.0 // indirect go.elastic.co/apm/v2 v2.6.0 // indirect go.elastic.co/fastjson v1.3.0 // indirect - go.etcd.io/etcd v3.3.27+incompatible // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.113.0 // indirect go.opentelemetry.io/collector/client v1.19.0 // indirect @@ -426,7 +424,7 @@ require ( go.opentelemetry.io/collector/extension/auth v0.113.0 // indirect go.opentelemetry.io/collector/extension/experimental/storage v0.113.0 // indirect go.opentelemetry.io/collector/extension/extensioncapabilities v0.113.0 // indirect - go.opentelemetry.io/collector/filter v0.113.0 // indirect + go.opentelemetry.io/collector/filter v0.114.0 // indirect go.opentelemetry.io/collector/internal/fanoutconsumer v0.113.0 // indirect go.opentelemetry.io/collector/internal/memorylimiter v0.113.0 // indirect go.opentelemetry.io/collector/internal/sharedcomponent v0.113.0 // indirect diff --git a/go.sum b/go.sum index 4a5a80d69..a2443ce56 100644 --- a/go.sum +++ b/go.sum @@ -1127,8 +1127,6 @@ github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkE github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= -github.com/coreos/etcd v3.3.27+incompatible h1:QIudLb9KeBsE5zyYxd1mjzRSkzLg9Wf9QlRwFgd6oTA= -github.com/coreos/etcd v3.3.27+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-oidc/v3 v3.11.0 h1:Ia3MxdwpSw702YW0xgfmP1GVCMA9aEFWu12XUZ3/OtI= github.com/coreos/go-oidc/v3 v3.11.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0= @@ -1731,8 +1729,8 @@ github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/q github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= github.com/knadh/koanf v1.5.0/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= -github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM= -github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= +github.com/knadh/koanf/v2 v2.1.2 h1:I2rtLRqXRy1p01m/utEtpZSSA6dcJbgGVuE27kW2PzQ= +github.com/knadh/koanf/v2 v2.1.2/go.mod h1:Gphfaen0q1Fc1HTgJgSTC4oRX9R2R5ErYMZJy8fLJBo= github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00= github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -2614,8 +2612,6 @@ go.elastic.co/fastjson v1.3.0/go.mod h1:K9vDh7O0ODsVKV2B5e2XYLY277QZaCbB3tS1SnAR go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= -go.etcd.io/etcd v3.3.27+incompatible h1:5hMrpf6REqTHV2LW2OclNpRtxI0k9ZplMemJsMSWju0= -go.etcd.io/etcd v3.3.27+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= @@ -2661,8 +2657,8 @@ go.opentelemetry.io/collector/config/configtls v1.19.0 h1:GQ/cF1hgNqHVBq2oSSrOFX go.opentelemetry.io/collector/config/configtls v1.19.0/go.mod h1:1hyqnYB3JqEUlk1ME/s9HYz4oCRcxQCRxsJitFFT/cA= go.opentelemetry.io/collector/config/internal v0.113.0 h1:9RAzH8v7ItFT1npHpvP0SvUzBHcZDliCGRo9Spp6v7c= go.opentelemetry.io/collector/config/internal v0.113.0/go.mod h1:yC7E4h1Uj0SubxcFImh6OvBHFTjMh99+A5PuyIgDWqc= -go.opentelemetry.io/collector/confmap v1.19.0 h1:TQ0lZpAKqgsE0EKk+u4JA+uBbPYeFRmWP3GH43w40CY= -go.opentelemetry.io/collector/confmap v1.19.0/go.mod h1:GgNu1ElPGmLn9govqIfjaopvdspw4PJ9KeDtWC4E2Q4= +go.opentelemetry.io/collector/confmap v1.20.0 h1:ARfOwmkKxFOud1njl03yAHQ30+uenlzqCO6LBYamDTE= +go.opentelemetry.io/collector/confmap v1.20.0/go.mod h1:DMpd9Ay/ffls3JoQBQ73vWeRsz1rNuLbwjo6WtjSQus= go.opentelemetry.io/collector/confmap/provider/envprovider v1.19.0 h1:f8O/I5pVRN86Gx5mHekNx92S6fGdOS4VcooRJKWe6Bs= go.opentelemetry.io/collector/confmap/provider/envprovider v1.19.0/go.mod h1:AiaW5YW1LD0/WlZuc8eZuZPBH6PA9QqsiAYRX1iC6T0= go.opentelemetry.io/collector/confmap/provider/fileprovider v1.19.0 h1:TYwyk4ea3U+5MYcEjrzZAaonBcLlabQu8CZeB7ekAYY= @@ -2719,8 +2715,8 @@ go.opentelemetry.io/collector/extension/zpagesextension v0.113.0 h1:b/Clxso9uVwL go.opentelemetry.io/collector/extension/zpagesextension v0.113.0/go.mod h1:5csGYy9Ydfy6Hpw3Tod864P6HUEZpA6UiuPJPG3TjSU= go.opentelemetry.io/collector/featuregate v1.19.0 h1:ASea2sU+tdpKI3RxIJC/pufDAfwAmrvcQ4EmTHVu0B0= go.opentelemetry.io/collector/featuregate v1.19.0/go.mod h1:47xrISO71vJ83LSMm8+yIDsUbKktUp48Ovt7RR6VbRs= -go.opentelemetry.io/collector/filter v0.113.0 h1:5ODwM8QEOzZq08H8DJilBa4PHieXpBreJVKZ0D2YshA= -go.opentelemetry.io/collector/filter v0.113.0/go.mod h1:Mh3N6cpVijdamUJj1tAgSU1RG/Ek4FuY2ODKYxKZDtk= +go.opentelemetry.io/collector/filter v0.114.0 h1:5I97yblUxc6rXCYRn542aSrsNQLo/dE+87XROW2b5oU= +go.opentelemetry.io/collector/filter v0.114.0/go.mod h1:Nxwc+RD9AH4y/qYtkTP+Ac19CxgW5GAB+sJU4ACLr6g= go.opentelemetry.io/collector/internal/fanoutconsumer v0.113.0 h1:Beu2zAN6/EDXQ6hMFU6FT1BsnU5FXmWNOlfTAhrgbGc= go.opentelemetry.io/collector/internal/fanoutconsumer v0.113.0/go.mod h1:WUXbc4L6KJ3SpmsxBgId0OYzRDuS7n274kNpqrgnSmY= go.opentelemetry.io/collector/internal/memorylimiter v0.113.0 h1:qe3xZYB4BgSuPDgFMQbcJ5gDy8t+S1vt6pL+OKrdx9E= diff --git a/receiver/splunksearchapireceiver/integration_test.go b/receiver/splunksearchapireceiver/integration_test.go index b765f34d2..3cf444257 100644 --- a/receiver/splunksearchapireceiver/integration_test.go +++ b/receiver/splunksearchapireceiver/integration_test.go @@ -40,7 +40,7 @@ func TestSplunkResultsPaginationFailure(t *testing.T) { EventBatchSize: 5, }, } - var callCount int = 0 + var callCount int server := newMockSplunkServer(&callCount) defer server.Close() settings := componenttest.NewNopTelemetrySettings() diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index 85d9d9a76..6c581f00d 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -81,7 +81,10 @@ func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component } ssapir.storageClient = storageClient - ssapir.initCheckpoint(ctx) + err = ssapir.initCheckpoint(ctx) + if err != nil { + return fmt.Errorf("failed to initialize checkpoint: %w", err) + } go ssapir.runQueries(ctx) return nil } From 2153d9e03670ba0fd8de6f69e60524a90ff352da Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Tue, 19 Nov 2024 10:52:23 -0500 Subject: [PATCH 09/10] return error on export fail --- receiver/splunksearchapireceiver/receiver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index 6c581f00d..d03d3c70a 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -170,7 +170,8 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { err = ssapir.logsConsumer.ConsumeLogs(ctx, logs) if err != nil { // error from down the pipeline, freak out - ssapir.logger.Error("error consuming logs", zap.Error(err)) + return fmt.Errorf("error consuming logs: %w", err) + } // last batch of logs has been successfully exported exportedEvents += logs.ResourceLogs().Len() From 34907e1128f8f6534912e3d86184bff37b78508d Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Tue, 19 Nov 2024 11:23:07 -0500 Subject: [PATCH 10/10] tidy --- receiver/splunksearchapireceiver/go.mod | 3 +-- receiver/splunksearchapireceiver/go.sum | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/receiver/splunksearchapireceiver/go.mod b/receiver/splunksearchapireceiver/go.mod index 7d5c2c71f..22d7df545 100644 --- a/receiver/splunksearchapireceiver/go.mod +++ b/receiver/splunksearchapireceiver/go.mod @@ -6,11 +6,9 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.113.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.113.0 - go.opentelemetry.io/collector/confmap v1.20.0 go.opentelemetry.io/collector/consumer v0.113.0 go.opentelemetry.io/collector/consumer/consumertest v0.113.0 go.opentelemetry.io/collector/extension/experimental/storage v0.113.0 - go.opentelemetry.io/collector/filter v0.114.0 go.opentelemetry.io/collector/pdata v1.19.0 go.opentelemetry.io/collector/receiver v0.113.0 go.uber.org/zap v1.27.0 @@ -51,6 +49,7 @@ require ( go.opentelemetry.io/collector/config/configopaque v1.19.0 // indirect go.opentelemetry.io/collector/config/configtls v1.19.0 // indirect go.opentelemetry.io/collector/config/internal v0.113.0 // indirect + go.opentelemetry.io/collector/confmap v1.20.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.113.0 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.113.0 // indirect go.opentelemetry.io/collector/extension v0.113.0 // indirect diff --git a/receiver/splunksearchapireceiver/go.sum b/receiver/splunksearchapireceiver/go.sum index c53898099..6f7285030 100644 --- a/receiver/splunksearchapireceiver/go.sum +++ b/receiver/splunksearchapireceiver/go.sum @@ -127,8 +127,6 @@ go.opentelemetry.io/collector/extension/experimental/storage v0.113.0 h1:Qq4IaB6 go.opentelemetry.io/collector/extension/experimental/storage v0.113.0/go.mod h1:BRmo+A7f06u/rhyLauU/Vogk+QRN0y1j2VVVgMGWrfQ= go.opentelemetry.io/collector/featuregate v1.19.0 h1:ASea2sU+tdpKI3RxIJC/pufDAfwAmrvcQ4EmTHVu0B0= go.opentelemetry.io/collector/featuregate v1.19.0/go.mod h1:47xrISO71vJ83LSMm8+yIDsUbKktUp48Ovt7RR6VbRs= -go.opentelemetry.io/collector/filter v0.114.0 h1:5I97yblUxc6rXCYRn542aSrsNQLo/dE+87XROW2b5oU= -go.opentelemetry.io/collector/filter v0.114.0/go.mod h1:Nxwc+RD9AH4y/qYtkTP+Ac19CxgW5GAB+sJU4ACLr6g= go.opentelemetry.io/collector/pdata v1.19.0 h1:jmnU5R8TOCbwRr4B8sjdRxM7L5WnEKlQWX1dtLYxIbE= go.opentelemetry.io/collector/pdata v1.19.0/go.mod h1:Ox1YVLe87cZDB/TL30i4SUz1cA5s6AM6SpFMfY61ICs= go.opentelemetry.io/collector/pdata/pprofile v0.113.0 h1:VRf4p0VhfuaR+Epy/nMIlu/9t39WU9CUgHVUvpuGxfU=