Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to pass code argument as a Pipeline variable in SageMaker Script Processor #4918

Open
ucegbe opened this issue Nov 4, 2024 · 2 comments
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: question

Comments

@ucegbe
Copy link

ucegbe commented Nov 4, 2024

Describe the bug
When creating a processing step in SageMaker Pipelines, When I try to pass the code argument for the Script processor run method and initialize the Pipeline client, I get the following error:
AttributeError: 'ParameterString' object has no attribute 'decode'

To reproduce

#Create a Pipeline Variable 
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)
from sagemaker.workflow.functions import Join
# Infra Parameters
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString( name="ProcessingInstanceType", default_value="ml.m5.xlarge") 
processing_volume_size =  ParameterInteger(name="ProcessingVolumeSize", default_value=30)

# Artifacts location Parameters
model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval")
model_output_bucket = ParameterString( name="ModelOutput", default_value=f"s3://{bucket}/output/model")
train_output_bucket = ParameterString( name="TrainOutput", default_value=f"s3://{bucket}/output/train")
validation_output_bucket = ParameterString( name="ValidationOutput", default_value=f"s3://{bucket}/output/validation")
test_output_bucket = ParameterString( name="TestOutput", default_value=f"s3://{bucket}/output/test")
s3_input_data_location = ParameterString( name="S3InputDataURI", default_value=f"s3://{bucket}/ml_training/churn.csv")
s3_processing_code_location = ParameterString( name="S3ProcessingCodeLocation", default_value="s3://xxx/LightGBM-ML-Pipeline/code/8bfcc7b244a13a0833cb390dc0d312a6/churn_preprocess.py")
data_split_ratio = ParameterString( name="DataSplitRatio", default_value="0.3")


# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    volume_size_in_gb = processing_volume_size,
    base_job_name="sklearn-pre-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=s3_input_data_location, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",\
                         destination = train_output_bucket),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",\
                        destination = validation_output_bucket),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",\
                        destination = test_output_bucket)
    ],
    code=s3_processing_code_location,
    arguments =[
        "--split-ratio",data_split_ratio
    ],
)
step_process = ProcessingStep(name="LightGBMDataPreProcess", step_args=processor_args)


import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        # Other parameters
        s3_processing_code_location,
        processing_volume_size,
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        training_instance_count,
        model_approval_status,
        model_output_bucket,
        train_output_bucket,
        validation_output_bucket,
        test_output_bucket,
        s3_input_data_location,
        data_split_ratio,
    ],
    steps=[step_process], 
)

definition = json.loads(pipeline.definition())
print(definition)

Expected behavior
Ability to pass code location for Script Processors as a Pipeline variable.

