From ae59aea8c0ebac64eac796b8b8b08e00f92ab855 Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Tue, 12 Nov 2024 09:51:13 -0500 Subject: [PATCH 1/5] WIP --- receiver/splunksearchapireceiver/api.go | 11 +- receiver/splunksearchapireceiver/config.go | 1 + receiver/splunksearchapireceiver/factory.go | 3 +- receiver/splunksearchapireceiver/model.go | 3 +- receiver/splunksearchapireceiver/receiver.go | 133 ++++++++++--------- 5 files changed, 86 insertions(+), 65 deletions(-) diff --git a/receiver/splunksearchapireceiver/api.go b/receiver/splunksearchapireceiver/api.go index 64a0285ae..c9958547e 100644 --- a/receiver/splunksearchapireceiver/api.go +++ b/receiver/splunksearchapireceiver/api.go @@ -22,6 +22,8 @@ import ( "fmt" "io" "net/http" + + "go.uber.org/zap" ) func (ssapir *splunksearchapireceiver) createSearchJob(config *Config, search string) (CreateJobResponse, error) { @@ -91,14 +93,16 @@ func (ssapir *splunksearchapireceiver) getJobStatus(config *Config, sid string) return jobStatusResponse, nil } -func (ssapir *splunksearchapireceiver) getSearchResults(config *Config, sid string) (SearchResults, error) { - endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json", config.Endpoint, sid) +func (ssapir *splunksearchapireceiver) getSearchResults(config *Config, sid string, offset int) (SearchResults, error) { + endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json&offset=%d&count=%d", config.Endpoint, sid, offset, ssapir.eventBatchSize) req, err := http.NewRequest("GET", endpoint, nil) if err != nil { return SearchResults{}, err } req.SetBasicAuth(config.Username, config.Password) + ssapir.logger.Info("Getting search results", zap.Int("offset", offset), zap.Int("count", ssapir.eventBatchSize)) + resp, err := ssapir.client.Do(req) if err != nil { return SearchResults{}, err @@ -114,11 +118,12 @@ func (ssapir *splunksearchapireceiver) getSearchResults(config *Config, sid stri if err != nil { return SearchResults{}, fmt.Errorf("failed to read search job results response: %v", err) } - // fmt.Println("Body: ", string(body)) + err = json.Unmarshal(body, &searchResults) if err != nil { return SearchResults{}, fmt.Errorf("failed to unmarshal search job results: %v", err) } + fmt.Println("Init offset: ", searchResults.InitOffset) return searchResults, nil } diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index 629ecc7dc..f5a19b264 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -28,6 +28,7 @@ type Config struct { Username string `mapstructure:"splunk_username"` Password string `mapstructure:"splunk_password"` Searches []Search `mapstructure:"searches"` + EventBatchSize int `mapstructure:"event_batch_size"` } // Search struct to represent a Splunk search diff --git a/receiver/splunksearchapireceiver/factory.go b/receiver/splunksearchapireceiver/factory.go index 64ce850fc..0018d9033 100644 --- a/receiver/splunksearchapireceiver/factory.go +++ b/receiver/splunksearchapireceiver/factory.go @@ -29,7 +29,8 @@ var ( func createDefaultConfig() component.Config { return &Config{ - ClientConfig: confighttp.NewDefaultClientConfig(), + ClientConfig: confighttp.NewDefaultClientConfig(), + EventBatchSize: 100, } } diff --git a/receiver/splunksearchapireceiver/model.go b/receiver/splunksearchapireceiver/model.go index 94984f670..a53e0a882 100644 --- a/receiver/splunksearchapireceiver/model.go +++ b/receiver/splunksearchapireceiver/model.go @@ -49,7 +49,8 @@ type List struct { // SearchResults struct to represent the JSON response from Splunk search results endpoint type SearchResults struct { - Results []struct { + InitOffset int `json:"init_offset"` + Results []struct { Raw string `json:"_raw"` Time string `json:"_time"` } `json:"results"` diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index 5080a522e..013f0d594 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -27,12 +27,13 @@ import ( ) type splunksearchapireceiver struct { - host component.Host - logger *zap.Logger - logsConsumer consumer.Logs - config *Config - settings component.TelemetrySettings - client *http.Client + host component.Host + logger *zap.Logger + logsConsumer consumer.Logs + config *Config + settings component.TelemetrySettings + client *http.Client + eventBatchSize int } func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component.Host) error { @@ -70,67 +71,79 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { } time.Sleep(2 * time.Second) } - // fmt.Println("Search completed successfully") - // fetch search results - results, err := ssapir.getSplunkSearchResults(ssapir.config, searchID) - if err != nil { - ssapir.logger.Error("error fetching search results", zap.Error(err)) - } - // fmt.Println("Search results: ", results) - - // parse time strings to time.Time - earliestTime, err := time.Parse(time.RFC3339, search.EarliestTime) - if err != nil { - // should be impossible to reach with config validation - ssapir.logger.Error("earliest_time failed to be parsed as RFC3339", zap.Error(err)) - } - - latestTime, err := time.Parse(time.RFC3339, search.LatestTime) - if err != nil { - // should be impossible to reach with config validation - ssapir.logger.Error("latest_time failed to be parsed as RFC3339", zap.Error(err)) - } - - logs := plog.NewLogs() - for idx, splunkLog := range results.Results { - if idx >= search.Limit && search.Limit != 0 { - break - } - // convert log timestamp to ISO8601 (UTC() makes RFC3339 into ISO8601) - logTimestamp, err := time.Parse(time.RFC3339, splunkLog.Time) + var resultCountTracker = 0 // track number of results exported + var offset = 0 // offset for pagination + for { + // fetch search results + results, err := ssapir.getSplunkSearchResults(ssapir.config, searchID, offset) if err != nil { - ssapir.logger.Error("error parsing log timestamp", zap.Error(err)) - break + ssapir.logger.Error("error fetching search results", zap.Error(err)) } - if logTimestamp.UTC().After(latestTime.UTC()) { - ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC())) - // logger.Info will only log up to 10 times for a given code block, known weird behavior - continue + + // parse time strings to time.Time + earliestTime, err := time.Parse(time.RFC3339, search.EarliestTime) + if err != nil { + // should be impossible to reach with config validation + ssapir.logger.Error("earliest_time failed to be parsed as RFC3339", zap.Error(err)) } - if logTimestamp.UTC().Before(earliestTime) { - ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC())) - continue + + latestTime, err := time.Parse(time.RFC3339, search.LatestTime) + if err != nil { + // should be impossible to reach with config validation + ssapir.logger.Error("latest_time failed to be parsed as RFC3339", zap.Error(err)) } - log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - // convert time to timestamp - timestamp := pcommon.NewTimestampFromTime(logTimestamp.UTC()) - log.SetTimestamp(timestamp) - log.Body().SetStr(splunkLog.Raw) + logs := plog.NewLogs() + for idx, splunkLog := range results.Results { + if idx >= search.Limit && search.Limit != 0 { + break + } + // convert log timestamp to ISO8601 (UTC() makes RFC3339 into ISO8601) + logTimestamp, err := time.Parse(time.RFC3339, splunkLog.Time) + if err != nil { + ssapir.logger.Error("error parsing log timestamp", zap.Error(err)) + break + } + if logTimestamp.UTC().After(latestTime.UTC()) { + ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC())) + // logger will only log up to 10 times for a given code block, known weird behavior + continue + } + if logTimestamp.UTC().Before(earliestTime) { + ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC())) + continue + } + log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + + // convert time to timestamp + timestamp := pcommon.NewTimestampFromTime(logTimestamp.UTC()) + log.SetTimestamp(timestamp) + log.Body().SetStr(splunkLog.Raw) - } - if logs.ResourceLogs().Len() == 0 { - ssapir.logger.Info("search returned no logs within the given time range") - return nil - } + } + if logs.ResourceLogs().Len() == 0 { + ssapir.logger.Info("search returned no logs within the given time range") + return nil + } - // pass logs, wait for exporter to confirm successful export to GCP - err = ssapir.logsConsumer.ConsumeLogs(ctx, logs) - if err != nil { - // Error from down the pipeline, freak out - ssapir.logger.Error("error consuming logs", zap.Error(err)) + // pass logs, wait for exporter to confirm successful export to GCP + err = ssapir.logsConsumer.ConsumeLogs(ctx, logs) + if err != nil { + // Error from down the pipeline, freak out + ssapir.logger.Error("error consuming logs", zap.Error(err)) + } + // if the number of results is less than the results per request, we have queried all pages for the search + if len(results.Results) < ssapir.eventBatchSize { + resultCountTracker += len(results.Results) + break + } + resultCountTracker += logs.ResourceLogs().Len() + offset += ssapir.eventBatchSize + time.Sleep(5 * time.Second) } + ssapir.logger.Info("search results exported", zap.String("query", search.Query), zap.Int("results", resultCountTracker)) + // search results completely exported, reset checkpoint storage } return nil } @@ -160,8 +173,8 @@ func (ssapir *splunksearchapireceiver) isSearchCompleted(config *Config, sid str return false, nil } -func (ssapir *splunksearchapireceiver) getSplunkSearchResults(config *Config, sid string) (SearchResults, error) { - resp, err := ssapir.getSearchResults(config, sid) +func (ssapir *splunksearchapireceiver) getSplunkSearchResults(config *Config, sid string, offset int) (SearchResults, error) { + resp, err := ssapir.getSearchResults(config, sid, offset) if err != nil { return SearchResults{}, err } From 76fea5fe684f2b07d75f71553b91ba6ea98cf3d1 Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Wed, 13 Nov 2024 10:49:44 -0500 Subject: [PATCH 2/5] pagination functionality --- receiver/splunksearchapireceiver/api.go | 23 +++++----- receiver/splunksearchapireceiver/receiver.go | 44 ++++++++++---------- 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/receiver/splunksearchapireceiver/api.go b/receiver/splunksearchapireceiver/api.go index 3c96d4702..f2708f42d 100644 --- a/receiver/splunksearchapireceiver/api.go +++ b/receiver/splunksearchapireceiver/api.go @@ -22,20 +22,17 @@ import ( "fmt" "io" "net/http" - - "go.uber.org/zap" ) -func (ssapir *splunksearchapireceiver) createSearchJob(config *Config, search string) (CreateJobResponse, error) { - // fmt.Println("Creating search job for search: ", search) - endpoint := fmt.Sprintf("%s/services/search/jobs", config.Endpoint) +func (ssapir *splunksearchapireceiver) createSearchJob(search string) (CreateJobResponse, error) { + endpoint := fmt.Sprintf("%s/services/search/jobs", ssapir.config.Endpoint) reqBody := fmt.Sprintf(`search=%s`, search) req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer([]byte(reqBody))) if err != nil { return CreateJobResponse{}, err } - req.SetBasicAuth(config.Username, config.Password) + req.SetBasicAuth(ssapir.config.Username, ssapir.config.Password) resp, err := ssapir.client.Do(req) if err != nil { @@ -60,15 +57,15 @@ func (ssapir *splunksearchapireceiver) createSearchJob(config *Config, search st return jobResponse, nil } -func (ssapir *splunksearchapireceiver) getJobStatus(config *Config, sid string) (JobStatusResponse, error) { +func (ssapir *splunksearchapireceiver) getJobStatus(sid string) (JobStatusResponse, error) { // fmt.Println("Getting job status") - endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s", config.Endpoint, sid) + endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s", ssapir.config.Endpoint, sid) req, err := http.NewRequest("GET", endpoint, nil) if err != nil { return JobStatusResponse{}, err } - req.SetBasicAuth(config.Username, config.Password) + req.SetBasicAuth(ssapir.config.Username, ssapir.config.Password) resp, err := ssapir.client.Do(req) if err != nil { @@ -93,15 +90,15 @@ func (ssapir *splunksearchapireceiver) getJobStatus(config *Config, sid string) return jobStatusResponse, nil } -func (ssapir *splunksearchapireceiver) getSearchResults(config *Config, sid string, offset int) (SearchResults, error) { - endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json&offset=%d&count=%d", config.Endpoint, sid, offset, ssapir.eventBatchSize) +func (ssapir *splunksearchapireceiver) getSearchResults(sid string, offset int) (SearchResults, error) { + endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json&offset=%d&count=%d", ssapir.config.Endpoint, sid, offset, ssapir.config.EventBatchSize) req, err := http.NewRequest("GET", endpoint, nil) if err != nil { return SearchResults{}, err } - req.SetBasicAuth(config.Username, config.Password) + req.SetBasicAuth(ssapir.config.Username, ssapir.config.Password) - ssapir.logger.Info("Getting search results", zap.Int("offset", offset), zap.Int("count", ssapir.eventBatchSize)) + // ssapir.logger.Info("Getting search results", zap.Int("offset", offset), zap.Int("count", ssapir.config.EventBatchSize)) resp, err := ssapir.client.Do(req) if err != nil { diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index 83bd1d59a..0662991a2 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -29,14 +29,13 @@ import ( ) type splunksearchapireceiver struct { - host component.Host - logger *zap.Logger - logsConsumer consumer.Logs - config *Config - settings component.TelemetrySettings - client *http.Client - eventBatchSize int - wg *sync.WaitGroup + host component.Host + logger *zap.Logger + logsConsumer consumer.Logs + config *Config + settings component.TelemetrySettings + client *http.Client + wg *sync.WaitGroup } func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component.Host) error { @@ -70,8 +69,8 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { var resultCountTracker = 0 // track number of results exported var offset = 0 // offset for pagination + var limitReached = false for { - // fetch search results ssapir.logger.Info("fetching search results") results, err := ssapir.getSplunkSearchResults(searchID, offset) if err != nil { @@ -82,19 +81,18 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { // parse time strings to time.Time earliestTime, err := time.Parse(time.RFC3339, search.EarliestTime) if err != nil { - // should be impossible to reach with config validation ssapir.logger.Error("earliest_time failed to be parsed as RFC3339", zap.Error(err)) } latestTime, err := time.Parse(time.RFC3339, search.LatestTime) if err != nil { - // should be impossible to reach with config validation ssapir.logger.Error("latest_time failed to be parsed as RFC3339", zap.Error(err)) } logs := plog.NewLogs() for idx, splunkLog := range results.Results { - if idx >= search.Limit && search.Limit != 0 { + if (idx+resultCountTracker) >= search.Limit && search.Limit != 0 { + limitReached = true break } // convert log timestamp to ISO8601 (UTC() makes RFC3339 into ISO8601) @@ -106,6 +104,7 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { if logTimestamp.UTC().After(latestTime.UTC()) { ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC())) // logger will only log up to 10 times for a given code block, known weird behavior + // TODO: Consider breaking, assuming all logs are in order continue } if logTimestamp.UTC().Before(earliestTime) { @@ -119,10 +118,10 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { log.SetTimestamp(timestamp) log.Body().SetStr(splunkLog.Raw) - } - if logs.ResourceLogs().Len() == 0 { - ssapir.logger.Info("search returned no logs within the given time range") - return nil + if logs.ResourceLogs().Len() == 0 { + ssapir.logger.Info("search returned no logs within the given time range") + return nil + } } // pass logs, wait for exporter to confirm successful export to GCP @@ -131,17 +130,20 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { // Error from down the pipeline, freak out ssapir.logger.Error("error consuming logs", zap.Error(err)) } + if limitReached { + ssapir.logger.Info("limit reached, stopping search result export") + resultCountTracker += logs.ResourceLogs().Len() + break + } // if the number of results is less than the results per request, we have queried all pages for the search - if len(results.Results) < ssapir.eventBatchSize { + if len(results.Results) < ssapir.config.EventBatchSize { resultCountTracker += len(results.Results) break } resultCountTracker += logs.ResourceLogs().Len() - offset += ssapir.eventBatchSize - time.Sleep(5 * time.Second) + offset += len(results.Results) } - ssapir.logger.Info("search results exported", zap.String("query", search.Query), zap.Int("results", resultCountTracker)) - // search results completely exported, reset checkpoint storage + ssapir.logger.Info("search results exported", zap.String("query", search.Query), zap.Int("total results", resultCountTracker)) } return nil } From 30697ec5f02e25a50f6e5449189e9ab78205f44d Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Wed, 13 Nov 2024 15:42:30 -0500 Subject: [PATCH 3/5] break if results earlier than earliest_time --- receiver/splunksearchapireceiver/api.go | 11 ++++--- receiver/splunksearchapireceiver/config.go | 10 +++--- receiver/splunksearchapireceiver/factory.go | 3 -- receiver/splunksearchapireceiver/model.go | 3 ++ receiver/splunksearchapireceiver/receiver.go | 32 ++++++++++---------- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/receiver/splunksearchapireceiver/api.go b/receiver/splunksearchapireceiver/api.go index f2708f42d..459b5cf51 100644 --- a/receiver/splunksearchapireceiver/api.go +++ b/receiver/splunksearchapireceiver/api.go @@ -22,6 +22,8 @@ import ( "fmt" "io" "net/http" + + "go.uber.org/zap" ) func (ssapir *splunksearchapireceiver) createSearchJob(search string) (CreateJobResponse, error) { @@ -90,15 +92,15 @@ func (ssapir *splunksearchapireceiver) getJobStatus(sid string) (JobStatusRespon return jobStatusResponse, nil } -func (ssapir *splunksearchapireceiver) getSearchResults(sid string, offset int) (SearchResults, error) { - endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json&offset=%d&count=%d", ssapir.config.Endpoint, sid, offset, ssapir.config.EventBatchSize) +func (ssapir *splunksearchapireceiver) getSearchResults(sid string, offset int, batchSize int) (SearchResults, error) { + endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json&offset=%d&count=%d", ssapir.config.Endpoint, sid, offset, batchSize) req, err := http.NewRequest("GET", endpoint, nil) if err != nil { return SearchResults{}, err } req.SetBasicAuth(ssapir.config.Username, ssapir.config.Password) - // ssapir.logger.Info("Getting search results", zap.Int("offset", offset), zap.Int("count", ssapir.config.EventBatchSize)) + ssapir.logger.Info("Getting search results", zap.Int("offset", offset), zap.Int("count", batchSize)) resp, err := ssapir.client.Do(req) if err != nil { @@ -119,7 +121,6 @@ func (ssapir *splunksearchapireceiver) getSearchResults(sid string, offset int) if err != nil { return SearchResults{}, fmt.Errorf("failed to unmarshal search job results: %v", err) } - fmt.Println("Init offset: ", searchResults.InitOffset) - + fmt.Println("Search results: ", searchResults) return searchResults, nil } diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index 86674ae69..3be2add42 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -32,16 +32,16 @@ type Config struct { Username string `mapstructure:"splunk_username"` Password string `mapstructure:"splunk_password"` Searches []Search `mapstructure:"searches"` - EventBatchSize int `mapstructure:"event_batch_size"` JobPollInterval time.Duration `mapstructure:"job_poll_interval"` } // Search struct to represent a Splunk search type Search struct { - Query string `mapstructure:"query"` - EarliestTime string `mapstructure:"earliest_time"` - LatestTime string `mapstructure:"latest_time"` - Limit int `mapstructure:"limit"` + Query string `mapstructure:"query"` + EarliestTime string `mapstructure:"earliest_time"` + LatestTime string `mapstructure:"latest_time"` + Limit int `mapstructure:"limit"` + EventBatchSize int `mapstructure:"event_batch_size"` } // Validate validates the Splunk Search API receiver configuration diff --git a/receiver/splunksearchapireceiver/factory.go b/receiver/splunksearchapireceiver/factory.go index ba2937c54..dc61db414 100644 --- a/receiver/splunksearchapireceiver/factory.go +++ b/receiver/splunksearchapireceiver/factory.go @@ -16,7 +16,6 @@ package splunksearchapireceiver import ( "context" - "sync" "time" "go.opentelemetry.io/collector/component" @@ -32,7 +31,6 @@ var ( func createDefaultConfig() component.Config { return &Config{ ClientConfig: confighttp.NewDefaultClientConfig(), - EventBatchSize: 100, JobPollInterval: 10 * time.Second, } } @@ -48,7 +46,6 @@ func createLogsReceiver(_ context.Context, logsConsumer: consumer, config: ssapirConfig, settings: params.TelemetrySettings, - wg: &sync.WaitGroup{}, } return ssapir, nil } diff --git a/receiver/splunksearchapireceiver/model.go b/receiver/splunksearchapireceiver/model.go index a53e0a882..d711fbf78 100644 --- a/receiver/splunksearchapireceiver/model.go +++ b/receiver/splunksearchapireceiver/model.go @@ -14,11 +14,13 @@ package splunksearchapireceiver +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs // CreateJobResponse struct to represent the XML response from Splunk create job endpoint type CreateJobResponse struct { SID string `xml:"sid"` } +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs.2F.7Bsearch_id.7D // JobStatusResponse struct to represent the XML response from Splunk job status endpoint type JobStatusResponse struct { Content struct { @@ -47,6 +49,7 @@ type List struct { } `xml:"item"` } +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fv2.2Fjobs.2F.7Bsearch_id.7D.2Fresults // SearchResults struct to represent the JSON response from Splunk search results endpoint type SearchResults struct { InitOffset int `json:"init_offset"` diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index 0662991a2..05d1da126 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "net/http" - "sync" "time" "go.opentelemetry.io/collector/component" @@ -28,6 +27,12 @@ import ( "go.uber.org/zap" ) +var ( + offset = 0 // offset for pagination and checkpointing + exportedEvents = 0 // track the number of events returned by the results endpoint that are exported + limitReached = false // flag to stop processing search results when limit is reached +) + type splunksearchapireceiver struct { host component.Host logger *zap.Logger @@ -35,7 +40,6 @@ type splunksearchapireceiver struct { config *Config settings component.TelemetrySettings client *http.Client - wg *sync.WaitGroup } func (ssapir *splunksearchapireceiver) Start(ctx context.Context, host component.Host) error { @@ -67,12 +71,9 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { ssapir.logger.Error("error polling for search completion", zap.Error(err)) } - var resultCountTracker = 0 // track number of results exported - var offset = 0 // offset for pagination - var limitReached = false for { ssapir.logger.Info("fetching search results") - results, err := ssapir.getSplunkSearchResults(searchID, offset) + results, err := ssapir.getSplunkSearchResults(searchID, offset, search.EventBatchSize) if err != nil { ssapir.logger.Error("error fetching search results", zap.Error(err)) } @@ -91,7 +92,7 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { logs := plog.NewLogs() for idx, splunkLog := range results.Results { - if (idx+resultCountTracker) >= search.Limit && search.Limit != 0 { + if (idx+exportedEvents) >= search.Limit && search.Limit != 0 { limitReached = true break } @@ -104,12 +105,11 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { if logTimestamp.UTC().After(latestTime.UTC()) { ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC())) // logger will only log up to 10 times for a given code block, known weird behavior - // TODO: Consider breaking, assuming all logs are in order continue } if logTimestamp.UTC().Before(earliestTime) { ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC())) - continue + break } log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() @@ -132,18 +132,18 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { } if limitReached { ssapir.logger.Info("limit reached, stopping search result export") - resultCountTracker += logs.ResourceLogs().Len() + exportedEvents += logs.ResourceLogs().Len() break } // if the number of results is less than the results per request, we have queried all pages for the search - if len(results.Results) < ssapir.config.EventBatchSize { - resultCountTracker += len(results.Results) + if len(results.Results) < search.EventBatchSize { + exportedEvents += len(results.Results) break } - resultCountTracker += logs.ResourceLogs().Len() + exportedEvents += logs.ResourceLogs().Len() offset += len(results.Results) } - ssapir.logger.Info("search results exported", zap.String("query", search.Query), zap.Int("total results", resultCountTracker)) + ssapir.logger.Info("search results exported", zap.String("query", search.Query), zap.Int("total results", exportedEvents)) } return nil } @@ -195,8 +195,8 @@ func (ssapir *splunksearchapireceiver) isSearchCompleted(sid string) (bool, erro return false, nil } -func (ssapir *splunksearchapireceiver) getSplunkSearchResults(sid string, offset int) (SearchResults, error) { - resp, err := ssapir.getSearchResults(sid, offset) +func (ssapir *splunksearchapireceiver) getSplunkSearchResults(sid string, offset int, batchSize int) (SearchResults, error) { + resp, err := ssapir.getSearchResults(sid, offset, batchSize) if err != nil { return SearchResults{}, err } From 2f7a9e924a70d3ae7372087b26a3c90d03c036f4 Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Wed, 13 Nov 2024 16:05:09 -0500 Subject: [PATCH 4/5] fix lint --- receiver/splunksearchapireceiver/model.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/receiver/splunksearchapireceiver/model.go b/receiver/splunksearchapireceiver/model.go index d711fbf78..2d5a6dfec 100644 --- a/receiver/splunksearchapireceiver/model.go +++ b/receiver/splunksearchapireceiver/model.go @@ -14,14 +14,14 @@ package splunksearchapireceiver -// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs // CreateJobResponse struct to represent the XML response from Splunk create job endpoint +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs type CreateJobResponse struct { SID string `xml:"sid"` } -// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs.2F.7Bsearch_id.7D // JobStatusResponse struct to represent the XML response from Splunk job status endpoint +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs.2F.7Bsearch_id.7D type JobStatusResponse struct { Content struct { Type string `xml:"type,attr"` @@ -49,8 +49,8 @@ type List struct { } `xml:"item"` } -// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fv2.2Fjobs.2F.7Bsearch_id.7D.2Fresults // SearchResults struct to represent the JSON response from Splunk search results endpoint +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fv2.2Fjobs.2F.7Bsearch_id.7D.2Fresults type SearchResults struct { InitOffset int `json:"init_offset"` Results []struct { From c911fecd77ca01c4cbff2f9b413c064264a1a177 Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Thu, 14 Nov 2024 10:49:18 -0500 Subject: [PATCH 5/5] check for earliest/latest in query --- receiver/splunksearchapireceiver/config.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index 3be2add42..5c2862195 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -16,6 +16,7 @@ package splunksearchapireceiver import ( "errors" + "fmt" "strings" "time" @@ -73,6 +74,10 @@ func (cfg *Config) Validate() error { return errNonStandaloneSearchQuery } + if strings.Contains(search.Query, "earliest=") || strings.Contains(search.Query, "latest=") { + return fmt.Errorf("time query parameters must be configured using only the \"earliest_time\" and \"latest_time\" configuration parameters") + } + if search.EarliestTime == "" { return errors.New("missing earliest_time in search") }