Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SSAPI Checkpointing (BPS-277) #1969

Merged
merged 12 commits into from
Nov 19, 2024

Conversation

Caleb-Hurshman
Copy link

@Caleb-Hurshman Caleb-Hurshman commented Nov 15, 2024

Proposed Change

Add checkpointing capabilities to the SSAPI receiver.
The receiver now requires a storage extension to be configured in order to run.
Store the offset of the last successfully exported log as the checkpoint value.
Upon starting the receiver, load a checkpoint from the storage extension in case a previous run failed.

1st run (Empty storage extn):

{"level":"info","ts":"2024-11-15T11:33:03.831-0500","caller":"splunksearchapireceiver/receiver.go:260","msg":"no checkpoint found","kind":"receiver","name":"splunksearchapi","data_type":"logs"}
{"level":"info","ts":"2024-11-15T11:33:09.094-0500","caller":"splunksearchapireceiver/receiver.go:111","msg":"search results fetched","kind":"receiver","name":"splunksearchapi","data_type":"logs","num_results":8}
offset 8
{"level":"info","ts":"2024-11-15T11:33:09.096-0500","caller":"splunksearchapireceiver/receiver.go:106","msg":"fetching search results","kind":"receiver","name":"splunksearchapi","data_type":"logs"}
{"level":"info","ts":"2024-11-15T11:33:09.136-0500","caller":"splunksearchapireceiver/receiver.go:111","msg":"search results fetched","kind":"receiver","name":"splunksearchapi","data_type":"logs","num_results":8}
offset 16
{"level":"info","ts":"2024-11-15T11:33:09.136-0500","caller":"splunksearchapireceiver/receiver.go:106","msg":"fetching search results","kind":"receiver","name":"splunksearchapi","data_type":"logs"}
{"level":"info","ts":"2024-11-15T11:33:09.173-0500","caller":"splunksearchapireceiver/receiver.go:111","msg":"search results fetched","kind":"receiver","name":"splunksearchapi","data_type":"logs","num_results":4}
offset 20
{"level":"info","ts":"2024-11-15T11:33:09.174-0500","caller":"splunksearchapireceiver/receiver.go:185","msg":"all search results exported","kind":"receiver","name":"splunksearchapi","data_type":"logs","query":"search index=otel earliest=-1d latest=now","total results":20}

2nd run (Storage extension not cleared out to simulate an existing offset checkpoint):

{"level":"info","ts":"2024-11-15T11:33:35.808-0500","caller":"splunksearchapireceiver/receiver.go:72","msg":"found offset checkpoint in storage extension","kind":"receiver","name":"splunksearchapi","data_type":"logs","offset":20}
{"level":"info","ts":"2024-11-15T11:33:41.161-0500","caller":"splunksearchapireceiver/receiver.go:111","msg":"search results fetched","kind":"receiver","name":"splunksearchapi","data_type":"logs","num_results":0}
offset 20
{"level":"info","ts":"2024-11-15T11:33:41.162-0500","caller":"splunksearchapireceiver/receiver.go:185","msg":"all search results exported","kind":"receiver","name":"splunksearchapi","data_type":"logs","query":"search index=otel earliest=-1d latest=now","total results":0}
Checklist
  • Changes are tested
  • CI has passed

@Caleb-Hurshman Caleb-Hurshman requested review from dpaasman00 and a team as code owners November 15, 2024 16:47
@Caleb-Hurshman Caleb-Hurshman requested review from schmikei and removed request for a team and dpaasman00 November 15, 2024 16:49
@Caleb-Hurshman Caleb-Hurshman changed the title feat: SSAPI Checkpointing feat: SSAPI Checkpointing (BPS-277) Nov 15, 2024
ssapir.logger.Info("no checkpoint found")
return
}
if err = json.Unmarshal(marshalBytes, ssapir.checkpointRecord); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we bomb out if we fail to unmarshal a checkpoint?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't Fatal because that would stop the collector process entirely. I'd prefer seeing the error propogated back up.

We do this so that other components can have clean Shutdown calls.

receiver/splunksearchapireceiver/receiver.go Outdated Show resolved Hide resolved
receiver/splunksearchapireceiver/receiver.go Outdated Show resolved Hide resolved
receiver/splunksearchapireceiver/receiver.go Outdated Show resolved Hide resolved

