Skip to content

Commit

Permalink
Merge pull request #440 from go-faster/feat/store-label-scope
Browse files Browse the repository at this point in the history
feat(chstorage): lookup attribute scope before querying data
  • Loading branch information
tdakkota authored Jun 27, 2024
2 parents 40f2fe4 + 40896e2 commit d90e1f3
Show file tree
Hide file tree
Showing 15 changed files with 258 additions and 105 deletions.
21 changes: 21 additions & 0 deletions integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Integration tests

This directory contains Prometheus, Tempo and Loki API integration tests.

Docker is required to run.

### Getting traces from tests for debugging

Run a local [`Jaeger`](https://www.jaegertracing.io/) instance:

```console
docker run --rm -d --name jaeger -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one:latest
```

Run wanted test with `E2E_TRACES_EXPORTER`:

```console
E2E_TRACES_EXPORTER='localhost:4317' E2E='1' go test -v -run "TestCH/QueryRange" ./integration/prome2e
```

Open Jaeger UI [http://localhost:16686/](http://localhost:16686/).
5 changes: 4 additions & 1 deletion integration/lokie2e/ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func randomPrefix() string {
func TestCH(t *testing.T) {
integration.Skip(t)
ctx := context.Background()
provider := integration.IntegrationProvider(t)

req := testcontainers.ContainerRequest{
Name: "oteldb-lokie2e-clickhouse",
Expand All @@ -48,6 +49,9 @@ func TestCH(t *testing.T) {
opts := ch.Options{
Address: endpoint,
Database: "default",

OpenTelemetryInstrumentation: true,
TracerProvider: provider,
}

connectBackoff := backoff.NewExponentialBackOff()
Expand Down Expand Up @@ -77,7 +81,6 @@ func TestCH(t *testing.T) {
t.Logf("Test tables prefix: %s", prefix)
require.NoError(t, tables.Create(ctx, c))

provider := integration.NewProvider()
inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{
Tables: tables,
TracerProvider: provider,
Expand Down
26 changes: 18 additions & 8 deletions integration/lokie2e/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
"sigs.k8s.io/yaml"

"github.com/go-faster/oteldb/integration"
"github.com/go-faster/oteldb/integration/lokie2e"
"github.com/go-faster/oteldb/integration/requirex"
"github.com/go-faster/oteldb/internal/chstorage"
Expand All @@ -37,7 +37,15 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func setupDB(ctx context.Context, t *testing.T, provider *integration.Provider, set *lokie2e.BatchSet, inserter logstorage.Inserter, querier logstorage.Querier, engineQuerier logqlengine.Querier) *lokiapi.Client {
func setupDB(
ctx context.Context,
t *testing.T,
provider trace.TracerProvider,
set *lokie2e.BatchSet,
inserter logstorage.Inserter,
querier logstorage.Querier,
engineQuerier logqlengine.Querier,
) *lokiapi.Client {
consumer := logstorage.NewConsumer(inserter)

logEncoder := plog.JSONMarshaler{}
Expand Down Expand Up @@ -82,7 +90,14 @@ func setupDB(ctx context.Context, t *testing.T, provider *integration.Provider,
return c
}

func runTest(ctx context.Context, t *testing.T, provider *integration.Provider, inserter logstorage.Inserter, querier logstorage.Querier, engineQuerier logqlengine.Querier) {
func runTest(
ctx context.Context,
t *testing.T,
provider trace.TracerProvider,
inserter logstorage.Inserter,
querier logstorage.Querier,
engineQuerier logqlengine.Querier,
) {
now := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
set, err := generateLogs(now)
require.NoError(t, err)
Expand All @@ -93,20 +108,15 @@ func runTest(ctx context.Context, t *testing.T, provider *integration.Provider,
require.NotZero(t, set.End)
require.GreaterOrEqual(t, set.End, set.Start)

tracer := provider.TracerProvider.Tracer("test")
c := setupDB(ctx, t, provider, set, inserter, querier, engineQuerier)

t.Run("Labels", func(t *testing.T) {
ctx, span := tracer.Start(ctx, "Labels")
a := require.New(t)
r, err := c.Labels(ctx, lokiapi.LabelsParams{
// Always sending time range because default is current time.
Start: lokiapi.NewOptLokiTime(asLokiTime(set.Start)),
End: lokiapi.NewOptLokiTime(asLokiTime(set.End)),
})
span.End()
provider.Flush()

a.NoError(err)

a.Len(r.Data, len(set.Labels))
Expand Down
16 changes: 13 additions & 3 deletions integration/prome2e/ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestCH(t *testing.T) {
t.Parallel()
integration.Skip(t)
ctx := context.Background()
provider := integration.IntegrationProvider(t)

req := testcontainers.ContainerRequest{
Name: "oteldb-prome2e-clickhouse",
Expand All @@ -43,6 +44,9 @@ func TestCH(t *testing.T) {
opts := ch.Options{
Address: endpoint,
Database: "default",

OpenTelemetryInstrumentation: true,
TracerProvider: provider,
}

connectBackoff := backoff.NewExponentialBackOff()
Expand Down Expand Up @@ -71,12 +75,18 @@ func TestCH(t *testing.T) {
t.Logf("Test tables prefix: %s", prefix)
require.NoError(t, tables.Create(ctx, c))

inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{Tables: tables})
inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{
Tables: tables,
TracerProvider: provider,
})
require.NoError(t, err)

querier, err := chstorage.NewQuerier(c, chstorage.QuerierOptions{Tables: tables})
querier, err := chstorage.NewQuerier(c, chstorage.QuerierOptions{
Tables: tables,
TracerProvider: provider,
})
require.NoError(t, err)

ctx = zctx.Base(ctx, integration.Logger(t))
runTest(ctx, t, inserter, querier, querier)
runTest(ctx, t, provider, inserter, querier, querier)
}
14 changes: 11 additions & 3 deletions integration/prome2e/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"

"github.com/go-faster/oteldb/integration/prome2e"
Expand Down Expand Up @@ -40,6 +41,7 @@ func readBatchSet(p string) (s prome2e.BatchSet, _ error) {
func setupDB(
ctx context.Context,
t *testing.T,
provider trace.TracerProvider,
set prome2e.BatchSet,
consumer MetricsConsumer,
querier storage.Queryable,
Expand All @@ -57,20 +59,26 @@ func setupDB(
EnableNegativeOffset: true,
})
api := promhandler.NewPromAPI(engine, querier, exemplarQuerier, promhandler.PromAPIOptions{})
promh, err := promapi.NewServer(api)
promh, err := promapi.NewServer(api,
promapi.WithTracerProvider(provider),
)
require.NoError(t, err)

s := httptest.NewServer(promh)
t.Cleanup(s.Close)

c, err := promapi.NewClient(s.URL, promapi.WithClient(s.Client()))
c, err := promapi.NewClient(s.URL,
promapi.WithClient(s.Client()),
promapi.WithTracerProvider(provider),
)
require.NoError(t, err)
return c
}

func runTest(
ctx context.Context,
t *testing.T,
provider trace.TracerProvider,
consumer MetricsConsumer,
querier storage.Queryable,
exemplarQuerier storage.ExemplarQueryable,
Expand All @@ -79,7 +87,7 @@ func runTest(
require.NoError(t, err)
require.NotEmpty(t, set.Batches)
require.NotEmpty(t, set.Labels)
c := setupDB(ctx, t, set, consumer, querier, exemplarQuerier)
c := setupDB(ctx, t, provider, set, consumer, querier, exemplarQuerier)

t.Run("Labels", func(t *testing.T) {
t.Run("All", func(t *testing.T) {
Expand Down
16 changes: 13 additions & 3 deletions integration/tempoe2e/ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestCH(t *testing.T) {
t.Parallel()
integration.Skip(t)
ctx := context.Background()
provider := integration.IntegrationProvider(t)

req := testcontainers.ContainerRequest{
Name: "oteldb-tempoe2e-clickhouse",
Expand All @@ -43,6 +44,9 @@ func TestCH(t *testing.T) {
opts := ch.Options{
Address: endpoint,
Database: "default",

OpenTelemetryInstrumentation: true,
TracerProvider: provider,
}

connectBackoff := backoff.NewExponentialBackOff()
Expand Down Expand Up @@ -71,12 +75,18 @@ func TestCH(t *testing.T) {
t.Logf("Test tables prefix: %s", prefix)
require.NoError(t, tables.Create(ctx, c))

inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{Tables: tables})
inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{
Tables: tables,
TracerProvider: provider,
})
require.NoError(t, err)

querier, err := chstorage.NewQuerier(c, chstorage.QuerierOptions{Tables: tables})
querier, err := chstorage.NewQuerier(c, chstorage.QuerierOptions{
Tables: tables,
TracerProvider: provider,
})
require.NoError(t, err)

ctx = zctx.Base(ctx, integration.Logger(t))
runTest(ctx, t, inserter, querier, querier)
runTest(ctx, t, provider, inserter, querier, querier)
}
18 changes: 14 additions & 4 deletions integration/tempoe2e/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"

"github.com/go-faster/oteldb/integration/requirex"
Expand All @@ -42,6 +43,7 @@ func readBatchSet(p string) (s tempoe2e.BatchSet, _ error) {
func setupDB(
ctx context.Context,
t *testing.T,
provider trace.TracerProvider,
set tempoe2e.BatchSet,
inserter tracestorage.Inserter,
querier tracestorage.Querier,
Expand All @@ -56,25 +58,33 @@ func setupDB(

var engine *traceqlengine.Engine
if engineQuerier != nil {
engine = traceqlengine.NewEngine(engineQuerier, traceqlengine.Options{})
engine = traceqlengine.NewEngine(engineQuerier, traceqlengine.Options{
TracerProvider: provider,
})
}
api := tempohandler.NewTempoAPI(querier, engine, tempohandler.TempoAPIOptions{
EnableAutocompleteQuery: true,
})
tempoh, err := tempoapi.NewServer(api)
tempoh, err := tempoapi.NewServer(api,
tempoapi.WithTracerProvider(provider),
)
require.NoError(t, err)

s := httptest.NewServer(tempoh)
t.Cleanup(s.Close)

c, err := tempoapi.NewClient(s.URL, tempoapi.WithClient(s.Client()))
c, err := tempoapi.NewClient(s.URL,
tempoapi.WithClient(s.Client()),
tempoapi.WithTracerProvider(provider),
)
require.NoError(t, err)
return c
}

func runTest(
ctx context.Context,
t *testing.T,
provider trace.TracerProvider,
inserter tracestorage.Inserter,
querier tracestorage.Querier,
engineQuerier traceqlengine.Querier,
Expand Down Expand Up @@ -102,7 +112,7 @@ func runTest(
}
}

c := setupDB(ctx, t, set, inserter, querier, engineQuerier)
c := setupDB(ctx, t, provider, set, inserter, querier, engineQuerier)
var (
start = tempoapi.NewOptUnixSeconds(set.Start.AsTime().Add(-time.Second))
end = tempoapi.NewOptUnixSeconds(set.End.AsTime())
Expand Down
49 changes: 43 additions & 6 deletions integration/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ package integration
import (
"context"
"math/rand"
"os"
"sync"
"testing"
"time"

tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)

type randomIDGenerator struct {
Expand Down Expand Up @@ -36,7 +41,7 @@ func (gen *randomIDGenerator) NewIDs(context.Context) (tid trace.TraceID, sid tr
// Provider is a helper for tests providing a TracerProvider and an
// InMemoryExporter.
type Provider struct {
*tracesdk.TracerProvider
*sdktrace.TracerProvider
Exporter *tracetest.InMemoryExporter
}

Expand All @@ -52,18 +57,50 @@ func (p *Provider) Flush() {
}
}

// IntegrationProvider returns trace exporter for debugging.
//
// E2E_TRACES_EXPORTER enironment variable sets gRPC OTLP receiver address
// to export traces.
func IntegrationProvider(t *testing.T) trace.TracerProvider {
if addr := os.Getenv("E2E_TRACES_EXPORTER"); addr != "" {
ctx := context.Background()

exp, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint(addr),
otlptracegrpc.WithInsecure(),
)
if err != nil {
t.Fatalf("create trace exporter: %+v", err)
}
tracer := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exp))
t.Cleanup(func() {
flushCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
if err := tracer.ForceFlush(flushCtx); err != nil {
t.Logf("Flush traces error: %+v", err)
}
if err := tracer.Shutdown(ctx); err != nil {
t.Logf("Shutdown tracer error: %+v", err)
}
})

return tracer
}
return noop.NewTracerProvider()
}

// NewProvider initializes and returns a new Provider along with an exporter.
func NewProvider() *Provider {
exporter := tracetest.NewInMemoryExporter()
randSource := rand.NewSource(10)
tp := tracesdk.NewTracerProvider(
tp := sdktrace.NewTracerProvider(
// Using deterministic random ids.
tracesdk.WithIDGenerator(&randomIDGenerator{
sdktrace.WithIDGenerator(&randomIDGenerator{
// #nosec G404
rand: rand.New(randSource),
}),
tracesdk.WithBatcher(exporter,
tracesdk.WithBatchTimeout(0), // instant
sdktrace.WithBatcher(exporter,
sdktrace.WithBatchTimeout(0), // instant
),
)
return &Provider{
Expand Down
Loading

0 comments on commit d90e1f3

Please sign in to comment.