From 53cdcaa75c88ac91282d6b3d1b9dc3f92c586a79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 11:08:15 +0200 Subject: [PATCH] Tracing: added OpenTelemetry integration test The test verifies that span tree structure and status code are valid. Speculative executions are run parallel to the main thread, so some of them can finish only after query result has been returned. Thus, in order to collect span data from entire request, we decided to wait until all speculative executions end. The main thread uses conditional variable `allEnded` to wait for them and lock is used for concurrent mutation of activeSpans. --- .../OpenTelemetryTracingInfo.java | 16 +- .../OpenTelemetryTracingInfoFactory.java | 10 +- .../opentelemetry/OpenTelemetryTest.java | 889 ++++++++++++++++++ 3 files changed, 902 insertions(+), 13 deletions(-) create mode 100644 driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java index ad20d82420d..de1463ce293 100644 --- a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java @@ -20,8 +20,8 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; -import com.datastax.driver.core.tracing.PrecisionLevel; import com.datastax.driver.core.tracing.TracingInfo; +import com.datastax.driver.core.tracing.VerbosityLevel; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; @@ -32,9 +32,9 @@ public class OpenTelemetryTracingInfo implements TracingInfo { private final Tracer tracer; private final Context context; private boolean tracingStarted; - private final PrecisionLevel precision; + private final VerbosityLevel precision; - protected OpenTelemetryTracingInfo(Tracer tracer, Context context, PrecisionLevel precision) { + protected OpenTelemetryTracingInfo(Tracer tracer, Context context, VerbosityLevel precision) { this.tracer = tracer; this.context = context; this.precision = precision; @@ -53,7 +53,7 @@ private void assertStarted() { assert tracingStarted : "TracingInfo.setStartTime must be called before any other method"; } - public PrecisionLevel getPrecision() { + public VerbosityLevel getVerbosity() { return precision; } @@ -72,14 +72,14 @@ public void setConsistencyLevel(ConsistencyLevel consistency) { public void setStatement(String statement) { assertStarted(); - if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { span.setAttribute("db.scylladb.statement", statement); } } public void setHostname(String hostname) { assertStarted(); - if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { span.setAttribute("net.peer.name", hostname); } } @@ -168,7 +168,7 @@ public void setRowsCount(int rowsCount) { @Override public void setStatement(String statement, int limit) { assertStarted(); - if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { if (statement.length() > limit) statement = statement.substring(0, limit); span.setAttribute("db.scylladb.statement", statement); } @@ -244,7 +244,7 @@ public void tracingFinished() { span.end(); } - private boolean currentPrecisionLevelIsAtLeast(PrecisionLevel requiredLevel) { + private boolean currentVerbosityLevelIsAtLeast(VerbosityLevel requiredLevel) { return requiredLevel.compareTo(precision) <= 0; } } diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java index 64ff29f82a9..91fab8e9c8a 100644 --- a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java @@ -17,22 +17,22 @@ package com.datastax.driver.opentelemetry; import com.datastax.driver.core.tracing.NoopTracingInfoFactory; -import com.datastax.driver.core.tracing.PrecisionLevel; import com.datastax.driver.core.tracing.TracingInfo; import com.datastax.driver.core.tracing.TracingInfoFactory; +import com.datastax.driver.core.tracing.VerbosityLevel; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; public class OpenTelemetryTracingInfoFactory implements TracingInfoFactory { private final Tracer tracer; - private final PrecisionLevel precision; + private final VerbosityLevel precision; public OpenTelemetryTracingInfoFactory(final Tracer tracer) { - this(tracer, PrecisionLevel.NORMAL); + this(tracer, VerbosityLevel.NORMAL); } - public OpenTelemetryTracingInfoFactory(final Tracer tracer, final PrecisionLevel precision) { + public OpenTelemetryTracingInfoFactory(final Tracer tracer, final VerbosityLevel precision) { this.tracer = tracer; this.precision = precision; } @@ -48,7 +48,7 @@ public TracingInfo buildTracingInfo(TracingInfo parent) { if (parent instanceof OpenTelemetryTracingInfo) { final OpenTelemetryTracingInfo castedParent = (OpenTelemetryTracingInfo) parent; return new OpenTelemetryTracingInfo( - castedParent.getTracer(), castedParent.getContext(), castedParent.getPrecision()); + castedParent.getTracer(), castedParent.getContext(), castedParent.getVerbosity()); } return new NoopTracingInfoFactory().buildTracingInfo(); diff --git a/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java new file mode 100644 index 00000000000..4026b360078 --- /dev/null +++ b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java @@ -0,0 +1,889 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.opentelemetry; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.SyntaxError; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import com.datastax.driver.core.tracing.VerbosityLevel; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.testng.annotations.Test; + +/** Tests for OpenTelemetry integration. */ +public class OpenTelemetryTest extends CCMTestsSupport { + /** Collects and saves spans. */ + private static final class BookkeepingSpanProcessor implements SpanProcessor { + final Lock lock = new ReentrantLock(); + final Condition allEnded = lock.newCondition(); + + final Collection startedSpans = new ArrayList<>(); + final Collection spans = new ArrayList<>(); + + int activeSpans = 0; + + @Override + public void onStart(Context parentContext, ReadWriteSpan span) { + lock.lock(); + + startedSpans.add(span); + ++activeSpans; + + lock.unlock(); + } + + @Override + public boolean isStartRequired() { + return true; + } + + @Override + public void onEnd(ReadableSpan span) { + lock.lock(); + + spans.add(span); + --activeSpans; + + if (activeSpans == 0) allEnded.signal(); + + lock.unlock(); + } + + @Override + public boolean isEndRequired() { + return true; + } + + public Collection getSpans() { + lock.lock(); + + try { + while (activeSpans > 0) allEnded.await(); + + for (ReadableSpan span : startedSpans) { + assertTrue(span.hasEnded()); + } + } catch (InterruptedException e) { + assert false; + } finally { + lock.unlock(); + } + + return spans; + } + } + + private Session session; + + /** + * Prepare OpenTelemetry configuration and run test with it. + * + * @param precisionLevel precision level of tracing for the tests. + * @param test test to run. + * @return collected spans. + */ + private Collection collectSpans( + VerbosityLevel precisionLevel, BiConsumer test) { + final Resource serviceNameResource = + Resource.create( + Attributes.of(ResourceAttributes.SERVICE_NAME, "Scylla Java driver - test")); + + final BookkeepingSpanProcessor collector = new BookkeepingSpanProcessor(); + + final SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(collector) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + final OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build(); + + final Tracer tracer = openTelemetry.getTracerProvider().get("this"); + final OpenTelemetryTracingInfoFactory tracingInfoFactory = + new OpenTelemetryTracingInfoFactory(tracer, precisionLevel); + cluster().setTracingInfoFactory(tracingInfoFactory); + + session = cluster().connect(); + session.execute("USE " + keyspace); + session.execute("DROP TABLE IF EXISTS t"); + session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)"); + + BatchStatement bs = new BatchStatement(); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (12, 3)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (1, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (5, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (6, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (7, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (8, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (9, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (10, 7)")); + bs.setConsistencyLevel(ConsistencyLevel.ALL); + session.execute(bs); + + collector.getSpans().clear(); + + test.accept(tracer, tracingInfoFactory); + session.close(); + + tracerProvider.close(); + cluster().setTracingInfoFactory(new NoopTracingInfoFactory()); + + return collector.getSpans(); + } + + private Collection getChildrenOfSpans( + Collection allSpans, Collection parentSpans) { + return allSpans.stream() + .filter( + span -> + parentSpans.stream() + .filter( + parentSpan -> + parentSpan.getSpanContext().equals(span.getParentSpanContext())) + .findAny() + .isPresent()) + .collect(Collectors.toList()); + } + + /** Basic test for creating spans with INSERT statement. */ + @Test(groups = "short") + public void simpleTracingInsertTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + session.execute("INSERT INTO t(k, v) VALUES (4, 2)"); + session.execute("INSERT INTO t(k, v) VALUES (2, 1)"); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 2); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such information with operation INSERT: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with UPDATE statement. */ + @Test(groups = "short") + public void simpleTracingUpdateTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + final BatchStatement batch = new BatchStatement(); + batch.addAll( + new ArrayList() { + { + add("UPDATE t SET v=0 WHERE k=1"); + add("UPDATE t SET v=0 WHERE k=2"); + add("UPDATE t SET v=0 WHERE k=3"); + add("UPDATE t SET v=0 WHERE k=4"); + } + }.stream().map(SimpleStatement::new).collect(Collectors.toList())); + + session.execute(batch); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 1); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 1); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 1); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "batch"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + // tags specific to BatchStatement + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.batch_size")), "4"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in BatchStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such information with operation UPDATE: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with DELETE statement. */ + @Test(groups = "short") + public void simpleTracingDeleteTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + final PreparedStatement ps = session.prepare("DELETE FROM t WHERE k=?"); + + session.execute(ps.bind(7)); + session.execute(ps.bind(8)); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter( + span -> + span.getParentSpanContext().equals(userSpan.getSpanContext()) + && span.toSpanData() + .getAttributes() + .get(AttributeKey.stringKey("db.scylladb.statement_type")) + != null) // to exclude preparation spans + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 2); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "prepared"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + // tags specific to PreparedStatement + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.keyspace")), keyspace); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.table")), "t"); + String partitionKey = tags.get(AttributeKey.stringKey("db.scylladb.partition_key")); + assertTrue(partitionKey.equals("k=7") || partitionKey.equals("k=8")); + String boundValues = tags.get(AttributeKey.stringKey("db.scylladb.bound_values")); + assertTrue(boundValues.equals("k=7") || boundValues.equals("k=8")); + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.db.operation"))); // not supported so far + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in BatchStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such information with operation DELETE: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with TRUNCATE statement. */ + @Test(groups = "short") + public void simpleTracingTruncateTest() { + final Collection spans = + collectSpans( + VerbosityLevel.FULL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + session.execute("TRUNCATE t"); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 1); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 1); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 1); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // present with VerbosityLevel.FULL: + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.statement")), "TRUNCATE t"); + // no such information with operation TRUNCATE: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with SELECT statement. */ + @Test(groups = "short") + public void simpleTracingSelectTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + SimpleStatement s = + new SimpleStatement("SELECT k FROM t WHERE v = 7 ALLOW FILTERING"); + s.setFetchSize(2); + s.setIdempotent(true); + s.setRetryPolicy(FallthroughRetryPolicy.INSTANCE); + s.setConsistencyLevel(ConsistencyLevel.ALL); + + assertEquals(session.execute(s).all().size(), 7); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assert requestSpans.size() >= 4; + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assert speculativeExecutionsSpans.size() >= 4; + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assert attemptSpans.size() >= 4; + + boolean wasNoMorePages = false; + int totalRows = 0; + + for (ReadableSpan requestSpan : requestSpans) { + SpanData spanData = requestSpan.toSpanData(); + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ALL"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.fetch_size")), "2"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.retry_policy")), "FallthroughRetryPolicy"); + + // tags specific for SELECT statement + final String hasMorePages = tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages")); + assertNotNull(hasMorePages); + if (hasMorePages.equals("false")) wasNoMorePages = true; + assertTrue(!(wasNoMorePages && hasMorePages.equals("true"))); + final String rowsCount = tags.get(AttributeKey.stringKey("db.scylladb.rows_count")); + assertNotNull(rowsCount); + totalRows += Integer.parseInt(rowsCount); + + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + } + + assertTrue(wasNoMorePages); + assertEquals(totalRows, 7); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with an erroneous statement. */ + @Test(groups = "short") + public void simpleRequestErrorTracingTest() { + final Collection spans = + collectSpans( + VerbosityLevel.FULL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + try { + session.execute("INSERT ONTO t(k, v) VALUES (4, 2)"); + // ^ syntax error here + assert false; // exception should be thrown before this line is executed + } catch (SyntaxError error) { + // pass + } + + try { + session.execute("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)"); + // ^ too many values + assert false; // exception should be thrown before this line is executed + } catch (InvalidQueryException error) { + // pass + } + + scope.close(); + userSpan.end(); + }); + + // Retrieve span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.ERROR); + final String collectedStatement = + spanData.getAttributes().get(AttributeKey.stringKey("db.scylladb.statement")); + assert collectedStatement.equals("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)") + || collectedStatement.equals("INSERT ONTO t(k, v) VALUES (4, 2)") + : "Bad statement gathered"; + }); + } +}