Skip to content

Commit

Permalink
Merge branch 'feat/ssapi-receiver' into feat/ssapi-pass-timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
Caleb-Hurshman committed Nov 15, 2024
2 parents 6625529 + a648d89 commit add7da2
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 60 deletions.
6 changes: 3 additions & 3 deletions receiver/splunksearchapireceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
type splunkSearchAPIClient interface {
CreateSearchJob(search string) (CreateJobResponse, error)
GetJobStatus(searchID string) (JobStatusResponse, error)
GetSearchResults(searchID string) (SearchResultsResponse, error)
GetSearchResults(searchID string, offset int, batchSize int) (SearchResultsResponse, error)
}

type defaultSplunkSearchAPIClient struct {
Expand Down Expand Up @@ -122,8 +122,8 @@ func (c defaultSplunkSearchAPIClient) GetJobStatus(sid string) (JobStatusRespons
return jobStatusResponse, nil
}

func (c defaultSplunkSearchAPIClient) GetSearchResults(sid string) (SearchResultsResponse, error) {
endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json", c.endpoint, sid)
func (c defaultSplunkSearchAPIClient) GetSearchResults(sid string, offset int, batchSize int) (SearchResultsResponse, error) {
endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json&offset=%d&count=%d", c.endpoint, sid, offset, batchSize)
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return SearchResultsResponse{}, err
Expand Down
9 changes: 5 additions & 4 deletions receiver/splunksearchapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ type Config struct {

// Search struct to represent a Splunk search
type Search struct {
Query string `mapstructure:"query"`
EarliestTime string `mapstructure:"earliest_time"`
LatestTime string `mapstructure:"latest_time"`
Limit int `mapstructure:"limit"`
Query string `mapstructure:"query"`
EarliestTime string `mapstructure:"earliest_time"`
LatestTime string `mapstructure:"latest_time"`
Limit int `mapstructure:"limit"`
EventBatchSize int `mapstructure:"event_batch_size"`
}

// Validate validates the Splunk Search API receiver configuration
Expand Down
6 changes: 5 additions & 1 deletion receiver/splunksearchapireceiver/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package splunksearchapireceiver

// CreateJobResponse struct to represent the XML response from Splunk create job endpoint
// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs
type CreateJobResponse struct {
SID string `xml:"sid"`
}

// JobStatusResponse struct to represent the XML response from Splunk job status endpoint
// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fjobs.2F.7Bsearch_id.7D
type JobStatusResponse struct {
Content struct {
Type string `xml:"type,attr"`
Expand Down Expand Up @@ -48,8 +50,10 @@ type List struct {
}

// SearchResultsResponse struct to represent the JSON response from Splunk search results endpoint
// https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch#search.2Fv2.2Fjobs.2F.7Bsearch_id.7D.2Fresults
type SearchResultsResponse struct {
Results []struct {
InitOffset int `json:"init_offset"`
Results []struct {
Raw string `json:"_raw"`
Time string `json:"_time"`
} `json:"results"`
Expand Down
123 changes: 71 additions & 52 deletions receiver/splunksearchapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ import (
"go.uber.org/zap"
)

var (
offset = 0 // offset for pagination and checkpointing
exportedEvents = 0 // track the number of events returned by the results endpoint that are exported
limitReached = false // flag to stop processing search results when limit is reached
)

type splunksearchapireceiver struct {
host component.Host
logger *zap.Logger
Expand Down Expand Up @@ -63,66 +69,79 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error {
ssapir.logger.Error("error polling for search completion", zap.Error(err))
}

// fetch search results
ssapir.logger.Info("fetching search results")
results, err := ssapir.getSplunkSearchResults(searchID)
if err != nil {
ssapir.logger.Error("error fetching search results", zap.Error(err))
}
ssapir.logger.Info("search results fetched", zap.Int("num_results", len(results.Results)))
for {
ssapir.logger.Info("fetching search results")
results, err := ssapir.getSplunkSearchResults(searchID, offset, search.EventBatchSize)
if err != nil {
ssapir.logger.Error("error fetching search results", zap.Error(err))
}
ssapir.logger.Info("search results fetched", zap.Int("num_results", len(results.Results)))

// parse time strings to time.Time
earliestTime, err := time.Parse(time.RFC3339, search.EarliestTime)
if err != nil {
// should be impossible to reach with config validation
ssapir.logger.Error("earliest_time failed to be parsed as RFC3339", zap.Error(err))
}
// parse time strings to time.Time
earliestTime, err := time.Parse(time.RFC3339, search.EarliestTime)
if err != nil {
ssapir.logger.Error("earliest_time failed to be parsed as RFC3339", zap.Error(err))
}

latestTime, err := time.Parse(time.RFC3339, search.LatestTime)
if err != nil {
// should be impossible to reach with config validation
ssapir.logger.Error("latest_time failed to be parsed as RFC3339", zap.Error(err))
}
latestTime, err := time.Parse(time.RFC3339, search.LatestTime)
if err != nil {
ssapir.logger.Error("latest_time failed to be parsed as RFC3339", zap.Error(err))
}

logs := plog.NewLogs()
for idx, splunkLog := range results.Results {
if idx >= search.Limit && search.Limit != 0 {
break
logs := plog.NewLogs()
for idx, splunkLog := range results.Results {
if (idx+exportedEvents) >= search.Limit && search.Limit != 0 {
limitReached = true
break
}
// convert log timestamp to ISO8601 (UTC() makes RFC3339 into ISO8601)
logTimestamp, err := time.Parse(time.RFC3339, splunkLog.Time)
if err != nil {
ssapir.logger.Error("error parsing log timestamp", zap.Error(err))
break
}
if logTimestamp.UTC().After(latestTime.UTC()) {
ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC()))
// logger will only log up to 10 times for a given code block, known weird behavior
continue
}
if logTimestamp.UTC().Before(earliestTime) {
ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC()))
break
}
log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

// convert time to timestamp
timestamp := pcommon.NewTimestampFromTime(logTimestamp.UTC())
log.SetTimestamp(timestamp)
log.Body().SetStr(splunkLog.Raw)

if logs.ResourceLogs().Len() == 0 {
ssapir.logger.Info("search returned no logs within the given time range")
return nil
}
}
// convert log timestamp to ISO8601 (UTC() makes RFC3339 into ISO8601)
logTimestamp, err := time.Parse(time.RFC3339, splunkLog.Time)

// pass logs, wait for exporter to confirm successful export to GCP
err = ssapir.logsConsumer.ConsumeLogs(ctx, logs)
if err != nil {
ssapir.logger.Error("error parsing log timestamp", zap.Error(err))
break
// Error from down the pipeline, freak out
ssapir.logger.Error("error consuming logs", zap.Error(err))
}
if logTimestamp.UTC().After(latestTime.UTC()) {
ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC()))
// logger.Info will only log up to 10 times for a given code block, known weird behavior
continue
if limitReached {
ssapir.logger.Info("limit reached, stopping search result export")
exportedEvents += logs.ResourceLogs().Len()
break
}
if logTimestamp.UTC().Before(earliestTime) {
ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC()))
// if the number of results is less than the results per request, we have queried all pages for the search
if len(results.Results) < search.EventBatchSize {
exportedEvents += len(results.Results)
break
}
log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

// convert time to timestamp
timestamp := pcommon.NewTimestampFromTime(logTimestamp.UTC())
log.SetTimestamp(timestamp)
log.Body().SetStr(splunkLog.Raw)

}
if logs.ResourceLogs().Len() == 0 {
ssapir.logger.Info("search returned no logs within the given time range")
return nil
}

// pass logs, wait for exporter to confirm successful export to GCP
err = ssapir.logsConsumer.ConsumeLogs(ctx, logs)
if err != nil {
// Error from down the pipeline, freak out
ssapir.logger.Error("error consuming logs", zap.Error(err))
exportedEvents += logs.ResourceLogs().Len()
offset += len(results.Results)
}
ssapir.logger.Info("search results exported", zap.String("query", search.Query), zap.Int("total results", exportedEvents))
}
return nil
}
Expand Down Expand Up @@ -177,8 +196,8 @@ func (ssapir *splunksearchapireceiver) isSearchCompleted(sid string) (bool, erro
return false, nil
}

func (ssapir *splunksearchapireceiver) getSplunkSearchResults(sid string) (SearchResultsResponse, error) {
resp, err := ssapir.client.GetSearchResults(sid)
func (ssapir *splunksearchapireceiver) getSplunkSearchResults(sid string, offset int, batchSize int) (SearchResultsResponse, error) {
resp, err := ssapir.client.GetSearchResults(sid, offset, batchSize)
if err != nil {
return SearchResultsResponse{}, err
}
Expand Down

0 comments on commit add7da2

Please sign in to comment.