diff --git a/receiver/splunksearchapireceiver/client.go b/receiver/splunksearchapireceiver/client.go index b60986f97..6c8866808 100644 --- a/receiver/splunksearchapireceiver/client.go +++ b/receiver/splunksearchapireceiver/client.go @@ -32,7 +32,7 @@ import ( type splunkSearchAPIClient interface { CreateSearchJob(search string) (CreateJobResponse, error) GetJobStatus(searchID string) (JobStatusResponse, error) - GetSearchResults(searchID string) (SearchResultsResponse, error) + GetSearchResults(searchID string, offset int, batchSize int) (SearchResultsResponse, error) } type defaultSplunkSearchAPIClient struct { @@ -122,8 +122,8 @@ func (c defaultSplunkSearchAPIClient) GetJobStatus(sid string) (JobStatusRespons return jobStatusResponse, nil } -func (c defaultSplunkSearchAPIClient) GetSearchResults(sid string) (SearchResultsResponse, error) { - endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json", c.endpoint, sid) +func (c defaultSplunkSearchAPIClient) GetSearchResults(sid string, offset int, batchSize int) (SearchResultsResponse, error) { + endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json&offset=%d&count=%d", c.endpoint, sid, offset, batchSize) req, err := http.NewRequest("GET", endpoint, nil) if err != nil { return SearchResultsResponse{}, err diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index cce941c94..b2ecb0919 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -37,10 +37,11 @@ type Config struct { // Search struct to represent a Splunk search type Search struct { - Query string `mapstructure:"query"` - EarliestTime string `mapstructure:"earliest_time"` - LatestTime string `mapstructure:"latest_time"` - Limit int `mapstructure:"limit"` + Query string `mapstructure:"query"` + EarliestTime string `mapstructure:"earliest_time"` + LatestTime string `mapstructure:"latest_time"` + Limit int `mapstructure:"limit"` + EventBatchSize int `mapstructure:"event_batch_size"` } // Validate validates the Splunk Search API receiver configuration diff --git a/receiver/splunksearchapireceiver/model.go b/receiver/splunksearchapireceiver/model.go index e0df25368..61b6e7691 100644 --- a/receiver/splunksearchapireceiver/model.go +++ b/receiver/splunksearchapireceiver/model.go @@ -15,11 +15,13 @@ package splunksearchapireceiver // CreateJobResponse struct to represent the XML response from Splunk create job endpoint +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs type CreateJobResponse struct { SID string `xml:"sid"` } // JobStatusResponse struct to represent the XML response from Splunk job status endpoint +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs.2F.7Bsearch_id.7D type JobStatusResponse struct { Content struct { Type string `xml:"type,attr"` @@ -48,8 +50,10 @@ type List struct { } // SearchResultsResponse struct to represent the JSON response from Splunk search results endpoint +// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fv2.2Fjobs.2F.7Bsearch_id.7D.2Fresults type SearchResultsResponse struct { - Results []struct { + InitOffset int `json:"init_offset"` + Results []struct { Raw string `json:"_raw"` Time string `json:"_time"` } `json:"results"` diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index 024f257b5..0d8ff9ee6 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -26,6 +26,12 @@ import ( "go.uber.org/zap" ) +var ( + offset = 0 // offset for pagination and checkpointing + exportedEvents = 0 // track the number of events returned by the results endpoint that are exported + limitReached = false // flag to stop processing search results when limit is reached +) + type splunksearchapireceiver struct { host component.Host logger *zap.Logger @@ -63,66 +69,79 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { ssapir.logger.Error("error polling for search completion", zap.Error(err)) } - // fetch search results - ssapir.logger.Info("fetching search results") - results, err := ssapir.getSplunkSearchResults(searchID) - if err != nil { - ssapir.logger.Error("error fetching search results", zap.Error(err)) - } - ssapir.logger.Info("search results fetched", zap.Int("num_results", len(results.Results))) + for { + ssapir.logger.Info("fetching search results") + results, err := ssapir.getSplunkSearchResults(searchID, offset, search.EventBatchSize) + if err != nil { + ssapir.logger.Error("error fetching search results", zap.Error(err)) + } + ssapir.logger.Info("search results fetched", zap.Int("num_results", len(results.Results))) - // parse time strings to time.Time - earliestTime, err := time.Parse(time.RFC3339, search.EarliestTime) - if err != nil { - // should be impossible to reach with config validation - ssapir.logger.Error("earliest_time failed to be parsed as RFC3339", zap.Error(err)) - } + // parse time strings to time.Time + earliestTime, err := time.Parse(time.RFC3339, search.EarliestTime) + if err != nil { + ssapir.logger.Error("earliest_time failed to be parsed as RFC3339", zap.Error(err)) + } - latestTime, err := time.Parse(time.RFC3339, search.LatestTime) - if err != nil { - // should be impossible to reach with config validation - ssapir.logger.Error("latest_time failed to be parsed as RFC3339", zap.Error(err)) - } + latestTime, err := time.Parse(time.RFC3339, search.LatestTime) + if err != nil { + ssapir.logger.Error("latest_time failed to be parsed as RFC3339", zap.Error(err)) + } - logs := plog.NewLogs() - for idx, splunkLog := range results.Results { - if idx >= search.Limit && search.Limit != 0 { - break + logs := plog.NewLogs() + for idx, splunkLog := range results.Results { + if (idx+exportedEvents) >= search.Limit && search.Limit != 0 { + limitReached = true + break + } + // convert log timestamp to ISO8601 (UTC() makes RFC3339 into ISO8601) + logTimestamp, err := time.Parse(time.RFC3339, splunkLog.Time) + if err != nil { + ssapir.logger.Error("error parsing log timestamp", zap.Error(err)) + break + } + if logTimestamp.UTC().After(latestTime.UTC()) { + ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC())) + // logger will only log up to 10 times for a given code block, known weird behavior + continue + } + if logTimestamp.UTC().Before(earliestTime) { + ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC())) + break + } + log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + + // convert time to timestamp + timestamp := pcommon.NewTimestampFromTime(logTimestamp.UTC()) + log.SetTimestamp(timestamp) + log.Body().SetStr(splunkLog.Raw) + + if logs.ResourceLogs().Len() == 0 { + ssapir.logger.Info("search returned no logs within the given time range") + return nil + } } - // convert log timestamp to ISO8601 (UTC() makes RFC3339 into ISO8601) - logTimestamp, err := time.Parse(time.RFC3339, splunkLog.Time) + + // pass logs, wait for exporter to confirm successful export to GCP + err = ssapir.logsConsumer.ConsumeLogs(ctx, logs) if err != nil { - ssapir.logger.Error("error parsing log timestamp", zap.Error(err)) - break + // Error from down the pipeline, freak out + ssapir.logger.Error("error consuming logs", zap.Error(err)) } - if logTimestamp.UTC().After(latestTime.UTC()) { - ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC())) - // logger.Info will only log up to 10 times for a given code block, known weird behavior - continue + if limitReached { + ssapir.logger.Info("limit reached, stopping search result export") + exportedEvents += logs.ResourceLogs().Len() + break } - if logTimestamp.UTC().Before(earliestTime) { - ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC())) + // if the number of results is less than the results per request, we have queried all pages for the search + if len(results.Results) < search.EventBatchSize { + exportedEvents += len(results.Results) break } - log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - - // convert time to timestamp - timestamp := pcommon.NewTimestampFromTime(logTimestamp.UTC()) - log.SetTimestamp(timestamp) - log.Body().SetStr(splunkLog.Raw) - - } - if logs.ResourceLogs().Len() == 0 { - ssapir.logger.Info("search returned no logs within the given time range") - return nil - } - - // pass logs, wait for exporter to confirm successful export to GCP - err = ssapir.logsConsumer.ConsumeLogs(ctx, logs) - if err != nil { - // Error from down the pipeline, freak out - ssapir.logger.Error("error consuming logs", zap.Error(err)) + exportedEvents += logs.ResourceLogs().Len() + offset += len(results.Results) } + ssapir.logger.Info("search results exported", zap.String("query", search.Query), zap.Int("total results", exportedEvents)) } return nil } @@ -177,8 +196,8 @@ func (ssapir *splunksearchapireceiver) isSearchCompleted(sid string) (bool, erro return false, nil } -func (ssapir *splunksearchapireceiver) getSplunkSearchResults(sid string) (SearchResultsResponse, error) { - resp, err := ssapir.client.GetSearchResults(sid) +func (ssapir *splunksearchapireceiver) getSplunkSearchResults(sid string, offset int, batchSize int) (SearchResultsResponse, error) { + resp, err := ssapir.client.GetSearchResults(sid, offset, batchSize) if err != nil { return SearchResultsResponse{}, err }