From 5852ea1b52e395b0ea73c441f3fb33ca98224aef Mon Sep 17 00:00:00 2001 From: Siddharth Agrawal Date: Fri, 30 Aug 2024 08:04:11 -0700 Subject: [PATCH] chore: demonstrate exporting OpenTelemetry logs to Google Cloud (#2573) Monitoring --- README.md | 50 ++- samples/install-without-bom/pom.xml | 10 + samples/snapshot/pom.xml | 11 + samples/snippets/pom.xml | 17 + .../bigquerystorage/ExportOpenTelemetry.java | 335 ++++++++++++++++++ .../ExportOpenTelemetryIT.java | 125 +++++++ 6 files changed, 539 insertions(+), 9 deletions(-) create mode 100644 samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java create mode 100644 samples/snippets/src/test/java/com/example/bigquerystorage/ExportOpenTelemetryIT.java diff --git a/README.md b/README.md index 046114f8a8..d7aacbf1db 100644 --- a/README.md +++ b/README.md @@ -106,18 +106,50 @@ use this BigQuery Storage Client Library. Samples are in the [`samples/`](https://github.com/googleapis/java-bigquerystorage/tree/main/samples) directory. -| Sample | Source Code | Try it | -| --------------------------- | --------------------------------- | ------ | -| Json Writer Stream Cdc | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java) | +| Sample | Source Code | Try it | +|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Json Writer Stream Cdc | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java) | | Parallel Write Committed Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java) | -| Storage Arrow Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) | -| Storage Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) | -| Write Buffered Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java) | -| Write Committed Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java) | -| Write Pending Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java) | -| Write To Default Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java) | +| Storage Arrow Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) | +| Storage Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) | +| Write Buffered Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java) | +| Write Committed Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java) | +| Write Pending Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java) | +| Write To Default Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java) | +| Export OpenTelemetry Metrics | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java) | +## OpenTelemetry Support +The client supports emitting metrics to OpenTelemetry. This is disabled by default. It can be enabled by calling + +```aidl +JsonStreamWriter.Builder.setEnableOpenTelemetry(true) +``` +The following attributes are supported. + +| Key | Value | +|-----------------|-----------------------------------------------------------| +| `table_id` | Holds fully qualified name of destination table | +| `writer_id` | Specifies writer instance id associated with queue lengths. | +| `trace_field_1` | If a colon-separated traceId is provided, this holds the first portion. Must be non-empty. Currently populated only for Dataflow. | +| `trace_field_2` | If a colon-separated traceId is provided, this holds the second portion. Must be non-empty. Currently populated only for Dataflow. | +| `trace_field_3` | If a colon-separated traceId is provided, this holds the third portion. Must be non-empty. Currently populated only for Dataflow. | +| `error_code` | Specifies error code in the event an append request fails, or a connection ends. | +| `is_retry` | Indicates this was a retry operation. This can be set for either ack’ed requests or connection retry attempts. | + + +The following metrics are supported. + +| Name | Kind | Description | +|------------------------------|---------------------|------------------------------------------------------------------------------------------------------------------| +| `append_requests_acked` | Synchronous counter | Counts number of requests acked by the server | +| `append_request_bytes_acked` | Synchronous counter | Counts byte size of requests acked by the server | +| `append_rows_acked` | Synchronous counter | Counts number of rows in requests acked by the server | +| `active_connection_count` | Asynchronous gauge | Reports number of active connections | +| `inflight_queue_length` | Asynchronous gauge | Reports length of inflight queue. This queue contains sent append requests waiting for response from the server. | +| `network_response_latency` | Histogram | Reports time taken in milliseconds for a response to arrive once a message has been sent over the network. | +| `connection_start_count` | Synchronous counter | Counts number of connection attempts made, regardless of whether these are initial or retry. | +| `connection_end_count` | Synchronous counter | Counts number of connection end events. This is decorated with the error code. | ## Troubleshooting diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml index bf0c8689a6..0008193b82 100644 --- a/samples/install-without-bom/pom.xml +++ b/samples/install-without-bom/pom.xml @@ -54,6 +54,16 @@ arrow-memory-netty ${arrow.version} + + io.opentelemetry + opentelemetry-exporter-logging + 1.41.0 + + + com.google.cloud.opentelemetry + exporter-metrics + 0.31.0 + junit diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index ddf0cf1a1e..9f37ed4c68 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -55,6 +55,17 @@ ${arrow.version} + + io.opentelemetry + opentelemetry-exporter-logging + 1.41.0 + + + com.google.cloud.opentelemetry + exporter-metrics + 0.31.0 + + junit junit diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index f12e76702e..f203d33a4a 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -35,6 +35,13 @@ pom import + + io.opentelemetry + opentelemetry-bom + 1.41.0 + pom + import + @@ -65,6 +72,16 @@ arrow-memory-netty ${arrow.version} + + io.opentelemetry + opentelemetry-exporter-logging + + + com.google.cloud.opentelemetry + exporter-metrics + 0.31.0 + + diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java new file mode 100644 index 0000000000..08604d4d9d --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java @@ -0,0 +1,335 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +// [START bigquerystorage_jsonstreamwriter_export] +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; +import com.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException; +import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.GuardedBy; +import org.json.JSONArray; +import org.json.JSONObject; +import org.threeten.bp.Duration; + +public class ExportOpenTelemetry { + + public static void runExportToOpenTelemetry() + throws DescriptorValidationException, InterruptedException, IOException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "MY_PROJECT_ID"; + String datasetName = "MY_DATASET_NAME"; + String tableName = "MY_TABLE_NAME"; + exportToOpenTelemetry(projectId, datasetName, tableName); + } + + private static ByteString buildByteString() { + byte[] bytes = new byte[] {1, 2, 3, 4, 5}; + return ByteString.copyFrom(bytes); + } + + // Create a JSON object that is compatible with the table schema. + private static JSONObject buildRecord(int i, int j) { + JSONObject record = new JSONObject(); + StringBuilder sbSuffix = new StringBuilder(); + for (int k = 0; k < j; k++) { + sbSuffix.append(k); + } + record.put("test_string", String.format("record %03d-%03d %s", i, j, sbSuffix.toString())); + ByteString byteString = buildByteString(); + record.put("test_bytes", byteString); + record.put( + "test_geo", + "POLYGON((-124.49 47.35,-124.49 40.73,-116.49 40.73,-116.49 47.35,-124.49 47.35))"); + return record; + } + + public static void exportToOpenTelemetry(String projectId, String datasetName, String tableName) + throws DescriptorValidationException, InterruptedException, IOException { + TableName parentTable = TableName.of(projectId, datasetName, tableName); + + DataWriter writer = new DataWriter(); + // One time initialization for the worker. + writer.initialize(parentTable); + + // Write two batches of fake data to the stream, each with 10 JSON records. Data may be + // batched up to the maximum request size: + // https://cloud.google.com/bigquery/quotas#write-api-limits + for (int i = 0; i < 2; i++) { + JSONArray jsonArr = new JSONArray(); + for (int j = 0; j < 10; j++) { + JSONObject record = buildRecord(i, j); + jsonArr.put(record); + } + + writer.append(new AppendContext(jsonArr)); + } + + // Final cleanup for the stream during worker teardown. + writer.cleanup(); + verifyExpectedRowCount(parentTable, 12); + System.out.println("Appended records successfully."); + } + + private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount) + throws InterruptedException { + String queryRowCount = + "SELECT COUNT(*) FROM `" + + parentTable.getProject() + + "." + + parentTable.getDataset() + + "." + + parentTable.getTable() + + "`"; + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + TableResult results = bigquery.query(queryConfig); + int countRowsActual = + Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + if (countRowsActual != expectedRowCount) { + throw new RuntimeException( + "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); + } + } + + private static class AppendContext { + + JSONArray data; + + AppendContext(JSONArray data) { + this.data = data; + } + } + + private static class DataWriter { + + private static final int MAX_RECREATE_COUNT = 3; + + private BigQueryWriteClient client; + + // Track the number of in-flight requests to wait for all responses before shutting down. + private final Phaser inflightRequestCount = new Phaser(1); + private final Object lock = new Object(); + private JsonStreamWriter streamWriter; + + @GuardedBy("lock") + private RuntimeException error = null; + + private AtomicInteger recreateCount = new AtomicInteger(0); + + private JsonStreamWriter createStreamWriter(String tableName) + throws DescriptorValidationException, IOException, InterruptedException { + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + + // Use the JSON stream writer to send records in JSON format. Specify the table name to write + // to the default stream. + // For more information about JsonStreamWriter, see: + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html + return JsonStreamWriter.newBuilder(tableName, client) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .setChannelsPerCpu(2) + .build()) + .setEnableConnectionPool(true) + .setEnableOpenTelemetry(true) + // If value is missing in json and there is a default value configured on bigquery + // column, apply the default value to the missing value field. + .setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) + .setRetrySettings(retrySettings) + .build(); + } + + public void initialize(TableName parentTable) + throws DescriptorValidationException, IOException, InterruptedException { + // Initialize client without settings, internally within stream writer a new client will be + // created with full settings. + client = BigQueryWriteClient.create(); + + streamWriter = createStreamWriter(parentTable.toString()); + } + + public void append(AppendContext appendContext) + throws DescriptorValidationException, IOException, InterruptedException { + synchronized (this.lock) { + if (!streamWriter.isUserClosed() + && streamWriter.isClosed() + && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) { + streamWriter = createStreamWriter(streamWriter.getStreamName()); + this.error = null; + } + // If earlier appends have failed, we need to reset before continuing. + if (this.error != null) { + throw this.error; + } + } + // Append asynchronously for increased throughput. + ApiFuture future = streamWriter.append(appendContext.data); + ApiFutures.addCallback( + future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor()); + + // Increase the count of in-flight requests. + inflightRequestCount.register(); + } + + public void cleanup() { + // Wait for all in-flight requests to complete. + inflightRequestCount.arriveAndAwaitAdvance(); + + client.close(); + // Close the connection to the server. + streamWriter.close(); + + // Verify that no error occurred in the stream. + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + } + + static class AppendCompleteCallback implements ApiFutureCallback { + + private final DataWriter parent; + private final AppendContext appendContext; + + public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) { + this.parent = parent; + this.appendContext = appendContext; + } + + public void onSuccess(AppendRowsResponse response) { + System.out.format("Append success\n"); + this.parent.recreateCount.set(0); + done(); + } + + public void onFailure(Throwable throwable) { + if (throwable instanceof AppendSerializationError) { + AppendSerializationError ase = (AppendSerializationError) throwable; + Map rowIndexToErrorMessage = ase.getRowIndexToErrorMessage(); + if (rowIndexToErrorMessage.size() > 0) { + // Omit the faulty rows + JSONArray dataNew = new JSONArray(); + for (int i = 0; i < appendContext.data.length(); i++) { + if (!rowIndexToErrorMessage.containsKey(i)) { + dataNew.put(appendContext.data.get(i)); + } else { + // process faulty rows by placing them on a dead-letter-queue, for instance + } + } + + // Retry the remaining valid rows, but using a separate thread to + // avoid potentially blocking while we are in a callback. + if (dataNew.length() > 0) { + try { + this.parent.append(new AppendContext(dataNew)); + } catch (DescriptorValidationException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + // Mark the existing attempt as done since we got a response for it + done(); + return; + } + } + + boolean resendRequest = false; + if (throwable instanceof MaximumRequestCallbackWaitTimeExceededException) { + resendRequest = true; + } else if (throwable instanceof StreamWriterClosedException) { + if (!parent.streamWriter.isUserClosed()) { + resendRequest = true; + } + } + if (resendRequest) { + // Retry this request. + try { + this.parent.append(new AppendContext(appendContext.data)); + } catch (DescriptorValidationException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // Mark the existing attempt as done since we got a response for it + done(); + return; + } + + synchronized (this.parent.lock) { + if (this.parent.error == null) { + StorageException storageException = Exceptions.toStorageException(throwable); + this.parent.error = + (storageException != null) ? storageException : new RuntimeException(throwable); + } + } + done(); + } + + private void done() { + // Reduce the count of in-flight requests. + this.parent.inflightRequestCount.arriveAndDeregister(); + } + } + } +} +// [END bigquerystorage_jsonstreamwriter_export] diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/ExportOpenTelemetryIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/ExportOpenTelemetryIT.java new file mode 100644 index 0000000000..341bdabe37 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/ExportOpenTelemetryIT.java @@ -0,0 +1,125 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.opentelemetry.metric.GoogleCloudMetricExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ExportOpenTelemetryIT { + + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static SdkMeterProvider METER_PROVIDER = null; + + private ByteArrayOutputStream bout; + private PrintStream out; + private BigQuery bigquery; + private String datasetName; + private String tableName; + + private static void requireEnvVar(String varName) { + assertNotNull( + "Environment variable " + varName + " is required to perform these tests.", + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar("GOOGLE_CLOUD_PROJECT"); + } + + private void setupGoogleCloudMonitoringOT() { + MetricExporter metricExporter = GoogleCloudMetricExporter.createWithDefaultConfiguration(); + METER_PROVIDER = + SdkMeterProvider.builder() + .registerMetricReader( + PeriodicMetricReader.builder(metricExporter) + .setInterval(java.time.Duration.ofMillis(30000)) + .build()) + .build(); + OpenTelemetrySdk.builder().setMeterProvider(METER_PROVIDER).buildAndRegisterGlobal(); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + System.setOut(out); + + setupGoogleCloudMonitoringOT(); + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("test_string", StandardSQLTypeName.STRING) + .setMaxLength(20L) + .build(), + com.google.cloud.bigquery.Field.newBuilder("test_bytes", StandardSQLTypeName.BYTES) + .build(), + com.google.cloud.bigquery.Field.newBuilder("test_geo", StandardSQLTypeName.GEOGRAPHY) + .build()); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); + } + + @After + public void tearDown() { + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents()); + System.setOut(null); + if (METER_PROVIDER != null) { + METER_PROVIDER.shutdown(); + } + } + + @Test + public void testExportOpenTelemetry() throws Exception { + ExportOpenTelemetry.exportToOpenTelemetry(GOOGLE_CLOUD_PROJECT, datasetName, tableName); + assertThat(bout.toString()).contains("Appended records successfully."); + } +}