From cf2ecaeb8d4b7f036008199312873cd7a6e52108 Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Mon, 4 Dec 2023 13:30:35 +0300 Subject: [PATCH] feat(logparser): add generic json parser Updates: #243 --- .../logparser/_golden/genericjson_01.json | 21 ++ .../logparser/_testdata/genericjson/zap.jsonl | 1 + internal/logparser/deduct.go | 26 ++ internal/logparser/deduct_test.go | 33 ++ internal/logparser/gold_test.go | 15 + internal/logparser/json.go | 343 ++++++++++++++++++ internal/logparser/json_test.go | 59 +++ internal/logparser/line.go | 73 ++++ internal/logparser/logparser.go | 2 + 9 files changed, 573 insertions(+) create mode 100644 internal/logparser/_golden/genericjson_01.json create mode 100644 internal/logparser/_testdata/genericjson/zap.jsonl create mode 100644 internal/logparser/deduct.go create mode 100644 internal/logparser/deduct_test.go create mode 100644 internal/logparser/gold_test.go create mode 100644 internal/logparser/json.go create mode 100644 internal/logparser/json_test.go create mode 100644 internal/logparser/line.go create mode 100644 internal/logparser/logparser.go diff --git a/internal/logparser/_golden/genericjson_01.json b/internal/logparser/_golden/genericjson_01.json new file mode 100644 index 00000000..f24e2adf --- /dev/null +++ b/internal/logparser/_golden/genericjson_01.json @@ -0,0 +1,21 @@ +{ + "severity_number_str": "Info", + "severity_text": "info", + "body": "Events", + "timestamp": "2023-12-04T09:11:59.1613184Z", + "logger": "poll", + "caller": "gh-archived/main.go:197", + "duration": 0.437301455, + "integers": [ + 1, + 2, + 3 + ], + "new_count": 100, + "remaining": 1040, + "used": 3960, + "reset": 660.838704106, + "reset_human": "in 12 minutes", + "sleep": 0.198120375, + "target_rate": 0.63542183 +} \ No newline at end of file diff --git a/internal/logparser/_testdata/genericjson/zap.jsonl b/internal/logparser/_testdata/genericjson/zap.jsonl new file mode 100644 index 00000000..caccf76c --- /dev/null +++ b/internal/logparser/_testdata/genericjson/zap.jsonl @@ -0,0 +1 @@ +{"level":"info","ts":1701681119.1613183,"logger":"poll","caller":"gh-archived/main.go:197","msg":"Events","duration":0.437301455,"integers":[1, 2, 3],"new_count":100,"remaining":1040,"used":3960,"reset":660.838704106,"reset_human":"in 12 minutes","sleep":0.198120375,"target_rate":0.63542183} \ No newline at end of file diff --git a/internal/logparser/deduct.go b/internal/logparser/deduct.go new file mode 100644 index 00000000..d5dc2943 --- /dev/null +++ b/internal/logparser/deduct.go @@ -0,0 +1,26 @@ +package logparser + +import "time" + +// ISO8601Millis time format with millisecond precision. +const ISO8601Millis = "2006-01-02T15:04:05.000Z0700" + +// we are not expecting logs from past. +var deductStart = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + +// deductNanos returns unix nano from arbitrary time integer, deducting resolution by range. +func deductNanos(n int64) (int64, bool) { + if n > deductStart.UnixNano() { + return n, true + } + if n > deductStart.UnixMicro() { + return n * 1e3, true + } + if n > deductStart.UnixMilli() { + return n * 1e6, true + } + if n > deductStart.Unix() { + return n * 1e9, true + } + return 0, false +} diff --git a/internal/logparser/deduct_test.go b/internal/logparser/deduct_test.go new file mode 100644 index 00000000..40d71899 --- /dev/null +++ b/internal/logparser/deduct_test.go @@ -0,0 +1,33 @@ +package logparser + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestDeductNanos(t *testing.T) { + var ( + start = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) + end = time.Date(2200, 1, 1, 0, 0, 0, 0, time.UTC) + ) + truncate := func(nanos int64, zeroes int) int64 { + d := int64(math.Pow10(zeroes)) + return nanos - nanos%d + } + assert := func(a, b int64, msgAndArgs ...interface{}) { + t.Helper() + v, ok := deductNanos(a) + require.True(t, ok, msgAndArgs...) + require.Equal(t, b, v, msgAndArgs...) + } + for v := start; v.Before(end); v = v.Add(time.Second*44 + time.Nanosecond*1337123 + time.Hour*6) { + expected := v.UnixNano() + assert(v.Unix(), truncate(expected, 9), "v=%v", v) + assert(v.UnixMilli(), truncate(expected, 6), "v=%v", v) + assert(v.UnixMicro(), truncate(expected, 3), "v=%v", v) + assert(v.UnixNano(), expected, "v=%v", v) + } +} diff --git a/internal/logparser/gold_test.go b/internal/logparser/gold_test.go new file mode 100644 index 00000000..75d1d161 --- /dev/null +++ b/internal/logparser/gold_test.go @@ -0,0 +1,15 @@ +package logparser + +import ( + "os" + "testing" + + "github.com/go-faster/sdk/gold" +) + +func TestMain(m *testing.M) { + // Explicitly registering flags for golden files. + gold.Init() + + os.Exit(m.Run()) +} diff --git a/internal/logparser/json.go b/internal/logparser/json.go new file mode 100644 index 00000000..01fea525 --- /dev/null +++ b/internal/logparser/json.go @@ -0,0 +1,343 @@ +package logparser + +import ( + "encoding/hex" + "fmt" + "strings" + "time" + + "github.com/go-faster/errors" + "github.com/go-faster/jx" + "github.com/google/uuid" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/go-faster/oteldb/internal/otelstorage" +) + +// GenericJSONParser can parse generic json into [Line]. +type GenericJSONParser struct{} + +var _severityMap = map[rune]plog.SeverityNumber{ + 'i': plog.SeverityNumberInfo, + 't': plog.SeverityNumberTrace, + 'd': plog.SeverityNumberDebug, + 'w': plog.SeverityNumberWarn, + 'e': plog.SeverityNumberError, + 'f': plog.SeverityNumberFatal, +} + +func encodeValue(v pcommon.Value, e *jx.Encoder) { + switch v.Type() { + case pcommon.ValueTypeStr: + e.Str(v.Str()) + case pcommon.ValueTypeInt: + e.Int64(v.Int()) + case pcommon.ValueTypeDouble: + e.Float64(v.Double()) + case pcommon.ValueTypeBool: + e.Bool(v.Bool()) + case pcommon.ValueTypeBytes: + e.Base64(v.Bytes().AsRaw()) + case pcommon.ValueTypeEmpty: + e.Null() + case pcommon.ValueTypeMap: + e.Obj(func(e *jx.Encoder) { + v.Map().Range(func(k string, v pcommon.Value) bool { + return !e.Field(k, func(e *jx.Encoder) { + encodeValue(v, e) + }) + }) + }) + case pcommon.ValueTypeSlice: + e.Arr(func(e *jx.Encoder) { + s := v.Slice() + for i := 0; i < s.Len(); i++ { + encodeValue(s.At(i), e) + } + }) + default: + panic(fmt.Sprintf("unknown value type %v", v.Type())) + } +} + +func setJSONValue(v pcommon.Value, d *jx.Decoder) error { + switch d.Next() { + case jx.String: + s, err := d.Str() + if err != nil { + return errors.Wrap(err, "string") + } + v.SetStr(s) + case jx.Number: + n, err := d.Num() + if err != nil { + return errors.Wrap(err, "number") + } + if n.IsInt() { + i, err := n.Int64() + if err != nil { + return errors.Wrap(err, "int") + } + v.SetInt(i) + } else { + f, err := n.Float64() + if err != nil { + return errors.Wrap(err, "float") + } + v.SetDouble(f) + } + case jx.Bool: + b, err := d.Bool() + if err != nil { + return errors.Wrap(err, "bool") + } + v.SetBool(b) + case jx.Null: + // Empty + return nil + case jx.Array: + slice := v.SetEmptySlice() + return d.Arr(func(d *jx.Decoder) error { + vs := slice.AppendEmpty() + return setJSONValue(vs, d) + }) + case jx.Object: + fmt.Println("object") + m := v.SetEmptyMap() + return d.Obj(func(d *jx.Decoder, key string) error { + return addJSONMapKey(m, key, d) + }) + default: + panic("unreachable") + } + return nil +} + +func addJSONMapKey(m pcommon.Map, key string, d *jx.Decoder) error { + switch d.Next() { + case jx.String: + v, err := d.Str() + if err != nil { + return errors.Wrap(err, "string") + } + m.PutStr(key, v) + case jx.Number: + v, err := d.Num() + if err != nil { + return errors.Wrap(err, "number") + } + if v.IsInt() { + i, err := v.Int64() + if err != nil { + return errors.Wrap(err, "int") + } + m.PutInt(key, i) + } else { + f, err := v.Float64() + if err != nil { + return errors.Wrap(err, "float") + } + m.PutDouble(key, f) + } + case jx.Bool: + v, err := d.Bool() + if err != nil { + return errors.Wrap(err, "bool") + } + m.PutBool(key, v) + case jx.Null: + m.PutEmpty(key) + case jx.Array: + slice := m.PutEmptySlice(key) + return d.Arr(func(d *jx.Decoder) error { + v := slice.AppendEmpty() + return setJSONValue(v, d) + }) + case jx.Object: + m2 := m.PutEmptyMap(key) + return d.Obj(func(d *jx.Decoder, key string) error { + return addJSONMapKey(m2, key, d) + }) + default: + panic("unreachable") + } + return nil +} + +// Parse generic json into [Line]. +func (GenericJSONParser) Parse(data []byte) (*Line, error) { + dec := jx.DecodeBytes(data) + const ( + fieldMessage = "message" + fieldMsg = "msg" + ) + hasMsgFields := map[string]bool{ + fieldMessage: false, + fieldMsg: false, + } + if err := dec.ObjBytes(func(d *jx.Decoder, key []byte) error { + switch string(key) { + case fieldMessage: + hasMsgFields[fieldMessage] = true + case fieldMsg: + hasMsgFields[fieldMsg] = true + } + return d.Skip() + }); err != nil { + return nil, errors.Wrap(err, "read object") + } + + // Default to "msg". + // This is the field that will be used for body. + msgField := fieldMsg + if hasMsgFields[fieldMessage] && !hasMsgFields[fieldMsg] { + // Falling back to "message" if "msg" is not present. + msgField = fieldMessage + } + + dec.ResetBytes(data) + line := &Line{} + attrs := pcommon.NewMap() + if err := dec.ObjBytes(func(d *jx.Decoder, k []byte) error { + switch string(k) { + case "trace_id", "traceid", "traceID", "traceId": + if d.Next() != jx.String { + return addJSONMapKey(attrs, string(k), d) + } + v, err := d.Str() + if err != nil { + return err + } + traceID, err := otelstorage.ParseTraceID(strings.ToLower(v)) + if err != nil { + // Trying to parse as UUID. + id, err := uuid.Parse(v) + if err != nil { + attrs.PutStr(string(k), v) + return nil + } + traceID = otelstorage.TraceID(id) + } + line.TraceID = traceID + case "span_id", "spanid", "spanID", "spanId": + if d.Next() != jx.String { + // TODO: handle integers + return addJSONMapKey(attrs, string(k), d) + } + v, err := d.Str() + if err != nil { + return err + } + raw, _ := hex.DecodeString(v) + if len(raw) != 8 { + attrs.PutStr(string(k), v) + return nil + } + var spanID otelstorage.SpanID + copy(spanID[:], raw) + line.SpanID = spanID + case "level", "lvl", "levelStr", "severity_text", "severity": + if d.Next() != jx.String { + return addJSONMapKey(attrs, string(k), d) + } + v, err := d.Str() + if err != nil { + return errors.Wrap(err, "level") + } + if v == "" { + attrs.PutStr(string(k), v) + return nil + } + line.SeverityText = v + line.SeverityNumber = _severityMap[rune(v[0])] + case msgField: + if d.Next() != jx.String { + return addJSONMapKey(attrs, string(k), d) + } + v, err := d.Str() + if err != nil { + return errors.Wrap(err, "msg") + } + line.Body = v + case "ts", "time", "@timestamp", "timestamp": + if d.Next() == jx.String { + v, err := d.Str() + if err != nil { + return errors.Wrap(err, "ts") + } + if num := jx.Num(v); num.IsInt() { + // Quoted integer. + ts, err := num.Int64() + if err != nil { + return errors.Wrap(err, "int time") + } + if n, ok := deductNanos(ts); ok { + line.Timestamp = otelstorage.Timestamp(n) + return nil + } + } + for _, layout := range []string{ + time.RFC3339Nano, + time.RFC3339, + ISO8601Millis, + } { + ts, err := time.Parse(layout, v) + if err != nil { + continue + } + line.Timestamp = otelstorage.Timestamp(ts.UnixNano()) + return nil + } + attrs.PutStr(string(k), v) + return nil + } else if d.Next() != jx.Number { + // Fallback to generic value. + return addJSONMapKey(attrs, string(k), d) + } + v, err := d.Num() + if err != nil { + return errors.Wrap(err, "ts") + } + if v.IsInt() { + // Parsing time as integer. + ts, err := v.Int64() + if err != nil { + return errors.Wrap(err, "ts") + } + if n, ok := deductNanos(ts); ok { + line.Timestamp = otelstorage.Timestamp(n) + return nil + } + + // Fallback. + attrs.PutInt(string(k), ts) + return nil + } + + // Parsing 1318229038.000654 as time. + // Default is "epoch time, i.e. unix seconds float64". + // See zapcore.EpochTimeEncoder. + f, err := v.Float64() + if err != nil { + return errors.Wrap(err, "ts parse") + } + // TODO: Also deduct f. + line.Timestamp = otelstorage.Timestamp(f * float64(time.Second)) + default: + return addJSONMapKey(attrs, string(k), d) + } + return nil + }); err != nil { + return nil, errors.Wrap(err, "read object") + } + if attrs.Len() > 0 { + line.Attrs = otelstorage.Attrs(attrs) + } + return line, nil +} + +// Detect if line is parsable by this parser. +func (GenericJSONParser) Detect(line string) bool { + return jx.DecodeStr(line).Next() == jx.Object +} diff --git a/internal/logparser/json_test.go b/internal/logparser/json_test.go new file mode 100644 index 00000000..b87f2038 --- /dev/null +++ b/internal/logparser/json_test.go @@ -0,0 +1,59 @@ +package logparser + +import ( + "bufio" + "bytes" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/go-faster/sdk/gold" + "github.com/stretchr/testify/require" +) + +func TestGenericJSONParser_Parse(t *testing.T) { + data, err := os.ReadFile(filepath.Join("_testdata", "genericjson", "zap.jsonl")) + require.NoError(t, err, "read testdata") + + var parser GenericJSONParser + scanner := bufio.NewScanner(bytes.NewReader(data)) + + var i int + for scanner.Scan() { + i++ + t.Run(fmt.Sprintf("Line%02d", i), func(t *testing.T) { + line, err := parser.Parse(scanner.Bytes()) + require.NoError(t, err, "parse") + + name := fmt.Sprintf("genericjson_%02d.json", i) + gold.Str(t, line.String(), name) + }) + } +} + +func BenchmarkGenericJSONParser_Parse(b *testing.B) { + b.ReportAllocs() + data, err := os.ReadFile(filepath.Join("_testdata", "genericjson", "zap.jsonl")) + require.NoError(b, err, "read testdata") + + var parser GenericJSONParser + scanner := bufio.NewScanner(bytes.NewReader(data)) + + var i int + b.ResetTimer() + for scanner.Scan() { + i++ + b.Run(fmt.Sprintf("Line%02d", i), func(b *testing.B) { + b.ReportAllocs() + b.SetBytes(int64(len(scanner.Bytes()))) + + for j := 0; j < b.N; j++ { + _, err := parser.Parse(scanner.Bytes()) + if err != nil { + b.Fatal(err) + } + } + }) + } +} diff --git a/internal/logparser/line.go b/internal/logparser/line.go new file mode 100644 index 00000000..5dc261ed --- /dev/null +++ b/internal/logparser/line.go @@ -0,0 +1,73 @@ +package logparser + +import ( + "encoding/hex" + "time" + + "github.com/go-faster/jx" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/go-faster/oteldb/internal/otelstorage" +) + +// Line represents single parsed line that can be converted to [logstorage.Record]. +type Line struct { + Timestamp otelstorage.Timestamp `json:"timestamp"` + TraceID otelstorage.TraceID `json:"trace_id"` + SpanID otelstorage.SpanID `json:"span_id"` + Attrs otelstorage.Attrs `json:"attrs"` + SeverityNumber plog.SeverityNumber `json:"severity_number"` + SeverityText string `json:"severity_text"` + Body string `json:"body"` +} + +func (l Line) String() string { + e := &jx.Encoder{} + e.SetIdent(2) + l.Encode(e) + return e.String() +} + +// Encode line as json. +func (l Line) Encode(e *jx.Encoder) { + e.Obj(func(e *jx.Encoder) { + if l.SeverityNumber != 0 { + e.Field("severity_number_str", func(e *jx.Encoder) { + e.Str(l.SeverityNumber.String()) + }) + } + if l.SeverityText != "" { + e.Field("severity_text", func(e *jx.Encoder) { + e.Str(l.SeverityText) + }) + } + if l.Body != "" { + e.Field("body", func(e *jx.Encoder) { + e.Str(l.Body) + }) + } + if !l.Timestamp.AsTime().IsZero() { + e.Field("timestamp", func(e *jx.Encoder) { + e.Str(l.Timestamp.AsTime().Format(time.RFC3339Nano)) + }) + } + if !l.Attrs.IsZero() { + l.Attrs.AsMap().Range(func(k string, v pcommon.Value) bool { + return !e.Field(k, func(e *jx.Encoder) { + encodeValue(v, e) + }) + }) + } + if !l.TraceID.IsEmpty() { + e.Field("trace_id", func(e *jx.Encoder) { + e.Str(hex.EncodeToString(l.TraceID[:])) + }) + } + if !l.SpanID.IsEmpty() { + e.Field("span_id", func(e *jx.Encoder) { + e.Str(hex.EncodeToString(l.SpanID[:])) + }) + } + }) +} diff --git a/internal/logparser/logparser.go b/internal/logparser/logparser.go new file mode 100644 index 00000000..70b2aa05 --- /dev/null +++ b/internal/logparser/logparser.go @@ -0,0 +1,2 @@ +// Package logparser parses logs. +package logparser