diff --git a/google-cloud-datastore-utils/pom.xml b/google-cloud-datastore-utils/pom.xml new file mode 100644 index 000000000..2011e27f7 --- /dev/null +++ b/google-cloud-datastore-utils/pom.xml @@ -0,0 +1,110 @@ + + + 4.0.0 + com.google.cloud + google-cloud-datastore-utils + 2.20.0-grpc-experimental-1-SNAPSHOT + jar + Google Cloud Datastore Utilities + https://github.com/googleapis/java-datastore + + Java datastore client utility library. + + + com.google.cloud + google-cloud-datastore-parent + 2.20.0-grpc-experimental-1-SNAPSHOT + + + google-cloud-datastore-utils + + + + com.google.api-client + google-api-client + + + com.google.http-client + google-http-client-protobuf + + + com.google.http-client + google-http-client-gson + + + com.google.api.grpc + proto-google-cloud-datastore-v1 + + + com.google.api + api-common + + + com.google.protobuf + protobuf-java + + + com.google.guava + guava + + + com.google.api.grpc + proto-google-common-protos + + + com.google.http-client + google-http-client + + + com.google.http-client + google-http-client-jackson2 + + + com.google.oauth-client + google-oauth-client + + + com.google.code.findbugs + jsr305 + + + + junit + junit + test + + + com.google.truth + truth + 1.4.2 + test + + + org.checkerframework + checker-qual + + + + + + + + + org.codehaus.mojo + flatten-maven-plugin + + + + + + native + + + com.google.cloud.datastore.it.ITDatastoreConceptsTest + + + + diff --git a/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/Datastore.java b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/Datastore.java new file mode 100644 index 000000000..d66e9ce60 --- /dev/null +++ b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/Datastore.java @@ -0,0 +1,136 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils; + +import com.google.datastore.v1.*; +import com.google.rpc.Code; +import java.io.IOException; +import java.io.InputStream; + +/** + * Provides access to Cloud Datastore. + * + *

