Skip to content

Commit

Permalink
Fix acceptance tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jphalip committed Apr 25, 2024
1 parent 7971a8a commit d9cea3e
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 75 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -859,13 +859,13 @@ modules, you can break it down into several steps, and only rerun the necessary
```sh
# Install hive-bigquery-parent/pom.xml to Maven local repo
mvn install:install-file -Dpackaging=pom -Dfile=hive-bigquery-parent/pom.xml -DpomFile=hive-bigquery-parent/pom.xml
./mvnw install:install-file -Dpackaging=pom -Dfile=hive-bigquery-parent/pom.xml -DpomFile=hive-bigquery-parent/pom.xml
# Build and install shaded-dependencies and shaded-test-dependencies jars to Maven local repo
mvn clean install -pl shaded-dependencies,shaded-test-dependencies -Pdataproc21 -DskipTests
./mvnw clean install -pl shaded-deps-dataproc21,shaded-acceptance-tests-dependencies -Pdataproc21 -DskipTests
# Build and test connector
mvn clean verify -pl connector -Pdataproc21,acceptance
# Build and test the connector
./mvnw clean verify -pl hive-bigquery-connector-common,hive-3-bigquery-connector -Pdataproc21,acceptance
```
##### Running the tests for different Hadoop versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@
*/
package com.google.cloud.hive.bigquery.connector.acceptance;

import com.google.common.base.Preconditions;
import org.apache.parquet.Strings;

