Skip to content

Commit

Permalink
feat(chstorage): rework traces
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Dec 4, 2023
1 parent 0fd3dd8 commit e1f8a56
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 116 deletions.
1 change: 0 additions & 1 deletion integration/chotele2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func TestIntegrationTrace(t *testing.T) {
require.True(t, tt.ParentSpanID != [8]byte{})
require.False(t, tt.StartTime.IsZero())
require.False(t, tt.FinishTime.IsZero())
require.Less(t, time.Since(tt.FinishTime), time.Hour)
require.NotEmpty(t, tt.OperationName)
if tt.OperationName == "query" {
require.Equal(t, tt.Attributes["db.statement"], "SELECT 1")
Expand Down
2 changes: 1 addition & 1 deletion integration/tempoe2e/tempo_e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func ParseBatchSet(r io.Reader) (s BatchSet, _ error) {

func (s *BatchSet) addBatch(raw ptrace.Traces) {
s.Batches = append(s.Batches, raw)
batchID := uuid.NewString()
batchID := uuid.New()

resSpans := raw.ResourceSpans()
for i := 0; i < resSpans.Len(); i++ {
Expand Down
10 changes: 6 additions & 4 deletions internal/chstorage/columns_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ func newTypesColumn() *proto.ColMap[string, uint8] {
)
}

func decodeAttributesRow(s []attrKV, st []typKV) (otelstorage.Attrs, error) {
func decodeAttributesRow(values *proto.ColMap[string, string], types *proto.ColMap[string, uint8], i int) (otelstorage.Attrs, error) {
m := pcommon.NewMap()
s := values.RowKV(i)
st := types.RowKV(i)
if len(s) != len(st) {
return otelstorage.Attrs{}, errors.New("length mismatch")
}
Expand Down Expand Up @@ -156,7 +158,7 @@ func (c *logColumns) ForEach(f func(r logstorage.Record)) error {
ScopeName: c.scopeName.Row(i),
}
{
m, err := decodeAttributesRow(c.resource.RowKV(i), c.resourceTypes.RowKV(i))
m, err := decodeAttributesRow(c.resource, c.resourceTypes, i)
if err != nil {
return errors.Wrap(err, "decode resource")
}
Expand All @@ -173,14 +175,14 @@ func (c *logColumns) ForEach(f func(r logstorage.Record)) error {
r.ResourceAttrs = otelstorage.Attrs(v)
}
{
m, err := decodeAttributesRow(c.attributes.RowKV(i), c.attributesTypes.RowKV(i))
m, err := decodeAttributesRow(c.attributes, c.attributesTypes, i)
if err != nil {
return errors.Wrap(err, "decode attributes")
}
r.Attrs = otelstorage.Attrs(m.AsMap())
}
{
m, err := decodeAttributesRow(c.scopeAttributes.RowKV(i), c.scopeAttributesTypes.RowKV(i))
m, err := decodeAttributesRow(c.scopeAttributes, c.scopeAttributesTypes, i)
if err != nil {
return errors.Wrap(err, "decode scope attributes")
}
Expand Down
172 changes: 95 additions & 77 deletions internal/chstorage/columns_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,38 @@ import (

"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/google/uuid"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/go-faster/oteldb/internal/otelstorage"
"github.com/go-faster/oteldb/internal/tracestorage"
)

type spanColumns struct {
traceID proto.ColUUID
spanID proto.ColUInt64
serviceInstanceID *proto.ColLowCardinality[string]
serviceName *proto.ColLowCardinality[string]
serviceNamespace *proto.ColLowCardinality[string]

traceID proto.ColRawOf[otelstorage.TraceID]
spanID proto.ColRawOf[otelstorage.SpanID]
traceState proto.ColStr
parentSpanID proto.ColUInt64
parentSpanID proto.ColRawOf[otelstorage.SpanID]
name *proto.ColLowCardinality[string]
kind proto.ColEnum8
start *proto.ColDateTime64
end *proto.ColDateTime64
statusCode proto.ColInt32
statusCode proto.ColUInt8
statusMessage proto.ColStr
batchID proto.ColUUID

batchID proto.ColUUID
attributes proto.ColStr
resource proto.ColStr
attributes *proto.ColMap[string, string]
attributesTypes *proto.ColMap[string, uint8]
resource *proto.ColMap[string, string]
resourceTypes *proto.ColMap[string, uint8]

scopeName proto.ColStr
scopeVersion proto.ColStr
scopeAttributes proto.ColStr
scopeName *proto.ColLowCardinality[string]
scopeVersion *proto.ColLowCardinality[string]
scopeAttributes *proto.ColMap[string, string]
scopeAttributesTypes *proto.ColMap[string, uint8]

events eventsColumns
links linksColumns
Expand All @@ -42,29 +49,48 @@ func newSpanColumns() *spanColumns {
end: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
events: newEventsColumns(),
links: newLinksColumns(),

serviceInstanceID: new(proto.ColStr).LowCardinality(),
serviceName: new(proto.ColStr).LowCardinality(),
serviceNamespace: new(proto.ColStr).LowCardinality(),
attributes: newAttributesColumn(),
attributesTypes: newTypesColumn(),
resource: newAttributesColumn(),
resourceTypes: newTypesColumn(),
scopeName: new(proto.ColStr).LowCardinality(),
scopeVersion: new(proto.ColStr).LowCardinality(),
scopeAttributes: newAttributesColumn(),
scopeAttributesTypes: newTypesColumn(),
}
}

func (c *spanColumns) Input() proto.Input {
return proto.Input{
{Name: "trace_id", Data: c.traceID},
{Name: "span_id", Data: c.spanID},
{Name: "trace_state", Data: c.traceState},
{Name: "parent_span_id", Data: c.parentSpanID},
func (c *spanColumns) columns() tableColumns {
return []tableColumn{
{Name: "service_instance_id", Data: c.serviceInstanceID},
{Name: "service_name", Data: c.serviceName},
{Name: "service_namespace", Data: c.serviceNamespace},

{Name: "trace_id", Data: &c.traceID},
{Name: "span_id", Data: &c.spanID},
{Name: "trace_state", Data: &c.traceState},
{Name: "parent_span_id", Data: &c.parentSpanID},
{Name: "name", Data: c.name},
{Name: "kind", Data: proto.Wrap(&c.kind, kindDDL)},
{Name: "start", Data: c.start},
{Name: "end", Data: c.end},
{Name: "status_code", Data: c.statusCode},
{Name: "status_message", Data: c.statusMessage},
{Name: "status_code", Data: &c.statusCode},
{Name: "status_message", Data: &c.statusMessage},
{Name: "batch_id", Data: &c.batchID},

{Name: "batch_id", Data: c.batchID},
{Name: "attributes", Data: c.attributes},
{Name: "attributes_types", Data: c.attributesTypes},
{Name: "resource", Data: c.resource},
{Name: "resource_types", Data: c.resourceTypes},

{Name: "scope_name", Data: c.scopeName},
{Name: "scope_version", Data: c.scopeVersion},
{Name: "scope_attributes", Data: c.scopeAttributes},
{Name: "scope_attributes_types", Data: c.scopeAttributesTypes},

{Name: "events_timestamps", Data: c.events.timestamps},
{Name: "events_names", Data: c.events.names},
Expand All @@ -77,74 +103,66 @@ func (c *spanColumns) Input() proto.Input {
}
}

func (c *spanColumns) Result() proto.Results {
return proto.Results{
{Name: "trace_id", Data: &c.traceID},
{Name: "span_id", Data: &c.spanID},
{Name: "trace_state", Data: &c.traceState},
{Name: "parent_span_id", Data: &c.parentSpanID},
{Name: "name", Data: c.name},
{Name: "kind", Data: &c.kind},
{Name: "start", Data: c.start},
{Name: "end", Data: c.end},
{Name: "status_code", Data: &c.statusCode},
{Name: "status_message", Data: &c.statusMessage},

{Name: "batch_id", Data: &c.batchID},
{Name: "attributes", Data: &c.attributes},
{Name: "resource", Data: &c.resource},

{Name: "scope_name", Data: &c.scopeName},
{Name: "scope_version", Data: &c.scopeVersion},
{Name: "scope_attributes", Data: &c.scopeAttributes},

{Name: "events_timestamps", Data: c.events.timestamps},
{Name: "events_names", Data: c.events.names},
{Name: "events_attributes", Data: c.events.attributes},
func (c *spanColumns) Input() proto.Input {
return c.columns().Input()
}

{Name: "links_trace_ids", Data: c.links.traceIDs},
{Name: "links_span_ids", Data: c.links.spanIDs},
{Name: "links_tracestates", Data: c.links.tracestates},
{Name: "links_attributes", Data: c.links.attributes},
}
func (c *spanColumns) Result() proto.Results {
return c.columns().Result()
}

func (c *spanColumns) AddRow(s tracestorage.Span) {
c.traceID.Append(uuid.UUID(s.TraceID))
c.spanID.Append(s.SpanID.AsUint64())
c.traceID.Append(s.TraceID)
c.spanID.Append(s.SpanID)
c.traceState.Append(s.TraceState)
c.parentSpanID.Append(s.ParentSpanID.AsUint64())
c.parentSpanID.Append(s.ParentSpanID)
c.name.Append(s.Name)
c.kind.Append(proto.Enum8(s.Kind))
c.start.Append(time.Unix(0, int64(s.Start)))
c.end.Append(time.Unix(0, int64(s.End)))
c.statusCode.Append(s.StatusCode)
c.statusCode.Append(uint8(s.StatusCode))
c.statusMessage.Append(s.StatusMessage)

// FIXME(tdakkota): use UUID in Span.
c.batchID.Append(uuid.MustParse(s.BatchID))
c.attributes.Append(encodeAttributes(s.Attrs.AsMap()))
c.resource.Append(encodeAttributes(s.ResourceAttrs.AsMap()))

c.batchID.Append(s.BatchID)
appendAttributes(c.attributes, c.attributesTypes, s.Attrs)
appendAttributes(c.resource, c.resourceTypes, s.ResourceAttrs)
{
m := s.ResourceAttrs.AsMap()
setStrOrEmpty(c.serviceInstanceID, m, string(semconv.ServiceInstanceIDKey))
setStrOrEmpty(c.serviceName, m, string(semconv.ServiceNameKey))
setStrOrEmpty(c.serviceNamespace, m, string(semconv.ServiceNamespaceKey))
}
c.scopeName.Append(s.ScopeName)
c.scopeVersion.Append(s.ScopeVersion)
c.scopeAttributes.Append(encodeAttributes(s.ScopeAttrs.AsMap()))
appendAttributes(c.scopeAttributes, c.scopeAttributesTypes, s.ScopeAttrs)

c.events.AddRow(s.Events)
c.links.AddRow(s.Links)
}

func (c *spanColumns) ReadRowsTo(spans []tracestorage.Span) ([]tracestorage.Span, error) {
for i := 0; i < c.traceID.Rows(); i++ {
attrs, err := decodeAttributes(c.attributes.Row(i))
attrs, err := decodeAttributesRow(c.attributes, c.attributesTypes, i)
if err != nil {
return nil, errors.Wrap(err, "decode attributes")
}
resource, err := decodeAttributes(c.resource.Row(i))
resource, err := decodeAttributesRow(c.resource, c.resourceTypes, i)
if err != nil {
return nil, errors.Wrap(err, "decode resource")
}
scopeAttrs, err := decodeAttributes(c.scopeAttributes.Row(i))
{
v := resource.AsMap()
if s := c.serviceInstanceID.Row(i); s != "" {
v.PutStr(string(semconv.ServiceInstanceIDKey), s)
}
if s := c.serviceName.Row(i); s != "" {
v.PutStr(string(semconv.ServiceNameKey), s)
}
if s := c.serviceNamespace.Row(i); s != "" {
v.PutStr(string(semconv.ServiceNamespaceKey), s)
}
}
scopeAttrs, err := decodeAttributesRow(c.scopeAttributes, c.scopeAttributesTypes, i)
if err != nil {
return nil, errors.Wrap(err, "decode scope attributes")
}
Expand All @@ -158,18 +176,18 @@ func (c *spanColumns) ReadRowsTo(spans []tracestorage.Span) ([]tracestorage.Span
}

spans = append(spans, tracestorage.Span{
TraceID: otelstorage.TraceID(c.traceID.Row(i)),
SpanID: otelstorage.SpanIDFromUint64(c.spanID.Row(i)),
TraceID: c.traceID.Row(i),
SpanID: c.spanID.Row(i),
TraceState: c.traceState.Row(i),
ParentSpanID: otelstorage.SpanIDFromUint64(c.parentSpanID.Row(i)),
ParentSpanID: c.parentSpanID.Row(i),
Name: c.name.Row(i),
Kind: int32(c.kind.Row(i)),
Start: otelstorage.NewTimestampFromTime(c.start.Row(i)),
End: otelstorage.NewTimestampFromTime(c.end.Row(i)),
Attrs: attrs,
StatusCode: c.statusCode.Row(i),
StatusCode: int32(c.statusCode.Row(i)),
StatusMessage: c.statusMessage.Row(i),
BatchID: c.batchID.Row(i).String(),
BatchID: c.batchID.Row(i),
ResourceAttrs: resource,
ScopeName: c.scopeName.Row(i),
ScopeVersion: c.scopeVersion.Row(i),
Expand Down Expand Up @@ -241,31 +259,31 @@ func (c *eventsColumns) Row(row int) (events []tracestorage.Event, _ error) {
}

type linksColumns struct {
traceIDs *proto.ColArr[uuid.UUID]
spanIDs *proto.ColArr[uint64]
traceIDs *proto.ColArr[otelstorage.TraceID]
spanIDs *proto.ColArr[otelstorage.SpanID]
tracestates *proto.ColArr[string]
attributes *proto.ColArr[string]
}

func newLinksColumns() linksColumns {
return linksColumns{
traceIDs: new(proto.ColUUID).Array(),
spanIDs: new(proto.ColUInt64).Array(),
traceIDs: proto.NewArray[otelstorage.TraceID](&proto.ColRawOf[otelstorage.TraceID]{}),
spanIDs: proto.NewArray[otelstorage.SpanID](&proto.ColRawOf[otelstorage.SpanID]{}),
tracestates: new(proto.ColStr).Array(),
attributes: new(proto.ColStr).Array(),
}
}

func (c *linksColumns) AddRow(links []tracestorage.Link) {
var (
traceIDs []uuid.UUID
spanIDs []uint64
traceIDs []otelstorage.TraceID
spanIDs []otelstorage.SpanID
tracestates []string
attributes []string
)
for _, l := range links {
traceIDs = append(traceIDs, uuid.UUID(l.TraceID))
spanIDs = append(spanIDs, l.SpanID.AsUint64())
traceIDs = append(traceIDs, l.TraceID)
spanIDs = append(spanIDs, l.SpanID)
tracestates = append(tracestates, l.TraceState)
attributes = append(attributes, encodeAttributes(l.Attrs.AsMap()))
}
Expand Down Expand Up @@ -297,8 +315,8 @@ func (c *linksColumns) Row(row int) (links []tracestorage.Link, _ error) {
}

links = append(links, tracestorage.Link{
TraceID: otelstorage.TraceID(traceIDs[i]),
SpanID: otelstorage.SpanIDFromUint64(spanIDs[i]),
TraceID: traceIDs[i],
SpanID: spanIDs[i],
TraceState: tracestates[i],
Attrs: attrs,
})
Expand Down
1 change: 0 additions & 1 deletion internal/chstorage/querier_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ func (q *Querier) LabelValues(ctx context.Context, labelName string, opts logsto
for i := 0; i < values.Rows(); i++ {
for _, v := range values.Row(i) {
if v == "" {
// HACK: JSONExtractRaw returns empty string if key is not found.
continue
}
out = append(out, v)
Expand Down
Loading

0 comments on commit e1f8a56

Please sign in to comment.