diff --git a/.gitignore b/.gitignore
index a4512f9f794d..47c44171076d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -66,3 +66,4 @@ presto-native-execution/deps-install
# Compiled executables used for docker build
/docker/presto-cli-*-executable.jar
/docker/presto-server-*.tar.gz
+/docker/presto-remote-function-server-executable.jar
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 684ce522c9f7..4255e382ad64 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -3,12 +3,15 @@ FROM quay.io/centos/centos:stream9
ARG PRESTO_VERSION
ARG PRESTO_PKG=presto-server-$PRESTO_VERSION.tar.gz
ARG PRESTO_CLI_JAR=presto-cli-$PRESTO_VERSION-executable.jar
+ARG PRESTO_REMOTE_SERVER_JAR=presto-remote-function-server-executable.jar
ARG JMX_PROMETHEUS_JAVAAGENT_VERSION=0.20.0
ENV PRESTO_HOME="/opt/presto-server"
COPY $PRESTO_PKG .
COPY $PRESTO_CLI_JAR /opt/presto-cli
+COPY $PRESTO_REMOTE_SERVER_JAR /opt/presto-remote-server
+
RUN dnf install -y java-11-openjdk less procps python3 \
&& ln -s $(which python3) /usr/bin/python \
@@ -19,6 +22,7 @@ RUN dnf install -y java-11-openjdk less procps python3 \
&& rm -rf ./presto-server-$PRESTO_VERSION \
&& chmod +x /opt/presto-cli \
&& ln -s /opt/presto-cli /usr/local/bin/ \
+ && chmod +x /opt/presto-remote-server \
# clean cache jobs
&& mv /etc/yum/protected.d/systemd.conf /etc/yum/protected.d/systemd.conf.bak \
&& dnf clean all \
diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh
index d4df39e601fb..72dc38e6bbfa 100755
--- a/docker/entrypoint.sh
+++ b/docker/entrypoint.sh
@@ -2,4 +2,6 @@
set -e
+java -Dconfig=/opt/function-server/etc/config.properties -jar /opt/presto-remote-server >> log1.txt 2>&1
+
$PRESTO_HOME/bin/launcher run
diff --git a/presto-native-execution/pom.xml b/presto-native-execution/pom.xml
index 3dcfe4a02a1d..8ec2ae5aa179 100644
--- a/presto-native-execution/pom.xml
+++ b/presto-native-execution/pom.xml
@@ -362,6 +362,12 @@
presto-cli-*-executable.jar
+
+ ${project.parent.basedir}/presto-main/target
+
+ presto-remote-function-server-executable.jar
+
+
${project.parent.basedir}/presto-server/target
@@ -433,7 +439,7 @@
Release
presto-native-dependency:latest
- -DPRESTO_ENABLE_TESTING=OFF
+ -DPRESTO_ENABLE_REMOTE_FUNCTIONS=ON
2
ubuntu:22.04
ubuntu
diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerUtils.java
index 6f29f57b3da9..03941e5523ab 100644
--- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerUtils.java
+++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerUtils.java
@@ -83,6 +83,19 @@ public static void createNativeWorkerConfigProperties(int coordinatorPort, Strin
createPropertiesFile("testcontainers/" + nodeId + "/etc/config.properties", properties);
}
+ public static void createNativeWorkerConfigPropertiesWithFunctionServer(int coordinatorPort, int functionServerPort, String nodeId)
+ throws IOException
+ {
+ Properties properties = new Properties();
+ properties.setProperty("presto.version", "testversion");
+ properties.setProperty("http-server.http.port", "7777");
+ properties.setProperty("discovery.uri", "http://presto-coordinator:" + coordinatorPort);
+ properties.setProperty("system-memory-gb", "2");
+ properties.setProperty("native.sidecar", "false");
+ properties.setProperty("remote-function-server.rest.url", "http://presto-coordinator:" + functionServerPort);
+ createPropertiesFile("testcontainers/" + nodeId + "/etc/config.properties", properties);
+ }
+
public static void createCoordinatorConfigProperties(int port)
throws IOException
{
@@ -93,6 +106,8 @@ public static void createCoordinatorConfigProperties(int port)
properties.setProperty("http-server.http.port", Integer.toString(port));
properties.setProperty("discovery-server.enabled", "true");
properties.setProperty("discovery.uri", "http://presto-coordinator:" + port);
+ properties.setProperty("list-built-in-functions-only", "false");
+ properties.setProperty("native-execution-enabled", "false");
// Get native worker system properties and add them to the coordinator properties
Map nativeWorkerProperties = NativeQueryRunnerUtils.getNativeWorkerSystemProperties();
@@ -103,6 +118,52 @@ public static void createCoordinatorConfigProperties(int port)
createPropertiesFile("testcontainers/coordinator/etc/config.properties", properties);
}
+ public static void createFunctionNamespaceRemoteProperties()
+ throws IOException
+ {
+ Properties properties = new Properties();
+ properties.setProperty("function-namespace-manager.name", "rest");
+ properties.setProperty("supported-function-languages", "Java");
+ properties.setProperty("function-implementation-type", "REST");
+
+ String directoryPath = "testcontainers/function-namespace";
+ File directory = new File(directoryPath);
+ if (!directory.exists()) {
+ directory.mkdirs();
+ }
+
+ createPropertiesFile("testcontainers/coordinator/etc/function-namespace/remote.properties", properties);
+ }
+
+ public static void createFunctionNamespaceRemotePropertiesWithFunctionServer(int functionServerPort)
+ throws IOException
+ {
+ Properties properties = new Properties();
+ properties.setProperty("function-namespace-manager.name", "rest");
+ properties.setProperty("supported-function-languages", "Java");
+ properties.setProperty("function-implementation-type", "REST");
+ properties.setProperty("rest-based-function-manager.rest.url", "http://localhost:" + functionServerPort);
+
+ String directoryPath = "testcontainers/function-namespace";
+ File directory = new File(directoryPath);
+ if (!directory.exists()) {
+ directory.mkdirs();
+ }
+
+ createPropertiesFile("testcontainers/coordinator/etc/function-namespace/remote.properties", properties);
+ }
+
+ public static void createFunctionServerConfigProperties(int functionServerPort)
+ throws IOException
+ {
+ Properties properties = new Properties();
+ properties.setProperty("http-server.http.port", String.valueOf(functionServerPort));
+ properties.setProperty("regex-library", "RE2J");
+ properties.setProperty("parse-decimal-literals-as-double", "true");
+
+ createPropertiesFile("testcontainers/coordinator/etc/function-server/config.properties", properties);
+ }
+
public static void createCoordinatorJvmConfig()
throws IOException
@@ -161,6 +222,11 @@ public static void createCoordinatorEntryPointScript()
{
String scriptContent = "#!/bin/sh\n" +
"set -e\n" +
+ "java " +
+// "-Dplugin.dir=/opt/presto-remote-server/function-server-plugin " +
+// "-Dconfig=/opt/presto-remote-server/function-server-etc/config.properties " +
+// "-jar /opt/presto-remote-server >> log1.txt 2>&1 & \n" +
+ "-Dconfig=/opt/function-server/etc/config.properties -jar /opt/presto-remote-server >> log1.txt 2>&1 & \n" +
"$PRESTO_HOME/bin/launcher run\n";
createScriptFile("testcontainers/coordinator/entrypoint.sh", scriptContent);
}
@@ -204,10 +270,6 @@ public static void createPropertiesFile(String filePath, Properties properties)
parentDir.mkdirs();
}
- if (file.exists()) {
- throw new IOException("File exists: " + filePath);
- }
-
try (OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8)) {
for (String key : properties.stringPropertyNames()) {
writer.write(key + "=" + properties.getProperty(key) + "\n");
@@ -224,10 +286,6 @@ public static void createScriptFile(String filePath, String scriptContent)
parentDir.mkdirs();
}
- if (file.exists()) {
- throw new IOException("File exists: " + filePath);
- }
-
try (OutputStream output = new FileOutputStream(file);
OutputStreamWriter writer = new OutputStreamWriter(output, StandardCharsets.UTF_8)) {
writer.write(scriptContent);
diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerWithFunctionServer.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerWithFunctionServer.java
new file mode 100644
index 000000000000..5e6c15c15e34
--- /dev/null
+++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerWithFunctionServer.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.nativeworker;
+
+import com.facebook.presto.Session;
+import com.facebook.presto.common.QualifiedObjectName;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.cost.StatsCalculator;
+import com.facebook.presto.metadata.Metadata;
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.eventlistener.EventListener;
+import com.facebook.presto.split.PageSourceManager;
+import com.facebook.presto.split.SplitManager;
+import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager;
+import com.facebook.presto.sql.planner.NodePartitioningManager;
+import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
+import com.facebook.presto.testing.MaterializedResult;
+import com.facebook.presto.testing.QueryRunner;
+import com.facebook.presto.testing.TestingAccessControlManager;
+import com.facebook.presto.transaction.TransactionManager;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.logging.Logger;
+
+import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
+import static java.sql.DriverManager.getConnection;
+
+public class ContainerQueryRunnerWithFunctionServer
+ extends ContainerQueryRunner
+{
+ private static final Network network = Network.newNetwork();
+ private static final String PRESTO_COORDINATOR_IMAGE = System.getProperty("coordinatorImage", "presto-coordinator:latest");
+ private static final String PRESTO_WORKER_IMAGE = System.getProperty("workerImage", "presto-worker:latest");
+ private static final String CONTAINER_TIMEOUT = System.getProperty("containerTimeout", "120");
+ private static final String CLUSTER_SHUTDOWN_TIMEOUT = System.getProperty("clusterShutDownTimeout", "10");
+ private static final String BASE_DIR = System.getProperty("user.dir");
+ private static final int DEFAULT_COORDINATOR_PORT = 8080;
+ private static final int DEFAULT_FUNCTION_SERVER_PORT = 1122;
+ private static final String TPCH_CATALOG = "tpch";
+ private static final String TINY_SCHEMA = "tiny";
+ private static final int DEFAULT_NUMBER_OF_WORKERS = 4;
+ private static final Logger logger = Logger.getLogger(ContainerQueryRunner.class.getName());
+ private final GenericContainer> coordinator;
+ private final List> workers = new ArrayList<>();
+ private final int coordinatorPort;
+ private final int functionServerPort;
+ private final String catalog;
+ private final String schema;
+ private Connection connection;
+
+ public ContainerQueryRunnerWithFunctionServer()
+ throws InterruptedException, IOException
+ {
+ this(DEFAULT_COORDINATOR_PORT, DEFAULT_FUNCTION_SERVER_PORT, TPCH_CATALOG, TINY_SCHEMA, DEFAULT_NUMBER_OF_WORKERS);
+ }
+
+ public ContainerQueryRunnerWithFunctionServer(int coordinatorPort, int functionServerPort, String catalog, String schema, int numberOfWorkers)
+ throws InterruptedException, IOException
+ {
+ this.coordinatorPort = coordinatorPort;
+ this.functionServerPort = functionServerPort;
+ this.catalog = catalog;
+ this.schema = schema;
+
+ // The container details can be added as properties in VM options for testing in IntelliJ.
+ coordinator = createCoordinator();
+ for (int i = 0; i < numberOfWorkers; i++) {
+ workers.add(createNativeWorker(7777 + i, "native-worker-" + i));
+ }
+
+ coordinator.start();
+ workers.forEach(GenericContainer::start);
+
+ logger.info("Presto UI is accessible at http://localhost:" + coordinator.getMappedPort(coordinatorPort));
+
+ TimeUnit.SECONDS.sleep(5);
+
+ String url = String.format("jdbc:presto://localhost:%s/%s/%s?%s",
+ coordinator.getMappedPort(coordinatorPort),
+ catalog,
+ schema,
+ "timeZoneId=UTC");
+
+ try {
+ connection = getConnection(url, "test", null);
+ Statement statement = connection.createStatement();
+ statement.execute("set session remote_functions_enabled=true");
+ }
+ catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Delete the temporary files once the containers are started.
+ ContainerQueryRunnerUtils.deleteDirectory(BASE_DIR + "/testcontainers/coordinator");
+ for (int i = 0; i < numberOfWorkers; i++) {
+ ContainerQueryRunnerUtils.deleteDirectory(BASE_DIR + "/testcontainers/native-worker-" + i);
+ }
+ }
+
+ private GenericContainer> createCoordinator()
+ throws IOException
+ {
+ ContainerQueryRunnerUtils.createCoordinatorTpchProperties();
+ ContainerQueryRunnerUtils.createCoordinatorConfigProperties(coordinatorPort);
+ ContainerQueryRunnerUtils.createCoordinatorJvmConfig();
+ ContainerQueryRunnerUtils.createCoordinatorLogProperties();
+ ContainerQueryRunnerUtils.createCoordinatorNodeProperties();
+ ContainerQueryRunnerUtils.createCoordinatorEntryPointScript();
+ ContainerQueryRunnerUtils.createFunctionNamespaceRemotePropertiesWithFunctionServer(functionServerPort);
+ ContainerQueryRunnerUtils.createFunctionServerConfigProperties(functionServerPort);
+
+ return new GenericContainer<>(PRESTO_COORDINATOR_IMAGE)
+ .withExposedPorts(coordinatorPort)
+ .withNetwork(network).withNetworkAliases("presto-coordinator")
+ .withFileSystemBind(BASE_DIR + "/testcontainers/coordinator/etc", "/opt/presto-server/etc", BindMode.READ_WRITE)
+ .withFileSystemBind(BASE_DIR + "/testcontainers/coordinator/etc/function-server", "/opt/function-server/etc", BindMode.READ_ONLY)
+ .withFileSystemBind(BASE_DIR + "/testcontainers/coordinator/entrypoint.sh", "/opt/entrypoint.sh", BindMode.READ_ONLY)
+ .waitingFor(Wait.forLogMessage(".*======== SERVER STARTED ========.*", 1))
+ .withStartupTimeout(Duration.ofSeconds(Long.parseLong(CONTAINER_TIMEOUT)));
+ }
+
+ private GenericContainer> createNativeWorker(int port, String nodeId)
+ throws IOException
+ {
+ ContainerQueryRunnerUtils.createNativeWorkerConfigPropertiesWithFunctionServer(coordinatorPort, functionServerPort, nodeId);
+ ContainerQueryRunnerUtils.createNativeWorkerTpchProperties(nodeId);
+ ContainerQueryRunnerUtils.createNativeWorkerEntryPointScript(nodeId);
+ ContainerQueryRunnerUtils.createNativeWorkerNodeProperties(nodeId);
+ ContainerQueryRunnerUtils.createNativeWorkerVeloxProperties(nodeId);
+ return new GenericContainer<>(PRESTO_WORKER_IMAGE)
+ .withExposedPorts(port)
+ .withNetwork(network).withNetworkAliases(nodeId)
+ .withFileSystemBind(BASE_DIR + "/testcontainers/" + nodeId + "/etc", "/opt/presto-server/etc", BindMode.READ_ONLY)
+ .withFileSystemBind(BASE_DIR + "/testcontainers/" + nodeId + "/entrypoint.sh", "/opt/entrypoint.sh", BindMode.READ_ONLY)
+ .waitingFor(Wait.forLogMessage(".*Announcement succeeded: HTTP 202.*", 1));
+ }
+
+ @Override
+ public void close()
+ {
+ try {
+ TimeUnit.SECONDS.sleep(Long.parseLong(CLUSTER_SHUTDOWN_TIMEOUT));
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ coordinator.stop();
+ workers.forEach(GenericContainer::stop);
+ }
+
+ @Override
+ public TransactionManager getTransactionManager()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Metadata getMetadata()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SplitManager getSplitManager()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PageSourceManager getPageSourceManager()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public NodePartitioningManager getNodePartitioningManager()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ConnectorPlanOptimizerManager getPlanOptimizerManager()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PlanCheckerProviderManager getPlanCheckerProviderManager()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StatsCalculator getStatsCalculator()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Optional getEventListener()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TestingAccessControlManager getAccessControl()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MaterializedResult execute(String sql)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MaterializedResult execute(Session session, String sql, List extends Type> resultTypes)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List listTables(Session session, String catalog, String schema)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean tableExists(Session session, String table)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void installPlugin(Plugin plugin)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createCatalog(String catalogName, String connectorName, Map properties)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void loadFunctionNamespaceManager(String functionNamespaceManagerName, String catalogName, Map properties)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Lock getExclusiveLock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNodeCount()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Session getDefaultSession()
+ {
+ return testSessionBuilder()
+ .setCatalog(catalog)
+ .setSchema(schema)
+ .build();
+ }
+
+ @Override
+ public MaterializedResult execute(Session session, String sql)
+ {
+ try {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ return ContainerQueryRunnerUtils.toMaterializedResult(resultSet);
+ }
+ catch (SQLException e) {
+ throw new RuntimeException("Error executing query: " + sql, e);
+ }
+ }
+}
diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoContainerRemoteFunction.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoContainerRemoteFunction.java
new file mode 100644
index 000000000000..525860b23371
--- /dev/null
+++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoContainerRemoteFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.nativeworker;
+
+import com.facebook.presto.tests.AbstractTestQueryFramework;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestPrestoContainerRemoteFunction
+ extends AbstractTestQueryFramework
+{
+ @Override
+ protected ContainerQueryRunnerWithFunctionServer createQueryRunner()
+ throws Exception
+ {
+ return new ContainerQueryRunnerWithFunctionServer();
+ }
+
+ @Test
+ public void testRemoteFunctions()
+ {
+ assertEquals(
+ computeActual("select remote.default.abs(-10)")
+ .getMaterializedRows().get(0).getField(0).toString(),
+ "10");
+ assertEquals(
+ computeActual("select remote.default.abs(-1230)")
+ .getMaterializedRows().get(0).getField(0).toString(),
+ "1230");
+ assertEquals(
+ computeActual("select remote.default.second(CAST('2001-01-02 03:04:05' as timestamp))")
+ .getMaterializedRows().get(0).getField(0).toString(),
+ "5");
+ assertEquals(
+ computeActual("select remote.default.length(CAST('AB' AS VARBINARY))")
+ .getMaterializedRows().get(0).getField(0).toString(),
+ "2");
+ assertEquals(
+ computeActual("select remote.default.floor(100000.99)")
+ .getMaterializedRows().get(0).getField(0).toString(),
+ "100000.0");
+ }
+}