Skip to content

Commit

Permalink
Don't use the task type to imply the monitor names
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Nov 8, 2024
1 parent 9327a99 commit 128c955
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,14 @@ def dfp_inference_pipe(builder: mrc.Builder):
ts_column_name = config.get("timestamp_column_name")
monitor_options = config.get("monitor_options", {})

preproc_monitor_options = monitor_options.copy()
if "name_postfix" not in preproc_monitor_options:
preproc_monitor_options["name_postfix"] = "[inference_pipe]"

preproc_options = {
"batching_options": config.get("batching_options", {}),
"cache_dir": cache_dir,
"monitor_options": monitor_options,
"monitor_options": preproc_monitor_options,
"timestamp_column_name": ts_column_name,
"user_splitting_options": config.get("user_splitting_options", {}),
}
Expand Down
7 changes: 3 additions & 4 deletions python/morpheus_dfp/morpheus_dfp/modules/dfp_preproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ def dfp_preproc(builder: mrc.Builder):
ts_column_name = config.get("timestamp_column_name", None)

monitor_options = config.get("monitor_options", {})
pre_filter_options = config.get("pre_filter_options", {})
task_type = pre_filter_options.get("filter_task_type")
monitor_name_postfix = monitor_options.get("name_postfix", "")

batching_opts = config.get("batching_options", {})
batching_opts["cache_dir"] = cache_dir
Expand All @@ -100,7 +99,7 @@ def dfp_preproc(builder: mrc.Builder):

supported_loaders = config.get("supported_loaders", {})

file_to_df_monitor_default = {"description": f"FileToDF [{task_type}_pipe]"}
file_to_df_monitor_default = {"description": f"FileToDF {monitor_name_postfix}"}
file_to_df_monitor_conf = merge_dictionaries(monitor_options, file_to_df_monitor_default)

# Double check on how 'batcher_config' is used in the file_batcher module.
Expand All @@ -123,7 +122,7 @@ def dfp_preproc(builder: mrc.Builder):
dfp_split_users_default = {"fallback_username": config.get("fallback_username", "generic_user")}
dfp_split_users_conf = merge_dictionaries(splitting_opts, dfp_split_users_default)

dfp_split_users_monitor_default = {"description": f"SplitUsers [{task_type}_pipe]"}
dfp_split_users_monitor_default = {"description": f"SplitUsers {monitor_name_postfix}"}
dfp_split_users_monitor_conf = merge_dictionaries(monitor_options, dfp_split_users_monitor_default)

file_batcher_module = builder.load_module(FILE_BATCHER, "morpheus", "file_batcher", file_batcher_conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,14 @@ def dfp_training_pipe(builder: mrc.Builder):
ts_column_name = config.get("timestamp_column_name")
monitor_options = config.get("monitor_options", {})

preproc_monitor_options = monitor_options.copy()
if "name_postfix" not in preproc_monitor_options:
preproc_monitor_options["name_postfix"] = "[training_pipe]"

preproc_options = {
"batching_options": config.get("batching_options", {}),
"cache_dir": cache_dir,
"monitor_options": monitor_options,
"monitor_options": preproc_monitor_options,
"timestamp_column_name": ts_column_name,
"user_splitting_options": config.get("user_splitting_options", {}),
}
Expand Down

0 comments on commit 128c955

Please sign in to comment.