From 79738ccd240a912e4705dcf59c554086c2a97a6d Mon Sep 17 00:00:00 2001 From: Bharath Kumarasubramanian Date: Wed, 18 May 2022 12:50:23 -0700 Subject: [PATCH] Consolidate SamzaPipelineOptions and SamzaPortablePipelineOptions --- .../beam/runners/samza/SamzaJobInvoker.java | 8 +++-- .../runners/samza/SamzaPipelineOptions.java | 7 ++++ .../samza/SamzaPortablePipelineOptions.java | 32 ------------------- 3 files changed, 12 insertions(+), 35 deletions(-) delete mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java index 2aaeaaac2225..6ef8f9be4179 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java @@ -26,6 +26,7 @@ import org.apache.beam.runners.jobsubmission.JobInvoker; import org.apache.beam.runners.jobsubmission.PortablePipelineJarCreator; import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; +import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService; @@ -59,11 +60,12 @@ protected JobInvocation invokeWithExecutor( @Nullable String retrievalToken, ListeningExecutorService executorService) { LOG.trace("Parsing pipeline options"); - final SamzaPortablePipelineOptions samzaOptions = - PipelineOptionsTranslation.fromProto(options).as(SamzaPortablePipelineOptions.class); + final SamzaPipelineOptions samzaOptions = + PipelineOptionsTranslation.fromProto(options).as(SamzaPipelineOptions.class); + final PortablePipelineOptions portableOptions = samzaOptions.as(PortablePipelineOptions.class); final PortablePipelineRunner pipelineRunner; - if (Strings.isNullOrEmpty(samzaOptions.getOutputExecutablePath())) { + if (Strings.isNullOrEmpty(portableOptions.getOutputExecutablePath())) { pipelineRunner = new SamzaPipelineRunner(samzaOptions); } else { /* diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java index c0af1fab0d90..5fc22a990e7a 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java @@ -56,6 +56,13 @@ public interface SamzaPipelineOptions extends PipelineOptions { void setJobInstance(String instance); + @Description( + "The file path for the local file system token. If not set (by default), then the runner would" + + " not use secure server factory.") + String getFsTokenPath(); + + void setFsTokenPath(String path); + @Description( "Samza application execution environment." + "See {@link org.apache.beam.runners.samza.SamzaExecutionEnvironment} for detailed environment descriptions.") diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java deleted file mode 100644 index aa8e7ceb71d7..000000000000 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.samza; - -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PortablePipelineOptions; - -/** Samza pipeline option that contains portability specific logic. For internal usage only. */ -public interface SamzaPortablePipelineOptions - extends SamzaPipelineOptions, PortablePipelineOptions { - @Description( - "The file path for the local file system token. If not set (by default), then the runner would" - + " not use secure server factory.") - String getFsTokenPath(); - - void setFsTokenPath(String path); -}