public class AcceptanceTestConstants {

public static final String REGION = "us-west1";
public static final String DATAPROC_ENDPOINT = REGION + "-dataproc.googleapis.com:443";
public static final String PROJECT_ID =
Preconditions.checkNotNull(
System.getenv("GOOGLE_CLOUD_PROJECT"),
"Please set the 'GOOGLE_CLOUD_PROJECT' environment variable");

public static final boolean CLEAN_UP_CLUSTER =
Strings.isNullOrEmpty(System.getenv("CLEAN_UP_CLUSTER"))
Expand All @@ -41,7 +36,7 @@ public class AcceptanceTestConstants {
: Boolean.parseBoolean(System.getenv("CLEAN_UP_GCS"));

public static final String CONNECTOR_JAR_DIRECTORY = "target";
public static final String CONNECTOR_JAR_PREFIX = "hive-bigquery-connector";
public static final String CONNECTOR_JAR_PREFIX = "hive-3-bigquery-connector";
public static final String CONNECTOR_INIT_ACTION_PATH = "/acceptance/connectors.sh";

public static final String MIN_BIG_NUMERIC =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.hive.bigquery.connector.TestUtils;
import com.google.cloud.storage.*;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
Expand All @@ -39,9 +40,7 @@
import java.text.SimpleDateFormat;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class AcceptanceTestUtils {
Expand Down Expand Up @@ -75,7 +74,7 @@ public String getMarker() {
}

// must be set in order to run the acceptance test
static final String BUCKET = System.getenv("ACCEPTANCE_TEST_BUCKET");
static final String ACCEPTANCE_TEST_BUCKET_VAR = "ACCEPTANCE_TEST_BUCKET";
private static final BigQuery bq = BigQueryOptions.getDefaultInstance().getService();

static Storage storage =
Expand Down Expand Up @@ -163,8 +162,13 @@ public static BlobId uploadToGcs(ByteBuffer content, String destinationUri, Stri
return blobId;
}

public static String getAcceptanceTestBucket() {
return System.getenv()
.getOrDefault(ACCEPTANCE_TEST_BUCKET_VAR, TestUtils.getProject() + "-acceptance-tests");
}

public static String createTestBaseGcsDir(String testId) {
return String.format("gs://%s/hivebq-tests/%s", BUCKET, testId);
return String.format("gs://%s/hivebq-tests/%s", getAcceptanceTestBucket(), testId);
}

public static Blob getBlob(String gcsDirUri, String fileSuffix) throws URISyntaxException {
Expand Down Expand Up @@ -242,23 +246,10 @@ static void uploadConnectorInitAction(String resourcePath, String gcsUri) throws
DataprocAcceptanceTestBase.class.getResourceAsStream(resourcePath), gcsUri, "text/x-bash");
}

public static String generateTestId(
String dataprocImageVersion, List<ClusterProperty> clusterProperties) {
String clusterPropertiesMarkers =
clusterProperties.isEmpty()
? ""
: clusterProperties.stream()
.map(ClusterProperty::getMarker)
.collect(Collectors.joining("-", "-", ""));
public static String generateTestId(String dataprocImageVersion) {
String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
String testId =
String.format(
"%s-%s%s%s",
timestamp,
dataprocImageVersion.charAt(0),
dataprocImageVersion.charAt(2),
clusterPropertiesMarkers);
return testId;
return String.format(
"%s-%s%s", timestamp, dataprocImageVersion.charAt(0), dataprocImageVersion.charAt(2));
}

public static String generateClusterName(String testId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestConstants.CONNECTOR_JAR_DIRECTORY;
import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestConstants.CONNECTOR_JAR_PREFIX;
import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestConstants.DATAPROC_ENDPOINT;
import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestConstants.PROJECT_ID;
import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestConstants.REGION;
import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestUtils.createBqDataset;
import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestUtils.deleteBqDatasetAndTables;
Expand All @@ -34,16 +33,11 @@
import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestUtils.uploadConnectorJar;
import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestUtils.ClusterProperty;
import com.google.common.collect.ImmutableList;
import com.google.cloud.hive.bigquery.connector.TestUtils;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.Test;
import test.hivebqcon.com.google.cloud.dataproc.v1.Cluster;
import test.hivebqcon.com.google.cloud.dataproc.v1.ClusterConfig;
Expand All @@ -61,12 +55,7 @@
import test.hivebqcon.com.google.cloud.dataproc.v1.NodeInitializationAction;
import test.hivebqcon.com.google.cloud.dataproc.v1.SoftwareConfig;

public class DataprocAcceptanceTestBase {

protected static final ClusterProperty DISABLE_CONSCRYPT =
ClusterProperty.of("dataproc:dataproc.conscrypt.provider.enable", "false", "nc");
protected static final ImmutableList<ClusterProperty> DISABLE_CONSCRYPT_LIST =
ImmutableList.<ClusterProperty>builder().add(DISABLE_CONSCRYPT).build();
public abstract class DataprocAcceptanceTestBase {

private AcceptanceTestContext context;

Expand All @@ -75,20 +64,12 @@ protected DataprocAcceptanceTestBase(AcceptanceTestContext context) {
}

protected static AcceptanceTestContext setup(String dataprocImageVersion) throws Exception {
return setup(dataprocImageVersion, Collections.emptyList());
}

protected static AcceptanceTestContext setup(
String dataprocImageVersion, List<ClusterProperty> clusterProperties) throws Exception {
String testId = generateTestId(dataprocImageVersion, clusterProperties);
String testId = generateTestId(dataprocImageVersion);
String clusterName = generateClusterName(testId);
String testBaseGcsDir = AcceptanceTestUtils.createTestBaseGcsDir(testId);
String connectorJarUri = testBaseGcsDir + "/connector.jar";
String connectorInitActionUri = testBaseGcsDir + "/connectors.sh";
Map<String, String> properties =
clusterProperties.stream()
.collect(Collectors.toMap(ClusterProperty::getKey, ClusterProperty::getValue));
String bqProject = PROJECT_ID;
String bqProject = TestUtils.getProject();
String bqDataset = "hivebq_test_dataset_" + testId.replace("-", "_");
String bqTable = "hivebq_test_table_" + testId.replace("-", "_");

Expand All @@ -99,12 +80,7 @@ protected static AcceptanceTestContext setup(
createBqDataset(bqProject, bqDataset);

createClusterIfNeeded(
clusterName,
dataprocImageVersion,
testId,
properties,
connectorJarUri,
connectorInitActionUri);
clusterName, dataprocImageVersion, connectorJarUri, connectorInitActionUri);

AcceptanceTestContext testContext =
new AcceptanceTestContext(
Expand Down Expand Up @@ -155,22 +131,20 @@ private interface ThrowingConsumer<T> {
protected static void createClusterIfNeeded(
String clusterName,
String dataprocImageVersion,
String testId,
Map<String, String> properties,
String connectorJarUri,
String connectorInitActionUri)
throws Exception {
Cluster clusterSpec =
createClusterSpec(
clusterName, dataprocImageVersion, properties, connectorJarUri, connectorInitActionUri);
clusterName, dataprocImageVersion, connectorJarUri, connectorInitActionUri);
System.out.println("Cluster spec:\n" + clusterSpec);
System.out.println("Creating cluster " + clusterName + " ...");
cluster(client -> client.createClusterAsync(PROJECT_ID, REGION, clusterSpec).get());
cluster(client -> client.createClusterAsync(TestUtils.getProject(), REGION, clusterSpec).get());
}

protected static void deleteCluster(String clusterName) throws Exception {
System.out.println("Deleting cluster " + clusterName + " ...");
cluster(client -> client.deleteClusterAsync(PROJECT_ID, REGION, clusterName).get());
cluster(client -> client.deleteClusterAsync(TestUtils.getProject(), REGION, clusterName).get());
}

private static void cluster(ThrowingConsumer<ClusterControllerClient> command) throws Exception {
Expand All @@ -184,12 +158,11 @@ private static void cluster(ThrowingConsumer<ClusterControllerClient> command) t
private static Cluster createClusterSpec(
String clusterName,
String dataprocImageVersion,
Map<String, String> properties,
String connectorJarUri,
String connectorInitActionUri) {
return Cluster.newBuilder()
.setClusterName(clusterName)
.setProjectId(PROJECT_ID)
.setProjectId(TestUtils.getProject())
.setConfig(
ClusterConfig.newBuilder()
.addInitializationActions(
Expand Down Expand Up @@ -219,9 +192,7 @@ private static Cluster createClusterSpec(
.setBootDiskSizeGb(300)
.setNumLocalSsds(0)))
.setSoftwareConfig(
SoftwareConfig.newBuilder()
.setImageVersion(dataprocImageVersion)
.putAllProperties(properties)))
SoftwareConfig.newBuilder().setImageVersion(dataprocImageVersion)))
.build();
}

Expand Down Expand Up @@ -257,12 +228,13 @@ private Job runAndWait(String testName, Job job, Duration timeout) throws Except
try (JobControllerClient jobControllerClient =
JobControllerClient.create(
JobControllerSettings.newBuilder().setEndpoint(DATAPROC_ENDPOINT).build())) {
Job request = jobControllerClient.submitJob(PROJECT_ID, REGION, job);
Job request = jobControllerClient.submitJob(TestUtils.getProject(), REGION, job);
String jobId = request.getReference().getJobId();
System.err.println(String.format("%s job ID: %s", testName, jobId));
CompletableFuture<Job> finishedJobFuture =
CompletableFuture.supplyAsync(
() -> waitForJobCompletion(jobControllerClient, PROJECT_ID, REGION, jobId));
() ->
waitForJobCompletion(jobControllerClient, TestUtils.getProject(), REGION, jobId));
Job jobInfo = finishedJobFuture.get(timeout.getSeconds(), TimeUnit.SECONDS);
return jobInfo;
}
Expand All @@ -289,12 +261,14 @@ Job waitForJobCompletion(
}
}

void verifyJobSuceeded(Job job) throws Exception {
void verifyJobSucceeded(Job job) throws Exception {
String driverOutput =
AcceptanceTestUtils.readGcsFile(job.getDriverControlFilesUri() + "driveroutput.000000000");
System.out.println("Driver output: " + driverOutput);
System.out.println("Job status: " + job.getStatus().getState());
assertThat(job.getStatus().getState()).isEqualTo(JobStatus.State.DONE);
if (job.getStatus().getState() != JobStatus.State.DONE) {
throw new AssertionError(job.getStatus().getDetails());
}
}

void verifyJobOutput(String outputDirUri, String expectedOutput) throws Exception {
Expand All @@ -314,7 +288,7 @@ public void testHiveBq_managedTable_createWriteReadDrop_success() throws Excepti
outputDirUri,
Duration.ofSeconds(ACCEPTANCE_TEST_TIMEOUT_IN_SECONDS));

verifyJobSuceeded(result);
verifyJobSucceeded(result);
verifyJobOutput(outputDirUri, "345,world");
}

Expand All @@ -330,7 +304,7 @@ public void testHiveBq_externalTable_createReadDrop_success() throws Exception {
outputDirUri,
Duration.ofSeconds(ACCEPTANCE_TEST_TIMEOUT_IN_SECONDS));

verifyJobSuceeded(result);
verifyJobSucceeded(result);
verifyJobOutput(outputDirUri, "king,1191");
}
}

0 comments on commit d9cea3e

Please sign in to comment.