Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Flink Session Mode #177

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions pangeo_forge_runner/bakery/flink.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def get_pipeline_options(

# Flink cluster names have a 45 char limit, and can only contain - special char
# And no uppercase characters are allowed
# XXX
cluster_name = generate_hashed_slug(
escapism.escape(job_name, escape_char="-").lower(), 45
)
Expand Down Expand Up @@ -373,3 +374,141 @@ def get_pipeline_options(
**extra_options,
)
return PipelineOptions(**opts)


class FlinkSessionBakery(Bakery):
"""
Bake a Pangeo Forge recipe on a Flink cluster based on the Apache Flink k8s Operator
"""

# Not actually, but we don't have a job_id to return.
# that looks like just a dataflow concept, we'll have to refactor
blocking = True

parallelism = Integer(
-1,
config=True,
help="""
The degree of parallelism to be used when distributing operations onto workers.

Defaults to -1, which uses Flinks' defaults.
""",
)

max_parallelism = Integer(
-1,
config=True,
help="""
The pipeline wide maximum degree of parallelism to be used.
The maximum parallelism specifies the upper limit for dynamic scaling
and the number of key groups used for partitioned state.

Defaults to -1, which is no limit.
""",
)

def make_flink_job(self, job_name: str, deployment_name: str):
"""
Return YAML for a FlinkSessionJob
"""
flink_job = {
"apiVersion": "flink.apache.org/v1beta1",
"kind": "FlinkSessionJob",
"metadata": {"name": job_name},
"spec": {
"deploymentName": deployment_name,
"job": {
"upgradeMode": "stateless",
"parallelism": 2, # TODO FIXME: This should be set by the user
},
},
}

return flink_job

def get_pipeline_options(
self, job_name: str, container_image: str, extra_options: dict
) -> PipelineOptions:
"""
Return PipelineOptions for use with this Bakery
"""

# We use `kubectl` to talk to kubernetes, so let's make sure it exists
if shutil.which("kubectl") is None:
raise ValueError("kubectl is required for FlinkBakery to work")

# Flink cluster names have a 45 char limit, and can only contain - special char
# And no uppercase characters are allowed
job_name_slug = generate_hashed_slug(
escapism.escape(job_name, escape_char="-").lower(), 45
)

deployment_name = "pforge-flink-session"

# Create the temp flink cluster
with tempfile.NamedTemporaryFile(mode="w") as f:
f.write(json.dumps(self.make_flink_job(job_name_slug, deployment_name)))
f.flush()
# FIXME: Add a timeout here?
# --wait is needed even though we have a kubectl wait below.
# Without it, kubectl wait tries to read the CRD before it has *any* status
# fields and fails miserably
cmd = ["kubectl", "apply", "--wait", "-f", f.name]
subprocess.check_call(cmd)

time.sleep(5)
# Wait for the cluster to be ready
cmd = [
"kubectl",
"wait",
f"flinkdeployments.flink.apache.org/{deployment_name}",
"--for=jsonpath=.status.jobManagerDeploymentStatus=READY",
# FIXME: Parameterize this wait time
"--timeout=5m",
]
subprocess.check_call(cmd)

# port-forward the thing
cmd = [
"kubectl",
"port-forward",
"--pod-running-timeout=2m0s",
"--address",
"127.0.0.1", # Just ipv4 tx
# f"svc/{cluster_name}-rest",
f"svc/{deployment_name}-rest",
# 0 will allocate a random local port
"0:8081",
]
# Let's read out the line of output from kubectl so we can figure out what port
# `kubectl` has bound to
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
line = p.stdout.readline().decode()
# I could use regex here, but then I'll have two problems (at least)
# The line provided by kubectl looks like `Forwarding from 127.0.0.1:59408 -> 8081`
# We just want the randomly generated port there
listen_address = line.split(" ")[2]
# Use rsplit in case we listen on ipv6 stuff in the future
listen_port = listen_address.rsplit(":", 1)[1]

print(f"You can run '{' '.join(cmd)}' to make the Flink Dashboard available!")

# Set flags explicitly to empty so Apache Beam doesn't try to parse the commandline
# for pipeline options - we have traitlets doing that for us.
opts = dict(
flags=[],
runner="FlinkRunner",
# FIXME: This should be the deployed flink master URL
flink_master=f"127.0.0.1:{listen_port}",
flink_submit_uber_jar=True,
environment_type="EXTERNAL",
environment_config="0.0.0.0:50000",
# https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors
save_main_session=True,
# this might solve serialization issues; cf. https://beam.apache.org/blog/beam-2.36.0/
pickle_library="cloudpickle",
parallelism=self.parallelism,
max_parallelism=self.max_parallelism,
**extra_options,
)
return PipelineOptions(**opts)