diff --git a/README.md b/README.md
index 046114f8a8..d7aacbf1db 100644
--- a/README.md
+++ b/README.md
@@ -106,18 +106,50 @@ use this BigQuery Storage Client Library.
Samples are in the [`samples/`](https://github.com/googleapis/java-bigquerystorage/tree/main/samples) directory.
-| Sample | Source Code | Try it |
-| --------------------------- | --------------------------------- | ------ |
-| Json Writer Stream Cdc | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java) |
+| Sample | Source Code | Try it |
+|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Json Writer Stream Cdc | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java) |
| Parallel Write Committed Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java) |
-| Storage Arrow Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) |
-| Storage Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) |
-| Write Buffered Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java) |
-| Write Committed Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java) |
-| Write Pending Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java) |
-| Write To Default Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java) |
+| Storage Arrow Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) |
+| Storage Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) |
+| Write Buffered Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java) |
+| Write Committed Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java) |
+| Write Pending Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java) |
+| Write To Default Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java) |
+| Export OpenTelemetry Metrics | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java) |
+## OpenTelemetry Support
+The client supports emitting metrics to OpenTelemetry. This is disabled by default. It can be enabled by calling
+
+```aidl
+JsonStreamWriter.Builder.setEnableOpenTelemetry(true)
+```
+The following attributes are supported.
+
+| Key | Value |
+|-----------------|-----------------------------------------------------------|
+| `table_id` | Holds fully qualified name of destination table |
+| `writer_id` | Specifies writer instance id associated with queue lengths. |
+| `trace_field_1` | If a colon-separated traceId is provided, this holds the first portion. Must be non-empty. Currently populated only for Dataflow. |
+| `trace_field_2` | If a colon-separated traceId is provided, this holds the second portion. Must be non-empty. Currently populated only for Dataflow. |
+| `trace_field_3` | If a colon-separated traceId is provided, this holds the third portion. Must be non-empty. Currently populated only for Dataflow. |
+| `error_code` | Specifies error code in the event an append request fails, or a connection ends. |
+| `is_retry` | Indicates this was a retry operation. This can be set for either ack’ed requests or connection retry attempts. |
+
+
+The following metrics are supported.
+
+| Name | Kind | Description |
+|------------------------------|---------------------|------------------------------------------------------------------------------------------------------------------|
+| `append_requests_acked` | Synchronous counter | Counts number of requests acked by the server |
+| `append_request_bytes_acked` | Synchronous counter | Counts byte size of requests acked by the server |
+| `append_rows_acked` | Synchronous counter | Counts number of rows in requests acked by the server |
+| `active_connection_count` | Asynchronous gauge | Reports number of active connections |
+| `inflight_queue_length` | Asynchronous gauge | Reports length of inflight queue. This queue contains sent append requests waiting for response from the server. |
+| `network_response_latency` | Histogram | Reports time taken in milliseconds for a response to arrive once a message has been sent over the network. |
+| `connection_start_count` | Synchronous counter | Counts number of connection attempts made, regardless of whether these are initial or retry. |
+| `connection_end_count` | Synchronous counter | Counts number of connection end events. This is decorated with the error code. |
## Troubleshooting
diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml
index bf0c8689a6..0008193b82 100644
--- a/samples/install-without-bom/pom.xml
+++ b/samples/install-without-bom/pom.xml
@@ -54,6 +54,16 @@
arrow-memory-netty
${arrow.version}
+
+ io.opentelemetry
+ opentelemetry-exporter-logging
+ 1.41.0
+
+
+ com.google.cloud.opentelemetry
+ exporter-metrics
+ 0.31.0
+
junit
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index ddf0cf1a1e..9f37ed4c68 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -55,6 +55,17 @@
${arrow.version}
+
+ io.opentelemetry
+ opentelemetry-exporter-logging
+ 1.41.0
+
+
+ com.google.cloud.opentelemetry
+ exporter-metrics
+ 0.31.0
+
+
junit
junit
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index f12e76702e..f203d33a4a 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -35,6 +35,13 @@
pom
import
+
+ io.opentelemetry
+ opentelemetry-bom
+ 1.41.0
+ pom
+ import
+
@@ -65,6 +72,16 @@
arrow-memory-netty
${arrow.version}
+
+ io.opentelemetry
+ opentelemetry-exporter-logging
+
+
+ com.google.cloud.opentelemetry
+ exporter-metrics
+ 0.31.0
+
+
diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java
new file mode 100644
index 0000000000..08604d4d9d
--- /dev/null
+++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java
@@ -0,0 +1,335 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.bigquerystorage;
+
+// [START bigquerystorage_jsonstreamwriter_export]
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.QueryJobConfiguration;
+import com.google.cloud.bigquery.TableResult;
+import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
+import com.google.cloud.bigquery.storage.v1.Exceptions;
+import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
+import com.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException;
+import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
+import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;
+import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
+import com.google.cloud.bigquery.storage.v1.TableName;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.concurrent.GuardedBy;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.threeten.bp.Duration;
+
+public class ExportOpenTelemetry {
+
+ public static void runExportToOpenTelemetry()
+ throws DescriptorValidationException, InterruptedException, IOException {
+ // TODO(developer): Replace these variables before running the sample.
+ String projectId = "MY_PROJECT_ID";
+ String datasetName = "MY_DATASET_NAME";
+ String tableName = "MY_TABLE_NAME";
+ exportToOpenTelemetry(projectId, datasetName, tableName);
+ }
+
+ private static ByteString buildByteString() {
+ byte[] bytes = new byte[] {1, 2, 3, 4, 5};
+ return ByteString.copyFrom(bytes);
+ }
+
+ // Create a JSON object that is compatible with the table schema.
+ private static JSONObject buildRecord(int i, int j) {
+ JSONObject record = new JSONObject();
+ StringBuilder sbSuffix = new StringBuilder();
+ for (int k = 0; k < j; k++) {
+ sbSuffix.append(k);
+ }
+ record.put("test_string", String.format("record %03d-%03d %s", i, j, sbSuffix.toString()));
+ ByteString byteString = buildByteString();
+ record.put("test_bytes", byteString);
+ record.put(
+ "test_geo",
+ "POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-116.49 47.35,-124.49 47.35))");
+ return record;
+ }
+
+ public static void exportToOpenTelemetry(String projectId, String datasetName, String tableName)
+ throws DescriptorValidationException, InterruptedException, IOException {
+ TableName parentTable = TableName.of(projectId, datasetName, tableName);
+
+ DataWriter writer = new DataWriter();
+ // One time initialization for the worker.
+ writer.initialize(parentTable);
+
+ // Write two batches of fake data to the stream, each with 10 JSON records. Data may be
+ // batched up to the maximum request size:
+ // https://cloud.google.com/bigquery/quotas#write-api-limits
+ for (int i = 0; i < 2; i++) {
+ JSONArray jsonArr = new JSONArray();
+ for (int j = 0; j < 10; j++) {
+ JSONObject record = buildRecord(i, j);
+ jsonArr.put(record);
+ }
+
+ writer.append(new AppendContext(jsonArr));
+ }
+
+ // Final cleanup for the stream during worker teardown.
+ writer.cleanup();
+ verifyExpectedRowCount(parentTable, 12);
+ System.out.println("Appended records successfully.");
+ }
+
+ private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount)
+ throws InterruptedException {
+ String queryRowCount =
+ "SELECT COUNT(*) FROM `"
+ + parentTable.getProject()
+ + "."
+ + parentTable.getDataset()
+ + "."
+ + parentTable.getTable()
+ + "`";
+ QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build();
+ BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
+ TableResult results = bigquery.query(queryConfig);
+ int countRowsActual =
+ Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue());
+ if (countRowsActual != expectedRowCount) {
+ throw new RuntimeException(
+ "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual);
+ }
+ }
+
+ private static class AppendContext {
+
+ JSONArray data;
+
+ AppendContext(JSONArray data) {
+ this.data = data;
+ }
+ }
+
+ private static class DataWriter {
+
+ private static final int MAX_RECREATE_COUNT = 3;
+
+ private BigQueryWriteClient client;
+
+ // Track the number of in-flight requests to wait for all responses before shutting down.
+ private final Phaser inflightRequestCount = new Phaser(1);
+ private final Object lock = new Object();
+ private JsonStreamWriter streamWriter;
+
+ @GuardedBy("lock")
+ private RuntimeException error = null;
+
+ private AtomicInteger recreateCount = new AtomicInteger(0);
+
+ private JsonStreamWriter createStreamWriter(String tableName)
+ throws DescriptorValidationException, IOException, InterruptedException {
+ // Configure in-stream automatic retry settings.
+ // Error codes that are immediately retried:
+ // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
+ // Error codes that are retried with exponential backoff:
+ // * RESOURCE_EXHAUSTED
+ RetrySettings retrySettings =
+ RetrySettings.newBuilder()
+ .setInitialRetryDelay(Duration.ofMillis(500))
+ .setRetryDelayMultiplier(1.1)
+ .setMaxAttempts(5)
+ .setMaxRetryDelay(Duration.ofMinutes(1))
+ .build();
+
+ // Use the JSON stream writer to send records in JSON format. Specify the table name to write
+ // to the default stream.
+ // For more information about JsonStreamWriter, see:
+ // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
+ return JsonStreamWriter.newBuilder(tableName, client)
+ .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
+ .setChannelProvider(
+ BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
+ .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
+ .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1))
+ .setKeepAliveWithoutCalls(true)
+ .setChannelsPerCpu(2)
+ .build())
+ .setEnableConnectionPool(true)
+ .setEnableOpenTelemetry(true)
+ // If value is missing in json and there is a default value configured on bigquery
+ // column, apply the default value to the missing value field.
+ .setDefaultMissingValueInterpretation(
+ AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE)
+ .setRetrySettings(retrySettings)
+ .build();
+ }
+
+ public void initialize(TableName parentTable)
+ throws DescriptorValidationException, IOException, InterruptedException {
+ // Initialize client without settings, internally within stream writer a new client will be
+ // created with full settings.
+ client = BigQueryWriteClient.create();
+
+ streamWriter = createStreamWriter(parentTable.toString());
+ }
+
+ public void append(AppendContext appendContext)
+ throws DescriptorValidationException, IOException, InterruptedException {
+ synchronized (this.lock) {
+ if (!streamWriter.isUserClosed()
+ && streamWriter.isClosed()
+ && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
+ streamWriter = createStreamWriter(streamWriter.getStreamName());
+ this.error = null;
+ }
+ // If earlier appends have failed, we need to reset before continuing.
+ if (this.error != null) {
+ throw this.error;
+ }
+ }
+ // Append asynchronously for increased throughput.
+ ApiFuture future = streamWriter.append(appendContext.data);
+ ApiFutures.addCallback(
+ future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor());
+
+ // Increase the count of in-flight requests.
+ inflightRequestCount.register();
+ }
+
+ public void cleanup() {
+ // Wait for all in-flight requests to complete.
+ inflightRequestCount.arriveAndAwaitAdvance();
+
+ client.close();
+ // Close the connection to the server.
+ streamWriter.close();
+
+ // Verify that no error occurred in the stream.
+ synchronized (this.lock) {
+ if (this.error != null) {
+ throw this.error;
+ }
+ }
+ }
+
+ static class AppendCompleteCallback implements ApiFutureCallback {
+
+ private final DataWriter parent;
+ private final AppendContext appendContext;
+
+ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
+ this.parent = parent;
+ this.appendContext = appendContext;
+ }
+
+ public void onSuccess(AppendRowsResponse response) {
+ System.out.format("Append success\n");
+ this.parent.recreateCount.set(0);
+ done();
+ }
+
+ public void onFailure(Throwable throwable) {
+ if (throwable instanceof AppendSerializationError) {
+ AppendSerializationError ase = (AppendSerializationError) throwable;
+ Map rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
+ if (rowIndexToErrorMessage.size() > 0) {
+ // Omit the faulty rows
+ JSONArray dataNew = new JSONArray();
+ for (int i = 0; i < appendContext.data.length(); i++) {
+ if (!rowIndexToErrorMessage.containsKey(i)) {
+ dataNew.put(appendContext.data.get(i));
+ } else {
+ // process faulty rows by placing them on a dead-letter-queue, for instance
+ }
+ }
+
+ // Retry the remaining valid rows, but using a separate thread to
+ // avoid potentially blocking while we are in a callback.
+ if (dataNew.length() > 0) {
+ try {
+ this.parent.append(new AppendContext(dataNew));
+ } catch (DescriptorValidationException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ // Mark the existing attempt as done since we got a response for it
+ done();
+ return;
+ }
+ }
+
+ boolean resendRequest = false;
+ if (throwable instanceof MaximumRequestCallbackWaitTimeExceededException) {
+ resendRequest = true;
+ } else if (throwable instanceof StreamWriterClosedException) {
+ if (!parent.streamWriter.isUserClosed()) {
+ resendRequest = true;
+ }
+ }
+ if (resendRequest) {
+ // Retry this request.
+ try {
+ this.parent.append(new AppendContext(appendContext.data));
+ } catch (DescriptorValidationException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ // Mark the existing attempt as done since we got a response for it
+ done();
+ return;
+ }
+
+ synchronized (this.parent.lock) {
+ if (this.parent.error == null) {
+ StorageException storageException = Exceptions.toStorageException(throwable);
+ this.parent.error =
+ (storageException != null) ? storageException : new RuntimeException(throwable);
+ }
+ }
+ done();
+ }
+
+ private void done() {
+ // Reduce the count of in-flight requests.
+ this.parent.inflightRequestCount.arriveAndDeregister();
+ }
+ }
+ }
+}
+// [END bigquerystorage_jsonstreamwriter_export]
diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/ExportOpenTelemetryIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/ExportOpenTelemetryIT.java
new file mode 100644
index 0000000000..341bdabe37
--- /dev/null
+++ b/samples/snippets/src/test/java/com/example/bigquerystorage/ExportOpenTelemetryIT.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.bigquerystorage;
+
+import static com.google.common.truth.Truth.assertThat;
+import static junit.framework.TestCase.assertNotNull;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.DatasetId;
+import com.google.cloud.bigquery.DatasetInfo;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.StandardTableDefinition;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.opentelemetry.metric.GoogleCloudMetricExporter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.UUID;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ExportOpenTelemetryIT {
+
+ private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
+ private static SdkMeterProvider METER_PROVIDER = null;
+
+ private ByteArrayOutputStream bout;
+ private PrintStream out;
+ private BigQuery bigquery;
+ private String datasetName;
+ private String tableName;
+
+ private static void requireEnvVar(String varName) {
+ assertNotNull(
+ "Environment variable " + varName + " is required to perform these tests.",
+ System.getenv(varName));
+ }
+
+ @BeforeClass
+ public static void checkRequirements() {
+ requireEnvVar("GOOGLE_CLOUD_PROJECT");
+ }
+
+ private void setupGoogleCloudMonitoringOT() {
+ MetricExporter metricExporter = GoogleCloudMetricExporter.createWithDefaultConfiguration();
+ METER_PROVIDER =
+ SdkMeterProvider.builder()
+ .registerMetricReader(
+ PeriodicMetricReader.builder(metricExporter)
+ .setInterval(java.time.Duration.ofMillis(30000))
+ .build())
+ .build();
+ OpenTelemetrySdk.builder().setMeterProvider(METER_PROVIDER).buildAndRegisterGlobal();
+ }
+
+ @Before
+ public void setUp() {
+ bout = new ByteArrayOutputStream();
+ out = new PrintStream(bout);
+ System.setOut(out);
+
+ setupGoogleCloudMonitoringOT();
+ bigquery = BigQueryOptions.getDefaultInstance().getService();
+
+ // Create a new dataset and table for each test.
+ datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
+ tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
+ Schema schema =
+ Schema.of(
+ com.google.cloud.bigquery.Field.newBuilder("test_string", StandardSQLTypeName.STRING)
+ .setMaxLength(20L)
+ .build(),
+ com.google.cloud.bigquery.Field.newBuilder("test_bytes", StandardSQLTypeName.BYTES)
+ .build(),
+ com.google.cloud.bigquery.Field.newBuilder("test_geo", StandardSQLTypeName.GEOGRAPHY)
+ .build());
+ bigquery.create(DatasetInfo.newBuilder(datasetName).build());
+ TableInfo tableInfo =
+ TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema))
+ .build();
+ bigquery.create(tableInfo);
+ }
+
+ @After
+ public void tearDown() {
+ bigquery.delete(
+ DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents());
+ System.setOut(null);
+ if (METER_PROVIDER != null) {
+ METER_PROVIDER.shutdown();
+ }
+ }
+
+ @Test
+ public void testExportOpenTelemetry() throws Exception {
+ ExportOpenTelemetry.exportToOpenTelemetry(GOOGLE_CLOUD_PROJECT, datasetName, tableName);
+ assertThat(bout.toString()).contains("Appended records successfully.");
+ }
+}