From f5fae212519ca2da8f193eb4f6d25de2ba59b4b6 Mon Sep 17 00:00:00 2001 From: thodson Date: Mon, 26 Feb 2024 10:45:58 -0600 Subject: [PATCH 1/2] Add FlinkSessionBakery --- pangeo_forge_runner/bakery/flink.py | 134 ++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/pangeo_forge_runner/bakery/flink.py b/pangeo_forge_runner/bakery/flink.py index 850bfa15..59a71b30 100644 --- a/pangeo_forge_runner/bakery/flink.py +++ b/pangeo_forge_runner/bakery/flink.py @@ -373,3 +373,137 @@ def get_pipeline_options( **extra_options, ) return PipelineOptions(**opts) + + +class FlinkSessionBakery(Bakery): + """ + Bake a Pangeo Forge recipe on a Flink cluster based on the Apache Flink k8s Operator + """ + + parallelism = Integer( + -1, + config=True, + help=""" + The degree of parallelism to be used when distributing operations onto workers. + + Defaults to -1, which uses Flinks' defaults. + """, + ) + + max_parallelism = Integer( + -1, + config=True, + help=""" + The pipeline wide maximum degree of parallelism to be used. + The maximum parallelism specifies the upper limit for dynamic scaling + and the number of key groups used for partitioned state. + + Defaults to -1, which is no limit. + """, + ) + + def make_flink_job(self, job_name: str, deployment_name: str): + """ + Return YAML for a FlinkSessionJob + """ + flink_job = { + "apiVersion": "flink.apache.org/v1beta1", + "kind": "FlinkSessionJob", + "metadata": {"name": job_name}, + "spec": { + "deploymentName": deployment_name, + "job": { + "upgradeMode": "stateless", + "parallelism": 2, # TODO FIXME: This should be set by the user + }, + }, + } + + return flink_job + + def get_pipeline_options( + self, job_name: str, container_image: str, extra_options: dict + ) -> PipelineOptions: + """ + Return PipelineOptions for use with this Bakery + """ + + # We use `kubectl` to talk to kubernetes, so let's make sure it exists + if shutil.which("kubectl") is None: + raise ValueError("kubectl is required for FlinkBakery to work") + + # Flink cluster names have a 45 char limit, and can only contain - special char + # And no uppercase characters are allowed + job_name_slug = generate_hashed_slug( + escapism.escape(job_name, escape_char="-").lower(), 45 + ) + + deployment_name = "pforge-flink-session" + + # Create the temp flink cluster + with tempfile.NamedTemporaryFile(mode="w") as f: + f.write(json.dumps(self.make_flink_job(job_name_slug, deployment_name))) + f.flush() + # FIXME: Add a timeout here? + # --wait is needed even though we have a kubectl wait below. + # Without it, kubectl wait tries to read the CRD before it has *any* status + # fields and fails miserably + cmd = ["kubectl", "apply", "--wait", "-f", f.name] + subprocess.check_call(cmd) + + time.sleep(5) + # Wait for the cluster to be ready + cmd = [ + "kubectl", + "wait", + f"flinkdeployments.flink.apache.org/{deployment_name}", + "--for=jsonpath=.status.jobManagerDeploymentStatus=READY", + # FIXME: Parameterize this wait time + "--timeout=5m", + ] + subprocess.check_call(cmd) + + # port-forward the thing + cmd = [ + "kubectl", + "port-forward", + "--pod-running-timeout=2m0s", + "--address", + "127.0.0.1", # Just ipv4 tx + # f"svc/{cluster_name}-rest", + f"svc/{deployment_name}-rest", + # 0 will allocate a random local port + "0:8081", + ] + # Let's read out the line of output from kubectl so we can figure out what port + # `kubectl` has bound to + p = subprocess.Popen(cmd, stdout=subprocess.PIPE) + line = p.stdout.readline().decode() + # I could use regex here, but then I'll have two problems (at least) + # The line provided by kubectl looks like `Forwarding from 127.0.0.1:59408 -> 8081` + # We just want the randomly generated port there + listen_address = line.split(" ")[2] + # Use rsplit in case we listen on ipv6 stuff in the future + listen_port = listen_address.rsplit(":", 1)[1] + + print(f"You can run '{' '.join(cmd)}' to make the Flink Dashboard available!") + + # Set flags explicitly to empty so Apache Beam doesn't try to parse the commandline + # for pipeline options - we have traitlets doing that for us. + opts = dict( + flags=[], + runner="FlinkRunner", + # FIXME: This should be the deployed flink master URL + flink_master=f"127.0.0.1:{listen_port}", + flink_submit_uber_jar=True, + environment_type="EXTERNAL", + environment_config="0.0.0.0:50000", + # https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors + save_main_session=True, + # this might solve serialization issues; cf. https://beam.apache.org/blog/beam-2.36.0/ + pickle_library="cloudpickle", + parallelism=self.parallelism, + max_parallelism=self.max_parallelism, + **extra_options, + ) + return PipelineOptions(**opts) From 214bad837f7c17bef9def41bbcc565d95dc9caa5 Mon Sep 17 00:00:00 2001 From: thodson Date: Mon, 26 Feb 2024 22:55:00 -0600 Subject: [PATCH 2/2] Add blocking=True --- pangeo_forge_runner/bakery/flink.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pangeo_forge_runner/bakery/flink.py b/pangeo_forge_runner/bakery/flink.py index 59a71b30..b3b8c157 100644 --- a/pangeo_forge_runner/bakery/flink.py +++ b/pangeo_forge_runner/bakery/flink.py @@ -301,6 +301,7 @@ def get_pipeline_options( # Flink cluster names have a 45 char limit, and can only contain - special char # And no uppercase characters are allowed + # XXX cluster_name = generate_hashed_slug( escapism.escape(job_name, escape_char="-").lower(), 45 ) @@ -380,6 +381,10 @@ class FlinkSessionBakery(Bakery): Bake a Pangeo Forge recipe on a Flink cluster based on the Apache Flink k8s Operator """ + # Not actually, but we don't have a job_id to return. + # that looks like just a dataflow concept, we'll have to refactor + blocking = True + parallelism = Integer( -1, config=True,