Screenshots or logs

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[12], line 56
      2 from sagemaker.workflow.pipeline import Pipeline
      4 pipeline = Pipeline(
      5     name=pipeline_name,
      6     parameters=[
   (...)
     53     steps=[step_process,step_tuning], # we pass only the condition step as we have declared all steps as dependencies to the condition step
     54 )
---> 56 definition = json.loads(pipeline.definition())
     57 print(definition)

File /opt/conda/lib/python3.10/site-packages/sagemaker/workflow/pipeline.py:392, in Pipeline.definition(self)
    385 def definition(self) -> str:
    386     """Converts a request structure to string representation for workflow service calls."""
    387     compiled_steps = StepsCompiler(
    388         pipeline_name=self.name,
    389         sagemaker_session=self.sagemaker_session,
    390         steps=self.steps,
    391         pipeline_definition_config=self.pipeline_definition_config,
--> 392     ).build()
    394     request_dict = {
    395         "Version": self._version,
    396         "Metadata": self._metadata,
   (...)
    401         "Steps": list_to_request(compiled_steps),
    402     }
    404     request_dict["PipelineExperimentConfig"] = interpolate(
    405         request_dict["PipelineExperimentConfig"], {}, {}, pipeline_name=self.name
    406     )

File /opt/conda/lib/python3.10/site-packages/sagemaker/workflow/_steps_compiler.py:406, in StepsCompiler.build(self)
    403 if self._build_count > 1:
    404     raise RuntimeError("Cannot build a pipeline more than once with the same compiler.")
--> 406 return self._initialize_queue_and_build(self._input_steps)

File /opt/conda/lib/python3.10/site-packages/sagemaker/workflow/_steps_compiler.py:390, in StepsCompiler._initialize_queue_and_build(self, steps)
    388         compiled_steps.append(self._build_condition_step(step))
    389     else:
--> 390         compiled_steps.append(self._build_step(step))
    392 self._set_serialize_output_to_json_flag(compiled_steps)
    393 return compiled_steps

File /opt/conda/lib/python3.10/site-packages/sagemaker/workflow/_steps_compiler.py:323, in StepsCompiler._build_step(self, step)
    316 def _build_step(self, step: Step) -> CompiledStep:
    317     """Build a step."""
    319     with step_compilation_context_manager(
    320         pipeline_name=self.pipeline_name,
    321         step_name=step.name,
    322         sagemaker_session=self.sagemaker_session,
--> 323         code_hash=get_code_hash(step),
    324         config_hash=get_config_hash(step),
    325         pipeline_definition_config=self.pipeline_definition_config,
    326         upload_runtime_scripts=self.upload_runtime_scripts,
    327         upload_workspace=self.upload_workspace,
    328         pipeline_build_time=self.pipeline_build_time,
    329         function_step_secret_token=self._function_step_secret_token,
    330     ) as context:
    331         request_dict = step.to_request()
    333         self.upload_runtime_scripts = context.upload_runtime_scripts

File /opt/conda/lib/python3.10/site-packages/sagemaker/workflow/utilities.py:163, in get_code_hash(step)
    152     dependencies = get_processing_dependencies(
    153         [
    154             kwargs.get("dependencies"),
   (...)
    159         ]
    160     )
    161     code = kwargs.get("submit_app") or kwargs.get("code")
--> 163     return get_processing_code_hash(code, source_dir, dependencies)
    165 if isinstance(step, TrainingStep) and step.step_args:
    166     job_obj = step.step_args.func_args[0]

File /opt/conda/lib/python3.10/site-packages/sagemaker/workflow/utilities.py:218, in get_processing_code_hash(code, source_dir, dependencies)
    216 # Other Processors - Spark, Script, Base, etc.
    217 if code:
--> 218     code_url = urlparse(code)
    219     if code_url.scheme == "" or code_url.scheme == "file":
    220         return hash_files_or_dirs([code] + dependencies)

File /opt/conda/lib/python3.10/urllib/parse.py:392, in urlparse(url, scheme, allow_fragments)
    372 def urlparse(url, scheme='', allow_fragments=True):
    373     """Parse a URL into 6 components:
    374     <scheme>://<netloc>/<path>;<params>?<query>#<fragment>
    375 
   (...)
    390     Note that % escapes are not expanded.
    391     """
--> 392     url, scheme, _coerce_result = _coerce_args(url, scheme)
    393     splitresult = urlsplit(url, scheme, allow_fragments)
    394     scheme, netloc, url, query, fragment = splitresult

File /opt/conda/lib/python3.10/urllib/parse.py:128, in _coerce_args(*args)
    126 if str_input:
    127     return args + (_noop,)
--> 128 return _decode_args(args) + (_encode_result,)

File /opt/conda/lib/python3.10/urllib/parse.py:112, in _decode_args(args, encoding, errors)
    110 def _decode_args(args, encoding=_implicit_encoding,
    111                        errors=_implicit_errors):
--> 112     return tuple(x.decode(encoding, errors) if x else '' for x in args)

File /opt/conda/lib/python3.10/urllib/parse.py:112, in <genexpr>(.0)
    110 def _decode_args(args, encoding=_implicit_encoding,
    111                        errors=_implicit_errors):
--> 112     return tuple(x.decode(encoding, errors) if x else '' for x in args)

AttributeError: 'ParameterString' object has no attribute 'decode'

System information
A description of your system. Please provide:

  • SageMaker Python SDK version: 2.215.0
  • Framework name (eg. PyTorch) or algorithm (eg. KMeans): Sklearn Framework processor
  • Framework version: 1.0-1
  • Python version: >3
  • CPU or GPU: CPU
  • Custom Docker image (Y/N): N

Additional context
Add any other context about the problem here.

@ucegbe ucegbe added the bug label Nov 4, 2024
@qidewenwhen qidewenwhen added the component: pipelines Relates to the SageMaker Pipeline Platform label Nov 4, 2024
@qidewenwhen
Copy link
Member

Hi @ucegbe, thanks for reaching out!
As per the signature of the sklearn_processor.run , the code argument does not support PipelineVariable by design.

    @runnable_by_pipeline
    def run(
        self,
        code: str, <<<<<<<<<<<<<<<<
        inputs: Optional[List["ProcessingInput"]] = None,
        outputs: Optional[List["ProcessingOutput"]] = None,
        arguments: Optional[List[Union[str, PipelineVariable]]] = None,
        wait: bool = True,
        logs: bool = True,
        job_name: Optional[str] = None,
        experiment_config: Optional[Dict[str, str]] = None,
        kms_key: Optional[str] = None,
    ):

This is because it's evaluated in SDK compile time. As the log trace indicated, the SDK examines the code file to calculate a hash, which is further used in the cache feature. Thus, the PipelineVariable parsed in runtime does not work for this argument.

@ucegbe
Copy link
Author

ucegbe commented Nov 10, 2024

Thank you @qidewenwhen
Is there plans to make it a pipeline variable as is done for training Jobs? This will make it more versatile IMO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: question
Projects
None yet
Development

No branches or pull requests

2 participants