Skip to content

Commit

Permalink
Bug fix (#12)
Browse files Browse the repository at this point in the history
* fix: bytes to base64 in old api

* fix: only set 4xx for the malformed rows
  • Loading branch information
lavkesh authored May 10, 2023
1 parent 62c05ec commit 2ca9499
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ plugins {
}

group 'com.gotocompany'
version '0.4.4'
version '0.4.5'

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ public void init() {
} catch (IOException e) {
throw new IllegalArgumentException("Exception occurred while creating sink", e);
}


}

public Sink create() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,14 @@ public void setSinkResponseForException(
SinkResponse sinkResponse) {
io.grpc.Status status = io.grpc.Status.fromThrowable(cause);
instrumentation.logError("Error from exception: {} ", status);
if (BigQueryStorageResponseParser.shouldRetry(status)) {
if (cause instanceof Exceptions.AppendSerializationError) {
// first set all messages to retryable
IntStream.range(0, payload.getPayloadIndexes().size())
.forEach(index -> {
sinkResponse.addErrors(payload.getInputIndex(index), new ErrorInfo(new Exception(cause), ErrorType.SINK_5XX_ERROR));
instrumentErrors(status.getCode());
});
} else {
IntStream.range(0, payload.getPayloadIndexes().size())
.forEach(index -> {
sinkResponse.addErrors(payload.getInputIndex(index), new ErrorInfo(new Exception(cause), ErrorType.SINK_4XX_ERROR));
instrumentErrors(status.getCode());
});
}
if (cause instanceof Exceptions.AppendSerializationError) {
// then set non retryable messages
Exceptions.AppendSerializationError ase = (Exceptions.AppendSerializationError) cause;
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
rowIndexToErrorMessage.forEach((index, err) -> {
Expand All @@ -176,6 +170,20 @@ public void setSinkResponseForException(
sinkResponse.addErrors(inputIndex, errorInfo);
instrumentErrors(BigQueryMetrics.BigQueryStorageAPIError.ROW_APPEND_ERROR);
});
} else {
if (BigQueryStorageResponseParser.shouldRetry(status)) {
IntStream.range(0, payload.getPayloadIndexes().size())
.forEach(index -> {
sinkResponse.addErrors(payload.getInputIndex(index), new ErrorInfo(new Exception(cause), ErrorType.SINK_5XX_ERROR));
instrumentErrors(status.getCode());
});
} else {
IntStream.range(0, payload.getPayloadIndexes().size())
.forEach(index -> {
sinkResponse.addErrors(payload.getInputIndex(index), new ErrorInfo(new Exception(cause), ErrorType.SINK_4XX_ERROR));
instrumentErrors(status.getCode());
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void shouldSetSinkResponseForExceptionWithAppendError() {
responseParser.setSinkResponseForException(cause, payload, messages, response);
Assert.assertEquals(3, response.getErrors().size());
Assert.assertEquals(ErrorType.SINK_4XX_ERROR, response.getErrors().get(0L).getErrorType());
Assert.assertEquals(ErrorType.SINK_4XX_ERROR, response.getErrors().get(3L).getErrorType());
Assert.assertEquals(ErrorType.SINK_5XX_ERROR, response.getErrors().get(3L).getErrorType());
Assert.assertEquals(ErrorType.SINK_4XX_ERROR, response.getErrors().get(4L).getErrorType());
Assert.assertEquals("message1", response.getErrors().get(0L).getException().getMessage());
Assert.assertEquals("com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: UNKNOWN: test error", response.getErrors().get(3L).getException().getMessage());
Expand Down

0 comments on commit 2ca9499

Please sign in to comment.