Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SSAPI Receiver (BPS-289) #1951

Open
wants to merge 56 commits into
base: release/v1.67.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
cfd486a
ssapi mvp
Caleb-Hurshman Nov 8, 2024
6079244
lint
Caleb-Hurshman Nov 8, 2024
8eafe63
tls
Caleb-Hurshman Nov 8, 2024
77596d4
WIP
Caleb-Hurshman Nov 12, 2024
5bac523
ticker, other pr feedback
Caleb-Hurshman Nov 12, 2024
9f21fe4
pagination functionality
Caleb-Hurshman Nov 13, 2024
527cde0
break if results earlier than earliest_time
Caleb-Hurshman Nov 13, 2024
af6f860
fix lint
Caleb-Hurshman Nov 13, 2024
080023c
check for earliest/latest in query
Caleb-Hurshman Nov 14, 2024
294c855
config unit tests
Caleb-Hurshman Nov 11, 2024
8ec2d76
package comment
Caleb-Hurshman Nov 11, 2024
323d1af
feat(chronicleexporter): Support dynamic namespace and ingestion labe…
justinianvoss22 Nov 12, 2024
bf90b00
debug logs
Caleb-Hurshman Nov 13, 2024
abb4e61
rm unnecessary clauses
Caleb-Hurshman Nov 13, 2024
c18122b
fix error wording
Caleb-Hurshman Nov 13, 2024
0107133
rm space
Caleb-Hurshman Nov 13, 2024
b072ac6
wip
Caleb-Hurshman Nov 19, 2024
1d63802
client tests
Caleb-Hurshman Nov 19, 2024
1de224f
checkpoint methods
Caleb-Hurshman Nov 11, 2024
e2e48c6
WIP
Caleb-Hurshman Nov 14, 2024
4aa0396
functional checkpoint
Caleb-Hurshman Nov 15, 2024
cb84d76
debug logs, rm print
Caleb-Hurshman Nov 15, 2024
c3dd5cd
loadCheckpoint return error
Caleb-Hurshman Nov 15, 2024
b37138b
splunk failure test
Caleb-Hurshman Nov 18, 2024
61c87a7
WIP
Caleb-Hurshman Nov 14, 2024
8b6a212
encode req body
Caleb-Hurshman Nov 14, 2024
1cb7a4d
stricter query validation
Caleb-Hurshman Nov 14, 2024
90b47e4
storage config test
Caleb-Hurshman Nov 18, 2024
c7f2d7f
lint, tidy
Caleb-Hurshman Nov 18, 2024
fbc9ee5
return error on export fail
Caleb-Hurshman Nov 19, 2024
9b6d1dd
tidy
Caleb-Hurshman Nov 19, 2024
5c4d222
receiver tests
Caleb-Hurshman Nov 20, 2024
5fc579c
receiver tests
Caleb-Hurshman Nov 20, 2024
1145d00
lint
Caleb-Hurshman Nov 20, 2024
60f8cd8
fix TestCheckpoint
Caleb-Hurshman Nov 20, 2024
9054f80
rename structs
Caleb-Hurshman Nov 21, 2024
950ecf3
exporter fail test
Caleb-Hurshman Dec 3, 2024
3693b90
fix search checkpointing
Caleb-Hurshman Dec 6, 2024
97628ce
auth token
Caleb-Hurshman Nov 21, 2024
ccacaa0
lint
Caleb-Hurshman Nov 21, 2024
0c12f42
fix struct name
Caleb-Hurshman Nov 26, 2024
a904a88
rm prints, fix error messages
Caleb-Hurshman Dec 2, 2024
981eeb4
fix tests
Caleb-Hurshman Dec 2, 2024
257e5a1
default batch size
Caleb-Hurshman Dec 4, 2024
2be2b6d
log end of export
Caleb-Hurshman Dec 5, 2024
ad3255e
readme
Caleb-Hurshman Dec 6, 2024
bcc1600
how-to
Caleb-Hurshman Dec 6, 2024
70dcbde
how-to example config
Caleb-Hurshman Dec 6, 2024
cb2e7f7
change how-to conf values
Caleb-Hurshman Dec 6, 2024
699510b
change test batch size
Caleb-Hurshman Dec 9, 2024
1f81928
fix test case
Caleb-Hurshman Dec 9, 2024
dac6a6b
fix client test
Caleb-Hurshman Dec 9, 2024
1f91ded
fix rebase errors
Caleb-Hurshman Dec 10, 2024
dad623c
tidy
Caleb-Hurshman Dec 10, 2024
7dea9f4
initial feedback
Caleb-Hurshman Dec 11, 2024
cc0dfd3
Merge branch 'release/v1.67.0' into feat/ssapi-receiver
Caleb-Hurshman Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions factories/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/observiq/bindplane-agent/receiver/pluginreceiver"
"github.com/observiq/bindplane-agent/receiver/routereceiver"
"github.com/observiq/bindplane-agent/receiver/sapnetweaverreceiver"
"github.com/observiq/bindplane-agent/receiver/splunksearchapireceiver"
"github.com/observiq/bindplane-agent/receiver/telemetrygeneratorreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/activedirectorydsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/aerospikereceiver"
Expand Down Expand Up @@ -157,6 +158,7 @@ var defaultReceivers = []receiver.Factory{
sapnetweaverreceiver.NewFactory(),
simpleprometheusreceiver.NewFactory(),
snmpreceiver.NewFactory(),
splunksearchapireceiver.NewFactory(),
splunkhecreceiver.NewFactory(),
sqlqueryreceiver.NewFactory(),
sqlserverreceiver.NewFactory(),
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ require (
)

require (
github.com/observiq/bindplane-agent/receiver/splunksearchapireceiver v1.67.0
github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/aesprovider v0.114.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.114.0
go.opentelemetry.io/collector/extension/extensiontest v0.114.0
Expand Down Expand Up @@ -878,6 +879,8 @@ replace github.com/observiq/bindplane-agent/internal/report => ./internal/report

replace github.com/observiq/bindplane-agent/internal/measurements => ./internal/measurements

replace github.com/observiq/bindplane-agent/receiver/splunksearchapireceiver => ./receiver/splunksearchapireceiver

// Does not build with windows and only used in configschema executable
// Relevant issue https://github.com/mattn/go-ieproxy/issues/45
replace github.com/mattn/go-ieproxy => github.com/mattn/go-ieproxy v0.0.1
Expand Down
100 changes: 100 additions & 0 deletions receiver/splunksearchapireceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Splunk Search API Receiver
This receiver collects Splunk events using the [Splunk Search API](https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch).

## Supported Pipelines
- Logs

## Prerequisites
- Splunk admin credentials
- Configured storage extension

## Configuration
| Field | Type | Default | Description |
|---------------------|----------|-------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| endpoint | string | `required` `(no default)` | The endpoint of the splunk instance to collect from. |
| splunk_username | string | `(no default)` | Specifies the username used to authenticate to Splunk using basic auth. |
| splunk_password | string | `(no default)` | Specifies the password used to authenticate to Splunk using basic auth. |
| auth_token | string | `(no default)` | Specifies the token used to authenticate to Splunk using token auth. Mutually exclusive with basic auth using `splunk_username` and `splunk_password`. |
| token_type | string | `(no default)` | Specifies the type of token used to authenticate to Splunk using `auth_token`. Accepted values are "Bearer" or "Splunk". |
| job_poll_interval | duration | `5s` | The receiver uses an API call to determine if a search has completed. Specifies how long to wait between polling for search job completion. |
| searches.query | string | `required (no default)` | The Splunk search to run to retrieve the desired events. Queries must start with `search` and should not contain additional commands, nor any time fields (e.g. `earliesttime`) |
| searches.earliest_time | string | `required (no default)` | The earliest timestamp to collect logs. Only logs that occurred at or after this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. |
| searches.latest_time | string | `required (no default)` | The latest timestamp to collect logs. Only logs that occurred at or before this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. |
| searches.event_batch_size | int | `100` | The amount of events to query from Splunk for a single request. |
| storage | component | `required (no default)` | The component ID of a storage extension which can be used when polling for `logs`. The storage extension prevents duplication of data after an exporter error by remembering which events were previously exported. |

### Example Configuration
```yaml
receivers:
splunksearchapi:
endpoint: "https://splunk-c4-0.example.localnet:8089"
tls:
insecure_skip_verify: true
splunk_username: "user"
splunk_password: "pass"
job_poll_interval: 5s
searches:
- query: 'search index=my_index'
earliest_time: "2024-11-01T01:00:00.000-05:00"
latest_time: "2024-11-30T23:59:59.999-05:00"
event_batch_size: 500
storage: file_storage

extensions:
file_storage:
directory: "./local/storage"
```

## How To

### Migrate historical events to Google Cloud Logging
1. Identify the Splunk index to migrate events from. Create a Splunk search to capture the events from that index. This will be the `searches.query` you pass to the receiver.
- Example: `search index=my_index`
- Note: queries must begin with the explicit `search` command, and must not include additional commands, nor any time fields (e.g. `earliesttime`)
2. Determine the timeframe you want to migrate events from, and set the `searches.earliest_time` and `searches.latest_time` config fields accordingly.
- To migrate events from December 2024, EST (UTC-5):
- `earliest_time: "2024-12-01T00:00:00.000-05:00"`
- `latest_time: "2024-12-31T23:59:59.999-05:00"`
- Note: By default, GCL will not accept logs with a timestamp older than 30 days. Contact Google to modify this rule.
3. Repeat steps 1 & 2 for each index you wish to collect from
3. Configure a storage extension to store checkpointing data for the receiver.
4. Configure the rest of the receiver fields according to your Splunk environment.
5. Add a `googlecloud` exporter to your config. Configure the exporter to send to a GCP project where your service account has Logging Admin role. To check the permissions of service accounts in your project, go to the [IAM page](https://console.cloud.google.com/iam-admin/iam).
6. Disable the `sending_queue` field on the GCP exporter. The sending queue introduces an asynchronous step to the pipeline, which will jeopardize the receiver's ability to checkpoint correctly and recover from errors. For this same reason, avoid using any asynchronous processors (e.g., batch processor).

After following these steps, your configuration should look something like this:
```yaml
receivers:
splunksearchapi:
endpoint: "https://splunk-c4-0.example.localnet:8089"
tls:
insecure_skip_verify: true
splunk_username: "user"
splunk_password: "pass"
job_poll_interval: 5s
searches:
- query: 'search index=my_index'
earliest_time: "2024-12-01T00:00:00.000-05:00"
latest_time: "2024-12-31T23:59:59.999-05:00"
event_batch_size: 500
storage: file_storage
exporters:
googlecloud:
project: "my-gcp-project"
log:
default_log_name: "splunk-events"
sending_queue:
enabled: false

extensions:
file_storage:
directory: "./local/storage"

service:
extensions: [file_storage]
pipelines:
logs:
receivers: [splunksearchapi]
exporters: [googlecloud]
```
You are now ready to migrate events from Splunk to Google Cloud Logging.
175 changes: 175 additions & 0 deletions receiver/splunksearchapireceiver/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package splunksearchapireceiver contains the Splunk Search API receiver.
package splunksearchapireceiver

import (
"bytes"
"context"
"encoding/json"
"encoding/xml"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
)

type splunkSearchAPIClient interface {
CreateSearchJob(search string) (CreateJobResponse, error)
GetJobStatus(searchID string) (SearchJobStatusResponse, error)
GetSearchResults(searchID string, offset int, batchSize int) (SearchResults, error)
}

type defaultSplunkSearchAPIClient struct {
client *http.Client
endpoint string
logger *zap.Logger
username string
password string
authToken string
tokenType string
}

func newDefaultSplunkSearchAPIClient(ctx context.Context, settings component.TelemetrySettings, conf Config, host component.Host) (*defaultSplunkSearchAPIClient, error) {
client, err := conf.ClientConfig.ToClient(ctx, host, settings)
if err != nil {
return nil, err
}

return &defaultSplunkSearchAPIClient{
client: client,
endpoint: conf.Endpoint,
logger: settings.Logger,
username: conf.Username,
password: conf.Password,
authToken: conf.AuthToken,
tokenType: conf.TokenType,
}, nil
}

func (c *defaultSplunkSearchAPIClient) CreateSearchJob(search string) (CreateJobResponse, error) {
endpoint := fmt.Sprintf("%s/services/search/jobs", c.endpoint)

if !strings.Contains(search, strings.ToLower("starttime=")) || !strings.Contains(search, strings.ToLower("endtime=")) || !strings.Contains(search, strings.ToLower("timeformat=")) {
return CreateJobResponse{}, fmt.Errorf("search query must contain starttime, endtime, and timeformat")
}

reqBody := fmt.Sprintf(`search=%s`, url.QueryEscape(search))
resp, err := c.doSplunkRequest("POST", endpoint, bytes.NewBuffer([]byte(reqBody)))
if err != nil {
return CreateJobResponse{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusCreated {
return CreateJobResponse{}, fmt.Errorf("create search job: %d", resp.StatusCode)
}

var jobResponse CreateJobResponse
body, err := io.ReadAll(resp.Body)
if err != nil {
return CreateJobResponse{}, fmt.Errorf("read search job create response: %w", err)
}

err = xml.Unmarshal(body, &jobResponse)
if err != nil {
return CreateJobResponse{}, fmt.Errorf("unmarshal search job create response: %w", err)
}
return jobResponse, nil
}

func (c *defaultSplunkSearchAPIClient) GetJobStatus(sid string) (SearchJobStatusResponse, error) {
endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s", c.endpoint, sid)

resp, err := c.doSplunkRequest("GET", endpoint, nil)
if err != nil {
return SearchJobStatusResponse{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return SearchJobStatusResponse{}, fmt.Errorf("get search job status: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return SearchJobStatusResponse{}, fmt.Errorf("read search job status response: %w", err)
}
var jobStatusResponse SearchJobStatusResponse
err = xml.Unmarshal(body, &jobStatusResponse)
if err != nil {
return SearchJobStatusResponse{}, fmt.Errorf("unmarshal search job status response: %w", err)
}

return jobStatusResponse, nil
}

func (c *defaultSplunkSearchAPIClient) GetSearchResults(sid string, offset int, batchSize int) (SearchResults, error) {
endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json&offset=%d&count=%d", c.endpoint, sid, offset, batchSize)
resp, err := c.doSplunkRequest("GET", endpoint, nil)
if err != nil {
return SearchResults{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return SearchResults{}, fmt.Errorf("get search job results: %d", resp.StatusCode)
}

var searchResults SearchResults
body, err := io.ReadAll(resp.Body)
if err != nil {
return SearchResults{}, fmt.Errorf("read search job results response: %w", err)
}
err = json.Unmarshal(body, &searchResults)
if err != nil {
return SearchResults{}, fmt.Errorf("unmarshal search job results response: %w", err)
}

return searchResults, nil
}

func (c *defaultSplunkSearchAPIClient) doSplunkRequest(method, endpoint string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest(method, endpoint, body)
if err != nil {
return nil, fmt.Errorf("new http request: %w", err)
}
err = c.setSplunkRequestAuth(req)
if err != nil {
return nil, fmt.Errorf("set splunk request auth: %w", err)
}
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("client do request: %w", err)
}
return resp, nil
}

func (c *defaultSplunkSearchAPIClient) setSplunkRequestAuth(req *http.Request) error {
if c.authToken != "" {
if strings.EqualFold(c.tokenType, TokenTypeBearer) {
req.Header.Set("Authorization", "Bearer "+string(c.authToken))
} else if strings.EqualFold(c.tokenType, TokenTypeSplunk) {
req.Header.Set("Authorization", "Splunk "+string(c.authToken))
}
} else {
req.SetBasicAuth(c.username, c.password)
}
return nil
}
Loading