Skip to content

Commit

Permalink
feat: add sticky session support in sumologic extension and exporter (#…
Browse files Browse the repository at this point in the history
…1363)

* feat: add sticky session support in sumologic extension and exporter
  • Loading branch information
dmolenda-sumo authored Dec 29, 2023
1 parent 063f566 commit 297bc86
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 32 deletions.
1 change: 1 addition & 0 deletions .changelog/1363.added.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat: add support for sticky session in sumologic extension and sumologic exporter
6 changes: 6 additions & 0 deletions pkg/exporter/sumologicexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ exporters:
# num_seconds is the number of seconds to buffer in case of a backend outage,
# requests_per_second is the average number of requests per seconds.
queue_size: <queue_size>

# defines if sticky session support is enable
# more details about sticky sessions for ALB could be found here:
# https://docs.aws.amazon.com/elasticloadbalancing/latest/application/sticky-sessions.html
# default = false
sticky_session_enabled: {true, false}
```
## Metrics
Expand Down
6 changes: 6 additions & 0 deletions pkg/exporter/sumologicexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type Config struct {
ClearLogsTimestamp bool `mapstructure:"clear_logs_timestamp"`

JSONLogs `mapstructure:"json_logs"`

// StickySessionEnabled defines if sticky session support is enable.
// By default this is false.
StickySessionEnabled bool `mapstructure:"sticky_session_enabled"`
}

type JSONLogs struct {
Expand Down Expand Up @@ -231,4 +235,6 @@ const (
DefaultFlattenBody bool = false
// DefaultDropRoutingAttribute defines default DropRoutingAttribute
DefaultDropRoutingAttribute string = ""
// DefaultStickySessionEnabled defines default StickySessionEnabled value
DefaultStickySessionEnabled bool = false
)
39 changes: 37 additions & 2 deletions pkg/exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ type sumologicexporter struct {
dataUrlLogs string
dataUrlTraces string

foundSumologicExtension bool
sumologicExtension *sumologicextension.SumologicExtension

stickySessionCookieLock sync.RWMutex
stickySessionCookie string

id component.ID
}

Expand All @@ -85,8 +91,9 @@ func initExporter(cfg *Config, createSettings exporter.CreateSettings) (*sumolog
},
},
// NOTE: client is now set in start()
prometheusFormatter: pf,
id: createSettings.ID,
prometheusFormatter: pf,
id: createSettings.ID,
foundSumologicExtension: false,
}

se.logger.Info(
Expand Down Expand Up @@ -204,6 +211,8 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err
metricsUrl,
logsUrl,
tracesUrl,
se.StickySessionCookie,
se.SetStickySessionCookie,
se.id,
)

Expand Down Expand Up @@ -286,6 +295,8 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Met
metricsUrl,
logsUrl,
tracesUrl,
se.StickySessionCookie,
se.SetStickySessionCookie,
se.id,
)

Expand Down Expand Up @@ -343,6 +354,8 @@ func (se *sumologicexporter) pushTracesData(ctx context.Context, td ptrace.Trace
metricsUrl,
logsUrl,
tracesUrl,
se.StickySessionCookie,
se.SetStickySessionCookie,
se.id,
)

Expand Down Expand Up @@ -380,6 +393,8 @@ func (se *sumologicexporter) configure(ctx context.Context) error {
if ok && httpSettings.Auth.AuthenticatorID == v.ComponentID() {
ext = v
foundSumoExt = true
se.foundSumologicExtension = true
se.sumologicExtension = ext
break
}
}
Expand Down Expand Up @@ -476,6 +491,26 @@ func (se *sumologicexporter) shutdown(context.Context) error {
return nil
}

func (se *sumologicexporter) StickySessionCookie() string {
if se.foundSumologicExtension {
return se.sumologicExtension.StickySessionCookie()
} else {
se.stickySessionCookieLock.RLock()
defer se.stickySessionCookieLock.RUnlock()
return se.stickySessionCookie
}
}

func (se *sumologicexporter) SetStickySessionCookie(stickySessionCookie string) {
if se.foundSumologicExtension {
se.sumologicExtension.SetStickySessionCookie(stickySessionCookie)
} else {
se.stickySessionCookieLock.Lock()
se.stickySessionCookie = stickySessionCookie
se.stickySessionCookieLock.Unlock()
}
}

// get the destination url for a given signal type
// this mostly adds signal-specific suffixes if the format is otlp
func getSignalURL(oCfg *Config, endpointUrl string, signal component.DataType) (string, error) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/exporter/sumologicexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func createDefaultConfig() component.Config {
},
TraceFormat: OTLPTraceFormat,

HTTPClientSettings: CreateDefaultHTTPClientSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
QueueSettings: qs,
HTTPClientSettings: CreateDefaultHTTPClientSettings(),
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
QueueSettings: qs,
StickySessionEnabled: DefaultStickySessionEnabled,
}
}

Expand Down
81 changes: 61 additions & 20 deletions pkg/exporter/sumologicexporter/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,18 @@ func (b *bodyBuilder) toCountingReader() *countingReader {
}

type sender struct {
logger *zap.Logger
config *Config
client *http.Client
compressor *compressor
prometheusFormatter prometheusFormatter
jsonLogsConfig JSONLogs
dataUrlMetrics string
dataUrlLogs string
dataUrlTraces string
id component.ID
logger *zap.Logger
config *Config
client *http.Client
compressor *compressor
prometheusFormatter prometheusFormatter
jsonLogsConfig JSONLogs
dataUrlMetrics string
dataUrlLogs string
dataUrlTraces string
stickySessionCookieFunc func() string
setStickySessionCookieFunc func(string)
id component.ID
}

const (
Expand All @@ -157,6 +159,8 @@ const (

contentEncodingGzip string = "gzip"
contentEncodingDeflate string = "deflate"

stickySessionKey string = "AWSALB"
)

func newSender(
Expand All @@ -168,19 +172,23 @@ func newSender(
metricsUrl string,
logsUrl string,
tracesUrl string,
stickySessionCookieFunc func() string,
setStickySessionCookieFunc func(string),
id component.ID,
) *sender {
return &sender{
logger: logger,
config: cfg,
client: cl,
compressor: c,
prometheusFormatter: pf,
jsonLogsConfig: cfg.JSONLogs,
dataUrlMetrics: metricsUrl,
dataUrlLogs: logsUrl,
dataUrlTraces: tracesUrl,
id: id,
logger: logger,
config: cfg,
client: cl,
compressor: c,
prometheusFormatter: pf,
jsonLogsConfig: cfg.JSONLogs,
dataUrlMetrics: metricsUrl,
dataUrlLogs: logsUrl,
dataUrlTraces: tracesUrl,
stickySessionCookieFunc: stickySessionCookieFunc,
setStickySessionCookieFunc: setStickySessionCookieFunc,
id: id,
}
}

Expand All @@ -202,6 +210,10 @@ func (s *sender) send(ctx context.Context, pipeline PipelineType, reader *counti
return err
}

if s.config.StickySessionEnabled {
s.addStickySessionCookie(req)
}

s.logger.Debug("Sending data",
zap.String("pipeline", string(pipeline)),
zap.Any("headers", req.Header),
Expand All @@ -221,6 +233,10 @@ func (s *sender) send(ctx context.Context, pipeline PipelineType, reader *counti
}

func (s *sender) handleReceiverResponse(resp *http.Response) error {
if s.config.StickySessionEnabled {
s.updateStickySessionCookie(resp)
}

// API responds with a 200 or 204 with ConentLength set to 0 when all data
// has been successfully ingested.
if resp.ContentLength == 0 && (resp.StatusCode == 200 || resp.StatusCode == 204) {
Expand Down Expand Up @@ -821,3 +837,28 @@ func (s *sender) recordMetrics(duration time.Duration, count int64, req *http.Re
s.logger.Debug("error for recording metric for sent request", zap.Error(err))
}
}

func (s *sender) addStickySessionCookie(req *http.Request) {
currectCookieValue := s.stickySessionCookieFunc()
if currectCookieValue != "" {
cookie := &http.Cookie{
Name: stickySessionKey,
Value: currectCookieValue,
}
req.AddCookie(cookie)
}
}

func (s *sender) updateStickySessionCookie(resp *http.Response) {
cookies := resp.Cookies()
if len(cookies) > 0 {
for _, cookie := range cookies {
if cookie.Name == stickySessionKey {
if cookie.Value != s.stickySessionCookieFunc() {
s.setStickySessionCookieFunc(cookie.Value)
}
return
}
}
}
}
2 changes: 2 additions & 0 deletions pkg/exporter/sumologicexporter/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http.
testServer.URL,
testServer.URL,
testServer.URL,
func() string { return "" },
func(string) {},
component.ID{},
),
}
Expand Down
1 change: 1 addition & 0 deletions pkg/extension/sumologicextension/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ and can be used as an authenticator for the
- `initial_interval` - initial interval of backoff (default: `500ms`)
- `max_interval` - maximum interval of backoff (default: `1m`)
- `max_elapsed_time` - time after which registration fails definitely (default: `15m`)
- `sticky_session_enabled` - default value is `false`

[credentials_help]: https://help.sumologic.com/docs/manage/security/installation-tokens
[fields_help]: https://help.sumologic.com/docs/manage/fields
Expand Down
4 changes: 4 additions & 0 deletions pkg/extension/sumologicextension/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ type Config struct {
// Exponential algorithm is being used.
// Please see following link for details: https://github.com/cenkalti/backoff
BackOff backOffConfig `mapstructure:"backoff"`

// StickySessionEnabled defines if sticky session support is enable.
// By default this is false.
StickySessionEnabled bool `mapstructure:"sticky_session_enabled"`
}

type accessCredentials struct {
Expand Down
69 changes: 62 additions & 7 deletions pkg/extension/sumologicextension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type SumologicExtension struct {
registrationInfo api.OpenRegisterResponsePayload
updateMetadata bool

stickySessionCookieLock sync.RWMutex
stickySessionCookie string

closeChan chan struct{}
closeOnce sync.Once
backOff *backoff.ExponentialBackOff
Expand All @@ -81,6 +84,8 @@ const (
collectorIdField = "collector_id"
collectorNameField = "collector_name"
collectorCredentialIdField = "collector_credential_id"

stickySessionKey = "AWSALB"
)

const (
Expand Down Expand Up @@ -895,6 +900,18 @@ func (se *SumologicExtension) SetBaseUrl(baseUrl string) {
se.baseUrlLock.Unlock()
}

func (se *SumologicExtension) StickySessionCookie() string {
se.stickySessionCookieLock.RLock()
defer se.stickySessionCookieLock.RUnlock()
return se.stickySessionCookie
}

func (se *SumologicExtension) SetStickySessionCookie(stickySessionCookie string) {
se.stickySessionCookieLock.Lock()
se.stickySessionCookie = stickySessionCookie
se.stickySessionCookieLock.Unlock()
}

// WatchCredentialKey watches for credential key updates. It makes use of a
// channel close (done by injectCredentials) and string comparison with a
// known/previous credential key (old). This function allows components to be
Expand Down Expand Up @@ -944,25 +961,63 @@ func (se *SumologicExtension) CreateCredentialsHeader() (http.Header, error) {
// [1]: https://github.com/open-telemetry/opentelemetry-collector/blob/2e84285efc665798d76773b9901727e8836e9d8f/config/configauth/clientauth.go#L34-L39
func (se *SumologicExtension) RoundTripper(base http.RoundTripper) (http.RoundTripper, error) {
return roundTripper{
collectorCredentialId: se.registrationInfo.CollectorCredentialId,
collectorCredentialKey: se.registrationInfo.CollectorCredentialKey,
base: base,
collectorCredentialId: se.registrationInfo.CollectorCredentialId,
collectorCredentialKey: se.registrationInfo.CollectorCredentialKey,
addStickySessionCookie: se.addStickySessionCookie,
updateStickySessionCookie: se.updateStickySessionCookie,
base: base,
}, nil
}

func (se *SumologicExtension) PerRPCCredentials() (grpccredentials.PerRPCCredentials, error) {
return nil, errGRPCNotSupported
}

func (se *SumologicExtension) addStickySessionCookie(req *http.Request) {
if !se.conf.StickySessionEnabled {
return
}
currectCookieValue := se.StickySessionCookie()
if currectCookieValue != "" {
cookie := &http.Cookie{
Name: stickySessionKey,
Value: currectCookieValue,
}
req.AddCookie(cookie)
}
}

func (se *SumologicExtension) updateStickySessionCookie(resp *http.Response) {
cookies := resp.Cookies()
if se.conf.StickySessionEnabled && len(cookies) > 0 {
for _, cookie := range cookies {
if cookie.Name == stickySessionKey {
if cookie.Value != se.StickySessionCookie() {
se.SetStickySessionCookie(cookie.Value)
}
return
}
}
}
}

type roundTripper struct {
collectorCredentialId string
collectorCredentialKey string
base http.RoundTripper
collectorCredentialId string
collectorCredentialKey string
addStickySessionCookie func(*http.Request)
updateStickySessionCookie func(*http.Response)
base http.RoundTripper
}

func (rt roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
addCollectorCredentials(req, rt.collectorCredentialId, rt.collectorCredentialKey)
return rt.base.RoundTrip(req)
rt.addStickySessionCookie(req)
resp, err := rt.base.RoundTrip(req)
if err != nil {
return nil, err
}
rt.updateStickySessionCookie(resp)
return resp, err
}

func addCollectorCredentials(req *http.Request, collectorCredentialId string, collectorCredentialKey string) {
Expand Down
Loading

0 comments on commit 297bc86

Please sign in to comment.