-
Notifications
You must be signed in to change notification settings - Fork 13
/
nested-pipeline.yaml
177 lines (170 loc) · 6.72 KB
/
nested-pipeline.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# Copyright 2021 The MLX Contributors
#
# SPDX-License-Identifier: Apache-2.0
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: download-and-save-most-frequent
annotations:
tekton.dev/output_artifacts: '{"download-message": [{"key": "artifacts/$PIPELINERUN/download-message/content.tgz",
"name": "download-message-content", "path": "/tmp/outputs/content/data"}], "frequent-word":
[{"key": "artifacts/$PIPELINERUN/frequent-word/Output.tgz", "name": "frequent-word-Output",
"path": "/tmp/outputs/Output/data"}]}'
tekton.dev/input_artifacts: '{"frequent-word": [{"name": "download-message-content",
"parent_task": "download-message"}], "save-message": [{"name": "frequent-word-Output",
"parent_task": "frequent-word"}]}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"download-message": [["content", "$(results.content.path)"]],
"frequent-word": [["Output", "$(results.output.path)"]], "save-message": []}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"description": "Download and Get Most
Frequent Word and Save to GCS", "inputs": [{"default": "gs://ml-pipeline-playground/shakespeare1.txt",
"name": "url", "optional": true, "type": "String"}, {"default": "res.txt", "name":
"outputpath", "optional": true, "type": "String"}], "name": "Nested Pipeline"}'
spec:
params:
- name: outputpath
value: res.txt
- name: url
value: gs://ml-pipeline-playground/shakespeare1.txt
pipelineSpec:
params:
- name: outputpath
default: res.txt
- name: url
default: gs://ml-pipeline-playground/shakespeare1.txt
tasks:
- name: download-message
params:
- name: url
value: $(params.url)
taskSpec:
steps:
- name: main
command:
- sh
- -c
- |
set -e
gsutil cp "$0" "$1"
- $(inputs.params.url)
- $(results.content.path)
image: google/cloud-sdk
params:
- name: url
results:
- name: content
description: /tmp/outputs/content/data
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "Download Message",
"outputs": [{"description": "file content.", "name": "content", "type":
"String"}], "version": "Download Message@sha256=0da65249fbc9c2600dced3912b301411ee099ea62631ba8fa4ebb3753643eb55"}'
tekton.dev/template: ''
timeout: 525600m
- name: frequent-word
params:
- name: download-message-content
value: $(tasks.download-message.results.content)
taskSpec:
steps:
- name: main
args:
- --message
- $(inputs.params.download-message-content)
- '----output-paths'
- $(results.output.path)
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def frequent_word(message):
"""get frequent word."""
from collections import Counter
words = Counter(message.split())
result = max(words, key=words.get)
print(result)
return result
def _serialize_str(str_value: str) -> str:
if not isinstance(str_value, str):
raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value))))
return str_value
import argparse
_parser = argparse.ArgumentParser(prog='Frequent word', description='get frequent word.')
_parser.add_argument("--message", dest="message", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = frequent_word(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_str,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
image: python:3.5-jessie
params:
- name: download-message-content
results:
- name: output
description: /tmp/outputs/Output/data
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "Frequent word",
"outputs": [{"name": "Output", "type": "String"}], "version": "Frequent
word@sha256=e1af016a623c0f31454872f321df7ff90c198bf0986f79a02923eb57ef122ed3"}'
tekton.dev/template: ''
timeout: 525600m
- name: save-message
params:
- name: frequent-word-Output
value: $(tasks.frequent-word.results.output)
- name: outputpath
value: $(params.outputpath)
taskSpec:
steps:
- name: main
command:
- sh
- -c
- |
set -e
echo "$0"| gsutil cp - "$1"
- $(inputs.params.frequent-word-Output)
- $(inputs.params.outputpath)
image: google/cloud-sdk
params:
- name: frequent-word-Output
- name: outputpath
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "Save Message",
"outputs": [], "version": "Save Message@sha256=0c6f77a07b938c7978848376ddd4e7275a483e8e7e1d7587cd1670af9b01b6b7"}'
tekton.dev/template: ''
timeout: 525600m
timeout: 525600m