Skip to content

Commit

Permalink
kafka-tracing: injects init context on forward (#1409)
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
codefromthecrypt authored Feb 27, 2024
1 parent 328107c commit 6224d3f
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public <K, V> Producer<K, V> producer(Producer<K, V> producer) {
* one couldn't be extracted.
*/
public Span nextSpan(ConsumerRecord<?, ?> record) {
// Eventhough the type is ConsumerRecord, this is not a (remote) consumer span. Only "poll"
// Even though the type is ConsumerRecord, this is not a (remote) consumer span. Only "poll"
// events create consumer spans. Since this is a processor span, we use the normal sampler.
TraceContextOrSamplingFlags extracted =
extractAndClearTraceIdHeaders(processorExtractor, record.headers(), record.headers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ final class KafkaContainer extends GenericContainer<KafkaContainer> {
static final int KAFKA_PORT = 19092;

KafkaContainer() {
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.2"));
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.1.0"));
waitStrategy = Wait.forHealthcheck();
// Kafka broker listener port (19092) needs to be exposed for test cases to access it.
addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -16,12 +16,12 @@
import brave.propagation.Propagation.Getter;
import brave.propagation.Propagation.Setter;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessingContext;

final class KafkaStreamsPropagation {
/**
* Used by {@link KafkaStreamsTracing#nextSpan(ProcessorContext)} to extract a trace context from
* a prior stage.
* Used by {@link KafkaStreamsTracing#nextSpan(ProcessingContext, Headers)} to extract a trace
* context from a prior stage.
*/
static final Getter<Headers, String> GETTER = new Getter<Headers, String>() {
@Override public String get(Headers headers, String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
*/
package brave.kafka.streams;

import brave.propagation.CurrentTraceContext;
import brave.propagation.TraceContext;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;

class TracingFixedKeyProcessor<KIn, VIn, VOut> extends
final class TracingFixedKeyProcessor<KIn, VIn, VOut> extends
BaseTracingProcessor<FixedKeyProcessorContext<KIn, VOut>, FixedKeyRecord<KIn, VIn>, FixedKeyProcessor<KIn, VIn, VOut>>
implements FixedKeyProcessor<KIn, VIn, VOut> {

Expand All @@ -31,13 +33,20 @@ class TracingFixedKeyProcessor<KIn, VIn, VOut> extends
return record.headers();
}

@Override
void process(FixedKeyProcessor<KIn, VIn, VOut> delegate, FixedKeyRecord<KIn, VIn> record) {
@Override void process(FixedKeyProcessor<KIn, VIn, VOut> delegate,
FixedKeyRecord<KIn, VIn> record) {
delegate.process(record);
}

@Override public void init(FixedKeyProcessorContext<KIn, VOut> context) {
this.context = context;
CurrentTraceContext current =
kafkaStreamsTracing.kafkaTracing.messagingTracing().tracing().currentTraceContext();
TraceContext traceContext = current.get();
if (traceContext != null) {
context =
new TracingFixedKeyProcessorContext<>(context, kafkaStreamsTracing.injector, traceContext);
}
delegate.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.kafka.streams;

import brave.propagation.TraceContext;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;

/** Injects the initialization tracing context to record headers on forward */
final class TracingFixedKeyProcessorContext<KForward, VForward>
extends TracingProcessingContext<FixedKeyProcessorContext<KForward, VForward>>
implements FixedKeyProcessorContext<KForward, VForward> {

TracingFixedKeyProcessorContext(FixedKeyProcessorContext<KForward, VForward> delegate,
TraceContext.Injector<Headers> injector, TraceContext context) {
super(delegate, injector, context);
}

@Override public <K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> r) {
injector.inject(context, r.headers());
delegate.forward(r);
}

@Override
public <K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> r, String s) {
injector.inject(context, r.headers());
delegate.forward(r, s);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.kafka.streams;

import brave.propagation.TraceContext;
import brave.propagation.TraceContext.Injector;
import java.io.File;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.ProcessingContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;

abstract class TracingProcessingContext<C extends ProcessingContext> implements ProcessingContext {
final C delegate;
final Injector<Headers> injector;
final TraceContext context;

TracingProcessingContext(C delegate, Injector<Headers> injector,
TraceContext context) {
this.delegate = delegate;
this.injector = injector;
this.context = context;
}

@Override public String applicationId() {
return delegate.applicationId();
}

@Override public TaskId taskId() {
return delegate.taskId();
}

@Override public Optional<RecordMetadata> recordMetadata() {
return delegate.recordMetadata();
}

@Override public Serde<?> keySerde() {
return delegate.keySerde();
}

@Override public Serde<?> valueSerde() {
return delegate.valueSerde();
}

@Override public File stateDir() {
return delegate.stateDir();
}

@Override public StreamsMetrics metrics() {
return delegate.metrics();
}

@Override public <S extends StateStore> S getStateStore(String s) {
return delegate.getStateStore(s);
}

@Override public Cancellable schedule(Duration duration, PunctuationType punctuationType,
Punctuator punctuator) {
return delegate.schedule(duration, punctuationType, punctuator);
}

@Override public void commit() {
delegate.commit();
}

@Override public Map<String, Object> appConfigs() {
return delegate.appConfigs();
}

@Override public Map<String, Object> appConfigsWithPrefix(String s) {
return delegate.appConfigsWithPrefix(s);
}

@Override public long currentSystemTimeMs() {
return delegate.currentSystemTimeMs();
}

@Override public long currentStreamTimeMs() {
return delegate.currentStreamTimeMs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package brave.kafka.streams;

import brave.propagation.CurrentTraceContext;
import brave.propagation.TraceContext;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
Expand All @@ -37,6 +39,12 @@ final class TracingProcessor<KIn, VIn, KOut, VOut> extends

@Override public void init(ProcessorContext<KOut, VOut> context) {
this.context = context;
CurrentTraceContext current =
kafkaStreamsTracing.kafkaTracing.messagingTracing().tracing().currentTraceContext();
TraceContext traceContext = current.get();
if (traceContext != null) {
context = new TracingProcessorContext<>(context, kafkaStreamsTracing.injector, traceContext);
}
delegate.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.kafka.streams;

import brave.propagation.TraceContext;
import brave.propagation.TraceContext.Injector;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;

/** Injects the initialization tracing context to record headers on forward */
final class TracingProcessorContext<KForward, VForward>
extends TracingProcessingContext<ProcessorContext<KForward, VForward>>
implements ProcessorContext<KForward, VForward> {

TracingProcessorContext(ProcessorContext<KForward, VForward> delegate,
Injector<Headers> injector, TraceContext context) {
super(delegate, injector, context);
}

@Override public <K extends KForward, V extends VForward> void forward(Record<K, V> r) {
injector.inject(context, r.headers());
delegate.forward(r);
}

@Override
public <K extends KForward, V extends VForward> void forward(Record<K, V> r, String s) {
injector.inject(context, r.headers());
delegate.forward(r, s);
}
}
Loading

0 comments on commit 6224d3f

Please sign in to comment.