Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SSAPI Exporter Integration Test (BPS-279) #2023

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion receiver/splunksearchapireceiver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestGetSearchResults(t *testing.T) {

resp, err := testClient.GetSearchResults("123456", 0, 5)
require.NoError(t, err)
require.Equal(t, 5, len(resp.Results))
require.Equal(t, 3, len(resp.Results))
require.Equal(t, "Hello, world!", resp.Results[0].Raw)

// returns an error if the response status isn't 200
Expand Down
119 changes: 109 additions & 10 deletions receiver/splunksearchapireceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ package splunksearchapireceiver

import (
"context"
"errors"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

Expand All @@ -41,7 +45,7 @@ func TestSplunkResultsPaginationFailure(t *testing.T) {
},
}
var callCount int
server := newMockSplunkServer(&callCount)
server := newMockSplunkServerPagination(&callCount)
defer server.Close()
settings := componenttest.NewNopTelemetrySettings()
ssapir := newSSAPIReceiver(zap.NewNop(), cfg, settings, component.NewID(typeStr))
Expand All @@ -54,11 +58,11 @@ func TestSplunkResultsPaginationFailure(t *testing.T) {

ssapir.initCheckpoint(context.Background())
ssapir.runQueries(context.Background())
require.Equal(t, 5, ssapir.checkpointRecord.Offset)
require.Equal(t, 3, ssapir.checkpointRecord.Offset)
require.Equal(t, 1, callCount)
}

func newMockSplunkServer(callCount *int) *httptest.Server {
func newMockSplunkServerPagination(callCount *int) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if req.URL.String() == "/services/search/jobs" {
rw.Header().Set("Content-Type", "application/xml")
Expand All @@ -68,8 +72,7 @@ func newMockSplunkServer(callCount *int) *httptest.Server {
<sid>123456</sid>
</response>
`))
}
if req.URL.String() == "/services/search/v2/jobs/123456" {
} else if req.URL.String() == "/services/search/v2/jobs/123456" {
rw.Header().Set("Content-Type", "application/xml")
rw.WriteHeader(200)
rw.Write([]byte(`<?xml version="1.0" encoding="UTF-8"?>
Expand All @@ -81,21 +84,98 @@ func newMockSplunkServer(callCount *int) *httptest.Server {
</dict>
</content>
</response>`))
}
if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=0&count=5" && req.URL.Query().Get("offset") == "0" {
} else if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=0&count=5" && req.URL.Query().Get("offset") == "0" {
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(200)
rw.Write(splunkEventsResultsP1)
*callCount++
}
if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=5&count=5" && req.URL.Query().Get("offset") == "5" {
} else if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=5&count=5" && req.URL.Query().Get("offset") == "5" {
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(400)
rw.Write([]byte("error, bad request"))
}
}))
}

// Test the case where the GCP exporter returns an error
func TestExporterFailure(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Searches = []Search{
{
Query: "search index=otel",
EarliestTime: "2024-11-14T00:00:00.000Z",
LatestTime: "2024-11-14T23:59:59.000Z",
EventBatchSize: 3,
},
}
server := newMockSplunkServer()
defer server.Close()
settings := componenttest.NewNopTelemetrySettings()
ssapir := newSSAPIReceiver(zap.NewNop(), cfg, settings, component.NewID(typeStr))
logsConsumer := &mockLogsConsumerExporterErr{}
logsConsumer.On("ConsumeLogs", mock.Anything, mock.Anything).Return(nil)

ssapir.logsConsumer = logsConsumer
ssapir.client, _ = newSplunkSearchAPIClient(context.Background(), settings, *cfg, componenttest.NewNopHost())
ssapir.client.(*defaultSplunkSearchAPIClient).client = server.Client()
ssapir.client.(*defaultSplunkSearchAPIClient).endpoint = server.URL

ssapir.initCheckpoint(context.Background())
err := ssapir.runQueries(context.Background())
require.NoError(t, err)
require.Equal(t, 5, ssapir.checkpointRecord.Offset)
require.Equal(t, "search index=otel", ssapir.checkpointRecord.Search)

// simulate data failing
// the checkpoint should not be updated, and an error should be returned
ssapir.checkpointRecord.Offset = 0
offset = 0
logsConsumerErr := &mockLogsConsumerExporterErr{}
logsConsumerErr.On("ConsumeLogs", mock.Anything, mock.Anything).Return(errors.New("error exporting logs"))

ssapir.logsConsumer = logsConsumerErr
ssapir.initCheckpoint(context.Background())
err = ssapir.runQueries(context.Background())
require.EqualError(t, err, "error consuming logs: error exporting logs")
require.Equal(t, 0, ssapir.checkpointRecord.Offset)
require.Equal(t, "search index=otel", ssapir.checkpointRecord.Search)
}

func newMockSplunkServer() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if req.URL.String() == "/services/search/jobs" {
rw.Header().Set("Content-Type", "application/xml")
rw.WriteHeader(201)
rw.Write([]byte(`<?xml version="1.0" encoding="UTF-8"?>
<response>
<sid>123456</sid>
</response>
`))
} else if req.URL.String() == "/services/search/v2/jobs/123456" {
rw.Header().Set("Content-Type", "application/xml")
rw.WriteHeader(200)
rw.Write([]byte(`<?xml version="1.0" encoding="UTF-8"?>
<response>
<content>
<type>DISPATCH</type>
<dict>
<key name="dispatchState">DONE</key>
</dict>
</content>
</response>`))
} else if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=0&count=3" {
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(200)
rw.Write(splunkEventsResultsP1)
} else if req.URL.String() == "/services/search/v2/jobs/123456/results?output_mode=json&offset=3&count=3" {
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(200)
rw.Write(splunkEventsResultsP2)
}
}))
}

var splunkEventsResultsP1 = []byte(`{
"init_offset": 0,
"results": [
Expand All @@ -110,7 +190,13 @@ var splunkEventsResultsP1 = []byte(`{
{
"_raw": "lorem ipsum",
"_time": "2024-11-14T13:02:29.000-05:00"
},
}
]
}`)

var splunkEventsResultsP2 = []byte(`{
"init_offset": 3,
"results": [
{
"_raw": "dolor sit amet",
"_time": "2024-11-14T13:02:28.000-05:00"
Expand All @@ -121,3 +207,16 @@ var splunkEventsResultsP1 = []byte(`{
}
]
}`)

type mockLogsConsumerExporterErr struct {
mock.Mock
}

func (m *mockLogsConsumerExporterErr) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
args := m.Called(ctx, logs)
return args.Error(0)
}

func (m *mockLogsConsumerExporterErr) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
4 changes: 4 additions & 0 deletions receiver/splunksearchapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func (ssapir *splunksearchapireceiver) Shutdown(ctx context.Context) error {

func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) error {
Copy link
Contributor

@StefanKurek StefanKurek Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm missing something (which I may be. I haven't looked beyond the failing test and this function), I believe your test failures are related to this function.

It looks like the logic here is to make your query, wait for it, and then start retrieving the search results and processing them in a loop until the # of search results are less than the max possible returned in a single page. If that's the case you quit the infinite loop, otherwise you try to get another page of results.

Now what happens if it was your last page of results, but the number of results was the max number that can fit within a page (as is with the failing test)? It seems like you try to get search results but get an error because there are none.

Hopefully I'm right or at least half right and this puts you on the right track to fixing the CI issues.

for _, search := range ssapir.config.Searches {
// set current search query
ssapir.checkpointRecord.Search = search.Query

// set default event batch size (matches Splunk API default)
if search.EventBatchSize == 0 {
search.EventBatchSize = 100
Expand Down Expand Up @@ -296,6 +299,7 @@ func (ssapir *splunksearchapireceiver) checkpoint(ctx context.Context) error {
if ssapir.checkpointRecord == nil {
return nil
}

marshalBytes, err := json.Marshal(ssapir.checkpointRecord)
if err != nil {
return fmt.Errorf("failed to write checkpoint: %w", err)
Expand Down