diff --git a/cassandra-tests/src/main/java/cassandra/CassandraContainer.java b/cassandra-tests/src/main/java/cassandra/CassandraContainer.java index 4ca1e9c..03b48a0 100644 --- a/cassandra-tests/src/main/java/cassandra/CassandraContainer.java +++ b/cassandra-tests/src/main/java/cassandra/CassandraContainer.java @@ -25,7 +25,7 @@ public class CassandraContainer extends GenericContainer { public CassandraContainer() { - super(parse("ghcr.io/openzipkin/zipkin-cassandra:2.23.7")); + super(parse("ghcr.io/openzipkin/zipkin-cassandra:2.27.0")); waitStrategy = Wait.forHealthcheck(); addExposedPort(9042); withStartupTimeout(Duration.ofMinutes(2)); diff --git a/cassandra/src/main/java/brave/cassandra/Tracing.java b/cassandra/src/main/java/brave/cassandra/Tracing.java index ec910bd..7fefc30 100644 --- a/cassandra/src/main/java/brave/cassandra/Tracing.java +++ b/cassandra/src/main/java/brave/cassandra/Tracing.java @@ -22,11 +22,12 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.Map; -import java.util.UUID; -import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.TraceStateImpl; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.TimeUUID; import zipkin2.reporter.Call; import zipkin2.reporter.CheckResult; import zipkin2.reporter.brave.AsyncZipkinSpanHandler; @@ -49,7 +50,7 @@ *

Alternatively, you can subclass this and fix configuration to your favorite mechanism. */ public class Tracing extends org.apache.cassandra.tracing.Tracing { - final InetAddress coordinator = FBUtilities.getLocalAddress(); + final InetAddressAndPort coordinator = FBUtilities.getLocalAddressAndPort(); final TracingComponent component; public Tracing(brave.Tracing tracing) { // subclassable to pin configuration @@ -77,6 +78,8 @@ public Tracing() { brave.Tracing tracing = brave.Tracing.newBuilder() .localServiceName(System.getProperty("zipkin.service_name", "cassandra")) + .localIp(coordinator.getAddress().getHostAddress()) + .localPort(coordinator.getPort()) .addSpanHandler(zipkinSpanHandler) .build(); component = new TracingComponent.Explicit(tracing); @@ -99,8 +102,11 @@ static void maybeFailFast(Throwable error) { * payload. If that's possible, it re-uses the trace identifiers and starts a server span. * Otherwise, a new trace is created. */ - @Override protected UUID newSession(UUID sessionId, TraceType traceType, - Map customPayload) { + @Override protected TimeUUID newSession( + TimeUUID sessionId, + TraceType traceType, + Map customPayload + ) { // override instead of call from super as otherwise we cannot store a reference to the span assert get() == null; @@ -119,9 +125,10 @@ static void maybeFailFast(Throwable error) { } @Override - protected TraceState newTraceState(InetAddress inetAddress, UUID timeUUID, TraceType traceType) { + protected TraceState newTraceState(InetAddressAndPort coordinator, TimeUUID sessionId, + TraceType traceType) { assert false : "we don't expect this to be ever reached as we override newSession"; - return new TraceStateImpl(coordinator, timeUUID, traceType); + return new TraceStateImpl(coordinator, sessionId, traceType); } /** This extracts the RPC span encoded in the custom payload, or starts a new trace */ @@ -181,7 +188,7 @@ protected void parseRequest( customizer.tag(CassandraTraceKeys.CASSANDRA_SESSION_ID, state.sessionId.toString()); } - @Override public TraceState initializeFromMessage(MessageIn message) { + @Override public TraceState initializeFromMessage(Message.Header header) { // not current tracing inter-node messages return null; } @@ -193,8 +200,8 @@ protected void parseRequest( static final class ZipkinTraceState extends TraceState { final Span incoming; - ZipkinTraceState(InetAddress coordinator, UUID sessionId, - org.apache.cassandra.tracing.Tracing.TraceType traceType, Span incoming) { + ZipkinTraceState(InetAddressAndPort coordinator, TimeUUID sessionId, + TraceType traceType, Span incoming) { super(coordinator, sessionId, traceType); this.incoming = incoming; } diff --git a/pom.xml b/pom.xml index 27c4f3a..9f9d3f7 100644 --- a/pom.xml +++ b/pom.xml @@ -141,8 +141,8 @@ org.apache.cassandra cassandra-all - - 3.11.9 + + 4.1.3 org.slf4j