This class is thread-safe. + */ +public class Datastore { + + final RemoteRpc remoteRpc; + + Datastore(RemoteRpc remoteRpc) { + this.remoteRpc = remoteRpc; + } + + /** Reset the RPC count. */ + public void resetRpcCount() { + remoteRpc.resetRpcCount(); + } + + /** + * Returns the number of RPC calls made since the client was created or {@link #resetRpcCount} was + * called. + */ + public int getRpcCount() { + return remoteRpc.getRpcCount(); + } + + private com.google.datastore.utils.DatastoreException invalidResponseException( + String method, IOException exception) { + return RemoteRpc.makeException( + remoteRpc.getUrl(), method, Code.UNAVAILABLE, "Invalid response", exception); + } + + public AllocateIdsResponse allocateIds(AllocateIdsRequest request) + throws com.google.datastore.utils.DatastoreException { + try (InputStream is = + remoteRpc.call("allocateIds", request, request.getProjectId(), request.getDatabaseId())) { + return AllocateIdsResponse.parseFrom(is); + } catch (IOException exception) { + throw invalidResponseException("allocateIds", exception); + } + } + + public BeginTransactionResponse beginTransaction(BeginTransactionRequest request) + throws com.google.datastore.utils.DatastoreException { + try (InputStream is = + remoteRpc.call( + "beginTransaction", request, request.getProjectId(), request.getDatabaseId())) { + return BeginTransactionResponse.parseFrom(is); + } catch (IOException exception) { + throw invalidResponseException("beginTransaction", exception); + } + } + + public CommitResponse commit(CommitRequest request) + throws com.google.datastore.utils.DatastoreException { + try (InputStream is = + remoteRpc.call("commit", request, request.getProjectId(), request.getDatabaseId())) { + return CommitResponse.parseFrom(is); + } catch (IOException exception) { + throw invalidResponseException("commit", exception); + } + } + + public LookupResponse lookup(LookupRequest request) + throws com.google.datastore.utils.DatastoreException { + try (InputStream is = + remoteRpc.call("lookup", request, request.getProjectId(), request.getDatabaseId())) { + return LookupResponse.parseFrom(is); + } catch (IOException exception) { + throw invalidResponseException("lookup", exception); + } + } + + public ReserveIdsResponse reserveIds(ReserveIdsRequest request) + throws com.google.datastore.utils.DatastoreException { + try (InputStream is = + remoteRpc.call("reserveIds", request, request.getProjectId(), request.getDatabaseId())) { + return ReserveIdsResponse.parseFrom(is); + } catch (IOException exception) { + throw invalidResponseException("reserveIds", exception); + } + } + + public RollbackResponse rollback(RollbackRequest request) + throws com.google.datastore.utils.DatastoreException { + try (InputStream is = + remoteRpc.call("rollback", request, request.getProjectId(), request.getDatabaseId())) { + return RollbackResponse.parseFrom(is); + } catch (IOException exception) { + throw invalidResponseException("rollback", exception); + } + } + + public RunQueryResponse runQuery(RunQueryRequest request) + throws com.google.datastore.utils.DatastoreException { + try (InputStream is = + remoteRpc.call("runQuery", request, request.getProjectId(), request.getDatabaseId())) { + return RunQueryResponse.parseFrom(is); + } catch (IOException exception) { + throw invalidResponseException("runQuery", exception); + } + } + + public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryRequest request) + throws DatastoreException { + try (InputStream is = + remoteRpc.call( + "runAggregationQuery", request, request.getProjectId(), request.getDatabaseId())) { + return RunAggregationQueryResponse.parseFrom(is); + } catch (IOException exception) { + throw invalidResponseException("runAggregationQuery", exception); + } + } +} diff --git a/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreException.java b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreException.java new file mode 100644 index 000000000..48a5dac30 --- /dev/null +++ b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreException.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils; + +import com.google.rpc.Code; + +/** Indicates an error in a {@link Datastore} call. */ +public class DatastoreException extends Exception { + private final String methodName; + private final Code code; + + public DatastoreException(String methodName, Code code, String message, Throwable cause) { + super(message, cause); + this.methodName = methodName; + this.code = code; + } + + /** @return the canonical error code */ + public Code getCode() { + return code; + } + + /** @return the datastore method name */ + public String getMethodName() { + return methodName; + } + + @Override + public String toString() { + return String.format("%s, code=%s", super.toString(), code); + } +} diff --git a/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreFactory.java b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreFactory.java new file mode 100644 index 000000000..2befe276e --- /dev/null +++ b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreFactory.java @@ -0,0 +1,127 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.javanet.NetHttpTransport; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.logging.*; + +/** Client factory for {@link com.google.datastore.utils.Datastore}. */ +public class DatastoreFactory { + + // Lazy load this because we might be running inside App Engine and this + // class isn't on the whitelist. + private static ConsoleHandler methodHandler; + + /** API version. */ + public static final String VERSION = "v1"; + + public static final String DEFAULT_HOST = "https://datastore.googleapis.com"; + + /** Singleton factory instance. */ + private static final DatastoreFactory INSTANCE = new DatastoreFactory(); + + public static DatastoreFactory get() { + return INSTANCE; + } + + /** + * Provides access to a datastore using the provided options. Logs into the application using the + * credentials available via these options. + * + * @throws IllegalArgumentException if the server or credentials weren't provided. + */ + public com.google.datastore.utils.Datastore create( + com.google.datastore.utils.DatastoreOptions options) { + return new com.google.datastore.utils.Datastore(newRemoteRpc(options)); + } + + /** Constructs a Google APIs HTTP client with the associated credentials. */ + public HttpRequestFactory makeClient(com.google.datastore.utils.DatastoreOptions options) { + Credential credential = options.getCredential(); + HttpTransport transport = options.getTransport(); + if (transport == null) { + transport = credential == null ? new NetHttpTransport() : credential.getTransport(); + transport = transport == null ? new NetHttpTransport() : transport; + } + return transport.createRequestFactory(credential); + } + + /** Starts logging datastore method calls to the console. (Useful within tests.) */ + public static void logMethodCalls() { + Logger logger = Logger.getLogger(Datastore.class.getName()); + logger.setLevel(Level.FINE); + if (!Arrays.asList(logger.getHandlers()).contains(getStreamHandler())) { + logger.addHandler(getStreamHandler()); + } + } + + /** Build a valid datastore URL. */ + String buildProjectEndpoint(com.google.datastore.utils.DatastoreOptions options) { + if (options.getProjectEndpoint() != null) { + return options.getProjectEndpoint(); + } + // DatastoreOptions ensures either project endpoint or project ID is set. + String projectId = checkNotNull(options.getProjectId()); + if (options.getHost() != null) { + return validateUrl( + String.format("https://%s/%s/projects/%s", options.getHost(), VERSION, projectId)); + } else if (options.getLocalHost() != null) { + return validateUrl( + String.format("http://%s/%s/projects/%s", options.getLocalHost(), VERSION, projectId)); + } + return validateUrl(String.format("%s/%s/projects/%s", DEFAULT_HOST, VERSION, projectId)); + } + + protected com.google.datastore.utils.RemoteRpc newRemoteRpc(DatastoreOptions options) { + checkNotNull(options); + HttpRequestFactory client = makeClient(options); + return new com.google.datastore.utils.RemoteRpc( + client, options.getInitializer(), buildProjectEndpoint(options)); + } + + private static String validateUrl(String url) { + try { + return new URI(url).toString(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + // TODO: Support something other than console handler for when we're + // running in App Engine + private static synchronized StreamHandler getStreamHandler() { + if (methodHandler == null) { + methodHandler = new ConsoleHandler(); + methodHandler.setFormatter( + new Formatter() { + @Override + public String format(LogRecord record) { + return record.getMessage() + "\n"; + } + }); + methodHandler.setLevel(Level.FINE); + } + return methodHandler; + } +} diff --git a/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreHelper.java b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreHelper.java new file mode 100644 index 000000000..6480de136 --- /dev/null +++ b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreHelper.java @@ -0,0 +1,729 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.gson.GsonFactory; +import com.google.datastore.v1.ArrayValue; +import com.google.datastore.v1.CompositeFilter; +import com.google.datastore.v1.Entity; +import com.google.datastore.v1.Filter; +import com.google.datastore.v1.Key; +import com.google.datastore.v1.Key.PathElement; +import com.google.datastore.v1.Key.PathElement.IdTypeCase; +import com.google.datastore.v1.Mutation; +import com.google.datastore.v1.PartitionId; +import com.google.datastore.v1.PropertyFilter; +import com.google.datastore.v1.PropertyOrder; +import com.google.datastore.v1.PropertyReference; +import com.google.datastore.v1.Value; +import com.google.datastore.v1.Value.ValueTypeCase; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.protobuf.TimestampOrBuilder; +import com.google.type.LatLng; +import java.io.File; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.PrivateKey; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** Helper methods for {@link Datastore}. */ +// TODO: Accept OrBuilders when possible. +public final class DatastoreHelper { + private static final Logger logger = + Logger.getLogger(com.google.datastore.utils.DatastoreHelper.class.getName()); + + private static final int MICROSECONDS_PER_SECOND = 1000 * 1000; + private static final int NANOSECONDS_PER_MICROSECOND = 1000; + + /** The property used in the Datastore to give us a random distribution. * */ + public static final String SCATTER_PROPERTY_NAME = "__scatter__"; + + /** The property used in the Datastore to get the key of the entity. * */ + public static final String KEY_PROPERTY_NAME = "__key__"; + + /** Name of the environment variable used to set the project ID. */ + public static final String PROJECT_ID_ENV_VAR = "DATASTORE_PROJECT_ID"; + + /** Name of the environment variable used to set the local host. */ + public static final String LOCAL_HOST_ENV_VAR = "DATASTORE_EMULATOR_HOST"; + + /** Name of the environment variable used to set the service account. */ + public static final String SERVICE_ACCOUNT_ENV_VAR = "DATASTORE_SERVICE_ACCOUNT"; + + /** Name of the environment variable used to set the private key file. */ + public static final String PRIVATE_KEY_FILE_ENV_VAR = "DATASTORE_PRIVATE_KEY_FILE"; + + private static final String URL_OVERRIDE_ENV_VAR = "__DATASTORE_URL_OVERRIDE"; + + private static final AtomicReference projectIdFromComputeEngine = new AtomicReference<>(); + + /** Comparator for Keys */ + private static final class KeyComparator implements Comparator { + + static final com.google.datastore.utils.DatastoreHelper.KeyComparator INSTANCE = + new com.google.datastore.utils.DatastoreHelper.KeyComparator(); + + private int comparePathElement(PathElement thisElement, PathElement otherElement) { + int result = thisElement.getKind().compareTo(otherElement.getKind()); + if (result != 0) { + return result; + } + if (thisElement.getIdTypeCase() == IdTypeCase.ID) { + if (otherElement.getIdTypeCase() != IdTypeCase.ID) { + return -1; + } + return Long.valueOf(thisElement.getId()).compareTo(otherElement.getId()); + } + if (otherElement.getIdTypeCase() == IdTypeCase.ID) { + return 1; + } + + return thisElement.getName().compareTo(otherElement.getName()); + } + + @Override + public int compare(Key thisKey, Key otherKey) { + if (!thisKey.getPartitionId().equals(otherKey.getPartitionId())) { + throw new IllegalArgumentException("Cannot compare keys with different partition ids."); + } + + Iterator thisPath = thisKey.getPathList().iterator(); + Iterator otherPath = otherKey.getPathList().iterator(); + while (thisPath.hasNext()) { + if (!otherPath.hasNext()) { + return 1; + } + int result = comparePathElement(thisPath.next(), otherPath.next()); + if (result != 0) { + return result; + } + } + + return otherPath.hasNext() ? -1 : 0; + } + } + + private DatastoreHelper() {} + + private static HttpTransport newTransport() throws GeneralSecurityException, IOException { + return GoogleNetHttpTransport.newTrustedTransport(); + } + + static JsonFactory newJsonFactory() { + return new GsonFactory(); + } + + /** + * Constructs credentials for the given account and key. + * + * @param serviceAccountId service account ID (typically an e-mail address). + * @param privateKeyFile the file name from which to get the private key. + * @return valid credentials or {@code null} + */ + public static Credential getServiceAccountCredential( + String serviceAccountId, String privateKeyFile) throws GeneralSecurityException, IOException { + return getServiceAccountCredential(serviceAccountId, privateKeyFile, DatastoreOptions.SCOPES); + } + + /** + * Constructs credentials for the given account and key file. + * + * @param serviceAccountId service account ID (typically an e-mail address). + * @param privateKeyFile the file name from which to get the private key. + * @param serviceAccountScopes Collection of OAuth scopes to use with the the service account flow + * or {@code null} if not. + * @return valid credentials or {@code null} + */ + public static Credential getServiceAccountCredential( + String serviceAccountId, String privateKeyFile, Collection serviceAccountScopes) + throws GeneralSecurityException, IOException { + return getCredentialBuilderWithoutPrivateKey(serviceAccountId, serviceAccountScopes) + .setServiceAccountPrivateKeyFromP12File(new File(privateKeyFile)) + .build(); + } + + /** + * Constructs credentials for the given account and key. + * + * @param serviceAccountId service account ID (typically an e-mail address). + * @param privateKey the private key for the given account. + * @param serviceAccountScopes Collection of OAuth scopes to use with the the service account flow + * or {@code null} if not. + * @return valid credentials or {@code null} + */ + public static Credential getServiceAccountCredential( + String serviceAccountId, PrivateKey privateKey, Collection serviceAccountScopes) + throws GeneralSecurityException, IOException { + return getCredentialBuilderWithoutPrivateKey(serviceAccountId, serviceAccountScopes) + .setServiceAccountPrivateKey(privateKey) + .build(); + } + + private static GoogleCredential.Builder getCredentialBuilderWithoutPrivateKey( + String serviceAccountId, Collection serviceAccountScopes) + throws GeneralSecurityException, IOException { + HttpTransport transport = newTransport(); + JsonFactory jsonFactory = newJsonFactory(); + return new GoogleCredential.Builder() + .setTransport(transport) + .setJsonFactory(jsonFactory) + .setServiceAccountId(serviceAccountId) + .setServiceAccountScopes(serviceAccountScopes); + } + + /** + * Constructs a {@link Datastore} from environment variables and/or the Compute Engine metadata + * server. + * + *

The project ID is determined from, in order of preference: + * + *

+ * + *

Credentials are taken from, in order of preference: + * + *

    + *
  1. No credentials (if the DATASTORE_EMULATOR_HOST environment variable is set) + *
  2. Service Account specified by the DATASTORE_SERVICE_ACCOUNT and DATASTORE_PRIVATE_KEY_FILE + * environment variables + *
  3. Google Application Default as described here. + *
+ */ + public static DatastoreOptions.Builder getOptionsFromEnv() + throws GeneralSecurityException, IOException { + DatastoreOptions.Builder options = new DatastoreOptions.Builder(); + setProjectEndpointFromEnv(options); + options.credential(getCredentialFromEnv()); + return options; + } + + private static Credential getCredentialFromEnv() throws GeneralSecurityException, IOException { + if (System.getenv(LOCAL_HOST_ENV_VAR) != null) { + logger.log( + Level.INFO, + "{0} environment variable was set. Not using credentials.", + new Object[] {LOCAL_HOST_ENV_VAR}); + return null; + } + String serviceAccount = System.getenv(SERVICE_ACCOUNT_ENV_VAR); + String privateKeyFile = System.getenv(PRIVATE_KEY_FILE_ENV_VAR); + if (serviceAccount != null && privateKeyFile != null) { + logger.log( + Level.INFO, + "{0} and {1} environment variables were set. " + "Using service account credential.", + new Object[] {SERVICE_ACCOUNT_ENV_VAR, PRIVATE_KEY_FILE_ENV_VAR}); + return getServiceAccountCredential(serviceAccount, privateKeyFile); + } + return GoogleCredential.getApplicationDefault().createScoped(DatastoreOptions.SCOPES); + } + + /** + * Determines the project id from the environment. Uses the following sources in order of + * preference: + * + *
    + *
  1. Value of the DATASTORE_PROJECT_ID environment variable + *
  2. Compute Engine + *
+ * + * @throws IllegalStateException if the project ID cannot be determined + */ + private static String getProjectIdFromEnv() { + if (System.getenv(PROJECT_ID_ENV_VAR) != null) { + return System.getenv(PROJECT_ID_ENV_VAR); + } + String projectIdFromComputeEngine = getProjectIdFromComputeEngine(); + if (projectIdFromComputeEngine != null) { + return projectIdFromComputeEngine; + } + throw new IllegalStateException( + String.format( + "Could not determine project ID." + + " If you are not running on Compute Engine, set the" + + " %s environment variable.", + PROJECT_ID_ENV_VAR)); + } + + /** + * Gets the project ID from the Compute Engine metadata server. Returns {@code null} if the + * project ID cannot be determined (because, for instance, the code is not running on Compute + * Engine). + */ + @Nullable + public static String getProjectIdFromComputeEngine() { + String cachedProjectId = projectIdFromComputeEngine.get(); + return cachedProjectId != null ? cachedProjectId : queryProjectIdFromComputeEngine(); + } + + @Nullable + private static String queryProjectIdFromComputeEngine() { + HttpTransport transport; + + try { + transport = newTransport(); + } catch (GeneralSecurityException | IOException e) { + logger.log(Level.WARNING, "Failed to create HttpTransport.", e); + return null; + } + + try { + GenericUrl projectIdUrl = + new GenericUrl("http://metadata/computeMetadata/v1/project/project-id"); + HttpRequest request = transport.createRequestFactory().buildGetRequest(projectIdUrl); + request.getHeaders().set("Metadata-Flavor", "Google"); + String result = request.execute().parseAsString(); + projectIdFromComputeEngine.set(result); + return result; + } catch (IOException e) { + logger.log(Level.INFO, "Could not determine project ID from Compute Engine.", e); + return null; + } + } + + private static void setProjectEndpointFromEnv(DatastoreOptions.Builder options) { + // DATASTORE_HOST is deprecated. + if (System.getenv("DATASTORE_HOST") != null) { + logger.warning( + String.format( + "Ignoring value of environment variable DATASTORE_HOST. " + + "To point datastore to a host running locally, use " + + "the environment variable %s.", + LOCAL_HOST_ENV_VAR)); + } + String projectId = getProjectIdFromEnv(); + if (System.getenv(URL_OVERRIDE_ENV_VAR) != null) { + options.projectEndpoint( + String.format("%s/projects/%s", System.getenv(URL_OVERRIDE_ENV_VAR), projectId)); + return; + } + if (System.getenv(LOCAL_HOST_ENV_VAR) != null) { + options.projectId(projectId); + options.localHost(System.getenv(LOCAL_HOST_ENV_VAR)); + return; + } + options.projectId(projectId); + return; + } + + /** @see #getOptionsFromEnv() */ + public static Datastore getDatastoreFromEnv() throws GeneralSecurityException, IOException { + return DatastoreFactory.get().create(getOptionsFromEnv().build()); + } + + /** + * Gets a {@link com.google.datastore.utils.QuerySplitter}. + * + *

The returned {@link com.google.datastore.utils.QuerySplitter#getSplits} cannot accept a + * query that contains inequality filters, a sort filter, or a missing kind. + */ + public static QuerySplitter getQuerySplitter() { + return com.google.datastore.utils.QuerySplitterImpl.INSTANCE; + } + + public static Comparator getKeyComparator() { + return com.google.datastore.utils.DatastoreHelper.KeyComparator.INSTANCE; + } + + /** Make a sort order for use in a query. */ + public static PropertyOrder.Builder makeOrder( + String property, PropertyOrder.Direction direction) { + return PropertyOrder.newBuilder() + .setProperty(makePropertyReference(property)) + .setDirection(direction); + } + + /** Makes an ancestor filter. */ + public static Filter.Builder makeAncestorFilter(Key ancestor) { + return makeFilter( + com.google.datastore.utils.DatastoreHelper.KEY_PROPERTY_NAME, + PropertyFilter.Operator.HAS_ANCESTOR, + makeValue(ancestor)); + } + + /** Make a filter on a property for use in a query. */ + public static Filter.Builder makeFilter( + String property, PropertyFilter.Operator operator, Value value) { + return Filter.newBuilder() + .setPropertyFilter( + PropertyFilter.newBuilder() + .setProperty(makePropertyReference(property)) + .setOp(operator) + .setValue(value)); + } + + /** Make a filter on a property for use in a query. */ + public static Filter.Builder makeFilter( + String property, PropertyFilter.Operator operator, Value.Builder value) { + return makeFilter(property, operator, value.build()); + } + + /** Make a composite filter from the given sub-filters using AND to combine filters. */ + public static Filter.Builder makeAndFilter(Filter... subfilters) { + return makeAndFilter(Arrays.asList(subfilters)); + } + + /** Make a composite filter from the given sub-filters using AND to combine filters. */ + public static Filter.Builder makeAndFilter(Iterable subfilters) { + return Filter.newBuilder() + .setCompositeFilter( + CompositeFilter.newBuilder() + .addAllFilters(subfilters) + .setOp(CompositeFilter.Operator.AND)); + } + + /** Make a property reference for use in a query. */ + public static PropertyReference.Builder makePropertyReference(String propertyName) { + return PropertyReference.newBuilder().setName(propertyName); + } + + /** Make an array value containing the specified values. */ + public static Value.Builder makeValue(Iterable values) { + return Value.newBuilder().setArrayValue(ArrayValue.newBuilder().addAllValues(values)); + } + + /** Make a list value containing the specified values. */ + public static Value.Builder makeValue(Value value1, Value value2, Value... rest) { + ArrayValue.Builder arrayValue = ArrayValue.newBuilder(); + arrayValue.addValues(value1); + arrayValue.addValues(value2); + arrayValue.addAllValues(Arrays.asList(rest)); + return Value.newBuilder().setArrayValue(arrayValue); + } + + /** Make an array value containing the specified values. */ + public static Value.Builder makeValue( + Value.Builder value1, Value.Builder value2, Value.Builder... rest) { + ArrayValue.Builder arrayValue = ArrayValue.newBuilder(); + arrayValue.addValues(value1); + arrayValue.addValues(value2); + for (Value.Builder builder : rest) { + arrayValue.addValues(builder); + } + return Value.newBuilder().setArrayValue(arrayValue); + } + + /** Make a key value. */ + public static Value.Builder makeValue(Key key) { + return Value.newBuilder().setKeyValue(key); + } + + /** Make a key value. */ + public static Value.Builder makeValue(Key.Builder key) { + return makeValue(key.build()); + } + + /** Make an integer value. */ + public static Value.Builder makeValue(long key) { + return Value.newBuilder().setIntegerValue(key); + } + + /** Make a floating point value. */ + public static Value.Builder makeValue(double value) { + return Value.newBuilder().setDoubleValue(value); + } + + /** Make a boolean value. */ + public static Value.Builder makeValue(boolean value) { + return Value.newBuilder().setBooleanValue(value); + } + + /** Make a string value. */ + public static Value.Builder makeValue(String value) { + return Value.newBuilder().setStringValue(value); + } + + /** Make an entity value. */ + public static Value.Builder makeValue(Entity entity) { + return Value.newBuilder().setEntityValue(entity); + } + + /** Make a entity value. */ + public static Value.Builder makeValue(Entity.Builder entity) { + return makeValue(entity.build()); + } + + /** Make a ByteString value. */ + public static Value.Builder makeValue(ByteString blob) { + return Value.newBuilder().setBlobValue(blob); + } + + /** Make a timestamp value given a date. */ + public static Value.Builder makeValue(Date date) { + return Value.newBuilder().setTimestampValue(toTimestamp(date.getTime() * 1000L)); + } + + /** Makes a GeoPoint value. */ + public static Value.Builder makeValue(LatLng value) { + return Value.newBuilder().setGeoPointValue(value); + } + + /** Makes a GeoPoint value. */ + public static Value.Builder makeValue(LatLng.Builder value) { + return makeValue(value.build()); + } + + private static Timestamp.Builder toTimestamp(long microseconds) { + long seconds = microseconds / MICROSECONDS_PER_SECOND; + long microsecondsRemainder = microseconds % MICROSECONDS_PER_SECOND; + if (microsecondsRemainder < 0) { + // Nanos must be positive even if microseconds is negative. + // Java modulus doesn't take care of this for us. + microsecondsRemainder += MICROSECONDS_PER_SECOND; + seconds -= 1; + } + return Timestamp.newBuilder() + .setSeconds(seconds) + .setNanos((int) microsecondsRemainder * NANOSECONDS_PER_MICROSECOND); + } + + /** + * Make a key from the specified path of kind/id-or-name pairs and/or Keys. + * + *

The id-or-name values must be either String, Long, Integer or Short. + * + *

The last id-or-name value may be omitted, in which case an entity without an id is created + * (for use with automatic id allocation). + * + *

The PartitionIds of all Keys in the path must be equal. The returned Key.Builder will use + * this PartitionId. + */ + public static Key.Builder makeKey(Object... elements) { + Key.Builder key = Key.newBuilder(); + PartitionId partitionId = null; + for (int pathIndex = 0; pathIndex < elements.length; pathIndex += 2) { + PathElement.Builder pathElement = PathElement.newBuilder(); + Object element = elements[pathIndex]; + if (element instanceof Key) { + Key subKey = (Key) element; + if (partitionId == null) { + partitionId = subKey.getPartitionId(); + } else if (!partitionId.equals(subKey.getPartitionId())) { + throw new IllegalArgumentException( + "Partition IDs did not match, found: " + + partitionId + + " and " + + subKey.getPartitionId()); + } + key.addAllPath(((Key) element).getPathList()); + // We increment by 2, but since we got a Key argument we're only consuming 1 element in this + // iteration of the loop. Decrement the index so that when we jump by 2 we end up in the + // right spot. + pathIndex--; + } else { + String kind; + try { + kind = (String) element; + } catch (ClassCastException e) { + throw new IllegalArgumentException( + "Expected string or Key, got: " + element.getClass(), e); + } + pathElement.setKind(kind); + if (pathIndex + 1 < elements.length) { + Object value = elements[pathIndex + 1]; + if (value instanceof String) { + pathElement.setName((String) value); + } else if (value instanceof Long) { + pathElement.setId((Long) value); + } else if (value instanceof Integer) { + pathElement.setId((Integer) value); + } else if (value instanceof Short) { + pathElement.setId((Short) value); + } else { + throw new IllegalArgumentException( + "Expected string or integer, got: " + value.getClass()); + } + } + key.addPath(pathElement); + } + } + if (partitionId != null && !partitionId.equals(PartitionId.getDefaultInstance())) { + key.setPartitionId(partitionId); + } + return key; + } + + /** + * @return the double contained in value + * @throws IllegalArgumentException if the value does not contain a double. + */ + public static double getDouble(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.DOUBLE_VALUE) { + throw new IllegalArgumentException("Value does not contain a double."); + } + return value.getDoubleValue(); + } + + /** + * @return the key contained in value + * @throws IllegalArgumentException if the value does not contain a key. + */ + public static Key getKey(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.KEY_VALUE) { + throw new IllegalArgumentException("Value does not contain a key."); + } + return value.getKeyValue(); + } + + /** + * @return the blob contained in value + * @throws IllegalArgumentException if the value does not contain a blob. + */ + public static ByteString getByteString(Value value) { + if (value.getMeaning() == 18 && value.getValueTypeCase() == ValueTypeCase.STRING_VALUE) { + return value.getStringValueBytes(); + } else if (value.getValueTypeCase() == ValueTypeCase.BLOB_VALUE) { + return value.getBlobValue(); + } + throw new IllegalArgumentException("Value does not contain a blob."); + } + + /** + * @return the entity contained in value + * @throws IllegalArgumentException if the value does not contain an entity. + */ + public static Entity getEntity(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.ENTITY_VALUE) { + throw new IllegalArgumentException("Value does not contain an Entity."); + } + return value.getEntityValue(); + } + + /** + * @return the string contained in value + * @throws IllegalArgumentException if the value does not contain a string. + */ + public static String getString(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.STRING_VALUE) { + throw new IllegalArgumentException("Value does not contain a string."); + } + return value.getStringValue(); + } + + /** + * @return the boolean contained in value + * @throws IllegalArgumentException if the value does not contain a boolean. + */ + public static boolean getBoolean(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.BOOLEAN_VALUE) { + throw new IllegalArgumentException("Value does not contain a boolean."); + } + return value.getBooleanValue(); + } + + /** + * @return the long contained in value + * @throws IllegalArgumentException if the value does not contain a long. + */ + public static long getLong(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.INTEGER_VALUE) { + throw new IllegalArgumentException("Value does not contain an integer."); + } + return value.getIntegerValue(); + } + + /** + * @return the timestamp in microseconds contained in value + * @throws IllegalArgumentException if the value does not contain a timestamp. + */ + public static long getTimestamp(Value value) { + if (value.getMeaning() == 18 && value.getValueTypeCase() == ValueTypeCase.INTEGER_VALUE) { + return value.getIntegerValue(); + } else if (value.getValueTypeCase() == ValueTypeCase.TIMESTAMP_VALUE) { + return toMicroseconds(value.getTimestampValue()); + } + throw new IllegalArgumentException("Value does not contain a timestamp."); + } + + private static long toMicroseconds(TimestampOrBuilder timestamp) { + // Nanosecond precision is lost. + return timestamp.getSeconds() * MICROSECONDS_PER_SECOND + + timestamp.getNanos() / NANOSECONDS_PER_MICROSECOND; + } + + /** + * @return the array contained in value as a list. + * @throws IllegalArgumentException if the value does not contain an array. + */ + public static List getList(Value value) { + if (value.getValueTypeCase() != ValueTypeCase.ARRAY_VALUE) { + throw new IllegalArgumentException("Value does not contain an array."); + } + return value.getArrayValue().getValuesList(); + } + + /** + * Convert a timestamp value into a {@link Date} clipping off the microseconds. + * + * @param value a timestamp value to convert + * @return the resulting {@link Date} + * @throws IllegalArgumentException if the value does not contain a timestamp. + */ + public static Date toDate(Value value) { + return new Date(getTimestamp(value) / 1000); + } + + /** + * @param entity the entity to insert + * @return a mutation that will insert an entity + */ + public static Mutation.Builder makeInsert(Entity entity) { + return Mutation.newBuilder().setInsert(entity); + } + + /** + * @param entity the entity to update + * @return a mutation that will update an entity + */ + public static Mutation.Builder makeUpdate(Entity entity) { + return Mutation.newBuilder().setUpdate(entity); + } + + /** + * @param entity the entity to upsert + * @return a mutation that will upsert an entity + */ + public static Mutation.Builder makeUpsert(Entity entity) { + return Mutation.newBuilder().setUpsert(entity); + } + + /** + * @param key the key of the entity to delete + * @return a mutation that will delete an entity + */ + public static Mutation.Builder makeDelete(Key key) { + return Mutation.newBuilder().setDelete(key); + } +} diff --git a/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreOptions.java b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreOptions.java new file mode 100644 index 000000000..f6e91a41a --- /dev/null +++ b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/DatastoreOptions.java @@ -0,0 +1,204 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpTransport; +import java.util.Arrays; +import java.util.List; + +/** + * An immutable object containing settings for the datastore. + * + *

Example for connecting to a datastore: + * + *

+ * DatastoreOptions options = new DatastoreOptions.Builder()
+ *     .projectId("my-project-id")
+ *     .credential(DatastoreHelper.getComputeEngineCredential())
+ *     .build();
+ * DatastoreFactory.get().create(options);
+ * 
+ * + *

The options should be passed to {@link DatastoreFactory#create}. + */ +public class DatastoreOptions { + private final String projectId; + + private final String projectEndpoint; + private final String host; + private final String localHost; + + private final HttpRequestInitializer initializer; + + private final Credential credential; + private final HttpTransport transport; + public static final List SCOPES = + Arrays.asList("https://www.googleapis.com/auth/datastore"); + + DatastoreOptions(Builder b) { + checkArgument( + b.projectId != null || b.projectEndpoint != null, + "Either project ID or project endpoint must be provided."); + this.projectId = b.projectId; + this.projectEndpoint = b.projectEndpoint; + this.host = b.host; + this.localHost = b.localHost; + this.initializer = b.initializer; + this.credential = b.credential; + this.transport = b.transport; + } + + /** Builder for {@link DatastoreOptions}. */ + public static class Builder { + private static final String PROJECT_ENDPOINT_AND_PROJECT_ID_ERROR = + "Cannot set both project endpoint and project ID."; + private static final String PROJECT_ENDPOINT_AND_HOST_ERROR = + "Can set at most one of project endpoint, host, and local host."; + + private String projectId; + + private String projectEndpoint; + private String host; + private String localHost; + private HttpRequestInitializer initializer; + private Credential credential; + private HttpTransport transport; + + public Builder() {} + + public Builder(DatastoreOptions options) { + this.projectId = options.projectId; + this.projectEndpoint = options.projectEndpoint; + this.host = options.host; + this.localHost = options.localHost; + this.initializer = options.initializer; + this.credential = options.credential; + this.transport = options.transport; + } + + public DatastoreOptions build() { + return new DatastoreOptions(this); + } + + /** Sets the project ID used to access Cloud Datastore. */ + public Builder projectId(String projectId) { + checkArgument(projectEndpoint == null, PROJECT_ENDPOINT_AND_PROJECT_ID_ERROR); + this.projectId = projectId; + return this; + } + + /** + * Sets the host used to access Cloud Datastore. To connect to the Cloud Datastore Emulator, use + * {@link #localHost} instead. + */ + public Builder host(String host) { + checkArgument(projectEndpoint == null && localHost == null, PROJECT_ENDPOINT_AND_HOST_ERROR); + if (includesScheme(host)) { + throw new IllegalArgumentException( + String.format("Host \"%s\" must not include scheme.", host)); + } + this.host = host; + return this; + } + + /** + * Configures the client to access Cloud Datastore on a local host (typically a Cloud Datastore + * Emulator instance). Call this method also configures the client not to attach credentials to + * requests. + */ + public Builder localHost(String localHost) { + checkArgument(projectEndpoint == null && host == null, PROJECT_ENDPOINT_AND_HOST_ERROR); + if (includesScheme(localHost)) { + throw new IllegalArgumentException( + String.format("Local host \"%s\" must not include scheme.", localHost)); + } + this.localHost = localHost; + return this; + } + + /** + * Sets the project endpoint used to access Cloud Datastore. Prefer using {@link #projectId} + * and/or {@link #host}/{@link #localHost} when possible. + * + * @deprecated Use {@link #projectId} and/or {@link #host}/{@link #localHost} instead. + */ + @Deprecated + public Builder projectEndpoint(String projectEndpoint) { + checkArgument(projectId == null, PROJECT_ENDPOINT_AND_PROJECT_ID_ERROR); + checkArgument(localHost == null && host == null, PROJECT_ENDPOINT_AND_HOST_ERROR); + if (!includesScheme(projectEndpoint)) { + throw new IllegalArgumentException( + String.format("Project endpoint \"%s\" must include scheme.", projectEndpoint)); + } + this.projectEndpoint = projectEndpoint; + return this; + } + + /** Sets the (optional) initializer to run on HTTP requests to Cloud Datastore. */ + public Builder initializer(HttpRequestInitializer initializer) { + this.initializer = initializer; + return this; + } + + /** Sets the Google APIs {@link Credential} used to access Cloud Datastore. */ + public Builder credential(Credential credential) { + this.credential = credential; + return this; + } + + /** Sets the transport used to access Cloud Datastore. */ + public Builder transport(HttpTransport transport) { + this.transport = transport; + return this; + } + + private static boolean includesScheme(String url) { + return url.startsWith("http://") || url.startsWith("https://"); + } + } + + public String getProjectId() { + return projectId; + } + + public String getProjectEndpoint() { + return projectEndpoint; + } + + public String getHost() { + return host; + } + + public String getLocalHost() { + return localHost; + } + + public HttpRequestInitializer getInitializer() { + return initializer; + } + + public Credential getCredential() { + return credential; + } + + public HttpTransport getTransport() { + return transport; + } +} diff --git a/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/QuerySplitter.java b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/QuerySplitter.java new file mode 100644 index 000000000..31d1fd7d5 --- /dev/null +++ b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/QuerySplitter.java @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils; + +import com.google.api.core.BetaApi; +import com.google.datastore.v1.PartitionId; +import com.google.datastore.v1.Query; +import com.google.protobuf.Timestamp; +import java.util.List; + +/** Provides the ability to split a query into multiple shards. */ +public interface QuerySplitter { + + /** + * Returns a list of sharded {@link Query}s for the given query. + * + *

This will create up to the desired number of splits, however it may return less splits if + * the desired number of splits is unavailable. This will happen if the number of split points + * provided by the underlying Datastore is less than the desired number, which will occur if the + * number of results for the query is too small. + * + * @param query the query to split. + * @param partition the partition the query is running in. + * @param numSplits the desired number of splits. + * @param datastore the datastore to run on. + * @throws DatastoreException if there was a datastore error while generating query splits. + * @throws IllegalArgumentException if the given query or numSplits was invalid. + */ + List getSplits(Query query, PartitionId partition, int numSplits, Datastore datastore) + throws DatastoreException; + + /** + * Same as {@link #getSplits(Query, PartitionId, int, Datastore)} but the splits are based on + * {@code readTime}, and the returned sharded {@link Query}s should also be executed with {@code + * readTime}. Reading from a timestamp is currently a private preview feature in Datastore. + */ + @BetaApi + default List getSplits( + Query query, PartitionId partition, int numSplits, Datastore datastore, Timestamp readTime) + throws DatastoreException { + throw new UnsupportedOperationException("Not implemented."); + } +} diff --git a/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/QuerySplitterImpl.java b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/QuerySplitterImpl.java new file mode 100644 index 000000000..ac2a6557e --- /dev/null +++ b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/QuerySplitterImpl.java @@ -0,0 +1,309 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils; + +import static com.google.datastore.utils.DatastoreHelper.makeAndFilter; + +import com.google.api.core.BetaApi; +import com.google.datastore.v1.EntityResult; +import com.google.datastore.v1.Filter; +import com.google.datastore.v1.Key; +import com.google.datastore.v1.PartitionId; +import com.google.datastore.v1.Projection; +import com.google.datastore.v1.PropertyFilter; +import com.google.datastore.v1.PropertyFilter.Operator; +import com.google.datastore.v1.PropertyOrder.Direction; +import com.google.datastore.v1.PropertyReference; +import com.google.datastore.v1.Query; +import com.google.datastore.v1.QueryResultBatch; +import com.google.datastore.v1.QueryResultBatch.MoreResultsType; +import com.google.datastore.v1.ReadOptions; +import com.google.datastore.v1.RunQueryRequest; +import com.google.protobuf.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Provides the ability to split a query into multiple shards using Cloud Datastore. + * + *

This implementation of the QuerySplitter uses the __scatter__ property to gather random split + * points for a query. + */ +final class QuerySplitterImpl implements QuerySplitter { + + /** The number of keys to sample for each split. * */ + private static final int KEYS_PER_SPLIT = 32; + + private static final EnumSet UNSUPPORTED_OPERATORS = + EnumSet.of( + Operator.LESS_THAN, + Operator.LESS_THAN_OR_EQUAL, + Operator.GREATER_THAN, + Operator.GREATER_THAN_OR_EQUAL); + + static final QuerySplitter INSTANCE = new QuerySplitterImpl(); + + private QuerySplitterImpl() { + // No initialization required. + } + + @Override + public List getSplits( + Query query, PartitionId partition, int numSplits, Datastore datastore) + throws DatastoreException, IllegalArgumentException { + return getSplitsInternal(query, partition, numSplits, datastore, null); + } + + @BetaApi + @Override + public List getSplits( + Query query, PartitionId partition, int numSplits, Datastore datastore, Timestamp readTime) + throws DatastoreException, IllegalArgumentException { + return getSplitsInternal(query, partition, numSplits, datastore, readTime); + } + + private List getSplitsInternal( + Query query, + PartitionId partition, + int numSplits, + Datastore datastore, + @Nullable Timestamp readTime) + throws DatastoreException, IllegalArgumentException { + List splits = new ArrayList(numSplits); + if (numSplits == 1) { + splits.add(query); + return splits; + } + validateQuery(query); + validateSplitSize(numSplits); + + List scatterKeys = getScatterKeys(numSplits, query, partition, datastore, readTime); + Key lastKey = null; + for (Key nextKey : getSplitKey(scatterKeys, numSplits)) { + splits.add(createSplit(lastKey, nextKey, query)); + lastKey = nextKey; + } + splits.add(createSplit(lastKey, null, query)); + return splits; + } + + /** + * Verify that the given number of splits is not out of bounds. + * + * @param numSplits the number of splits. + * @throws IllegalArgumentException if the split size is invalid. + */ + private void validateSplitSize(int numSplits) throws IllegalArgumentException { + if (numSplits < 1) { + throw new IllegalArgumentException("The number of splits must be greater than 0."); + } + } + + /** + * Validates that we only have allowable filters. + * + *

Note that equality and ancestor filters are allowed, however they may result in inefficient + * sharding. + */ + private void validateFilter(Filter filter) throws IllegalArgumentException { + switch (filter.getFilterTypeCase()) { + case COMPOSITE_FILTER: + for (Filter subFilter : filter.getCompositeFilter().getFiltersList()) { + validateFilter(subFilter); + } + break; + case PROPERTY_FILTER: + if (UNSUPPORTED_OPERATORS.contains(filter.getPropertyFilter().getOp())) { + throw new IllegalArgumentException("Query cannot have any inequality filters."); + } + break; + default: + throw new IllegalArgumentException( + "Unsupported filter type: " + filter.getFilterTypeCase()); + } + } + + /** + * Verifies that the given query can be properly scattered. + * + * @param query the query to verify + * @throws IllegalArgumentException if the query is invalid. + */ + private void validateQuery(Query query) throws IllegalArgumentException { + if (query.getKindCount() != 1) { + throw new IllegalArgumentException("Query must have exactly one kind."); + } + if (query.getOrderCount() != 0) { + throw new IllegalArgumentException("Query cannot have any sort orders."); + } + if (query.hasFilter()) { + validateFilter(query.getFilter()); + } + } + + /** + * Create a new {@link Query} given the query and range. + * + * @param lastKey the previous key. If null then assumed to be the beginning. + * @param nextKey the next key. If null then assumed to be the end. + * @param query the desired query. + */ + private Query createSplit(Key lastKey, Key nextKey, Query query) { + if (lastKey == null && nextKey == null) { + return query; + } + List keyFilters = new ArrayList(); + if (query.hasFilter()) { + keyFilters.add(query.getFilter()); + } + if (lastKey != null) { + Filter lowerBound = + DatastoreHelper.makeFilter( + DatastoreHelper.KEY_PROPERTY_NAME, + PropertyFilter.Operator.GREATER_THAN_OR_EQUAL, + DatastoreHelper.makeValue(lastKey)) + .build(); + keyFilters.add(lowerBound); + } + if (nextKey != null) { + Filter upperBound = + DatastoreHelper.makeFilter( + DatastoreHelper.KEY_PROPERTY_NAME, + PropertyFilter.Operator.LESS_THAN, + DatastoreHelper.makeValue(nextKey)) + .build(); + keyFilters.add(upperBound); + } + return Query.newBuilder(query).setFilter(makeAndFilter(keyFilters)).build(); + } + + /** + * Gets a list of split keys given a desired number of splits. + * + *

This list will contain multiple split keys for each split. Only a single split key will be + * chosen as the split point, however providing multiple keys allows for more uniform sharding. + * + * @param numSplits the number of desired splits. + * @param query the user query. + * @param partition the partition to run the query in. + * @param datastore the datastore containing the data. + * @param readTime read time at which to get the split keys from the datastore. + * @throws com.google.datastore.utils.DatastoreException if there was an error when executing the + * datastore query. + */ + private List getScatterKeys( + int numSplits, + Query query, + PartitionId partition, + Datastore datastore, + @Nullable Timestamp readTime) + throws DatastoreException { + Query.Builder scatterPointQuery = createScatterQuery(query, numSplits); + + List keySplits = new ArrayList(); + + QueryResultBatch batch; + do { + RunQueryRequest.Builder scatterRequest = + RunQueryRequest.newBuilder().setPartitionId(partition).setQuery(scatterPointQuery); + scatterRequest.setProjectId(partition.getProjectId()); + scatterRequest.setDatabaseId(partition.getDatabaseId()); + if (readTime != null) { + scatterRequest.setReadOptions(ReadOptions.newBuilder().setReadTime(readTime).build()); + } + batch = datastore.runQuery(scatterRequest.build()).getBatch(); + for (EntityResult result : batch.getEntityResultsList()) { + keySplits.add(result.getEntity().getKey()); + } + scatterPointQuery.setStartCursor(batch.getEndCursor()); + scatterPointQuery + .getLimitBuilder() + .setValue(scatterPointQuery.getLimit().getValue() - batch.getEntityResultsCount()); + } while (batch.getMoreResults() == MoreResultsType.NOT_FINISHED); + Collections.sort(keySplits, DatastoreHelper.getKeyComparator()); + return keySplits; + } + + /** + * Creates a scatter query from the given user query + * + * @param query the user's query. + * @param numSplits the number of splits to create. + */ + private Query.Builder createScatterQuery(Query query, int numSplits) { + // TODO(pcostello): We can potentially support better splits with equality filters in our query + // if there exists a composite index on property, __scatter__, __key__. Until an API for + // metadata exists, this isn't possible. Note that ancestor and inequality queries fall into + // the same category. + Query.Builder scatterPointQuery = Query.newBuilder(); + scatterPointQuery.addAllKind(query.getKindList()); + scatterPointQuery.addOrder( + DatastoreHelper.makeOrder(DatastoreHelper.SCATTER_PROPERTY_NAME, Direction.ASCENDING)); + // There is a split containing entities before and after each scatter entity: + // ||---*------*------*------*------*------*------*---|| = scatter entity + // If we represent each split as a region before a scatter entity, there is an extra region + // following the last scatter point. Thus, we do not need the scatter entities for the last + // region. + scatterPointQuery.getLimitBuilder().setValue((numSplits - 1) * KEYS_PER_SPLIT); + scatterPointQuery.addProjection( + Projection.newBuilder().setProperty(PropertyReference.newBuilder().setName("__key__"))); + return scatterPointQuery; + } + + /** + * Given a list of keys and a number of splits find the keys to split on. + * + * @param keys the list of keys. + * @param numSplits the number of splits. + */ + private Iterable getSplitKey(List keys, int numSplits) { + // If the number of keys is less than the number of splits, we are limited in the number of + // splits we can make. + if (keys.size() < numSplits - 1) { + return keys; + } + + // Calculate the number of keys per split. This should be KEYS_PER_SPLIT, but may + // be less if there are not KEYS_PER_SPLIT * (numSplits - 1) scatter entities. + // + // Consider the following dataset, where - represents an entity and * represents an entity + // that is returned as a scatter entity: + // ||---*-----*----*-----*-----*------*----*----|| + // If we want 4 splits in this data, the optimal split would look like: + // ||---*-----*----*-----*-----*------*----*----|| + // | | | + // The scatter keys in the last region are not useful to us, so we never request them: + // ||---*-----*----*-----*-----*------*---------|| + // | | | + // With 6 scatter keys we want to set scatter points at indexes: 1, 3, 5. + // + // We keep this as a double so that any "fractional" keys per split get distributed throughout + // the splits and don't make the last split significantly larger than the rest. + double numKeysPerSplit = Math.max(1.0, ((double) keys.size()) / (numSplits - 1)); + + List keysList = new ArrayList(numSplits - 1); + // Grab the last sample for each split, otherwise the first split will be too small. + for (int i = 1; i < numSplits; i++) { + int splitIndex = (int) Math.round(i * numKeysPerSplit) - 1; + keysList.add(keys.get(splitIndex)); + } + + return keysList; + } +} diff --git a/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/RemoteRpc.java b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/RemoteRpc.java new file mode 100644 index 000000000..492936e15 --- /dev/null +++ b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/RemoteRpc.java @@ -0,0 +1,239 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils; + +import com.google.api.client.http.*; +import com.google.api.client.http.protobuf.ProtoHttpContent; +import com.google.api.client.util.IOUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.protobuf.MessageLite; +import com.google.rpc.Code; +import com.google.rpc.Status; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +/** + * An RPC transport that sends protocol buffers over HTTP. + * + *

This class is thread-safe. + */ +class RemoteRpc { + private static final Logger logger = Logger.getLogger(RemoteRpc.class.getName()); + + @VisibleForTesting static final String API_FORMAT_VERSION_HEADER = "X-Goog-Api-Format-Version"; + private static final String API_FORMAT_VERSION = "2"; + + @VisibleForTesting static final String X_GOOG_REQUEST_PARAMS_HEADER = "x-goog-request-params"; + + private final HttpRequestFactory client; + private final HttpRequestInitializer initializer; + private final String url; + private final AtomicInteger rpcCount = new AtomicInteger(0); + // Not final - so it can be set/reset in Unittests + private static boolean enableE2EChecksum = + Boolean.parseBoolean(System.getenv("GOOGLE_CLOUD_DATASTORE_HTTP_ENABLE_E2E_CHECKSUM")); + + RemoteRpc(HttpRequestFactory client, HttpRequestInitializer initializer, String url) { + this.client = client; + this.initializer = initializer; + this.url = url; + try { + resolveURL("dummyRpc"); + } catch (Exception e) { + throw new IllegalArgumentException( + "Unable to construct RemoteRpc due to unsupported url: <" + url + ">", e); + } + } + + /** + * Makes an RPC call using the client. Logs how long it took and any exceptions. + * + *

NOTE: The request could be an InputStream too, but the http client will need to find its + * length, which will require buffering it anyways. + * + * @throws com.google.datastore.utils.DatastoreException if the RPC fails. + */ + public InputStream call( + String methodName, MessageLite request, String projectId, String databaseId) + throws com.google.datastore.utils.DatastoreException { + logger.fine("remote datastore call " + methodName); + + long startTime = System.currentTimeMillis(); + try { + HttpResponse httpResponse; + try { + rpcCount.incrementAndGet(); + ProtoHttpContent payload = new ProtoHttpContent(request); + HttpRequest httpRequest = client.buildPostRequest(resolveURL(methodName), payload); + setHeaders(request, httpRequest, projectId, databaseId); + // Don't throw an HTTPResponseException on error. It converts the response to a String and + // throws away the original, whereas we need the raw bytes to parse it as a proto. + httpRequest.setThrowExceptionOnExecuteError(false); + // Datastore requests typically time out after 60s; set the read timeout to slightly longer + // than that by default (can be overridden via the HttpRequestInitializer). + httpRequest.setReadTimeout(65 * 1000); + if (initializer != null) { + initializer.initialize(httpRequest); + } + httpResponse = httpRequest.execute(); + if (!httpResponse.isSuccessStatusCode()) { + try (InputStream content = httpResponse.getContent()) { + throw makeException( + url, + methodName, + content, + httpResponse.getContentType(), + httpResponse.getContentCharset(), + null, + httpResponse.getStatusCode()); + } + } + InputStream inputStream = httpResponse.getContent(); + return inputStream; + } catch (SocketTimeoutException e) { + throw makeException(url, methodName, Code.DEADLINE_EXCEEDED, "Deadline exceeded", e); + } catch (IOException e) { + throw makeException(url, methodName, Code.UNAVAILABLE, "I/O error", e); + } + } finally { + long elapsedTime = System.currentTimeMillis() - startTime; + logger.fine("remote datastore call " + methodName + " took " + elapsedTime + " ms"); + } + } + + @VisibleForTesting + void setHeaders( + MessageLite request, HttpRequest httpRequest, String projectId, String databaseId) { + httpRequest.getHeaders().put(API_FORMAT_VERSION_HEADER, API_FORMAT_VERSION); + StringBuilder builder = new StringBuilder("project_id="); + builder.append(projectId); + if (!Strings.isNullOrEmpty(databaseId)) { + builder.append("&database_id="); + builder.append(databaseId); + } + httpRequest.getHeaders().put(X_GOOG_REQUEST_PARAMS_HEADER, builder.toString()); + } + + @VisibleForTesting + HttpRequestFactory getClient() { + return client; + } + + @VisibleForTesting + static void setSystemEnvE2EChecksum(boolean enableE2EChecksum) { + RemoteRpc.enableE2EChecksum = enableE2EChecksum; + } + + void resetRpcCount() { + rpcCount.set(0); + } + + int getRpcCount() { + return rpcCount.get(); + } + + public String getUrl() { + return url; + } + + GenericUrl resolveURL(String path) { + return new GenericUrl(url + ":" + path); + } + + HttpRequestFactory getHttpRequestFactory() { + return client; + } + + public static com.google.datastore.utils.DatastoreException makeException( + String url, String methodName, Code code, String message, Throwable cause) { + logger.fine("remote datastore call " + methodName + " against " + url + " failed: " + message); + return new com.google.datastore.utils.DatastoreException(methodName, code, message, cause); + } + + static DatastoreException makeException( + String url, + String methodName, + InputStream content, + String contentType, + Charset contentCharset, + Throwable cause, + int httpStatusCode) { + if (!contentType.equals("application/x-protobuf")) { + String responseContent; + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + IOUtils.copy(content, out, false); + responseContent = out.toString(contentCharset.name()); + } catch (IOException e) { + responseContent = ""; + } + return makeException( + url, + methodName, + Code.INTERNAL, + String.format( + "Non-protobuf error: %s. HTTP status code was %d.", responseContent, httpStatusCode), + cause); + } + + Status rpcStatus; + try { + rpcStatus = Status.parseFrom(content); + } catch (IOException e) { + return makeException( + url, + methodName, + Code.INTERNAL, + String.format( + "Unable to parse Status protocol buffer: HTTP status code was %s.", httpStatusCode), + e); + } + + Code code = Code.forNumber(rpcStatus.getCode()); + if (code == null) { + return makeException( + url, + methodName, + Code.INTERNAL, + String.format( + "Invalid error code: %d. Message: %s.", rpcStatus.getCode(), rpcStatus.getMessage()), + cause); + } else if (code == Code.OK) { + // We can end up here because there was no response body (and we successfully parsed an + // empty Status message). This may happen for 401s in particular due to special handling + // in low-level HTTP libraries. + if (httpStatusCode == HttpStatusCodes.STATUS_CODE_UNAUTHORIZED) { + return makeException(url, methodName, Code.UNAUTHENTICATED, "Unauthenticated.", cause); + } + return makeException( + url, + methodName, + Code.INTERNAL, + String.format( + "Unexpected OK error code with HTTP status code of %d. Message: %s.", + httpStatusCode, rpcStatus.getMessage()), + cause); + } + + return makeException(url, methodName, code, rpcStatus.getMessage(), cause); + } +} diff --git a/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/testing/MockCredential.java b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/testing/MockCredential.java new file mode 100644 index 000000000..d5d16bb65 --- /dev/null +++ b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/testing/MockCredential.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils.testing; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequest; +import java.io.IOException; + +/** Fake credential used for testing purpose. */ +public class MockCredential extends Credential { + public MockCredential() { + super( + new AccessMethod() { + @Override + public void intercept(HttpRequest request, String accessToken) throws IOException {} + + @Override + public String getAccessTokenFromRequest(HttpRequest request) { + return "MockAccessToken"; + } + }); + } +} diff --git a/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/testing/MockDatastoreFactory.java b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/testing/MockDatastoreFactory.java new file mode 100644 index 000000000..d4dd5caef --- /dev/null +++ b/google-cloud-datastore-utils/src/main/java/com/google/datastore/utils/testing/MockDatastoreFactory.java @@ -0,0 +1,132 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils.testing; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.*; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; +import com.google.api.client.testing.util.TestableByteArrayInputStream; +import com.google.common.collect.Iterables; +import com.google.datastore.utils.DatastoreFactory; +import com.google.datastore.utils.DatastoreOptions; +import com.google.protobuf.Message; +import com.google.rpc.Code; +import com.google.rpc.Status; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; + +/** Fake Datastore factory used for testing purposes when a true Datastore service is not needed. */ +public class MockDatastoreFactory extends DatastoreFactory { + private int nextStatus; + private Message nextResponse; + private Status nextError; + private IOException nextException; + + private String lastPath; + private String lastMimeType; + private byte[] lastBody; + private List lastCookies; + private String lastApiFormatHeaderValue; + + public void setNextResponse(Message response) { + nextStatus = HttpStatusCodes.STATUS_CODE_OK; + nextResponse = response; + nextError = null; + nextException = null; + } + + public void setNextError(int status, Code code, String message) { + nextStatus = status; + nextResponse = null; + nextError = makeErrorContent(message, code); + nextException = null; + } + + public void setNextException(IOException exception) { + nextStatus = 0; + nextResponse = null; + nextError = null; + nextException = exception; + } + + @Override + public HttpRequestFactory makeClient(DatastoreOptions options) { + HttpTransport transport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest(url) { + @Override + public LowLevelHttpResponse execute() throws IOException { + lastPath = new GenericUrl(getUrl()).getRawPath(); + lastMimeType = getContentType(); + lastCookies = getHeaderValues("Cookie"); + lastApiFormatHeaderValue = + Iterables.getOnlyElement(getHeaderValues("X-Goog-Api-Format-Version")); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + getStreamingContent().writeTo(out); + lastBody = out.toByteArray(); + if (nextException != null) { + throw nextException; + } + MockLowLevelHttpResponse response = + new MockLowLevelHttpResponse() + .setStatusCode(nextStatus) + .setContentType("application/x-protobuf"); + if (nextError != null) { + checkState(nextResponse == null); + response.setContent(new TestableByteArrayInputStream(nextError.toByteArray())); + } else { + response.setContent(new TestableByteArrayInputStream(nextResponse.toByteArray())); + } + return response; + } + }; + } + }; + Credential credential = options.getCredential(); + return transport.createRequestFactory(credential); + } + + public String getLastPath() { + return lastPath; + } + + public String getLastMimeType() { + return lastMimeType; + } + + public String getLastApiFormatHeaderValue() { + return lastApiFormatHeaderValue; + } + + public byte[] getLastBody() { + return lastBody; + } + + public List getLastCookies() { + return lastCookies; + } + + private static Status makeErrorContent(String message, Code code) { + return Status.newBuilder().setCode(code.getNumber()).setMessage(message).build(); + } +} diff --git a/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/QuerySplitterTest.java b/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/QuerySplitterTest.java new file mode 100644 index 000000000..483eca82b --- /dev/null +++ b/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/QuerySplitterTest.java @@ -0,0 +1,378 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.datastore.utils.DatastoreHelper.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertThrows; + +import com.google.datastore.utils.testing.MockCredential; +import com.google.datastore.utils.testing.MockDatastoreFactory; +import com.google.datastore.v1.*; +import com.google.datastore.v1.EntityResult.ResultType; +import com.google.datastore.v1.PropertyFilter.Operator; +import com.google.datastore.v1.PropertyOrder.Direction; +import com.google.datastore.v1.QueryResultBatch.MoreResultsType; +import com.google.protobuf.Int32Value; +import com.google.protobuf.Timestamp; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link com.google.datastore.utils.QuerySplitterImpl}. */ +@RunWith(JUnit4.class) +public class QuerySplitterTest { + private static final String PROJECT_ID = "project-id"; + private static final PartitionId PARTITION = + PartitionId.newBuilder().setProjectId(PROJECT_ID).build(); + private static final String KIND = "test-kind"; + + private DatastoreFactory factory = new MockDatastoreFactory(); + private com.google.datastore.utils.DatastoreOptions.Builder options = + new DatastoreOptions.Builder().projectId(PROJECT_ID).credential(new MockCredential()); + + private Filter propertyFilter = makeFilter("foo", Operator.EQUAL, makeValue("value")).build(); + + private Query query = + Query.newBuilder() + .addKind(KindExpression.newBuilder().setName(KIND).build()) + .setFilter(propertyFilter) + .build(); + + private Query splitQuery = + Query.newBuilder() + .addKind(KindExpression.newBuilder().setName(KIND).build()) + .addOrder(makeOrder("__scatter__", Direction.ASCENDING)) + .addProjection(Projection.newBuilder().setProperty(makePropertyReference("__key__"))) + .build(); + + private Key splitKey0 = makeKey(KIND, String.format("%05d", 1)).setPartitionId(PARTITION).build(); + private Key splitKey1 = + makeKey(KIND, String.format("%05d", 101)).setPartitionId(PARTITION).build(); + private Key splitKey2 = + makeKey(KIND, String.format("%05d", 201)).setPartitionId(PARTITION).build(); + private Key splitKey3 = + makeKey(KIND, String.format("%05d", 301)).setPartitionId(PARTITION).build(); + + @Test + public void disallowsSortOrder() { + com.google.datastore.utils.Datastore datastore = factory.create(options.build()); + Query queryWithOrder = + query.toBuilder().addOrder(makeOrder("bar", Direction.ASCENDING)).build(); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + com.google.datastore.utils.QuerySplitterImpl.INSTANCE.getSplits( + queryWithOrder, PARTITION, 2, datastore)); + assertThat(exception).hasMessageThat().contains("Query cannot have any sort orders."); + } + + @Test + public void disallowsMultipleKinds() { + com.google.datastore.utils.Datastore datastore = factory.create(options.build()); + Query queryWithMultipleKinds = + query + .toBuilder() + .addKind(KindExpression.newBuilder().setName("another-kind").build()) + .build(); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + com.google.datastore.utils.QuerySplitterImpl.INSTANCE.getSplits( + queryWithMultipleKinds, PARTITION, 2, datastore)); + assertThat(exception).hasMessageThat().contains("Query must have exactly one kind."); + } + + @Test + public void disallowsKindlessQuery() { + com.google.datastore.utils.Datastore datastore = factory.create(options.build()); + Query kindlessQuery = query.toBuilder().clearKind().build(); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + com.google.datastore.utils.QuerySplitterImpl.INSTANCE.getSplits( + kindlessQuery, PARTITION, 2, datastore)); + assertThat(exception).hasMessageThat().contains("Query must have exactly one kind."); + } + + @Test + public void disallowsInequalityFilter() { + com.google.datastore.utils.Datastore datastore = factory.create(options.build()); + Query queryWithInequality = + query + .toBuilder() + .setFilter(makeFilter("foo", Operator.GREATER_THAN, makeValue("value"))) + .build(); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + com.google.datastore.utils.QuerySplitterImpl.INSTANCE.getSplits( + queryWithInequality, PARTITION, 2, datastore)); + assertThat(exception).hasMessageThat().contains("Query cannot have any inequality filters."); + } + + @Test + public void splitsMustBePositive() { + com.google.datastore.utils.Datastore datastore = factory.create(options.build()); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + com.google.datastore.utils.QuerySplitterImpl.INSTANCE.getSplits( + query, PARTITION, 0, datastore)); + assertThat(exception).hasMessageThat().contains("The number of splits must be greater than 0."); + } + + @Test + public void getSplits() throws Exception { + com.google.datastore.utils.Datastore datastore = factory.create(options.build()); + MockDatastoreFactory mockClient = (MockDatastoreFactory) factory; + + RunQueryResponse splitQueryResponse = + RunQueryResponse.newBuilder() + .setQuery(splitQuery) + .setBatch( + QueryResultBatch.newBuilder() + .setEntityResultType(ResultType.KEY_ONLY) + .setMoreResults(MoreResultsType.NO_MORE_RESULTS) + .addEntityResults(makeKeyOnlyEntity(splitKey0)) + .addEntityResults(makeKeyOnlyEntity(splitKey1)) + .addEntityResults(makeKeyOnlyEntity(splitKey2)) + .addEntityResults(makeKeyOnlyEntity(splitKey3)) + .build()) + .build(); + + mockClient.setNextResponse(splitQueryResponse); + + List splittedQueries = + com.google.datastore.utils.QuerySplitterImpl.INSTANCE.getSplits( + query, PARTITION, 3, datastore); + + assertThat(splittedQueries) + .containsExactly( + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, null, splitKey1)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey1, splitKey3)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey3, null)) + .build()); + + RunQueryRequest expectedSplitQueryRequest = + RunQueryRequest.newBuilder() + .setPartitionId(PARTITION) + .setProjectId(PROJECT_ID) + .setQuery( + splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build())) + .build(); + + assertArrayEquals(expectedSplitQueryRequest.toByteArray(), mockClient.getLastBody()); + } + + @Test + public void getSplitsWithDatabaseId() throws Exception { + com.google.datastore.utils.Datastore datastore = factory.create(options.build()); + MockDatastoreFactory mockClient = (MockDatastoreFactory) factory; + + PartitionId partition = + PartitionId.newBuilder().setProjectId(PROJECT_ID).setDatabaseId("test-database").build(); + + RunQueryResponse splitQueryResponse = + RunQueryResponse.newBuilder() + .setQuery(splitQuery) + .setBatch( + QueryResultBatch.newBuilder() + .setEntityResultType(ResultType.KEY_ONLY) + .setMoreResults(MoreResultsType.NO_MORE_RESULTS) + .addEntityResults(makeKeyOnlyEntity(splitKey0)) + .addEntityResults(makeKeyOnlyEntity(splitKey1)) + .addEntityResults(makeKeyOnlyEntity(splitKey2)) + .addEntityResults(makeKeyOnlyEntity(splitKey3)) + .build()) + .build(); + + mockClient.setNextResponse(splitQueryResponse); + + List splitQueries = + com.google.datastore.utils.QuerySplitterImpl.INSTANCE.getSplits( + query, partition, 3, datastore); + + assertThat(splitQueries) + .containsExactly( + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, null, splitKey1)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey1, splitKey3)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey3, null)) + .build()); + + RunQueryRequest expectedSplitQueryRequest = + RunQueryRequest.newBuilder() + .setPartitionId(partition) + .setProjectId(PROJECT_ID) + .setDatabaseId("test-database") + .setQuery( + splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build())) + .build(); + + assertArrayEquals(expectedSplitQueryRequest.toByteArray(), mockClient.getLastBody()); + } + + @Test + public void notEnoughSplits() throws Exception { + com.google.datastore.utils.Datastore datastore = factory.create(options.build()); + MockDatastoreFactory mockClient = (MockDatastoreFactory) factory; + + RunQueryResponse splitQueryResponse = + RunQueryResponse.newBuilder() + .setQuery(splitQuery) + .setBatch( + QueryResultBatch.newBuilder() + .setEntityResultType(ResultType.KEY_ONLY) + .setMoreResults(MoreResultsType.NO_MORE_RESULTS) + .addEntityResults(makeKeyOnlyEntity(splitKey0)) + .build()) + .build(); + + mockClient.setNextResponse(splitQueryResponse); + + List splittedQueries = + com.google.datastore.utils.QuerySplitterImpl.INSTANCE.getSplits( + query, PARTITION, 100, datastore); + + assertThat(splittedQueries) + .containsExactly( + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, null, splitKey0)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey0, null)) + .build()); + + RunQueryRequest expectedSplitQueryRequest = + RunQueryRequest.newBuilder() + .setPartitionId(PARTITION) + .setProjectId(PROJECT_ID) + .setQuery( + splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(99 * 32).build())) + .build(); + + assertArrayEquals(expectedSplitQueryRequest.toByteArray(), mockClient.getLastBody()); + } + + @Test + public void getSplits_withReadTime() throws Exception { + Datastore datastore = factory.create(options.build()); + MockDatastoreFactory mockClient = (MockDatastoreFactory) factory; + + RunQueryResponse splitQueryResponse = + RunQueryResponse.newBuilder() + .setQuery(splitQuery) + .setBatch( + QueryResultBatch.newBuilder() + .setEntityResultType(ResultType.KEY_ONLY) + .setMoreResults(MoreResultsType.NO_MORE_RESULTS) + .addEntityResults(makeKeyOnlyEntity(splitKey0)) + .addEntityResults(makeKeyOnlyEntity(splitKey1)) + .addEntityResults(makeKeyOnlyEntity(splitKey2)) + .addEntityResults(makeKeyOnlyEntity(splitKey3)) + .build()) + .build(); + + mockClient.setNextResponse(splitQueryResponse); + + Timestamp readTime = Timestamp.newBuilder().setSeconds(1654651341L).build(); + + List splittedQueries = + com.google.datastore.utils.QuerySplitterImpl.INSTANCE.getSplits( + query, PARTITION, 3, datastore, readTime); + + assertThat(splittedQueries) + .containsExactly( + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, null, splitKey1)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey1, splitKey3)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey3, null)) + .build()); + + RunQueryRequest expectedSplitQueryRequest = + RunQueryRequest.newBuilder() + .setPartitionId(PARTITION) + .setProjectId(PROJECT_ID) + .setQuery( + splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build())) + .setReadOptions(ReadOptions.newBuilder().setReadTime(readTime)) + .build(); + + assertArrayEquals(expectedSplitQueryRequest.toByteArray(), mockClient.getLastBody()); + } + + private static EntityResult makeKeyOnlyEntity(Key key) { + return EntityResult.newBuilder().setEntity(Entity.newBuilder().setKey(key).build()).build(); + } + + private static Filter makeFilterWithKeyRange(Filter originalFilter, Key startKey, Key endKey) { + Filter startKeyFilter = + startKey == null + ? null + : makeFilter("__key__", Operator.GREATER_THAN_OR_EQUAL, makeValue(startKey)).build(); + + Filter endKeyFilter = + endKey == null + ? null + : makeFilter("__key__", Operator.LESS_THAN, makeValue(endKey)).build(); + + if (startKeyFilter == null && endKeyFilter == null) { + throw new IllegalArgumentException(); + } + + if (startKeyFilter != null && endKeyFilter == null) { + return makeAndFilter(originalFilter, startKeyFilter).build(); + } + + if (startKeyFilter == null && endKeyFilter != null) { + return makeAndFilter(originalFilter, endKeyFilter).build(); + } + + return makeAndFilter(originalFilter, startKeyFilter, endKeyFilter).build(); + } +} diff --git a/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/it/ITDatastoreProtoClientTest.java b/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/it/ITDatastoreProtoClientTest.java new file mode 100644 index 000000000..d30c1cbdc --- /dev/null +++ b/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/it/ITDatastoreProtoClientTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.utils.it; + +import static com.google.datastore.utils.DatastoreHelper.makeFilter; +import static com.google.datastore.utils.DatastoreHelper.makeValue; + +import com.google.common.truth.Truth; +import com.google.datastore.utils.Datastore; +import com.google.datastore.utils.DatastoreException; +import com.google.datastore.utils.DatastoreHelper; +import com.google.datastore.v1.*; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +public class ITDatastoreProtoClientTest { + + private static Datastore DATASTORE; + + private static PartitionId PARTITION; + + private static final String KIND = "test-kind"; + private static final String PROJECT_ID = System.getenv(DatastoreHelper.PROJECT_ID_ENV_VAR); + + @Before + public void setUp() throws GeneralSecurityException, IOException { + DATASTORE = DatastoreHelper.getDatastoreFromEnv(); + } + + @Test + public void testQuerySplitterWithDefaultDb() throws DatastoreException { + Filter propertyFilter = + makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build(); + Query query = + Query.newBuilder() + .addKind(KindExpression.newBuilder().setName(KIND).build()) + .setFilter(propertyFilter) + .build(); + + PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).build(); + + List splits = + DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE); + Truth.assertThat(splits).isNotEmpty(); + splits.forEach( + split -> { + Truth.assertThat(split.getKind(0).getName()).isEqualTo(KIND); + Truth.assertThat(split.getFilter()).isEqualTo(propertyFilter); + }); + } + + @Test + public void testQuerySplitterWithDb() throws DatastoreException { + Filter propertyFilter = + makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build(); + Query query = + Query.newBuilder() + .addKind(KindExpression.newBuilder().setName(KIND).build()) + .setFilter(propertyFilter) + .build(); + + PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).setDatabaseId("test-db").build(); + + List splits = + DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE); + + Truth.assertThat(splits).isNotEmpty(); + splits.forEach( + split -> { + Truth.assertThat(split.getKind(0).getName()).isEqualTo(KIND); + Truth.assertThat(split.getFilter()).isEqualTo(propertyFilter); + }); + } +} diff --git a/pom.xml b/pom.xml index aec03ebc7..6d29420d6 100644 --- a/pom.xml +++ b/pom.xml @@ -181,6 +181,11 @@ grpc-google-cloud-datastore-v1 2.22.0-grpc-experimental-1-SNAPSHOT + + com.google.cloud + google-cloud-datastore-utils + 2.20.0-grpc-experimental-1-SNAPSHOT + com.google.cloud.datastore datastore-v1-proto-client @@ -277,6 +282,7 @@ grpc-google-cloud-datastore-v1 datastore-v1-proto-client google-cloud-datastore-bom + google-cloud-datastore-utils