From 381a683ebe4742fb26cfd70a7906b125e9a1cffe Mon Sep 17 00:00:00 2001 From: "jian.h" Date: Sat, 19 Oct 2019 22:10:03 -0700 Subject: [PATCH] Support Samza on Kubernetes. Contributed by Weiqing Yang &Jian He --- build.gradle | 21 ++ gradle/dependency-versions.gradle | 1 + gradlew.bat | 84 +++++ .../ContainerProcessManager.java | 32 +- .../samza/coordinator/JobModelManager.scala | 30 +- .../samza/coordinator/server/HttpServer.scala | 10 + .../org/apache/samza/util/HttpUtil.scala | 1 + .../src/docker/dockerfiles/Dockerfile | 34 ++ .../org/apache/samza/config/KubeConfig.java | 57 ++++ .../job/kubernetes/KubeClientFactory.java | 40 +++ .../KubeClusterResourceManager.java | 323 ++++++++++++++++++ .../apache/samza/job/kubernetes/KubeJob.java | 242 +++++++++++++ .../samza/job/kubernetes/KubeJobFactory.java | 32 ++ .../job/kubernetes/KubePodStatusWatcher.java | 172 ++++++++++ .../KubeResourceManagerFactory.java | 40 +++ .../samza/job/kubernetes/KubeUtils.java | 56 +++ settings.gradle | 1 + 17 files changed, 1147 insertions(+), 29 deletions(-) create mode 100644 gradlew.bat create mode 100644 samza-kubernetes/src/docker/dockerfiles/Dockerfile create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java create mode 100644 samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java diff --git a/build.gradle b/build.gradle index 845b3160d0..00b40f4065 100644 --- a/build.gradle +++ b/build.gradle @@ -568,6 +568,27 @@ project(":samza-yarn_$scalaSuffix") { jar.dependsOn("lesscss") } +project(":samza-kubernetes_$scalaSuffix") { + apply plugin: 'java' + + dependencies { + compile project(':samza-api') + compile project(":samza-core_$scalaSuffix") + compile "org.codehaus.jackson:jackson-core-asl:1.9.7" + compile group: 'io.fabric8', name: 'kubernetes-client', version: kubernetesJavaClientVersion + testCompile "junit:junit:$junitVersion" + testCompile "org.mockito:mockito-core:$mockitoVersion" + } + + tasks.create(name: "releaseKubeTarGz", dependsOn: configurations.archives.artifacts, type: Tar) { + into "samza-kubernetes-${version}" + compression = Compression.GZIP + from(configurations.runtime) { into("lib/") } + from(configurations.archives.artifacts.files) { into("lib/") } + duplicatesStrategy 'exclude' + } +} + project(":samza-shell") { apply plugin: 'java' diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index f70e879d37..c3b857d980 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -53,4 +53,5 @@ jnaVersion = "4.5.1" couchbaseClientVersion = "2.7.2" couchbaseMockVersion = "1.5.22" + kubernetesJavaClientVersion = "4.1.3" } diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000000..e95643d6a2 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index cb0e5374cd..edf0f04c20 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -93,7 +93,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback * The Allocator matches requests to resources and executes processes. */ private final ContainerAllocator containerAllocator; - private final Thread allocatorThread; + private Thread allocatorThread = null; // The StandbyContainerManager manages standby-aware allocation and failover of containers private final Optional standbyContainerManager; @@ -166,8 +166,10 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri } this.containerAllocator = new ContainerAllocator(this.clusterResourceManager, config, state, hostAffinityEnabled, this.standbyContainerManager); - this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); - LOG.info("Finished container process manager initialization."); + if (shouldStartAllocateThread()) { + this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + } + LOG.info("finished initialization of samza task manager"); } @VisibleForTesting @@ -184,14 +186,23 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri this.clusterResourceManager = resourceManager; this.standbyContainerManager = Optional.empty(); + this.diagnosticsManager = Option.empty(); this.containerAllocator = allocator.orElseGet( () -> new ContainerAllocator(this.clusterResourceManager, clusterManagerConfig, state, hostAffinityEnabled, this.standbyContainerManager)); - this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + if (shouldStartAllocateThread()) { + this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + } LOG.info("Finished container process manager initialization"); } + // In Kubernetes, the pod will be started by kubelet automatically once it is allocated, it does not need a + // separate thread to keep polling the allocated resources to start the container. + public boolean shouldStartAllocateThread() { + return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager"); + } + public boolean shouldShutdown() { LOG.debug("ContainerProcessManager state: Completed containers: {}, Configured containers: {}, Are there too many failed containers: {}, Is allocator thread alive: {}", state.completedProcessors.get(), state.processorCount, jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no"); @@ -237,7 +248,9 @@ public void start() { // Start container allocator thread LOG.info("Starting the container allocator thread"); - allocatorThread.start(); + if (allocatorThread != null) { + allocatorThread.start(); + } LOG.info("Starting the container process manager"); } @@ -246,12 +259,15 @@ public void stop() { // Shutdown allocator thread containerAllocator.stop(); - try { + if (allocatorThread != null) { + + try { allocatorThread.join(); LOG.info("Stopped container allocator"); } catch (InterruptedException ie) { - LOG.error("Allocator thread join threw an interrupted exception", ie); - Thread.currentThread().interrupt(); + LOG.error("Allocator thread join threw an interrupted exception", ie); + Thread.currentThread().interrupt(); + } } if (diagnosticsManager.isDefined()) { diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 2afc36bd86..2f6da12213 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -23,34 +23,22 @@ import java.util import java.util.concurrent.atomic.AtomicReference import org.apache.samza.{Partition, SamzaException} -import org.apache.samza.config._ -import org.apache.samza.config.Config -import org.apache.samza.container.grouper.stream.SSPGrouperProxy -import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory +import org.apache.samza.config.{Config, _} +import org.apache.samza.container.grouper.stream.{SSPGrouperProxy, SystemStreamPartitionGrouperFactory} import org.apache.samza.container.grouper.task._ -import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore -import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping -import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping -import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping -import org.apache.samza.container.LocalityManager -import org.apache.samza.container.TaskName -import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore -import org.apache.samza.coordinator.server.HttpServer -import org.apache.samza.coordinator.server.JobServlet -import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping -import org.apache.samza.job.model.ContainerModel -import org.apache.samza.job.model.JobModel -import org.apache.samza.job.model.TaskMode -import org.apache.samza.job.model.TaskModel -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.container.{LocalityManager, TaskName} +import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore} +import org.apache.samza.coordinator.server.{HttpServer, JobServlet} +import org.apache.samza.coordinator.stream.messages.{SetContainerHostMapping, SetTaskContainerMapping, SetTaskModeMapping, SetTaskPartitionMapping} +import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, TaskModel} +import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap} import org.apache.samza.runtime.LocationId import org.apache.samza.system._ import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import org.apache.samza.util.{Logging, ReflectionUtil, Util} -import scala.collection.JavaConverters import scala.collection.JavaConversions._ +import scala.collection.JavaConverters import scala.collection.JavaConverters._ /** diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala index 384972262e..055fae0928 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala @@ -136,4 +136,14 @@ class HttpServer( throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.") } } + + def getIpUrl = { + if (running) { + val runningPort = server.getConnectors()(0).asInstanceOf[NetworkConnector].getLocalPort() + + new URL("http://" + Util.getLocalHost.getHostAddress + ":" + runningPort + rootPath) + } else { + throw new SamzaException("HttpServer is not currently running, so URLs are not available for it.") + } + } } diff --git a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala index ea5eb5a8cc..752322f9e6 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala @@ -55,6 +55,7 @@ object HttpUtil { (exception, loop) => { exception match { case ioe: IOException => { + error(ioe) warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass) httpConn = getHttpConnection(url, timeout) } diff --git a/samza-kubernetes/src/docker/dockerfiles/Dockerfile b/samza-kubernetes/src/docker/dockerfiles/Dockerfile new file mode 100644 index 0000000000..98b1229a51 --- /dev/null +++ b/samza-kubernetes/src/docker/dockerfiles/Dockerfile @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# samzaJarsFolder includes all the Samza jars (you needs to make sure all the samza jars are there.) +# You can build Samza image by: +# docker build -t dockerHubAccount/samza:versionNumber . +# Then Samza user can use the Samza image as base image to build their application image. +# + +FROM ubuntu:latest + +RUN apt-get update -y && apt-get upgrade -y && apt-get install -y openjdk-8-jdk + +ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64 +ENV PATH $PATH:$JAVA_HOME/bin + +RUN mkdir -p /opt/samza +WORKDIR /opt/samza/ +COPY samzaJarsFolder/ /opt/samza/ diff --git a/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java b/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java new file mode 100644 index 0000000000..c339577021 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/config/KubeConfig.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +public class KubeConfig { + + // the image name of samza + public static final String APP_IMAGE = "kube.app.image"; + public static final String DEFAULT_IMAGE = "weiqingyang/samza:v0"; + + // The directory path inside which the log will be stored. + public static final String SAMZA_MOUNT_DIR = "kube.app.pod.mnt.path"; + public static final String K8S_API_NAMESPACE = "kube.app.namespace"; + public static final String STREAM_PROCESSOR_CONTAINER_NAME_PREFIX = "sp"; + public static final String JC_CONTAINER_NAME_PREFIX = "jc"; + public static final String POD_RESTART_POLICY = "OnFailure"; + public static final String JC_POD_NAME_FORMAT = "%s-%s-%s"; // jc-appName-appId + public static final String TASK_POD_NAME_FORMAT = "%s-%s-%s-%s"; // sp-appName-appId-containerId + + // Environment variable + public static final String COORDINATOR_POD_NAME = "COORDINATOR_POD_NAME"; + public static final String AZURE_REMOTE_VOLUME_ENABLED = "kube.app.volume.azure.file-share.enabled"; + public static final String AZURE_SECRET = "kube.app.volume.azure-secret"; + public static final String AZURE_FILESHARE = "kube.app.volume.azure.file-share"; + + private Config config; + public KubeConfig(Config config) { + this.config = config; + } + + public static KubeConfig validate(Config config) throws ConfigException { + KubeConfig kc = new KubeConfig(config); + kc.validate(); + return kc; + } + + private void validate() throws ConfigException { + // TODO + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java new file mode 100644 index 0000000000..4405a0969d --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClientFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class KubeClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(KubeClientFactory.class); + + public static KubernetesClient create() { + ConfigBuilder builder = new ConfigBuilder(); + Config config = builder.build(); + KubernetesClient client = new DefaultKubernetesClient(config); + LOG.info("Kubernetes client created. "); + return client; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java new file mode 100644 index 0000000000..d341eceda0 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeClusterResourceManager.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.*; +import org.apache.samza.SamzaException; +import org.apache.samza.clustermanager.*; +import org.apache.samza.config.ClusterManagerConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; +import org.apache.samza.job.CommandBuilder; +import org.apache.samza.job.ShellCommandBuilder; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.samza.config.ApplicationConfig.*; +import static org.apache.samza.config.KubeConfig.*; + +public class KubeClusterResourceManager extends ClusterResourceManager { + private static final Logger LOG = LoggerFactory.getLogger(KubeClusterResourceManager.class); + private final Map podLabels = new HashMap<>(); + private KubernetesClient client; + private String appId; + private String appName; + private String image; + private String namespace; + private OwnerReference ownerReference; + private JobModelManager jobModelManager; + private boolean hostAffinityEnabled; + private Config config; + private String jcPodName; + + KubeClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback) { + super(callback); + this.config = config; + this.client = KubeClientFactory.create(); + this.jobModelManager = jobModelManager; + this.image = config.get(APP_IMAGE, DEFAULT_IMAGE); + this.namespace = config.get(K8S_API_NAMESPACE, "default"); + this.appId = config.get(APP_ID, "001"); + this.appName = config.get(APP_NAME, "samza"); + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled(); + createOwnerReferences(); + } + + @Override + public void start() { + LOG.info("Kubernetes Cluster ResourceManager started, starting watcher"); + startPodWatcher(); + jobModelManager.start(); + } + + // Create the owner reference for the samza-job-coordinator pod + private void createOwnerReferences() { + this.jcPodName = System.getenv(COORDINATOR_POD_NAME); + LOG.info("job coordinator pod name is: {}, namespace is: {}", jcPodName, namespace); + Pod pod = client.pods().inNamespace(namespace).withName(jcPodName).get(); + ownerReference = new OwnerReferenceBuilder() + .withName(pod.getMetadata().getName()) + .withApiVersion(pod.getApiVersion()) + .withUid(pod.getMetadata().getUid()) + .withKind(pod.getKind()) + .withController(true).build(); + podLabels.put("jc-pod-name", jcPodName); + } + + public void startPodWatcher() { + Watcher watcher = new Watcher() { + @Override + public void eventReceived(Action action, Pod pod) { + if (!pod.getMetadata().getLabels().get("jc-pod-name").equals(jcPodName)) { + LOG.warn("This JC pod name is " + jcPodName + ", received pods for a different JC " + + pod.getMetadata().getLabels().get("jc-pod-name")); + return; + } + LOG.info("Pod watcher received action " + action + " for pod " + pod.getMetadata().getName()); + switch (action) { + case ADDED: + LOG.info("Pod " + pod.getMetadata().getName() + " is added."); + break; + case MODIFIED: + LOG.info("Pod " + pod.getMetadata().getName() + " is modified."); + if (isPodFailed(pod)) { + deletePod(pod); + } + break; + case ERROR: + LOG.info("Pod " + pod.getMetadata().getName() + " received error."); + if (isPodFailed(pod)) { + deletePod(pod); + } + break; + case DELETED: + LOG.info("Pod " + pod.getMetadata().getName() + " is deleted."); + createNewStreamProcessor(pod); + break; + } + } + @Override + public void onClose(KubernetesClientException e) { + LOG.error("Pod watcher closed", e); + } + }; + + // TODO: "podLabels" is empty. Need to add lable when creating Pod + client.pods().withLabels(podLabels).watch(watcher); + } + + private boolean isPodFailed(Pod pod) { + return pod.getStatus() != null && pod.getStatus().getPhase().equals("Failed"); + } + + private void deletePod(Pod pod) { + boolean deleted = client.pods().delete(pod); + if (deleted) { + LOG.info("Deleted pod " + pod.getMetadata().getName()); + } else { + LOG.info("Failed to deleted pod " + pod.getMetadata().getName()); + } + } + private void createNewStreamProcessor(Pod pod) { + int memory = Integer.parseInt(pod.getSpec().getContainers().get(0).getResources().getRequests().get("memory").getAmount()); + int cpu = Integer.parseInt(pod.getSpec().getContainers().get(0).getResources().getRequests().get("cpu").getAmount()); + + String containerId = KubeUtils.getSamzaContainerNameFromPodName(pod.getMetadata().getName()); + + // Find out previously running container location + String lastSeenOn = jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY); + if (!hostAffinityEnabled || lastSeenOn == null) { + lastSeenOn = ResourceRequestState.ANY_HOST; + } + SamzaResourceRequest request = new SamzaResourceRequest(cpu, memory, lastSeenOn, containerId); + requestResources(request); + } + + @Override + public void requestResources(SamzaResourceRequest resourceRequest) { + String samzaContainerId = resourceRequest.getContainerID(); + LOG.info("Requesting resources on " + resourceRequest.getPreferredHost() + " for container " + samzaContainerId); + CommandBuilder builder = getCommandBuilder(samzaContainerId); + String command = buildCmd(builder); + LOG.info("Container ID {} using command {}", samzaContainerId, command); + Container container = KubeUtils.createContainer(STREAM_PROCESSOR_CONTAINER_NAME_PREFIX, image, resourceRequest, command); + container.setEnv(getEnvs(builder)); + String podName = String.format(TASK_POD_NAME_FORMAT, STREAM_PROCESSOR_CONTAINER_NAME_PREFIX, appName, appId, samzaContainerId); + + PodBuilder podBuilder; + if (config.getBoolean(AZURE_REMOTE_VOLUME_ENABLED)) { + AzureFileVolumeSource azureFileVolumeSource = + new AzureFileVolumeSource(false, config.get(AZURE_SECRET, "azure-secret"), config.get(AZURE_FILESHARE, "aksshare")); + Volume volume = new Volume(); + volume.setAzureFile(azureFileVolumeSource); + volume.setName("azure"); + VolumeMount volumeMount = new VolumeMount(); + volumeMount.setMountPath(config.get(SAMZA_MOUNT_DIR, "/tmp/mnt")); + volumeMount.setName("azure"); + volumeMount.setSubPath(podName); + LOG.info("Set subpath to " + podName + ", mountpath to " + config.get(SAMZA_MOUNT_DIR, "/tmp/mnt")); + container.setVolumeMounts(Collections.singletonList(volumeMount)); + podBuilder = new PodBuilder().editOrNewMetadata() + .withName(podName) + .withOwnerReferences(ownerReference) + .addToLabels(podLabels).endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .withVolumes(volume).addToContainers(container).endSpec(); + } else { + podBuilder = new PodBuilder().editOrNewMetadata() + .withName(podName) + .withOwnerReferences(ownerReference) + .addToLabels(podLabels).endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .addToContainers(container).endSpec(); + } + + String preferredHost = resourceRequest.getPreferredHost(); + Pod pod; + if (preferredHost.equals("ANY_HOST")) { + // Create a pod with only one container in anywhere + pod = podBuilder.build(); + } else { + LOG.info("Making a preferred host request on " + preferredHost); + pod = podBuilder.editOrNewSpec().editOrNewAffinity().editOrNewNodeAffinity() + .addNewPreferredDuringSchedulingIgnoredDuringExecution().withNewPreference() + .addNewMatchExpression() + .withKey("kubernetes.io/hostname") + .withOperator("Equal") + .withValues(preferredHost).endMatchExpression() + .endPreference().endPreferredDuringSchedulingIgnoredDuringExecution().endNodeAffinity().endAffinity().endSpec().build(); + } + client.pods().inNamespace(namespace).create(pod); + LOG.info("Created a pod " + pod.getMetadata().getName() + " on " + preferredHost); + } + + @Override + public void cancelResourceRequest(SamzaResourceRequest request) { + // no need to implement + } + + @Override + public void releaseResources(SamzaResource resource) { + // no need to implement + } + + @Override + public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) { + // no need to implement + } + + @Override + public void stopStreamProcessor(SamzaResource resource) { + client.pods().withName(resource.getResourceID()).delete(); + } + + @Override + public void stop(SamzaApplicationState.SamzaAppStatus status) { + LOG.info("Kubernetes Cluster ResourceManager stopped"); + jobModelManager.stop(); + // TODO: need to check + } + + private String buildCmd(CommandBuilder cmdBuilder) { + // TODO: check if we have framework path specified. If yes - use it, if not use default /opt/hello-samza/ + String jobLib = ""; // in case of separate framework, this directory will point at the job's libraries + String cmdPath = "/opt/samza/"; // TODO + + String fwkPath = JobConfig.getFwkPath(config); + if(fwkPath != null && (! fwkPath.isEmpty())) { + cmdPath = fwkPath; + jobLib = "export JOB_LIB_DIR=/opt/samza/lib"; + } + LOG.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + cmdPath + ";jobLib=" + jobLib); + + cmdBuilder.setCommandPath(cmdPath); + + return cmdBuilder.buildCommand(); + } + + // TODO: Need to check it again later!! Check AbstractContainerAllocator.getCommandBuilder(samzaContainerId) + private CommandBuilder getCommandBuilder(String containerId) { + TaskConfig taskConfig = new TaskConfig(config); + String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName()); + CommandBuilder cmdBuilder = Util.getObj(cmdBuilderClassName, CommandBuilder.class); + if (jobModelManager.server() == null) { + LOG.error("HttpServer is null"); + } + URL url = jobModelManager.server().getIpUrl(); + LOG.info("HttpServer URL: " + url); + cmdBuilder.setConfig(config).setId(containerId).setUrl(url); + + return cmdBuilder; + } + + // Construct the envs for the task container pod + private List getEnvs(CommandBuilder cmdBuilder) { + // for logging + StringBuilder sb = new StringBuilder(); + + List envList = new ArrayList<>(); + for (Map.Entry entry : cmdBuilder.buildEnvironment().entrySet()) { + envList.add(new EnvVar(entry.getKey(), entry.getValue(), null)); + sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue())); //logging + } + + // TODO: The ID assigned to the container by the execution environment: K8s container Id. ?? Seems there is no id + // envList.add(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), container.getId().toString()); + // sb.append(String.format("\n%s=%s", ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), container.getId().toString())); + + envList.add(new EnvVar("LOGGED_STORE_BASE_DIR", config.get(SAMZA_MOUNT_DIR), null)); + envList.add(new EnvVar("EXECUTION_PLAN_DIR", config.get(SAMZA_MOUNT_DIR), null)); + envList.add(new EnvVar("SAMZA_LOG_DIR", config.get(SAMZA_MOUNT_DIR), null)); + + LOG.info("Using environment variables: {}", cmdBuilder, sb.toString()); + + return envList; + } + + private URL formatUrl(URL url) { + int port = url.getPort(); + String host = url.getHost(); + LOG.info("Original host: {}, port: {}, url: {}", host, port, url); + + String formattedHost = host + "."+ namespace + ".svc.cluster.local"; + LOG.info("Formatted host: {}, port: {}", formattedHost, port); + URL newUrl; + try { + newUrl = new URL("http://" + formattedHost + ":" + url.getPort()); + LOG.info("Formatted URL: {}", newUrl); + } catch (MalformedURLException ex) { + throw new SamzaException(ex); + } + return newUrl; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java new file mode 100644 index 0000000000..9ede574248 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJob.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.samza.SamzaException; +import org.apache.samza.clustermanager.ResourceRequestState; +import org.apache.samza.clustermanager.SamzaResourceRequest; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.ApplicationStatus; +import org.apache.samza.job.StreamJob; +import org.apache.samza.util.CoordinatorStreamUtil; +import org.apache.samza.util.Util; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.mutable.StringBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.samza.config.ApplicationConfig.APP_ID; +import static org.apache.samza.config.ApplicationConfig.APP_NAME; +import static org.apache.samza.config.KubeConfig.*; +import static org.apache.samza.job.ApplicationStatus.*; +import static org.apache.samza.serializers.model.SamzaObjectMapper.getObjectMapper; + +/** + * The client to start a Kubernetes job coordinator + */ +public class KubeJob implements StreamJob { + private static final Logger LOG = LoggerFactory.getLogger(KubeJob.class); + private Config config; + private KubernetesClient kubernetesClient; + private String podName; + private ApplicationStatus currentStatus; + private String nameSpace; + private KubePodStatusWatcher watcher; + private String image; + + public KubeJob(Config config) { + this.kubernetesClient = KubeClientFactory.create(); + this.config = config; + this.podName = String.format(JC_POD_NAME_FORMAT, JC_CONTAINER_NAME_PREFIX, + config.get(APP_NAME, "samza"), config.get(APP_ID, "1")); + this.currentStatus = ApplicationStatus.New; + this.watcher = new KubePodStatusWatcher(podName); + this.nameSpace = config.get(K8S_API_NAMESPACE, "default"); + this.image = config.get(APP_IMAGE, DEFAULT_IMAGE); + } + + /** + * submit the kubernetes job coordinator + */ + public KubeJob submit() { + // create SamzaResourceRequest + int memoryMB = config.getInt("cluster-manager.container.memory.mb", 1024); // TODO + int numCores = config.getInt("cluster-manager.container.cpu.cores", 1); // TODO + String preferredHost = ResourceRequestState.ANY_HOST; + SamzaResourceRequest request = new SamzaResourceRequest(numCores, memoryMB, preferredHost, podName); + + // create Container + String fwkPath = config.get("samza.fwk.path", ""); + String fwkVersion = config.get("samza.fwk.version"); + String cmd = buildJobCoordinatorCmd(fwkPath, fwkVersion); + LOG.info(String.format("samza.fwk.path: %s. samza.fwk.version: %s. Command: %s", fwkPath, fwkVersion, cmd)); + Container container = KubeUtils.createContainer(JC_CONTAINER_NAME_PREFIX, image, request, cmd); + container.setEnv(getEnvs()); + + PodBuilder podBuilder; + if (config.getBoolean(AZURE_REMOTE_VOLUME_ENABLED)) { + AzureFileVolumeSource azureFileVolumeSource = new AzureFileVolumeSource(false, + config.get(AZURE_SECRET, "azure-secret"), config.get(AZURE_FILESHARE, "aksshare")); + Volume volume = new Volume(); + volume.setAzureFile(azureFileVolumeSource); + volume.setName("azure"); + VolumeMount volumeMount = new VolumeMount(); + volumeMount.setMountPath(config.get(SAMZA_MOUNT_DIR, "/tmp/mnt")); + volumeMount.setName("azure"); + volumeMount.setSubPath(podName); + container.setVolumeMounts(Collections.singletonList(volumeMount)); + podBuilder = new PodBuilder() + .editOrNewMetadata() + .withNamespace(nameSpace) + .withName(podName) + .endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .withContainers(container) + .withVolumes(volume) + .endSpec(); + } else { + + // create Pod + podBuilder = new PodBuilder() + .editOrNewMetadata() + .withNamespace(nameSpace) + .withName(podName) + .endMetadata() + .editOrNewSpec() + .withRestartPolicy(POD_RESTART_POLICY) + .withContainers(container) + .endSpec(); + } + + Pod pod = podBuilder.build(); + kubernetesClient.pods().create(pod); + // TODO: adding watcher here makes Client waiting .. Need to fix. + kubernetesClient.pods().withName(podName).watch(watcher); + return this; + } + + /** + * Kill the job coordinator pod + */ + public KubeJob kill() { + LOG.info("Killing application: {}, operator pod: {}, namespace: {}", config.get(APP_NAME), podName, nameSpace); + System.out.println("Killing application: " + config.get(APP_NAME) + + "; Operator pod: " + podName + "; namespace: " + nameSpace); + kubernetesClient.pods().inNamespace(nameSpace).withName(podName).delete(); + return this; + } + + /** + * Wait for finish without timeout + */ + public ApplicationStatus waitForFinish(long timeoutMs) { + watcher.waitForCompleted(timeoutMs, TimeUnit.MILLISECONDS); + return getStatus(); + } + + /** + * Wait for the application to reach a status + */ + public ApplicationStatus waitForStatus(ApplicationStatus status, long timeoutMs) { + switch (status.getStatusCode()) { + case New: + watcher.waitForPending(timeoutMs, TimeUnit.MILLISECONDS); + return New; + case Running: + watcher.waitForRunning(timeoutMs, TimeUnit.MILLISECONDS); + return Running; + case SuccessfulFinish: + watcher.waitForSucceeded(timeoutMs, TimeUnit.MILLISECONDS); + return SuccessfulFinish; + case UnsuccessfulFinish: + watcher.waitForFailed(timeoutMs, TimeUnit.MILLISECONDS); + return UnsuccessfulFinish; + default: + throw new SamzaException("Unsupported application status type: " + status); + } + } + + /** + * Get teh Status of the job coordinator pod + */ + public ApplicationStatus getStatus() { + Pod operatorPod = kubernetesClient.pods().inNamespace(nameSpace).withName(podName).get(); + PodStatus podStatus = operatorPod.getStatus(); + // TODO + switch (podStatus.getPhase()) { + case "Pending": + currentStatus = ApplicationStatus.New; + break; + case "Running": + currentStatus = Running; + break; + case "Completed": + case "Succeeded": + currentStatus = ApplicationStatus.SuccessfulFinish; + break; + case "Failed": + String err = new StringBuilder().append("Reason: ").append(podStatus.getReason()) + .append("Conditions: ").append(podStatus.getConditions().toString()).toString(); + currentStatus = ApplicationStatus.unsuccessfulFinish(new SamzaException(err)); + break; + case "CrashLoopBackOff": + case "Unknown": + default: + currentStatus = ApplicationStatus.New; + } + return currentStatus; + } + + // Build the job coordinator command + private String buildJobCoordinatorCmd(String fwkPath, String fwkVersion) { + // figure out if we have framework is deployed into a separate location + if (fwkVersion == null || fwkVersion.isEmpty()) { + fwkVersion = "STABLE"; + } + LOG.info(String.format("KubeJob: fwk_path is %s, ver is %s use it directly ", fwkPath, fwkVersion)); + + // default location + String cmdExec = "/opt/hello-samza/bin/run-jc.sh"; + if (!fwkPath.isEmpty()) { + // if we have framework installed as a separate package - use it + cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-jc.sh"; + } + return cmdExec; + } + + // Construct the envs for the job coordinator pod + private List getEnvs() { + MapConfig coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config); + List envList = new ArrayList<>(); + ObjectMapper objectMapper = getObjectMapper(); + String coordinatorSysConfig; + try { + coordinatorSysConfig = objectMapper.writeValueAsString(coordinatorSystemConfig); + } catch (IOException ex) { + LOG.warn("No coordinator system configs!", ex); + coordinatorSysConfig = ""; + } + envList.add(new EnvVar("SAMZA_COORDINATOR_SYSTEM_CONFIG", Util.envVarEscape(coordinatorSysConfig), null)); + envList.add(new EnvVar("SAMZA_LOG_DIR", config.get(SAMZA_MOUNT_DIR), null)); + envList.add(new EnvVar(COORDINATOR_POD_NAME, podName, null)); + envList.add(new EnvVar("JAVA_OPTS", "", null)); + return envList; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java new file mode 100644 index 0000000000..4ec39cf887 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeJobFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import org.apache.samza.config.Config; +import org.apache.samza.job.StreamJobFactory; + + +public class KubeJobFactory implements StreamJobFactory { + + @Override + public KubeJob getJob(Config config) { + return new KubeJob(config); + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java new file mode 100644 index 0000000000..0b73664803 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubePodStatusWatcher.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class KubePodStatusWatcher implements Watcher { + private static final Logger LOG = LoggerFactory.getLogger(KubeJob.class); + private Optional pod = Optional.empty(); + private String phase = "unknown"; + private String appId; + private CountDownLatch podRunningLatch = new CountDownLatch(1); + private CountDownLatch podPendingLatch = new CountDownLatch(1); + private CountDownLatch podSucceededLatch = new CountDownLatch(1); + private CountDownLatch podFailedLatch = new CountDownLatch(1); + private CountDownLatch podCompletedLatch = new CountDownLatch(1); + + public KubePodStatusWatcher(String appId) { + this.appId = appId; + } + + @Override + public void eventReceived(Action action, Pod pod) { + this.pod = Optional.of(pod); + switch (action) { + case DELETED: + case ERROR : + closeAllWatch(); + break; + default: + if (isFailed()) { + closeWatchWhenFailed(); + } else if(isSucceeded()) { + closeWatchWhenSucceed(); + } else if (isRunning()) { + closeWatchWhenRunning(); + } else if (isPending()) { + closeWatchWhenPending(); + } + } + } + + @Override + public void onClose(KubernetesClientException e) { + LOG.info("Stopping watching application {} with last-observed phase {}", appId, phase); + closeAllWatch(); + } + + public void waitForCompleted(long timeout, TimeUnit unit) { + try { + podCompletedLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForCompleted() was interrupted by exception: ", e); + } + } + + public void waitForSucceeded(long timeout, TimeUnit unit) { + try { + podSucceededLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForCompleted() was interrupted by exception: ", e); + } + } + + public void waitForFailed(long timeout, TimeUnit unit) { + try { + podFailedLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForCompleted() was interrupted by exception: ", e); + } + } + + public void waitForRunning(long timeout, TimeUnit unit) { + try { + podRunningLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForRunning() was interrupted by exception: ", e); + } + } + + public void waitForPending(long timeout, TimeUnit unit) { + try { + podPendingLatch.await(timeout, unit); + } catch (InterruptedException e) { + LOG.error("waitForPending() was interrupted by exception: ", e); + } + } + + private boolean isSucceeded() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Succeeded"; + } + + private boolean isFailed() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Failed"; + } + + private boolean isRunning() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Running"; + } + + private boolean isPending() { + if (pod.isPresent()) { + phase = pod.get().getStatus().getPhase(); + } + return phase == "Pending"; + } + + private void closeWatchWhenRunning() { + podRunningLatch.countDown(); + // TODO: may add a logging thread + } + + private void closeWatchWhenPending() { + podPendingLatch.countDown(); + // TODO: may add a logging thread + } + + + private void closeWatchWhenFailed() { + podFailedLatch.countDown(); + // TODO: may add a logging thread + } + + private void closeWatchWhenSucceed() { + podSucceededLatch.countDown(); + // TODO: may add a logging thread + } + + private void closeAllWatch() { + closeWatchWhenFailed(); + closeWatchWhenSucceed(); + closeWatchWhenPending(); + closeWatchWhenRunning(); + closeWatchWhenFailed(); + closeWatchWhenSucceed(); + // TODO: may add a logging thread + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java new file mode 100644 index 0000000000..68f64bf807 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeResourceManagerFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.kubernetes; + +import org.apache.samza.clustermanager.ClusterResourceManager; +import org.apache.samza.clustermanager.ResourceManagerFactory; +import org.apache.samza.clustermanager.SamzaApplicationState; +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.JobModelManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubeResourceManagerFactory implements ResourceManagerFactory { + private static Logger log = LoggerFactory.getLogger(KubeResourceManagerFactory.class); + + @Override + public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) { + JobModelManager jobModelManager = state.jobModelManager; + Config config = jobModelManager.jobModel().getConfig(); + KubeClusterResourceManager manager = new KubeClusterResourceManager(config, jobModelManager, callback); + log.info("KubeClusterResourceManager created"); + return manager; + } +} diff --git a/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java new file mode 100644 index 0000000000..5af5258246 --- /dev/null +++ b/samza-kubernetes/src/main/java/org/apache/samza/job/kubernetes/KubeUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.kubernetes; + +import io.fabric8.kubernetes.api.model.*; +import org.apache.samza.clustermanager.SamzaResourceRequest; + +public class KubeUtils { + + public static String getSamzaContainerNameFromPodName(String podName) { + // stream-processor-appName-appId-containerId + String[] splits = podName.split("-"); + return splits[splits.length - 1]; + } + + public static Pod createPod(String name, OwnerReference ownerReference, String restartPolicy, Container container) { + return new PodBuilder().editOrNewMetadata().withName(name).withOwnerReferences(ownerReference).endMetadata() + .editOrNewSpec().withRestartPolicy(restartPolicy).addToContainers(container).endSpec().build(); + } + + public static Pod createPod(String name, String restartPolicy, Container container, String namespace) { + return new PodBuilder().editOrNewMetadata().withNamespace(namespace).withName(name).endMetadata() + .editOrNewSpec().withRestartPolicy(restartPolicy).addToContainers(container).endSpec().build(); + } + + // for Samza operator + public static Container createContainer(String containerId, String image, SamzaResourceRequest resourceRequest, + String cmd) { + Quantity memQuantity = new QuantityBuilder(false) + .withAmount(String.valueOf(resourceRequest.getMemoryMB())).withFormat("Mi").build(); + Quantity cpuQuantity = new QuantityBuilder(false) + .withAmount(String.valueOf(resourceRequest.getNumCores())).build(); + return new ContainerBuilder().withName(containerId).withImage(image).withImagePullPolicy("Always").withCommand(cmd).editOrNewResources() + .addToRequests("memory", memQuantity).addToRequests("cpu", cpuQuantity).endResources().build(); + } + + // TODO: will add util methods describing details about Pod status and container status. Refer to Spark'KubernetesUtils. + // Then we can use them in logs and exception messages. +} diff --git a/settings.gradle b/settings.gradle index c636706d71..eb0f72f07e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -30,6 +30,7 @@ def scalaModules = [ 'samza-elasticsearch', 'samza-hdfs', 'samza-kafka', + 'samza-kubernetes', 'samza-kv', 'samza-kv-inmemory', 'samza-kv-rocksdb',