Skip to content

Commit

Permalink
feat: add generic Iceberg catalog adapter creation to Java / Python (d…
Browse files Browse the repository at this point in the history
…eephaven#5754)

### Java, connecting to a RESTCatalog using MinIO
```
import io.deephaven.iceberg.util.*;

properties = new HashMap<>();
properties.put("type", "rest");
properties.put("uri", "http://rest:8181");

properties.put("client.region", "us-east-1");

properties.put("s3.access-key-id", "admin");
properties.put("s3.secret-access-key", "password");
properties.put("s3.endpoint", "http://minio:9000");

adapter = IcebergTools.createAdapter("generic-adapter", properties);
```

### Python, connecting to a RESTCatalog using MinIO
```
from deephaven.experimental import iceberg

adapter = iceberg.adapter(name="generic-adapter", properties={
    "type" : "rest",
    "uri" : "http://rest:8181",
    "client.region" : "us-east-1",
    "s3.access-key-id" : "admin",
    "s3.secret-access-key" : "password",
    "s3.endpoint" : "http://minio:9000"
});
```

### Java, connecting to AWS Glue
NOTE: credentials set in local environment
```
import io.deephaven.iceberg.util.*;

properties = new HashMap<>();
properties.put("type", "glue");
properties.put("uri", "s3://lab-warehouse/sales");

adapter = IcebergTools.createAdapter("generic-adapter", properties);
```

### Python, connecting to AWS Glue
NOTE: credentials set in local environment
```
from deephaven.experimental import iceberg

adapter = iceberg.adapter(name="generic-adapter", properties={
    "type" : "glue",
    "uri" : "s3://lab-warehouse/sales",
    "warehouse" : "s3://lab-warehouse/sales",
});
```
  • Loading branch information
lbooker42 authored Sep 6, 2024
1 parent dc3c2b9 commit 1bb5f09
Show file tree
Hide file tree
Showing 18 changed files with 543 additions and 157 deletions.
4 changes: 3 additions & 1 deletion extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ dependencies {
runtimeOnly libs.awssdk.sts
runtimeOnly libs.awssdk.glue

testImplementation libs.junit4
compileOnly libs.autoservice
annotationProcessor libs.autoservice.compiler

testImplementation libs.junit4
testImplementation project(':engine-test-utils')

testImplementation libs.testcontainers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@

import com.google.common.base.Strings;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -68,12 +66,10 @@ public static IcebergCatalogAdapter createS3Rest(
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}

final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
return new IcebergCatalogAdapter(catalog, properties);
}

/**
Expand Down Expand Up @@ -101,11 +97,9 @@ public static IcebergCatalogAdapter createGlue(
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
return new IcebergCatalogAdapter(catalog, properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.util;

import com.google.auto.service.AutoService;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderPlugin;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.Map;

/**
* {@link io.deephaven.iceberg.internal.DataInstructionsProviderPlugin} implementation used for reading files from S3.
*/
@AutoService(io.deephaven.iceberg.internal.DataInstructionsProviderPlugin.class)
@SuppressWarnings("unused")
public final class S3InstructionsProviderPlugin implements DataInstructionsProviderPlugin {
@Override
public Object createInstructions(@NotNull final URI uri, @NotNull final Map<String, String> properties) {
// If the URI scheme is "s3","s3a","s3n" or if the properties contain one of these specific keys, we can
// create a useful S3Instructions object.
if (uri.getScheme().equals("s3")
|| uri.getScheme().equals("s3a")
|| uri.getScheme().equals("s3n")
|| properties.containsKey(AwsClientProperties.CLIENT_REGION)
|| properties.containsKey(S3FileIOProperties.ACCESS_KEY_ID)
|| properties.containsKey(S3FileIOProperties.SECRET_ACCESS_KEY)
|| properties.containsKey(S3FileIOProperties.ENDPOINT)) {

final S3Instructions.Builder builder = S3Instructions.builder();
if (properties.containsKey(AwsClientProperties.CLIENT_REGION)) {
builder.regionName(properties.get(AwsClientProperties.CLIENT_REGION));
}
if (properties.containsKey(S3FileIOProperties.ENDPOINT)) {
builder.endpointOverride(properties.get(S3FileIOProperties.ENDPOINT));
}
if (properties.containsKey(S3FileIOProperties.ACCESS_KEY_ID)
&& properties.containsKey(S3FileIOProperties.SECRET_ACCESS_KEY)) {
builder.credentials(
Credentials.basic(properties.get(S3FileIOProperties.ACCESS_KEY_ID),
properties.get(S3FileIOProperties.SECRET_ACCESS_KEY)));
}
return builder.build();
}

// We have no useful properties for creating an S3Instructions object.
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;

public class IcebergLocalStackTest extends IcebergToolsTest {

@BeforeClass
Expand All @@ -25,4 +27,9 @@ public Builder s3Instructions(final Builder builder) {
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.LocalStack.s3AsyncClient();
}

@Override
public Map<String, String> s3Properties() {
return SingletonContainers.LocalStack.s3Properties();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;

public class IcebergMinIOTest extends IcebergToolsTest {

@BeforeClass
Expand All @@ -29,4 +31,10 @@ public Builder s3Instructions(final Builder builder) {
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.MinIO.s3AsyncClient();
}

@Override
public Map<String, String> s3Properties() {
return SingletonContainers.MinIO.s3Properties();
}

}
Loading

0 comments on commit 1bb5f09

Please sign in to comment.