diff --git a/.github/workflows/samples.yaml b/.github/workflows/samples.yaml index ee0fb26ca..a62d98000 100644 --- a/.github/workflows/samples.yaml +++ b/.github/workflows/samples.yaml @@ -60,6 +60,11 @@ jobs: run: | pip install -r requirements.txt python sample.py + - name: Run connectorx sample tests + working-directory: ./samples/python/connectorx + run: | + pip install -r requirements.txt + python connectorx_sample.py nodejs-samples: runs-on: ubuntu-latest steps: diff --git a/README.md b/README.md index aabc975af..624a8668d 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ PGAdapter can be used with the following drivers and clients: 1. `pgx`: Version 4.15 and higher are supported. See [pgx support](docs/pgx.md) for more details. 1. `psycopg2`: Version 2.9.3 and higher are supported. See [psycopg2](docs/psycopg2.md) for more details. 1. `psycopg3`: Version 3.1.x and higher are supported. See [psycopg3 support](docs/psycopg3.md) for more details. +1. `connectorx`: Version 0.3.3 and higher have experimental support. See [connectorx sample](samples/python/connectorx) for more details. 1. `node-postgres`: Version 8.8.0 and higher are supported. See [node-postgres support](docs/node-postgres.md) for more details. 1. `npgsql`: Version 6.0.x and higher have experimental support. See [npgsql support](docs/npgsql.md) for more details. 1. `PDO_PGSQL`: The PHP PDO driver has experimental support. See [PHP PDO](docs/pdo.md) for more details. diff --git a/samples/python/connectorx/README.md b/samples/python/connectorx/README.md new file mode 100644 index 000000000..5c38f5d8f --- /dev/null +++ b/samples/python/connectorx/README.md @@ -0,0 +1,20 @@ +# connectorx sample + +This sample application shows how to connect to Cloud Spanner through PGAdapter using +[connectorx](https://sfu-db.github.io/connector-x/intro.html). +PGAdapter is automatically started in a Docker container by the sample application. + +Run the sample on the Spanner Emulator with the following command: + +```shell +python connectorx_sample.py +``` + +Run the sample on a real Spanner database with the following command: + +```shell +python connectorx_sample.py \ + -p my-project \ + -i my-instance \ + -i my-database +``` diff --git a/samples/python/connectorx/connectorx_sample.py b/samples/python/connectorx/connectorx_sample.py new file mode 100644 index 000000000..56cda1bfc --- /dev/null +++ b/samples/python/connectorx/connectorx_sample.py @@ -0,0 +1,98 @@ +# Copyright 2024 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Sample for connecting to PGAdapter with connectorx + +This sample application shows how to connect to PGAdapter and Cloud Spanner +with connectorx. +The sample starts PGAdapter in an embedded Docker container and then connects +through this container to the Cloud Spanner emulator or real Cloud Spanner. + +The sample uses the Cloud Spanner emulator by default. + +This sample requires Docker to be installed on the local environment. + +Usage (emulator): + python connectorx_sample.py + +Usage (Cloud Spanner): + python connectorx_sample.py -p my-project -i my-instance -d my-database +""" + +import argparse +import connectorx as cx +from pgadapter import start_pgadapter + +parser = argparse.ArgumentParser( + prog='PGAdapter connectorx sample', + description='Sample application for using connectorx with PGAdapter') +parser.add_argument('-e', '--emulator', required=False, default=None, + help="Set this option to 'True' to force PGAdapter to connect to the emulator.") +parser.add_argument('-p', '--project', default="my-project", + help="The Google Cloud project containing the Cloud Spanner instance that PGAdapter should connect to.") +parser.add_argument('-i', '--instance', default="my-instance", + help="The Cloud Spanner instance that PGAdapter should connect to.") +parser.add_argument('-d', '--database', default="my-database", + help="The Cloud Spanner database that connectorx should connect to.") +parser.add_argument('-c', '--credentials', required=False, + help="The credentials file that PGAdapter should use to connect to Cloud Spanner. If None, then the sample application will try to use the default credentials in the environment.") +args = parser.parse_args() + +if (args.project == "my-project" + and args.instance == "my-instance" + and args.database == "my-database" + and args.emulator is None): + use_emulator = True +else: + if args.emulator is None: + use_emulator = False + else: + use_emulator = args.emulator == "True" + +# Start PGAdapter in an embedded container. +container, port = start_pgadapter(args.project, + args.instance, + use_emulator, + args.credentials) +try: + print("PGAdapter running on port ", port, "\n") + + # Connect to Cloud Spanner using connectorx by connecting to PGAdapter that is + # running in the embedded container. The username/password combination is + # ignored by PGAdapter. + postgres_url = ("postgresql://localhost:{port}/{database}?sslmode=disable" + .format(port=port, database=args.database)) + query = "SELECT 'Hello World!' as greeting" + result = cx.read_sql(postgres_url, query) + print("Greeting from Cloud Spanner PostgreSQL:\n", result, "\n") + + # connectorx by default uses COPY TO STDOUT. PGAdapter by default tries to + # use PartitionQuery for COPY TO STDOUT operations. This is the most efficient + # for large data sets, as the partitions can be read in parallel by PGAdapter. + # You can instruct PGAdapter to use the ExecuteStreamingSql endpoint directly + # for queries that you know will not benefit from PartitionQuery. + # See https://cloud.google.com/spanner/docs/reads#read_data_in_parallel for + # more information on PartitionQuery. + # Add 'options=-c spanner.copy_partition_query=false' to the connection URL + # to use ExecuteStreamingSql directly. + postgres_url = ("postgresql://localhost:{port}/{database}?sslmode=disable" + "&options=-c spanner.copy_partition_query=false" + .format(port=port, database=args.database)) + query = "SELECT 'Hello World - Non Partitioned!' as non_partitioned_greeting" + result = cx.read_sql(postgres_url, query) + print("Greeting from Cloud Spanner PostgreSQL:\n", result, "\n") + +finally: + if container is not None: + container.stop() diff --git a/samples/python/connectorx/pgadapter.py b/samples/python/connectorx/pgadapter.py new file mode 100644 index 000000000..efcb57680 --- /dev/null +++ b/samples/python/connectorx/pgadapter.py @@ -0,0 +1,123 @@ +# Copyright 2023 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utility for starting and stopping PGAdapter in an embedded container + +Defines functions for starting and stopping PGAdapter in an embedded Docker +container. Requires that Docker is installed on the local system. +""" + +import io +import json +import os +import socket +import time +import google.auth +import google.oauth2.credentials +import google.oauth2.service_account +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs + + +def start_pgadapter(project: str, + instance: str, + emulator: bool = False, + credentials: str = None) -> (DockerContainer, str): + """Starts PGAdapter in an embedded Docker container + + Starts PGAdapter in an embedded Docker container and returns the TCP port + number where PGAdapter is listening for incoming connections. You can Use any + standard PostgreSQL driver to connect to this port. + + Parameters + ---------- + project : str + The Google Cloud project that PGAdapter should connect to. + instance : str + The Cloud Spanner instance that PGAdapter should connect to. + emulator: bool + Whether PGAdapter should connect to the Cloud Spanner emulator or real + Cloud Spanner. + credentials : str or None + The credentials file that PGAdapter should use. If None, then this + function will try to load the default credentials from the environment. + + Returns + ------- + container, port : tuple[DockerContainer, str] + The Docker container running PGAdapter and + the port where PGAdapter is listening. Connect to this port on localhost + with a standard PostgreSQL driver to connect to Cloud Spanner. + """ + + if emulator: + # Start PGAdapter with the Cloud Spanner emulator in a Docker container + container =( + DockerContainer("gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator") + .with_exposed_ports(5432) + .with_command("-p " + project + " -i " + instance)) + container.start() + else: + # Start PGAdapter in a Docker container + container = DockerContainer("gcr.io/cloud-spanner-pg-adapter/pgadapter") \ + .with_exposed_ports(5432) \ + .with_command(" -p " + project + + " -i " + instance + + " -x -c /credentials.json") + container.start() + # Determine the credentials that should be used by PGAdapter and write these + # to a file in the container. + credentials_info = _determine_credentials(credentials) + container.exec("sh -c 'cat <> /credentials.json\n" + + json.dumps(credentials_info, indent=0) + + "\nEOT'") + # Wait until PGAdapter has started and is listening on the exposed port. + wait_for_logs(container, "PostgreSQL version:") + port = container.get_exposed_port("5432") + _wait_for_port(port=int(port)) + return container, port + + +def _determine_credentials(credentials: str): + if credentials is None: + explicit_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") + else: + explicit_file = credentials + if explicit_file is None: + credentials, _ = google.auth.default() + if type(credentials).__name__ == \ + google.oauth2.credentials.Credentials.__name__: + info = json.loads(credentials.to_json()) + info["type"] = "authorized_user" + else: + raise ValueError("GOOGLE_APPLICATION_CREDENTIALS has not been set " + "and no explicit credentials were supplied") + else: + with io.open(explicit_file, "r") as file_obj: + info = json.load(file_obj) + return info + + +def _wait_for_port(port: int, poll_interval: float = 0.1, timeout: float = 5.0): + start = time.time() + while True: + try: + with socket.create_connection(("localhost", port), timeout=timeout): + break + except OSError: + duration = time.time() - start + if timeout and duration > timeout: + raise TimeoutError("container did not listen on port {} in {} seconds" + .format(port, timeout)) + time.sleep(poll_interval) diff --git a/samples/python/connectorx/requirements.txt b/samples/python/connectorx/requirements.txt new file mode 100644 index 000000000..fe91aab84 --- /dev/null +++ b/samples/python/connectorx/requirements.txt @@ -0,0 +1,6 @@ +connectorx==0.3.3 +pandas==2.2.2 +testcontainers~=4.8.0 +requests==2.32.3 +google~=3.0.0 +google.auth~=2.34.0 diff --git a/src/main/java/com/google/cloud/spanner/pgadapter/statements/BackendConnection.java b/src/main/java/com/google/cloud/spanner/pgadapter/statements/BackendConnection.java index d9dd99c88..098604574 100644 --- a/src/main/java/com/google/cloud/spanner/pgadapter/statements/BackendConnection.java +++ b/src/main/java/com/google/cloud/spanner/pgadapter/statements/BackendConnection.java @@ -1063,7 +1063,8 @@ public void initSessionSetting(String name, String value) { AbstractStatementParser statementParser = AbstractStatementParser.getInstance(Dialect.POSTGRESQL); if ("options".equalsIgnoreCase(name)) { - String[] commands = value.split("-c\\s+"); + // Some drivers encode spaces as '+'. + String[] commands = value.split("-c[\\s+]+"); for (String command : commands) { // Special case: If the setting is one that is handled by the Connection API, then we need // to execute the statement on the connection instead.