// EventRecord struct stores the offset of the last event exported successfully
type EventRecord struct {
Offset int `json:"offset"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious last time I've used a storage extension; we've used a timestamp to manage the last processed event time... Do you think that'd be more bulletproof than using the paginated offset?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the offset instead of the timestamp was a piece of feedback from Dan on the spec. Looking back at his comment, he thought it would be easier to use the offset. Whether it's as bulletproof is a good question, but I assumed he wouldn't sacrifice the reliability of the receiver for ease. Curious to hear more of your perspective.

@schmikei
Copy link
Contributor

Yeah I'm thinking there's a couple scenarios where we need to think about:

  1. Test a fail condition (shut off network connection mid processing) change configuration to do a different search; does it still use an offset that makes sense? Seems like we need to start tracking this by search
  2. Test a network failure from Splunk on retrieving paginated results (does our checkpoint pretty much work as we expect)

At least those are the two main things I'm curious if this solution fully solves at the moment.

@@ -60,6 +62,10 @@ func (cfg *Config) Validate() error {
return errors.New("at least one search must be provided")
}

if cfg.StorageID == nil {
return errors.New("storage configuration must be provided")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return errors.New("storage configuration must be provided")
return errors.New("storage configuration is required for this receiver")

@Caleb-Hurshman
Copy link
Author

  1. Test a network failure from Splunk on retrieving paginated results (does our checkpoint pretty much work as we expect)

How would you test this?

@schmikei
Copy link
Contributor

  1. Test a network failure from Splunk on retrieving paginated results (does our checkpoint pretty much work as we expect)

How would you test this?

My recommendation would be to use https://pkg.go.dev/net/http/httptest and simulate one in code as a test; otherwise you can use a non-routable IP address: https://stackoverflow.com/questions/100841/artificially-create-a-connection-timeout-error

Copy link
Contributor

@schmikei schmikei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one clarificationI want to understand when the ConsumeLogs call breaks;

ssapir.logger.Error("error consuming logs", zap.Error(err))
}
// last batch of logs has been successfully exported
exportedEvents += logs.ResourceLogs().Len()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there was an error in the ConsumeLogs I believe that exportedEvents would be an untrue value right?

At least since we're not stopping if there was an error from the consumer. I think we should consider stopping if we get a ConsumeLogs error ever occurs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And an checkpoint should still be written in that case

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the process should break if ConsumeLogs errors, but I don't think the checkpoint should be updated.
In my head, the flow is like this:

  1. Previous batch consumed successfully (new offset written, which is the offset of the last log exported +1)
  2. Next batch obtained from Splunk using the offset
  3. Batch fails to export, ConsumeLogs errors and the receiver process exits (no partial writes to GCP by default, no checkpoint written)
  4. User addresses some issue with their GCP instance
  5. User restarts the receiver, which grabs the last written offset, essentially restarting from step 2

Feel free to add on if I'm missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think my brain was on the thread that if some of the batch gets processed, but some others get dropped; I think that gets hairy for checkpointing for what is likely a pretty big edge case...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. It was my impression that the GCP exporter wouldn't partially write a batch, but I might be off on that. I don't know how to handle that case right now, but that's something I wanted to revisit when we get a chance to test. It depends on how much information the error returns. If there's no info on which log(s) failed, then we're kinda hosed.

go.opentelemetry.io/collector/consumer v0.113.0
go.opentelemetry.io/collector/consumer/consumertest v0.113.0
go.opentelemetry.io/collector/extension/experimental/storage v0.113.0
go.opentelemetry.io/collector/filter v0.114.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supposed to v0.114.0? I'd imagine a merge from main might smooth out some of these dependency conflicts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing any conflicts personally. Tidy-ing doesn't update these deps. Should I rebase the feature branch to the latest release?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps @dpaasman00 would know better; but I'd imagine that we probably want go.opentelemetry.io/collector/filter v0.113.0 rather than v0.114.0 but since this is not being merged into main for a little while; I'd imagine its not worth worrying about too much until we want to start entertaining a main merge.

Copy link
Contributor

@schmikei schmikei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think its good for the feature branch, may need some further testing.

Nice work 🚀

@Caleb-Hurshman Caleb-Hurshman merged commit c24eb17 into feat/ssapi-receiver Nov 19, 2024
14 checks passed
@Caleb-Hurshman Caleb-Hurshman deleted the feat/ssapi-checkpoints branch November 19, 2024 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants