Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CMEK support #117

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 68 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,18 @@ You can use the following properties in the `TBLPROPERTIES` clause when you crea

You can set the following Hive/Hadoop configuration properties in your environment:

| Property | Default value | Description |
|---------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `bq.read.data.format` | `arrow` | Data format used for reads from BigQuery. Possible values: `arrow`, `avro`. |
| `bq.temp.gcs.path` | | GCS location for storing temporary Avro files when using the `indirect` write method |
| `bq.write.method` | `direct` | Indicates how to write data to BigQuery. Possible values: `direct` (to directly write to the BigQuery storage API), `indirect` (to stage temporary Avro files to GCS before loading into BigQuery). |
| `bq.work.dir.parent.path` | `${hadoop.tmp.dir}` | Parent path on HDFS where each job creates its temporary work directory |
| `bq.work.dir.name.prefix` | `hive-bq-` | Prefix used for naming the jobs' temporary directories. |
| `materializationProject` | | Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. |
| `materializationDataset` | | Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. |
| `maxParallelism` | | Maximum initial number of read streams |
| `viewsEnabled` | `false` | Set it to `true` to enable reading views. |
| Property | Default value | Description |
|-------------------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `bq.read.data.format` | `arrow` | Data format used for reads from BigQuery. Possible values: `arrow`, `avro`. |
| `bq.temp.gcs.path` | | GCS location for storing temporary Avro files when using the `indirect` write method |
| `bq.write.method` | `direct` | Indicates how to write data to BigQuery. Possible values: `direct` (to directly write to the BigQuery storage API), `indirect` (to stage temporary Avro files to GCS before loading into BigQuery). |
| `bq.work.dir.parent.path` | `${hadoop.tmp.dir}` | Parent path on HDFS where each job creates its temporary work directory |
| `bq.work.dir.name.prefix` | `hive-bq-` | Prefix used for naming the jobs' temporary directories. |
| `bq.destination.table.kms.key.name` | | Cloud KMS encryption key used to protect the job's destination BigQuery table. Read more in the section on [customer-managed encryption keys](#customer-managed-encryption-keys) |
| `materializationProject` | | Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. |
| `materializationDataset` | | Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. |
| `maxParallelism` | | Maximum initial number of read streams |
| `viewsEnabled` | `false` | Set it to `true` to enable reading views. |

## Data Type Mapping

Expand Down Expand Up @@ -713,6 +714,24 @@ There are multiple options to override the default behavior and to provide custo
with the `bq.access.token` configuration property. You can generate an access token by running
`gcloud auth application-default print-access-token`.

## Customer-managed encryption key (CMEK)

You can provide a Cloud KMS key to be used to encrypt the destination table, for example when you
run a `CREATE TABLE` statement for a managed table, or when you insert data into a table that
doesn't exist yet. To do so, set the `bq.destination.table.kms.key.name` property with the
fully-qualified named of the desired Cloud KMS key in the form:

```
projects/<KMS_PROJECT_ID>/locations/<LOCATION>/keyRings/<KEY_RING>/cryptoKeys/<KEY>
```

The BigQuery service account associated with your project requires access to this encryption key.

The table will be encrypted by the key only if it created by the connector. A pre-existing
unencrypted table won't be encrypted just by setting this option.

For further information about using customer-managed encryption keys (CMEK) with BigQuery, see [here](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id).

## Known issues and limitations

* The `UPDATE`, `MERGE`, and `DELETE`, and `ALTER TABLE` statements are currently not supported.
Expand Down Expand Up @@ -766,15 +785,16 @@ Enable the following APIs:
```sh
gcloud services enable \
bigquerystorage.googleapis.com \
bigqueryconnection.googleapis.com
bigqueryconnection.googleapis.com \
cloudkms.googleapis.com
```

#### BigLake setup

Define environment variables:

```sh
export PROJECT=my-gcp-project
export PROJECT=<my-gcp-project>
export BIGLAKE_LOCATION=us
export BIGLAKE_REGION=us-central1
export BIGLAKE_CONNECTION=hive-integration-tests
Expand Down Expand Up @@ -807,6 +827,41 @@ export BIGLAKE_SA=$(bq show --connection --format json "${PROJECT}.${BIGLAKE_LOC
gsutil iam ch serviceAccount:${BIGLAKE_SA}:objectViewer gs://${BIGLAKE_BUCKET}
```

#### KMS setup

Create a KMS keyring:

```sh
gcloud kms keyrings create \
integration_tests_keyring \
--location us
```

```sh
gcloud kms keys create integration_tests_key \
--keyring integration_tests_keyring \
--location us \
--purpose "encryption"
```

Obtain the BigQuery service account name:

```sh
BQ_SERVICE_ACCOUNT=$(bq show --encryption_service_account --format json | jq -r ".ServiceAccountID")
```

Assign the Encrypter/Decrypter role to the BigQuery service account:

```sh
gcloud kms keys add-iam-policy-binding \
--project=${PROJECT} \
--member serviceAccount:${BQ_SERVICE_ACCOUNT} \
--role roles/cloudkms.cryptoKeyEncrypterDecrypter \
--location=us \
--keyring=integration_tests_keyring \
integration_tests_key
```

#### Running the tests

You must use Java version 8, as it's the version that Hive itself uses. Make sure that `JAVA_HOME` points to the Java
Expand Down
3 changes: 3 additions & 0 deletions cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ steps:
args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest_hive1']
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'
- 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}'

# 7. Run integration tests for Hive 2
- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit'
Expand All @@ -66,6 +67,7 @@ steps:
args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest_hive2']
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'
- 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}'

# 8. Run integration tests for Hive 3
- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit'
Expand All @@ -75,6 +77,7 @@ steps:
args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest_hive3']
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'
- 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}'

# Tests should take under 120 mins
timeout: 7200s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,27 @@ public void run(HookContext hookContext) throws Exception {
return;
}

// Parse and analyze the semantics of the Hive query
// Parse and analyze the semantics of the Hive query.
// We have to do this because unfortunately the WriteEntity objects in hookContext.getOutputs()
// are systematically marked as being of type INSERT_OVERWRITE, regardless of whether it is
// an "INSERT OVERWRITE" query or a regular "INSERT" query. This is apparently caused by the
// fact that Hive 1.x.x treats all "non native" tables (i.e. by Hive 1.x.x's definition all
// tables that have a storage handler defined:
// https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java#L845)
// as INSERT_OVERWRITE:
// https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L12147
// To get around this issue, we parse the query ourselves and try to determine the proper type
// for our purposes (insert or insert overwrite).
QBParseInfo parseInfo;
try {
Configuration conf = hookContext.getConf();
Context context = new Context(conf);
context.setCmd(hookContext.getQueryPlan().getQueryString());
ParseDriver parseDriver = new ParseDriver();
ASTNode tree = parseDriver.parse(hookContext.getQueryPlan().getQueryString(), context);
HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
HiveConf hiveConf = new HiveConf(conf, this.getClass());
jphalip marked this conversation as resolved.
Show resolved Hide resolved
SemanticAnalyzer analyzer = new SemanticAnalyzer(hiveConf);
if (tree.getChildren().size() == 0 || tree.getChild(0).getType() != HiveParser.TOK_QUERY) {
if (tree.getChildren().isEmpty() || tree.getChild(0).getType() != HiveParser.TOK_QUERY) {
return;
}
analyzer.analyze((ASTNode) tree.getChild(0), context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public Map<String, String> getBasicStatistics(Partish partish) {
Guice.createInjector(
new BigQueryClientModule(),
new HiveBigQueryConnectorModule(conf, hmsTable.getParameters()));
BigQueryClient bqClient = injector.getInstance(BigQueryClient.class);
HiveBigQueryConfig config = injector.getInstance(HiveBigQueryConfig.class);
return BigQueryUtils.getBasicStatistics(bqClient, config.getTableId());
return BigQueryUtils.getBasicStatistics(
injector.getInstance(BigQueryClient.class),
injector.getInstance(HiveBigQueryConfig.class).getTableId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,32 @@ public static void assertDoesNotContainColumn(Table hmsTable, String columnName)
}
}

protected void createBigQueryTable(Table hmsTable, TableInfo bigQueryTableInfo) {
Injector injector =
Guice.createInjector(
new BigQueryClientModule(),
new HiveBigQueryConnectorModule(conf, hmsTable.getParameters()));
HiveBigQueryConfig opts = injector.getInstance(HiveBigQueryConfig.class);
protected void createBigQueryTable(
Injector injector,
TableId tableId,
StandardTableDefinition tableDefinition,
HiveBigQueryConfig opts,
Table hmsTable) {
// TODO: We currently can't use the `BigQueryClient.createTable()` because it doesn't have a way
// to
// pass a TableInfo. This forces us to duplicate some code below from the existing
// `BigQueryClient.createTable()`. One better long-term solution would be to add a
// `createTable(TableInfo)` method to BigQueryClient. See:
// https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/1213
TableInfo.Builder bigQueryTableInfo =
TableInfo.newBuilder(tableId, tableDefinition)
.setDescription(hmsTable.getParameters().get("comment"));
opts.getKmsKeyName()
.ifPresent(
keyName ->
bigQueryTableInfo.setEncryptionConfiguration(
EncryptionConfiguration.newBuilder().setKmsKeyName(keyName).build()));
BigQueryCredentialsSupplier credentialsSupplier =
injector.getInstance(BigQueryCredentialsSupplier.class);
HeaderProvider headerProvider = injector.getInstance(HeaderProvider.class);

// TODO: We cannot use the BigQueryClient class here because it doesn't have a
// `create(TableInfo)` method. We could add it to that class eventually.
BigQuery bigQueryService =
BigQueryUtils.getBigQueryService(opts, headerProvider, credentialsSupplier);
bigQueryService.create(bigQueryTableInfo);
bigQueryService.create(bigQueryTableInfo.build());
}

/**
Expand Down Expand Up @@ -247,12 +258,7 @@ public void preCreateTable(Table table) throws MetaException {
tableDefBuilder.setTimePartitioning(tpBuilder.build());
}

StandardTableDefinition tableDefinition = tableDefBuilder.build();
TableInfo bigQueryTableInfo =
TableInfo.newBuilder(tableId, tableDefinition)
.setDescription(table.getParameters().get("comment"))
.build();
createBigQueryTable(table, bigQueryTableInfo);
createBigQueryTable(injector, tableId, tableDefBuilder.build(), opts, table);

String hmsDbTableName = HiveUtils.getDbTableName(table);
LOG.info("Created BigQuery table {} for {}", tableId, hmsDbTableName);
Expand Down Expand Up @@ -366,6 +372,11 @@ public void rollbackInsertTable(Table table, boolean overwrite) throws MetaExcep
}

public void commitDropTable(Table table, boolean deleteData) throws MetaException {
if (conf.getBoolean(HiveBigQueryConfig.CONNECTOR_IN_TEST, false)
&& conf.getBoolean(HiveBigQueryConfig.FORCE_DROP_FAILURE, false)) {
// For integration testing only
throw new RuntimeException(HiveBigQueryConfig.FORCED_DROP_FAILURE_ERROR_MESSAGE);
}
if (!HiveUtils.isExternalTable(table) && deleteData) {
// This is a managed table, so let's delete the table in BigQuery
Injector injector =
Expand Down
Loading
Loading