Skip to content

Commit

Permalink
Align GrpcSender contract with HttpSender (#6658)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Sep 5, 2024
1 parent 00b0e9f commit 1f6de35
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil.GRPC_STATUS_UNAVAILABLE;
import static io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil.GRPC_STATUS_UNIMPLEMENTED;
import static io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil.GRPC_STATUS_UNKNOWN;

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterMetrics;
Expand Down Expand Up @@ -62,41 +61,27 @@ public CompletableResultCode export(T exportRequest, int numItems) {

grpcSender.send(
exportRequest,
() -> {
exporterMetrics.addSuccess(numItems);
result.succeed();
},
(response, throwable) -> {
exporterMetrics.addFailed(numItems);

logFailureMessage(response, throwable);

switch (response.grpcStatusValue()) {
case GRPC_STATUS_UNKNOWN:
result.failExceptionally(FailedExportException.grpcFailedExceptionally(throwable));
break;
default:
result.failExceptionally(FailedExportException.grpcFailedWithResponse(response));
break;
}
});
grpcResponse -> onResponse(result, numItems, grpcResponse),
throwable -> onError(result, numItems, throwable));

return result;
}

public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
private void onResponse(CompletableResultCode result, int numItems, GrpcResponse grpcResponse) {
int statusCode = grpcResponse.grpcStatusValue();

if (statusCode == 0) {
exporterMetrics.addSuccess(numItems);
result.succeed();
return;
}
return grpcSender.shutdown();
}

private void logFailureMessage(GrpcResponse response, Throwable throwable) {
switch (response.grpcStatusValue()) {
exporterMetrics.addFailed(numItems);
switch (statusCode) {
case GRPC_STATUS_UNIMPLEMENTED:
if (loggedUnimplemented.compareAndSet(false, true)) {
GrpcExporterUtil.logUnimplemented(internalLogger, type, response.grpcStatusDescription());
GrpcExporterUtil.logUnimplemented(
internalLogger, type, grpcResponse.grpcStatusDescription());
}
break;
case GRPC_STATUS_UNAVAILABLE:
Expand All @@ -107,22 +92,42 @@ private void logFailureMessage(GrpcResponse response, Throwable throwable) {
+ "s. Server is UNAVAILABLE. "
+ "Make sure your collector is running and reachable from this network. "
+ "Full error message:"
+ response.grpcStatusDescription());
+ grpcResponse.grpcStatusDescription());
break;
default:
logger.log(
Level.WARNING,
"Failed to export "
+ type
+ "s. Server responded with gRPC status code "
+ response.grpcStatusValue()
+ statusCode
+ ". Error message: "
+ response.grpcStatusDescription());
+ grpcResponse.grpcStatusDescription());
break;
}
result.failExceptionally(FailedExportException.grpcFailedWithResponse(grpcResponse));
}

private void onError(CompletableResultCode result, int numItems, Throwable e) {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Error message: "
+ e.getMessage(),
e);
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "Failed to export " + type + "s. Details follow: " + throwable);
logger.log(Level.FINEST, "Failed to export " + type + "s. Details follow: " + e);
}
result.failExceptionally(FailedExportException.grpcFailedExceptionally(e));
}

public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return grpcSender.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public static GrpcResponse create(int grpcStatusValue, @Nullable String grpcStat

@Nullable
public abstract String grpcStatusDescription();

// TODO(jack-berg): add byte[] responseBody() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* An exporter of a messages encoded by {@link Marshaler} using the gRPC wire format.
Expand All @@ -17,7 +17,7 @@
*/
public interface GrpcSender<T extends Marshaler> {

void send(T request, Runnable onSuccess, BiConsumer<GrpcResponse, Throwable> onError);
void send(T request, Consumer<GrpcResponse> onResponse, Consumer<Throwable> onError);

/** Shutdown the sender. */
CompletableResultCode shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,52 +63,57 @@ public CompletableResultCode export(T exportRequest, int numItems) {
httpSender.send(
exportRequest,
exportRequest.getBinarySerializedSize(),
httpResponse -> {
int statusCode = httpResponse.statusCode();

if (statusCode >= 200 && statusCode < 300) {
exporterMetrics.addSuccess(numItems);
result.succeed();
return;
}

exporterMetrics.addFailed(numItems);

byte[] body = null;
try {
body = httpResponse.responseBody();
} catch (IOException ex) {
logger.log(Level.FINE, "Unable to obtain response body", ex);
}

String status = extractErrorStatus(httpResponse.statusMessage(), body);

logger.log(
Level.WARNING,
"Failed to export "
+ type
+ "s. Server responded with HTTP status code "
+ statusCode
+ ". Error message: "
+ status);

result.failExceptionally(FailedExportException.httpFailedWithResponse(httpResponse));
},
e -> {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ e.getMessage(),
e);
result.failExceptionally(FailedExportException.httpFailedExceptionally(e));
});
httpResponse -> onResponse(result, numItems, httpResponse),
throwable -> onError(result, numItems, throwable));

return result;
}

