Skip to content

Commit

Permalink
fix: secops processor works with https protocol (#2020)
Browse files Browse the repository at this point in the history
works with https
  • Loading branch information
justinianvoss22 authored Dec 3, 2024
1 parent b6b344e commit 3b25dea
Show file tree
Hide file tree
Showing 2 changed files with 279 additions and 9 deletions.
76 changes: 68 additions & 8 deletions exporter/chronicleexporter/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,26 @@ func (m *protoMarshaler) processLogRecord(ctx context.Context, logRecord plog.Lo
return rawLog, logType, namespace, ingestionLabels, nil
}

func (m *protoMarshaler) processHTTPLogRecord(ctx context.Context, logRecord plog.LogRecord, scope plog.ScopeLogs, resource plog.ResourceLogs) (string, string, error) {
func (m *protoMarshaler) processHTTPLogRecord(ctx context.Context, logRecord plog.LogRecord, scope plog.ScopeLogs, resource plog.ResourceLogs) (string, string, string, map[string]*api.Log_LogLabel, error) {
rawLog, err := m.getRawLog(ctx, logRecord, scope, resource)
if err != nil {
return "", "", err
return "", "", "", nil, err
}

logType, err := m.getLogType(ctx, logRecord, scope, resource)
if err != nil {
return "", "", err
return "", "", "", nil, err
}
namespace, err := m.getNamespace(ctx, logRecord, scope, resource)
if err != nil {
return "", "", "", nil, err
}
ingestionLabels, err := m.getHTTPIngestionLabels(logRecord)
if err != nil {
return "", "", "", nil, err
}

return rawLog, logType, nil
return rawLog, logType, namespace, ingestionLabels, nil
}

func (m *protoMarshaler) getRawLog(ctx context.Context, logRecord plog.LogRecord, scope plog.ScopeLogs, resource plog.ResourceLogs) (string, error) {
Expand Down Expand Up @@ -239,6 +247,7 @@ func (m *protoMarshaler) getIngestionLabels(logRecord plog.LogRecord) ([]*api.La
if err != nil {
return []*api.Label{}, fmt.Errorf("get chronicle ingestion labels: %w", err)
}

if len(ingestionLabels) != 0 {
return ingestionLabels, nil
}
Expand All @@ -253,6 +262,27 @@ func (m *protoMarshaler) getIngestionLabels(logRecord plog.LogRecord) ([]*api.La
return configLabels, nil
}

func (m *protoMarshaler) getHTTPIngestionLabels(logRecord plog.LogRecord) (map[string]*api.Log_LogLabel, error) {
// Check for labels in attributes["chronicle_ingestion_labels"]
ingestionLabels, err := m.getHTTPRawNestedFields(chronicleIngestionLabelsPrefix, logRecord)
if err != nil {
return nil, fmt.Errorf("get chronicle ingestion labels: %w", err)
}

if len(ingestionLabels) != 0 {
return ingestionLabels, nil
}

// use labels defined in the config if needed
configLabels := make(map[string]*api.Log_LogLabel)
for key, value := range m.cfg.IngestionLabels {
configLabels[key] = &api.Log_LogLabel{
Value: value,
}
}
return configLabels, nil
}

func (m *protoMarshaler) getRawField(ctx context.Context, field string, logRecord plog.LogRecord, scope plog.ScopeLogs, resource plog.ResourceLogs) (string, error) {
switch field {
case "body":
Expand Down Expand Up @@ -344,6 +374,34 @@ func (m *protoMarshaler) getRawNestedFields(field string, logRecord plog.LogReco
return nestedFields, nil
}

func (m *protoMarshaler) getHTTPRawNestedFields(field string, logRecord plog.LogRecord) (map[string]*api.Log_LogLabel, error) {
nestedFields := make(map[string]*api.Log_LogLabel) // Map with key as string and value as Log_LogLabel
logRecord.Attributes().Range(func(key string, value pcommon.Value) bool {
if !strings.HasPrefix(key, field) {
return true
}
// Extract the key name from the nested field
cleanKey := strings.Trim(key[len(field):], `[]"`)
var jsonMap map[string]string

// If needs to be parsed as JSON
if err := json.Unmarshal([]byte(value.AsString()), &jsonMap); err == nil {
for k, v := range jsonMap {
nestedFields[k] = &api.Log_LogLabel{
Value: v,
}
}
} else {
nestedFields[cleanKey] = &api.Log_LogLabel{
Value: value.AsString(),
}
}
return true
})

return nestedFields, nil
}

func (m *protoMarshaler) constructPayloads(rawLogs map[string][]*api.LogEntry, namespaceMap map[string]string, ingestionLabelsMap map[string][]*api.Label) []*api.BatchCreateLogsRequest {
payloads := make([]*api.BatchCreateLogsRequest, 0, len(rawLogs))
for logType, entries := range rawLogs {
Expand Down Expand Up @@ -387,7 +445,7 @@ func (m *protoMarshaler) extractRawHTTPLogs(ctx context.Context, ld plog.Logs) (
scopeLog := resourceLog.ScopeLogs().At(j)
for k := 0; k < scopeLog.LogRecords().Len(); k++ {
logRecord := scopeLog.LogRecords().At(k)
rawLog, logType, err := m.processHTTPLogRecord(ctx, logRecord, scopeLog, resourceLog)
rawLog, logType, namespace, ingestionLabels, err := m.processHTTPLogRecord(ctx, logRecord, scopeLog, resourceLog)
if err != nil {
m.teleSettings.Logger.Error("Error processing log record", zap.Error(err))
continue
Expand All @@ -405,9 +463,11 @@ func (m *protoMarshaler) extractRawHTTPLogs(ctx context.Context, ld plog.Logs) (
}

entry := &api.Log{
LogEntryTime: timestamppb.New(timestamp),
CollectionTime: timestamppb.New(logRecord.ObservedTimestamp().AsTime()),
Data: []byte(rawLog),
LogEntryTime: timestamppb.New(timestamp),
CollectionTime: timestamppb.New(logRecord.ObservedTimestamp().AsTime()),
Data: []byte(rawLog),
EnvironmentNamespace: namespace,
Labels: ingestionLabels,
}
entries[logType] = append(entries[logType], entry)
}
Expand Down
212 changes: 211 additions & 1 deletion exporter/chronicleexporter/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,6 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) {
cfg: Config{
CustomerID: uuid.New().String(),
LogType: "WINEVTLOG",
IngestionLabels: map[string]string{`chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"},
RawLogField: "attributes",
OverrideLogType: false,
},
Expand Down Expand Up @@ -495,6 +494,217 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) {
require.Len(t, requests, 0, "Expected no requests due to no log records")
},
},
{
name: "No log type set in config or attributes",
cfg: Config{
CustomerID: uuid.New().String(),
LogType: "WINEVTLOG",
RawLogField: "attributes",
OverrideLogType: false,
},
labels: []*api.Label{},
logRecords: func() plog.Logs {
return mockLogs(mockLogRecord("", map[string]any{"key1": "value1", "log_type": "WINEVTLOG", "namespace": "test", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}))
},
expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
require.Len(t, requests, 1)
logs := requests["WINEVTLOG"].GetInlineSource().Logs
// Assuming the attributes are marshaled into the Data field as a JSON string
expectedData := `{"key1":"value1", "log_type":"WINEVTLOG", "namespace":"test", "chronicle_ingestion_label[\"key1\"]": "value1", "chronicle_ingestion_label[\"key2\"]": "value2"}`
actualData := string(logs[0].Data)
require.JSONEq(t, expectedData, actualData, "Log attributes should match expected")
},
},
{
name: "Multiple log records with duplicate data, no log type in attributes",
cfg: Config{
CustomerID: uuid.New().String(),
LogType: "WINEVTLOG",
RawLogField: "body",
OverrideLogType: false,
},
logRecords: func() plog.Logs {
logs := plog.NewLogs()
record1 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
record1.Body().SetStr("First log message")
record1.Attributes().FromRaw(map[string]any{"chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"})
record2 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
record2.Body().SetStr("Second log message")
record2.Attributes().FromRaw(map[string]any{"chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"})
return logs
},
expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
// verify one request for log type in config
require.Len(t, requests, 1, "Expected a single batch request")
logs := requests["WINEVTLOG"].GetInlineSource().Logs
// verify batch source labels
require.Len(t, logs[0].Labels, 2)
require.Len(t, logs, 2, "Expected two log entries in the batch")
// Verifying the first log entry data
require.Equal(t, "First log message", string(logs[0].Data))
// Verifying the second log entry data
require.Equal(t, "Second log message", string(logs[1].Data))
},
},
{
name: "Multiple log records with different data, no log type in attributes",
cfg: Config{
CustomerID: uuid.New().String(),
LogType: "WINEVTLOG",
RawLogField: "body",
OverrideLogType: false,
},
logRecords: func() plog.Logs {
logs := plog.NewLogs()
record1 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
record1.Body().SetStr("First log message")
record1.Attributes().FromRaw(map[string]any{`chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"})
record2 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
record2.Body().SetStr("Second log message")
record2.Attributes().FromRaw(map[string]any{`chronicle_ingestion_label["key3"]`: "value3", `chronicle_ingestion_label["key4"]`: "value4"})
return logs
},
expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
// verify one request for one log type
require.Len(t, requests, 1, "Expected a single batch request")
logs := requests["WINEVTLOG"].GetInlineSource().Logs
require.Len(t, logs, 2, "Expected two log entries in the batch")
require.Equal(t, "", logs[0].EnvironmentNamespace)
// verify batch source labels
require.Len(t, logs[0].Labels, 2)
require.Len(t, logs[1].Labels, 2)
// Verifying the first log entry data
require.Equal(t, "First log message", string(logs[0].Data))
// Verifying the second log entry data
require.Equal(t, "Second log message", string(logs[1].Data))
},
},
{
name: "Override log type with attribute",
cfg: Config{
CustomerID: uuid.New().String(),
LogType: "DEFAULT", // This should be overridden by the log_type attribute
RawLogField: "body",
OverrideLogType: true,
},
logRecords: func() plog.Logs {
return mockLogs(mockLogRecord("Log with overridden type", map[string]any{"log_type": "windows_event.application", "namespace": "test", `ingestion_label["realkey1"]`: "realvalue1", `ingestion_label["realkey2"]`: "realvalue2"}))
},
expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
require.Len(t, requests, 1)
logs := requests["WINEVTLOG"].GetInlineSource().Logs
require.NotEqual(t, len(logs), 0)
},
},
{
name: "Override log type with chronicle attribute",
cfg: Config{
CustomerID: uuid.New().String(),
LogType: "DEFAULT", // This should be overridden by the chronicle_log_type attribute
RawLogField: "body",
OverrideLogType: true,
},
logRecords: func() plog.Logs {
return mockLogs(mockLogRecord("Log with overridden type", map[string]any{"chronicle_log_type": "ASOC_ALERT", "chronicle_namespace": "test", `chronicle_ingestion_label["realkey1"]`: "realvalue1", `chronicle_ingestion_label["realkey2"]`: "realvalue2"}))
},
expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
require.Len(t, requests, 1)
logs := requests["ASOC_ALERT"].GetInlineSource().Logs
require.Equal(t, "test", logs[0].EnvironmentNamespace, "Expected namespace to be overridden by attribute")
expectedLabels := map[string]string{
"realkey1": "realvalue1",
"realkey2": "realvalue2",
}
for key, label := range logs[0].Labels {
require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute")
}
},
},
{
name: "Multiple log records with duplicate data, log type in attributes",
cfg: Config{
CustomerID: uuid.New().String(),
LogType: "WINEVTLOG",
RawLogField: "body",
OverrideLogType: false,
},
logRecords: func() plog.Logs {
logs := plog.NewLogs()
record1 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
record1.Body().SetStr("First log message")
record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"})

record2 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
record2.Body().SetStr("Second log message")
record2.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"})
return logs
},
expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
// verify 1 request, 2 batches for same log type
require.Len(t, requests, 1, "Expected a single batch request")
logs := requests["WINEVTLOGS"].GetInlineSource().Logs
require.Len(t, logs, 2, "Expected two log entries in the batch")
// verify variables
require.Equal(t, "test1", logs[0].EnvironmentNamespace)
require.Len(t, logs[0].Labels, 2)
expectedLabels := map[string]string{
"key1": "value1",
"key2": "value2",
}
for key, label := range logs[0].Labels {
require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute")
}
},
},
{
name: "Multiple log records with different data, log type in attributes",
cfg: Config{
CustomerID: uuid.New().String(),
LogType: "WINEVTLOG",
RawLogField: "body",
OverrideLogType: false,
},
logRecords: func() plog.Logs {
logs := plog.NewLogs()
record1 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
record1.Body().SetStr("First log message")
record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"})

record2 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
record2.Body().SetStr("Second log message")
record2.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS2", "chronicle_namespace": "test2", `chronicle_ingestion_label["key3"]`: "value3", `chronicle_ingestion_label["key4"]`: "value4"})
return logs
},

expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) {
expectedLabels := map[string]string{
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
}
// verify 2 requests, with 1 batch for different log types
require.Len(t, requests, 2, "Expected a two batch request")

logs1 := requests["WINEVTLOGS1"].GetInlineSource().Logs
require.Len(t, logs1, 1, "Expected one log entries in the batch")
// verify variables for first log
require.Equal(t, logs1[0].EnvironmentNamespace, "test1")
require.Len(t, logs1[0].Labels, 2)
for key, label := range logs1[0].Labels {
require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute")
}

logs2 := requests["WINEVTLOGS2"].GetInlineSource().Logs
require.Len(t, logs2, 1, "Expected one log entries in the batch")
// verify variables for second log
require.Equal(t, logs2[0].EnvironmentNamespace, "test2")
require.Len(t, logs2[0].Labels, 2)
for key, label := range logs2[0].Labels {
require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute")
}
},
},
}

for _, tt := range tests {
Expand Down

0 comments on commit 3b25dea

Please sign in to comment.