From bbed794cffd2b37d5e87c7f4d9cf2bb59bb5af8e Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Thu, 21 Nov 2024 16:12:53 -0500 Subject: [PATCH 1/5] auth token --- receiver/splunksearchapireceiver/client.go | 59 ++++-- .../splunksearchapireceiver/client_test.go | 176 ++++++++++++++++++ receiver/splunksearchapireceiver/config.go | 28 ++- .../splunksearchapireceiver/config_test.go | 72 ++++++- receiver/splunksearchapireceiver/receiver.go | 13 +- 5 files changed, 328 insertions(+), 20 deletions(-) create mode 100644 receiver/splunksearchapireceiver/client_test.go diff --git a/receiver/splunksearchapireceiver/client.go b/receiver/splunksearchapireceiver/client.go index 6c8866808..bf0a52697 100644 --- a/receiver/splunksearchapireceiver/client.go +++ b/receiver/splunksearchapireceiver/client.go @@ -24,6 +24,7 @@ import ( "io" "net/http" "net/url" + "strings" "go.opentelemetry.io/collector/component" "go.uber.org/zap" @@ -36,11 +37,13 @@ type splunkSearchAPIClient interface { } type defaultSplunkSearchAPIClient struct { - client *http.Client - endpoint string - logger *zap.Logger - username string - password string + client *http.Client + endpoint string + logger *zap.Logger + username string + password string + authToken string + tokenType string } func newSplunkSearchAPIClient(ctx context.Context, settings component.TelemetrySettings, conf Config, host component.Host) (*defaultSplunkSearchAPIClient, error) { @@ -48,12 +51,15 @@ func newSplunkSearchAPIClient(ctx context.Context, settings component.TelemetryS if err != nil { return nil, err } + return &defaultSplunkSearchAPIClient{ - client: client, - endpoint: conf.Endpoint, - logger: settings.Logger, - username: conf.Username, - password: conf.Password, + client: client, + endpoint: conf.Endpoint, + logger: settings.Logger, + username: conf.Username, + password: conf.Password, + authToken: conf.AuthToken, + tokenType: conf.TokenType, }, nil } @@ -65,7 +71,11 @@ func (c defaultSplunkSearchAPIClient) CreateSearchJob(search string) (CreateJobR if err != nil { return CreateJobResponse{}, err } - req.SetBasicAuth(c.username, c.password) + + err = c.SetSplunkRequestAuth(req) + if err != nil { + return CreateJobResponse{}, err + } resp, err := c.client.Do(req) if err != nil { @@ -97,7 +107,11 @@ func (c defaultSplunkSearchAPIClient) GetJobStatus(sid string) (JobStatusRespons if err != nil { return JobStatusResponse{}, err } - req.SetBasicAuth(c.username, c.password) + + err = c.SetSplunkRequestAuth(req) + if err != nil { + return JobStatusResponse{}, err + } resp, err := c.client.Do(req) if err != nil { @@ -128,7 +142,11 @@ func (c defaultSplunkSearchAPIClient) GetSearchResults(sid string, offset int, b if err != nil { return SearchResultsResponse{}, err } - req.SetBasicAuth(c.username, c.password) + + err = c.SetSplunkRequestAuth(req) + if err != nil { + return SearchResultsResponse{}, err + } resp, err := c.client.Do(req) if err != nil { @@ -152,3 +170,18 @@ func (c defaultSplunkSearchAPIClient) GetSearchResults(sid string, offset int, b return searchResults, nil } + +func (c defaultSplunkSearchAPIClient) SetSplunkRequestAuth(req *http.Request) error { + if c.authToken != "" { + if strings.EqualFold(c.tokenType, TokenTypeBearer) { + req.Header.Set("Authorization", "Bearer "+string(c.authToken)) + } else if strings.EqualFold(c.tokenType, TokenTypeSplunk) { + req.Header.Set("Authorization", "Splunk "+string(c.authToken)) + } else { + return fmt.Errorf("auth_token provided without a correct token type, valid token types are %v", []string{TokenTypeBearer, TokenTypeSplunk}) + } + } else { + req.SetBasicAuth(c.username, c.password) + } + return nil +} diff --git a/receiver/splunksearchapireceiver/client_test.go b/receiver/splunksearchapireceiver/client_test.go new file mode 100644 index 000000000..bcf64087a --- /dev/null +++ b/receiver/splunksearchapireceiver/client_test.go @@ -0,0 +1,176 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunksearchapireceiver + +import ( + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCreateSearchJob(t *testing.T) { + server := newMockServer() + testClient := defaultSplunkSearchAPIClient{ + client: server.Client(), + endpoint: server.URL, + } + + resp, err := testClient.CreateSearchJob("index=otel starttime=\"\" endtime=\"\" timeformat=\"\"") + require.NoError(t, err) + require.Equal(t, "123456", resp.SID) + + // returns an error if the search doesn't have times + resp, err = testClient.CreateSearchJob("index=otel") + require.EqualError(t, err, "search query must contain starttime, endtime, and timeformat") + require.Empty(t, resp) + + // returns an error if the response status isn't 201 + resp, err = testClient.CreateSearchJob("index=fail_to_create_job starttime=\"\" endtime=\"\" timeformat=\"\"") + require.ErrorContains(t, err, "failed to create search job") + require.Empty(t, resp) + + // returns an error if the response body can't be unmarshalled + resp, err = testClient.CreateSearchJob("index=fail_to_unmarshal starttime=\"\" endtime=\"\" timeformat=\"\"") + require.ErrorContains(t, err, "failed to unmarshal search job create response") + require.Empty(t, resp) + +} + +func TestGetJobStatus(t *testing.T) { + server := newMockServer() + testClient := defaultSplunkSearchAPIClient{ + client: server.Client(), + endpoint: server.URL, + } + + resp, err := testClient.GetJobStatus("123456") + require.NoError(t, err) + require.Equal(t, "DONE", resp.Content.Dict.Keys[0].Value) + require.Equal(t, "text/xml", resp.Content.Type) + + // returns an error if the response status isn't 200 + resp, err = testClient.GetJobStatus("654321") + require.ErrorContains(t, err, "failed to get search job status") + require.Empty(t, resp) + + // returns an error if the response body can't be unmarshalled + resp, err = testClient.GetJobStatus("098765") + require.ErrorContains(t, err, "failed to unmarshal search job status response") + require.Empty(t, resp) +} + +func TestGetSearchResults(t *testing.T) { + server := newMockServer() + testClient := defaultSplunkSearchAPIClient{ + client: server.Client(), + endpoint: server.URL, + } + + resp, err := testClient.GetSearchResults("123456", 0, 5) + require.NoError(t, err) + require.Equal(t, 5, len(resp.Results)) + require.Equal(t, "Hello, world!", resp.Results[0].Raw) + + // returns an error if the response status isn't 200 + resp, err = testClient.GetSearchResults("654321", 0, 5) + require.ErrorContains(t, err, "failed to get search job results") + require.Empty(t, resp) + + // returns an error if the response body can't be unmarshalled + resp, err = testClient.GetSearchResults("098765", 0, 5) + require.ErrorContains(t, err, "failed to unmarshal search job results response") + require.Empty(t, resp) +} + +func TestSetSplunkRequestAuth(t *testing.T) { + client := defaultSplunkSearchAPIClient{ + username: "user", + password: "password", + } + req := httptest.NewRequest("GET", "http://localhost:8089", nil) + client.SetSplunkRequestAuth(req) + require.Equal(t, req.Header.Get("Authorization"), "Basic dXNlcjpwYXNzd29yZA==") + + client = defaultSplunkSearchAPIClient{ + authToken: "token", + tokenType: TokenTypeBearer, + } + client.SetSplunkRequestAuth(req) + require.Equal(t, req.Header.Get("Authorization"), "Bearer token") + + client = defaultSplunkSearchAPIClient{ + authToken: "token", + tokenType: TokenTypeSplunk, + } + client.SetSplunkRequestAuth(req) + require.Equal(t, req.Header.Get("Authorization"), "Splunk token") +} + +// mock Splunk servers +func newMockServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + switch req.URL.String() { + case "/services/search/jobs": + body, _ := io.ReadAll(req.Body) + if strings.Contains(string(body), "search=index%3Dotel") { + rw.Header().Set("Content-Type", "application/xml") + rw.WriteHeader(http.StatusCreated) + rw.Write([]byte(` + + 123456 + + `)) + } + if strings.Contains(string(body), "index%3Dfail_to_create_job") { + rw.WriteHeader(http.StatusNotFound) + } + if strings.Contains(string(body), "index%3Dfail_to_unmarshal") { + rw.WriteHeader(http.StatusCreated) + rw.Write([]byte(`invalid xml`)) + } + case "/services/search/v2/jobs/123456": + rw.Header().Set("Content-Type", "application/xml") + rw.WriteHeader(http.StatusOK) + rw.Write([]byte(` + + + + DONE + + + `)) + case "/services/search/v2/jobs/654321": + rw.WriteHeader(http.StatusNotFound) + case "/services/search/v2/jobs/098765": + rw.WriteHeader(http.StatusOK) + rw.Write([]byte(`invalid xml`)) + case "/services/search/v2/jobs/123456/results?output_mode=json&offset=0&count=5": + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + rw.Write(splunkEventsResultsP1) + case "/services/search/v2/jobs/654321/results?output_mode=json&offset=0&count=5": + rw.WriteHeader(http.StatusNotFound) + case "/services/search/v2/jobs/098765/results?output_mode=json&offset=0&count=5": + rw.WriteHeader(http.StatusOK) + rw.Write([]byte(`invalid json`)) + default: + rw.WriteHeader(http.StatusNotFound) + } + })) +} diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index 345cd6ab9..51c1a2a4d 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -16,6 +16,7 @@ package splunksearchapireceiver import ( "errors" + "fmt" "strings" "time" @@ -25,13 +26,17 @@ import ( var ( errNonStandaloneSearchQuery = errors.New("only standalone search commands can be used for scraping data") + TokenTypeBearer = "Bearer" + TokenTypeSplunk = "Splunk" ) // Config struct to represent the configuration for the Splunk Search API receiver type Config struct { confighttp.ClientConfig `mapstructure:",squash"` - Username string `mapstructure:"splunk_username"` - Password string `mapstructure:"splunk_password"` + Username string `mapstructure:"splunk_username,omitempty"` + Password string `mapstructure:"splunk_password,omitempty"` + AuthToken string `mapstructure:"auth_token,omitempty"` + TokenType string `mapstructure:"token_type,omitempty"` Searches []Search `mapstructure:"searches"` JobPollInterval time.Duration `mapstructure:"job_poll_interval"` StorageID *component.ID `mapstructure:"storage"` @@ -51,12 +56,27 @@ func (cfg *Config) Validate() error { if cfg.Endpoint == "" { return errors.New("missing Splunk server endpoint") } - if cfg.Username == "" { + + if cfg.Username == "" && cfg.AuthToken == "" { return errors.New("missing Splunk username") } - if cfg.Password == "" { + + if cfg.Password == "" && cfg.AuthToken == "" { return errors.New("missing Splunk password") } + + if cfg.AuthToken != "" { + if cfg.TokenType == "" { + return errors.New("auth_token provided without a token type") + } + if !strings.EqualFold(cfg.TokenType, TokenTypeBearer) && !strings.EqualFold(cfg.TokenType, TokenTypeSplunk) { + return fmt.Errorf("auth_token provided without a correct token type, valid token types are %v", []string{TokenTypeBearer, TokenTypeSplunk}) + } + if cfg.Username != "" || cfg.Password != "" { + return errors.New("auth_token and username/password were both provided, only one can be provided to authenticate with Splunk") + } + } + if len(cfg.Searches) == 0 { return errors.New("at least one search must be provided") } diff --git a/receiver/splunksearchapireceiver/config_test.go b/receiver/splunksearchapireceiver/config_test.go index 90440234c..5816d952d 100644 --- a/receiver/splunksearchapireceiver/config_test.go +++ b/receiver/splunksearchapireceiver/config_test.go @@ -27,6 +27,8 @@ func TestValidate(t *testing.T) { endpoint string username string password string + authToken string + tokenType string storage string searches []Search errExpected bool @@ -48,7 +50,7 @@ func TestValidate(t *testing.T) { errText: "missing Splunk server endpoint", }, { - desc: "Missing username", + desc: "Missing username, no auth token", endpoint: "http://localhost:8089", password: "password", storage: "file_storage", @@ -63,7 +65,7 @@ func TestValidate(t *testing.T) { errText: "missing Splunk username", }, { - desc: "Missing password", + desc: "Missing password, no auth token", endpoint: "http://localhost:8089", username: "user", storage: "file_storage", @@ -77,6 +79,55 @@ func TestValidate(t *testing.T) { errExpected: true, errText: "missing Splunk password", }, + { + desc: "Auth token without token type", + endpoint: "http://localhost:8089", + authToken: "token", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "auth_token provided without a token type", + }, + { + desc: "Auth token with invalid token type", + endpoint: "http://localhost:8089", + authToken: "token", + tokenType: "invalid", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "auth_token provided without a correct token type, valid token types are [Bearer Splunk]", + }, + { + desc: "Auth token and username/password provided", + endpoint: "http://localhost:8089", + username: "user", + password: "password", + authToken: "token", + tokenType: "Bearer", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: true, + errText: "auth_token and username/password were both provided, only one can be provided to authenticate with Splunk", + }, { desc: "Missing storage", endpoint: "http://localhost:8089", @@ -209,6 +260,21 @@ func TestValidate(t *testing.T) { }, errExpected: false, }, + { + desc: "Valid config with auth token", + endpoint: "http://localhost:8089", + authToken: "token", + tokenType: "Bearer", + storage: "file_storage", + searches: []Search{ + { + Query: "search index=_internal", + EarliestTime: "2024-10-30T04:00:00.000Z", + LatestTime: "2024-10-30T14:00:00.000Z", + }, + }, + errExpected: false, + }, { desc: "Valid config with multiple searches", endpoint: "http://localhost:8089", @@ -268,6 +334,8 @@ func TestValidate(t *testing.T) { cfg.Endpoint = tc.endpoint cfg.Username = tc.username cfg.Password = tc.password + cfg.AuthToken = tc.authToken + cfg.TokenType = tc.tokenType cfg.Searches = tc.searches if tc.storage != "" { cfg.StorageID = &component.ID{} diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index d03d3c70a..a5184f4f5 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -102,15 +102,24 @@ func (ssapir *splunksearchapireceiver) Shutdown(ctx context.Context) error { func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { for _, search := range ssapir.config.Searches { + // set default event batch size + fmt.Println(search.EventBatchSize) + if search.EventBatchSize == 0 { + search.EventBatchSize = 100 + fmt.Println(search.EventBatchSize) + } + // create search in Splunk searchID, err := ssapir.createSplunkSearch(search) if err != nil { ssapir.logger.Error("error creating search", zap.Error(err)) + return err } // wait for search to complete if err = ssapir.pollSearchCompletion(ctx, searchID); err != nil { ssapir.logger.Error("error polling for search completion", zap.Error(err)) + return err } for { @@ -118,6 +127,7 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { results, err := ssapir.getSplunkSearchResults(searchID, offset, search.EventBatchSize) if err != nil { ssapir.logger.Error("error fetching search results", zap.Error(err)) + return err } ssapir.logger.Info("search results fetched", zap.Int("num_results", len(results.Results))) @@ -189,10 +199,11 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { } // if the number of results is less than the results per request, we have queried all pages for the search if len(results.Results) < search.EventBatchSize { + ssapir.logger.Debug("results less than batch size, stopping search result export") break } - } + ssapir.logger.Debug("all search results exported", zap.String("query", search.Query), zap.Int("total results", exportedEvents)) } return nil From a35ad24afe1c493d08e79c77c78310c400240f18 Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Thu, 21 Nov 2024 16:15:46 -0500 Subject: [PATCH 2/5] lint --- receiver/splunksearchapireceiver/config.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index 51c1a2a4d..071af30f8 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -26,8 +26,10 @@ import ( var ( errNonStandaloneSearchQuery = errors.New("only standalone search commands can be used for scraping data") - TokenTypeBearer = "Bearer" - TokenTypeSplunk = "Splunk" + // TokenTypeBearer is the token type for Bearer tokens + TokenTypeBearer = "Bearer" + // TokenTypeSplunk is the token type for Splunk tokens + TokenTypeSplunk = "Splunk" ) // Config struct to represent the configuration for the Splunk Search API receiver From 9468cfbe5b7e185daef18dd17cf4b700c8784ddc Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Tue, 26 Nov 2024 10:18:39 -0500 Subject: [PATCH 3/5] fix struct name --- receiver/splunksearchapireceiver/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/splunksearchapireceiver/client.go b/receiver/splunksearchapireceiver/client.go index cc0c1217c..d2d93172a 100644 --- a/receiver/splunksearchapireceiver/client.go +++ b/receiver/splunksearchapireceiver/client.go @@ -114,7 +114,7 @@ func (c defaultSplunkSearchAPIClient) GetJobStatus(sid string) (SearchJobStatusR err = c.SetSplunkRequestAuth(req) if err != nil { - return JobStatusResponse{}, err + return SearchJobStatusResponse{}, err } resp, err := c.client.Do(req) From aa3134cc94ee123e1f53fc111fcab10e62f0a44d Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Mon, 2 Dec 2024 11:19:32 -0500 Subject: [PATCH 4/5] rm prints, fix error messages --- receiver/splunksearchapireceiver/config.go | 4 ++-- receiver/splunksearchapireceiver/receiver.go | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/receiver/splunksearchapireceiver/config.go b/receiver/splunksearchapireceiver/config.go index 071af30f8..9c15c31ca 100644 --- a/receiver/splunksearchapireceiver/config.go +++ b/receiver/splunksearchapireceiver/config.go @@ -60,11 +60,11 @@ func (cfg *Config) Validate() error { } if cfg.Username == "" && cfg.AuthToken == "" { - return errors.New("missing Splunk username") + return errors.New("missing Splunk username or auth token") } if cfg.Password == "" && cfg.AuthToken == "" { - return errors.New("missing Splunk password") + return errors.New("missing Splunk password or auth token") } if cfg.AuthToken != "" { diff --git a/receiver/splunksearchapireceiver/receiver.go b/receiver/splunksearchapireceiver/receiver.go index 3b6c9af12..5f32bafc0 100644 --- a/receiver/splunksearchapireceiver/receiver.go +++ b/receiver/splunksearchapireceiver/receiver.go @@ -112,10 +112,8 @@ func (ssapir *splunksearchapireceiver) Shutdown(ctx context.Context) error { func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error { for _, search := range ssapir.config.Searches { // set default event batch size - fmt.Println(search.EventBatchSize) if search.EventBatchSize == 0 { search.EventBatchSize = 100 - fmt.Println(search.EventBatchSize) } // create search in Splunk From de5f00c013d7dad5505ab18fc14228b79a18d6b7 Mon Sep 17 00:00:00 2001 From: Caleb Hurshman Date: Mon, 2 Dec 2024 11:40:02 -0500 Subject: [PATCH 5/5] fix tests --- receiver/splunksearchapireceiver/config_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/splunksearchapireceiver/config_test.go b/receiver/splunksearchapireceiver/config_test.go index 5816d952d..789825451 100644 --- a/receiver/splunksearchapireceiver/config_test.go +++ b/receiver/splunksearchapireceiver/config_test.go @@ -62,7 +62,7 @@ func TestValidate(t *testing.T) { }, }, errExpected: true, - errText: "missing Splunk username", + errText: "missing Splunk username or auth token", }, { desc: "Missing password, no auth token", @@ -77,7 +77,7 @@ func TestValidate(t *testing.T) { }, }, errExpected: true, - errText: "missing Splunk password", + errText: "missing Splunk password or auth token", }, { desc: "Auth token without token type",