private void onResponse(
CompletableResultCode result, int numItems, HttpSender.Response httpResponse) {
int statusCode = httpResponse.statusCode();

if (statusCode >= 200 && statusCode < 300) {
exporterMetrics.addSuccess(numItems);
result.succeed();
return;
}

exporterMetrics.addFailed(numItems);

byte[] body = null;
try {
body = httpResponse.responseBody();
} catch (IOException ex) {
logger.log(Level.FINE, "Unable to obtain response body", ex);
}

String status = extractErrorStatus(httpResponse.statusMessage(), body);

logger.log(
Level.WARNING,
"Failed to export "
+ type
+ "s. Server responded with HTTP status code "
+ statusCode
+ ". Error message: "
+ status);

result.failExceptionally(FailedExportException.httpFailedWithResponse(httpResponse));
}

private void onError(CompletableResultCode result, int numItems, Throwable e) {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ e.getMessage(),
e);
result.failExceptionally(FailedExportException.httpFailedExceptionally(e));
}

public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,19 @@ void connectTimeout() {
long startTimeMillis = System.currentTimeMillis();
CompletableResultCode result =
exporter.export(Collections.singletonList(generateFakeTelemetry()));

assertThat(result.join(10, TimeUnit.SECONDS).isSuccess()).isFalse();

assertThat(result.getFailureThrowable())
.asInstanceOf(
InstanceOfAssertFactories.throwable(FailedExportException.GrpcExportException.class))
.returns(false, Assertions.from(FailedExportException::failedWithResponse))
.satisfies(
ex -> {
assertThat(ex.getResponse()).isNull();
assertThat(ex.getCause()).isNotNull();
});

// Assert that the export request fails well before the default connect timeout of 10s
assertThat(System.currentTimeMillis() - startTimeMillis)
.isLessThan(TimeUnit.SECONDS.toMillis(1));
Expand Down Expand Up @@ -597,12 +608,12 @@ void errorWithUnknownError() {
.getFailureThrowable())
.asInstanceOf(
InstanceOfAssertFactories.throwable(FailedExportException.GrpcExportException.class))
.returns(false, Assertions.from(FailedExportException::failedWithResponse))
.returns(true, Assertions.from(FailedExportException::failedWithResponse))
.satisfies(
ex -> {
assertThat(ex.getResponse()).isNull();
assertThat(ex.getResponse()).isNotNull();

assertThat(ex.getCause()).isNotNull();
assertThat(ex.getCause()).isNull();
});
} finally {
exporter.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.exporter.internal.grpc.GrpcResponse;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
Expand All @@ -20,9 +22,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.Nullable;
import javax.annotation.Nullable;

/**
* A {@link GrpcSender} which uses the upstream grpc-java library.
Expand Down Expand Up @@ -50,7 +52,7 @@ public UpstreamGrpcSender(
}

@Override
public void send(T request, Runnable onSuccess, BiConsumer<GrpcResponse, Throwable> onError) {
public void send(T request, Consumer<GrpcResponse> onResponse, Consumer<Throwable> onError) {
MarshalerServiceStub<T, ?, ?> stub = this.stub;
if (timeoutNanos > 0) {
stub = stub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS);
Expand All @@ -71,19 +73,41 @@ public void send(T request, Runnable onSuccess, BiConsumer<GrpcResponse, Throwab
new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object unused) {
onSuccess.run();
onResponse.accept(
GrpcResponse.create(Status.OK.getCode().value(), Status.OK.getDescription()));
}

@Override
public void onFailure(Throwable t) {
Status status = Status.fromThrowable(t);
onError.accept(
GrpcResponse.create(status.getCode().value(), status.getDescription()), t);
Status status = fromThrowable(t);
if (status == null) {
onError.accept(t);
} else {
onResponse.accept(
GrpcResponse.create(status.getCode().value(), status.getDescription()));
}
}
},
MoreExecutors.directExecutor());
}

/**
* Copy of {@link Status#fromThrowable(Throwable)} which returns null instead of {@link
* Status#UNKNOWN} when no status can be found.
*/
@Nullable
private static Status fromThrowable(Throwable cause) {
while (cause != null) {
if (cause instanceof StatusException) {
return ((StatusException) cause).getStatus();
} else if (cause instanceof StatusRuntimeException) {
return ((StatusRuntimeException) cause).getStatus();
}
cause = cause.getCause();
}
return null;
}

@Override
public CompletableResultCode shutdown() {
if (shutdownChannel) {
Expand Down
Loading

0 comments on commit 1f6de35

Please sign in to comment.