diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index ba75c3090f..82cb8b43ec 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -20,6 +20,7 @@ on: branches: - 'pull-request/**' - 'branch-*' + - 'main' # This allows a subsequently queued workflow run to interrupt previous runs concurrency: diff --git a/CHANGELOG.md b/CHANGELOG.md index 016ac721ad..4160330f51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,19 @@ See the License for the specific language governing permissions and limitations under the License. --> +# Morpheus 24.10.01 (22 Nov 2024) + +## 🐛 Bug Fixes + +- Pin mlflow version to avoid breaking changes in v2.18 ([#2067](https://github.com/nv-morpheus/Morpheus/pull/2067)) [@dagardner-nv](https://github.com/dagardner-nv) +- Execute CI on the main branch ([#2064](https://github.com/nv-morpheus/Morpheus/pull/2064)) [@dagardner-nv](https://github.com/dagardner-nv) + +## 📖 Documentation + +- Remove references to pipeline-ae in docs ([#2063](https://github.com/nv-morpheus/Morpheus/pull/2063)) [@dagardner-nv](https://github.com/dagardner-nv) +- Document location of third party source repository ([#2059](https://github.com/nv-morpheus/Morpheus/pull/2059)) [@dagardner-nv](https://github.com/dagardner-nv) +- Update DFP class and file paths ([#2052](https://github.com/nv-morpheus/Morpheus/pull/2052)) [@dagardner-nv](https://github.com/dagardner-nv) + # Morpheus 24.10.00 (01 Nov 2024) ## 🚨 Breaking Changes diff --git a/ci/conda/recipes/morpheus-libs/meta.yaml b/ci/conda/recipes/morpheus-libs/meta.yaml index 747f734371..f6d88716bf 100644 --- a/ci/conda/recipes/morpheus-libs/meta.yaml +++ b/ci/conda/recipes/morpheus-libs/meta.yaml @@ -91,7 +91,7 @@ outputs: - feedparser =6.0.* - grpcio =1.62.* - lxml - - mlflow>=2.10.0,<3 + - mlflow>=2.10.0,<2.18 - mrc - networkx=2.8.8 - numpydoc =1.5.* @@ -239,7 +239,7 @@ outputs: - scripts/fetch_data.py - tests/* script: morpheus_llm_test.sh - + about: home: https://github.com/nv-morpheus/Morpheus license: Apache-2.0 diff --git a/ci/conda/recipes/morpheus/meta.yaml b/ci/conda/recipes/morpheus/meta.yaml index 23327bcbc7..fd60e49243 100644 --- a/ci/conda/recipes/morpheus/meta.yaml +++ b/ci/conda/recipes/morpheus/meta.yaml @@ -97,7 +97,7 @@ outputs: - feedparser =6.0.* - grpcio =1.62.* - libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863 - - mlflow>=2.10.0,<3 + - mlflow>=2.10.0,<2.18 - mrc - networkx=2.8.8 - numpydoc =1.5.* diff --git a/cmake/dependencies_core.cmake b/cmake/dependencies_core.cmake index b8c5457a69..ba3eac42bc 100644 --- a/cmake/dependencies_core.cmake +++ b/cmake/dependencies_core.cmake @@ -19,6 +19,10 @@ list(APPEND CMAKE_MESSAGE_CONTEXT "dep_core") # ========= morpheus_utils_configure_cccl() +# indicators +# ========== +morpheus_utils_configure_indicators() + # matx # ==== morpheus_utils_configure_matx() diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml index 32d7f8f505..ebbb3dffe0 100644 --- a/conda/environments/all_cuda-125_arch-x86_64.yaml +++ b/conda/environments/all_cuda-125_arch-x86_64.yaml @@ -53,6 +53,7 @@ dependencies: - gxx=12.1 - huggingface_hub=0.20.2 - include-what-you-use=0.20 +- indicators=2.3 - ipython - isort - jsonpatch>=1.33 diff --git a/conda/environments/dev_cuda-125_arch-x86_64.yaml b/conda/environments/dev_cuda-125_arch-x86_64.yaml index a2eae2afab..15bf6a0c16 100644 --- a/conda/environments/dev_cuda-125_arch-x86_64.yaml +++ b/conda/environments/dev_cuda-125_arch-x86_64.yaml @@ -45,6 +45,7 @@ dependencies: - gtest=1.14 - gxx=12.1 - include-what-you-use=0.20 +- indicators=2.3 - ipython - isort - libcublas-dev diff --git a/dependencies.yaml b/dependencies.yaml index e625b1971f..1f9941ad3b 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -277,6 +277,7 @@ dependencies: - cmake=3.27 - cuda-cudart-dev=12.5 - cuda-version=12.5 + - indicators=2.3 # C++ library for displaying progress bars - libtool # Needed for DOCA build - ninja=1.11 - pkg-config=0.29 # for mrc cmake diff --git a/docs/source/developer_guide/guides/2_real_world_phishing.md b/docs/source/developer_guide/guides/2_real_world_phishing.md index c460af3e02..61ba218fd5 100644 --- a/docs/source/developer_guide/guides/2_real_world_phishing.md +++ b/docs/source/developer_guide/guides/2_real_world_phishing.md @@ -482,7 +482,6 @@ To explicitly set the output format we could specify the `file_type` argument to ```python import logging import os -import tempfile import click @@ -542,7 +541,7 @@ MORPHEUS_ROOT = os.environ['MORPHEUS_ROOT'] @click.option("--server_url", default='localhost:8000', help="Tritonserver url.") @click.option( "--output_file", - default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"), + default=".tmp/output/phishing_detections.jsonlines", help="The path to the file where the inference output will be saved.", ) def run_pipeline(use_stage_function: bool, @@ -633,7 +632,7 @@ morpheus --log_level=debug --plugin examples/developer_guide/2_1_real_world_phis monitor --description="Inference Rate" --smoothing=0.001 --unit=inf \ add-scores --label=is_phishing \ serialize \ - to-file --filename=/tmp/detections.jsonlines --overwrite + to-file --filename=.tmp/output/phishing_detections_cli.jsonlines --overwrite ``` ## Stage Constructors diff --git a/docs/source/getting_started.md b/docs/source/getting_started.md index 7e76b38bdc..dcd1469fb8 100644 --- a/docs/source/getting_started.md +++ b/docs/source/getting_started.md @@ -376,35 +376,6 @@ Commands: validate Validate pipeline output for testing. ``` -And for the AE pipeline: - -``` -$ morpheus run pipeline-ae --help -Usage: morpheus run pipeline-ae [OPTIONS] COMMAND1 [ARGS]... [COMMAND2 [ARGS]...]... - - - -Commands: - add-class Add detected classifications to each message. - add-scores Add probability scores to each message. - buffer (Deprecated) Buffer results. - delay (Deprecated) Delay results for a certain duration. - filter Filter message by a classification threshold. - from-azure Source stage is used to load Azure Active Directory messages. - from-cloudtrail Load messages from a CloudTrail directory. - from-duo Source stage is used to load Duo Authentication messages. - inf-pytorch Perform inference with PyTorch. - inf-triton Perform inference with Triton Inference Server. - monitor Display throughput numbers at a specific point in the pipeline. - preprocess Prepare Autoencoder input DataFrames for inference. - serialize Includes & excludes columns from messages. - timeseries Perform time series anomaly detection and add prediction. - to-file Write all messages to a file. - to-kafka Write all messages to a Kafka cluster. - train-ae Train an Autoencoder model on incoming data. - trigger Buffer data until the previous stage has completed. - validate Validate pipeline output for testing. -``` > **Note**: The available commands for different types of pipelines are not the same. This means that the same stage may have different options when used in different pipelines. Check the CLI help for the most up-to-date information during development. ## Next Steps diff --git a/examples/abp_nvsmi_detection/README.md b/examples/abp_nvsmi_detection/README.md index 244d729420..f7fed3e260 100644 --- a/examples/abp_nvsmi_detection/README.md +++ b/examples/abp_nvsmi_detection/README.md @@ -140,7 +140,7 @@ morpheus --log_level=DEBUG \ `# 7th Stage: Convert from objects back into strings. Ignore verbose input data` \ serialize --include 'mining' \ `# 8th Stage: Write out the JSON lines to the detections.jsonlines file` \ - to-file --filename=detections.jsonlines --overwrite + to-file --filename=.tmp/output/abp_nvsmi_detections.jsonlines --overwrite ``` If successful, the following should be displayed: @@ -217,7 +217,7 @@ Added stage: morpheus.ControlMessage Added stage: └─ morpheus.ControlMessage -> morpheus.MessageMeta -Added stage: +Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta ====Building Pipeline Complete!==== Starting! Time: 1656353254.9919598 @@ -225,7 +225,7 @@ Inference Rate[Complete]: 1242inf [00:00, 1863.04inf/s] ====Pipeline Complete==== ``` -The output file `detections.jsonlines` will contain a single boolean value for each input line. At some point the values will switch from `0` to `1`: +The output file `.tmp/output/detections.jsonlines` will contain a single boolean value for each input line. At some point the values will switch from `0` to `1`: ``` ... diff --git a/examples/abp_pcap_detection/README.md b/examples/abp_pcap_detection/README.md index 6dc63212af..9c04f5e68f 100644 --- a/examples/abp_pcap_detection/README.md +++ b/examples/abp_pcap_detection/README.md @@ -97,7 +97,7 @@ python examples/abp_pcap_detection/run.py ``` Note: Both Morpheus and Triton Inference Server containers must have access to the same GPUs in order for this example to work. -The pipeline will process the input `abp_pcap_dump.jsonlines` sample data and write it to `pcap_out.jsonlines`. +The pipeline will process the input `abp_pcap_dump.jsonlines` sample data and write it to `.tmp/output/pcap_out.jsonlines`. ### CLI Example The above example is illustrative of using the Python API to build a custom Morpheus Pipeline. @@ -118,6 +118,6 @@ morpheus --log_level INFO --plugin "examples/abp_pcap_detection/abp_pcap_preproc monitor --description "Add classification rate" --unit "add-class" \ serialize \ monitor --description "Serialize rate" --unit ser \ - to-file --filename "pcap_out.jsonlines" --overwrite \ + to-file --filename ".tmp/output/pcap_out_cli.jsonlines" --overwrite \ monitor --description "Write to file rate" --unit "to-file" ``` diff --git a/examples/abp_pcap_detection/run.py b/examples/abp_pcap_detection/run.py index b1a654bbd9..24405bad3c 100644 --- a/examples/abp_pcap_detection/run.py +++ b/examples/abp_pcap_detection/run.py @@ -65,7 +65,7 @@ ) @click.option( "--output_file", - default="./pcap_out.jsonlines", + default="./.tmp/output/pcap_out.jsonlines", help="The path to the file where the inference output will be saved.", ) @click.option( diff --git a/examples/cpu_only/README.md b/examples/cpu_only/README.md index feac382a3f..3e8abd3233 100644 --- a/examples/cpu_only/README.md +++ b/examples/cpu_only/README.md @@ -53,7 +53,7 @@ Options: To launch the configured Morpheus pipeline with the sample data that is provided in `examples/data`, run the following: ```bash -python examples/cpu_only/run.py --use_cpu_only --in_file=examples/data/email.jsonlines --out_file=.tmp/out.jsonlines +python examples/cpu_only/run.py --use_cpu_only --in_file=examples/data/email.jsonlines --out_file=.tmp/output/cpu_only_out.jsonlines ``` ### CLI Example @@ -68,5 +68,5 @@ morpheus --log_level INFO \ deserialize \ monitor --description "deserialize" \ serialize \ - to-file --filename=.tmp/out.jsonlines --overwrite + to-file --filename=.tmp/output/cpu_only_cli_out.jsonlines --overwrite ``` diff --git a/examples/cpu_only/run.py b/examples/cpu_only/run.py index f0a50a47e0..7cbc96a440 100644 --- a/examples/cpu_only/run.py +++ b/examples/cpu_only/run.py @@ -61,7 +61,7 @@ "--out_file", help="Output file", type=click.Path(dir_okay=False), - default="output.csv", + default=".tmp/output/cpu_only_out.csv", required=True, ) def run_pipeline(log_level: int, use_cpu_only: bool, in_file: pathlib.Path, out_file: pathlib.Path): diff --git a/examples/developer_guide/2_1_real_world_phishing/run.py b/examples/developer_guide/2_1_real_world_phishing/run.py index 64ae7d77dc..8f83fade5c 100755 --- a/examples/developer_guide/2_1_real_world_phishing/run.py +++ b/examples/developer_guide/2_1_real_world_phishing/run.py @@ -17,7 +17,6 @@ import logging import os -import tempfile import click from recipient_features_stage import RecipientFeaturesStage @@ -77,7 +76,7 @@ @click.option("--server_url", default='localhost:8000', help="Tritonserver url.") @click.option( "--output_file", - default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"), + default=".tmp/output/phishing_detections.jsonlines", help="The path to the file where the inference output will be saved.", ) def run_pipeline(use_stage_function: bool, diff --git a/examples/gnn_fraud_detection_pipeline/README.md b/examples/gnn_fraud_detection_pipeline/README.md index c7206787a6..9110cc27ff 100644 --- a/examples/gnn_fraud_detection_pipeline/README.md +++ b/examples/gnn_fraud_detection_pipeline/README.md @@ -97,7 +97,7 @@ Added stage: morpheus.MessageMeta Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta -Added stage: +Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta ====Building Segment Complete!==== Graph construction rate[Complete]: 265 messages [00:00, 1016.18 messages/s] @@ -128,5 +128,5 @@ morpheus --log_level INFO \ gnn-fraud-classification --model_xgb_file examples/gnn_fraud_detection_pipeline/model/xgb.pt \ monitor --description "Add classification rate" \ serialize \ - to-file --filename "output.csv" --overwrite + to-file --filename "gnn_fraud_detection_cli_output.csv" --overwrite ``` diff --git a/examples/gnn_fraud_detection_pipeline/run.py b/examples/gnn_fraud_detection_pipeline/run.py index a5de019ed7..73c3301d47 100644 --- a/examples/gnn_fraud_detection_pipeline/run.py +++ b/examples/gnn_fraud_detection_pipeline/run.py @@ -84,7 +84,7 @@ @click.option( "--output_file", type=click.Path(dir_okay=False), - default="output.csv", + default=".tmp/output/gnn_fraud_detection_output.csv", help="The path to the file where the inference output will be saved.", ) def run_pipeline(num_threads, diff --git a/examples/log_parsing/README.md b/examples/log_parsing/README.md index 5d2485a3bc..19cda49a66 100644 --- a/examples/log_parsing/README.md +++ b/examples/log_parsing/README.md @@ -119,6 +119,6 @@ morpheus --log_level INFO \ monitor --description "Inference rate" --unit inf \ log-postprocess --vocab_path ./models/training-tuning-scripts/sid-models/resources/bert-base-cased-vocab.txt \ --model_config_path=./models/log-parsing-models/log-parsing-config-20220418.json \ - to-file --filename ./log-parsing-output.jsonlines --overwrite \ + to-file --filename .tmp/output/log-parsing-cli-output.jsonlines --overwrite \ monitor --description "Postprocessing rate" ``` diff --git a/examples/log_parsing/run.py b/examples/log_parsing/run.py index a85379f166..20e836c4c8 100644 --- a/examples/log_parsing/run.py +++ b/examples/log_parsing/run.py @@ -60,7 +60,7 @@ ) @click.option( "--output_file", - default="log-parsing-output.jsonlines", + default=".tmp/output/log-parsing-output.jsonlines", help="The path to the file where the inference output will be saved.", ) @click.option('--model_vocab_hash_file', diff --git a/examples/nlp_si_detection/README.md b/examples/nlp_si_detection/README.md index 37d4abfa1f..2ed3c8dcb0 100644 --- a/examples/nlp_si_detection/README.md +++ b/examples/nlp_si_detection/README.md @@ -131,8 +131,8 @@ morpheus --log_level=DEBUG \ filter --filter_source=TENSOR \ `# 8th Stage: Convert from objects back into strings` \ serialize --exclude '^_ts_' \ - `# 9th Stage: Write out the JSON lines to the detections.jsonlines file` \ - to-file --filename=detections.jsonlines --overwrite + `# 9th Stage: Write out the JSON lines to the nlp_si_detections.jsonlines file` \ + to-file --filename=.tmp/output/nlp_si_detections.jsonlines --overwrite ``` If successful, the following should be displayed: @@ -187,7 +187,7 @@ Added stage: morpheus.ControlMessage Added stage: └─ morpheus.ControlMessage -> morpheus.MessageMeta -Added stage: +Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta ====Building Pipeline Complete!==== Starting! Time: 1656352480.541071 @@ -196,7 +196,7 @@ Inference Rate[Complete]: 93085inf [00:07, 12673.63inf/s] ``` -The output file `detections.jsonlines` will contain the original PCAP messages with the following additional fields added: +The output file `.tmp/output/nlp_si_detections.jsonlines` will contain the original PCAP messages with the following additional fields added: * `address` * `bank_acct` * `credit_card` diff --git a/examples/nlp_si_detection/run.sh b/examples/nlp_si_detection/run.sh index 390418e545..d88f6a8ffb 100755 --- a/examples/nlp_si_detection/run.sh +++ b/examples/nlp_si_detection/run.sh @@ -29,4 +29,4 @@ morpheus --log_level=DEBUG \ add-class \ filter --filter_source=TENSOR \ serialize --exclude '^_ts_' \ - to-file --filename=detections.jsonlines --overwrite + to-file --filename=.tmp/output/nlp_si_detections.jsonlines --overwrite diff --git a/examples/ransomware_detection/README.md b/examples/ransomware_detection/README.md index e1f7197e1e..226dba098d 100644 --- a/examples/ransomware_detection/README.md +++ b/examples/ransomware_detection/README.md @@ -72,7 +72,7 @@ python examples/ransomware_detection/run.py --server_url=localhost:8000 \ --sliding_window=3 \ --model_name=ransomw-model-short-rf \ --input_glob=./examples/data/appshield/*/snapshot-*/*.json \ - --output_file=./ransomware_detection_output.jsonlines + --output_file=.tmp/output/ransomware_detection_output.jsonlines ``` Input features for a short model can be taken from every three snapshots sequence, such as (1, 2, 3), or (2, 3, 4). The sliding window represents the number of subsequent snapshots that need to be taken into consideration when generating the input for a model. Sliding window for the medium model is `5` and for the long model it is `10`. diff --git a/examples/ransomware_detection/run.py b/examples/ransomware_detection/run.py index 4887e7ff1b..7bc8dbf487 100644 --- a/examples/ransomware_detection/run.py +++ b/examples/ransomware_detection/run.py @@ -21,9 +21,12 @@ from stages.create_features import CreateFeaturesRWStage from stages.preprocessing import PreprocessingRWStage +from morpheus.common import TypeId from morpheus.config import Config from morpheus.config import PipelineModes +from morpheus.messages import MessageMeta from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.pipeline.stage_decorator import stage from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage from morpheus.stages.input.appshield_source_stage import AppShieldSourceStage @@ -61,6 +64,12 @@ type=click.IntRange(min=1), help="Max batch size to use for the model.", ) +@click.option( + "--pipeline_batch_size", + default=1024, + type=click.IntRange(min=1), + help=("Internal batch size for the pipeline. Can be much larger than the model batch size."), +) @click.option( "--conf_file", type=click.STRING, @@ -95,21 +104,22 @@ @click.option( "--output_file", type=click.STRING, - default="./ransomware_detection_output.jsonlines", + default=".tmp/output/ransomware_detection_output.jsonlines", help="The path to the file where the inference output will be saved.", ) -def run_pipeline(debug, - num_threads, - n_dask_workers, - threads_per_dask_worker, - model_max_batch_size, - conf_file, - model_name, - server_url, - sliding_window, - input_glob, - watch_directory, - output_file): +def run_pipeline(debug: bool, + num_threads: int, + n_dask_workers: int, + threads_per_dask_worker: int, + model_max_batch_size: int, + pipeline_batch_size: int, + conf_file: str, + model_name: str, + server_url: str, + sliding_window: int, + input_glob: str, + watch_directory: bool, + output_file: str): if debug: configure_logging(log_level=logging.DEBUG) @@ -125,6 +135,7 @@ def run_pipeline(debug, # Below properties are specified by the command line. config.num_threads = num_threads config.model_max_batch_size = model_max_batch_size + config.pipeline_batch_size = pipeline_batch_size config.feature_length = snapshot_fea_length * sliding_window config.class_labels = ["pred", "score"] @@ -222,6 +233,18 @@ def run_pipeline(debug, # This stage logs the metrics (msg/sec) from the above stage. pipeline.add_stage(MonitorStage(config, description="Serialize rate")) + @stage(needed_columns={'timestamp_process': TypeId.STRING}) + def concat_columns(msg: MessageMeta) -> MessageMeta: + """ + This stage concatinates the timestamp and pid_process columns to create a unique field. + """ + with msg.mutable_dataframe() as df: + df['timestamp_process'] = df['timestamp'] + df['pid_process'] + + return msg + + pipeline.add_stage(concat_columns(config)) + # Add a write file stage. # This stage writes all messages to a file. pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) diff --git a/examples/root_cause_analysis/README.md b/examples/root_cause_analysis/README.md index 45d36b8f0f..0a5c178de8 100644 --- a/examples/root_cause_analysis/README.md +++ b/examples/root_cause_analysis/README.md @@ -124,7 +124,7 @@ add-scores --label=is_root_cause \ `# 7th Stage: Convert from objects back into strings` \ serialize --exclude '^ts_' \ `# 8th Stage: Write results out to CSV file` \ -to-file --filename=./root-cause-binary-output.jsonlines --overwrite +to-file --filename=.tmp/output/root-cause-binary-output.jsonlines --overwrite ``` If successful, the following should be displayed: @@ -177,10 +177,10 @@ Added stage: └─ morpheus.ControlMessagee -> morpheus.ControlMessage Added stage: └─ morpheus.ControlMessage -> morpheus.MessageMeta -Added stage: +Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta Inference rate[Complete]: 473 inf [00:01, 340.43 inf/s] ====Pipeline Complete==== ``` -The output file `root-cause-binary-output.jsonlines` will contain the original kernel log messages with an additional field `is_root_cause`. The value of the new field will be the root cause probability. +The output file `.tmp/output/root-cause-binary-output.jsonlines` will contain the original kernel log messages with an additional field `is_root_cause`. The value of the new field will be the root cause probability. diff --git a/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake b/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake index 2aa8a46c68..e5c560fc2e 100644 --- a/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake +++ b/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake @@ -79,6 +79,7 @@ add_library(${PROJECT_NAME}::morpheus ALIAS morpheus) target_link_libraries(morpheus PRIVATE + indicators::indicators matx::matx $<$:ZLIB::ZLIB> PUBLIC diff --git a/python/morpheus/morpheus/_lib/common/__init__.pyi b/python/morpheus/morpheus/_lib/common/__init__.pyi index 8ba9ecf837..d64f663d26 100644 --- a/python/morpheus/morpheus/_lib/common/__init__.pyi +++ b/python/morpheus/morpheus/_lib/common/__init__.pyi @@ -15,6 +15,8 @@ __all__ = [ "FilterSource", "HttpEndpoint", "HttpServer", + "IndicatorsFontStyle", + "IndicatorsTextColor", "Tensor", "TypeId", "determine_file_type", @@ -120,6 +122,107 @@ class HttpServer(): def start(self) -> None: ... def stop(self) -> None: ... pass +class IndicatorsFontStyle(): + """ + Members: + + bold + + dark + + italic + + underline + + blink + + reverse + + concealed + + crossed + """ + def __eq__(self, other: object) -> bool: ... + def __getstate__(self) -> int: ... + def __hash__(self) -> int: ... + def __index__(self) -> int: ... + def __init__(self, value: int) -> None: ... + def __int__(self) -> int: ... + def __ne__(self, other: object) -> bool: ... + def __repr__(self) -> str: ... + def __setstate__(self, state: int) -> None: ... + @property + def name(self) -> str: + """ + :type: str + """ + @property + def value(self) -> int: + """ + :type: int + """ + __members__: dict # value = {'bold': , 'dark': , 'italic': , 'underline': , 'blink': , 'reverse': , 'concealed': , 'crossed': } + blink: morpheus._lib.common.IndicatorsFontStyle # value = + bold: morpheus._lib.common.IndicatorsFontStyle # value = + concealed: morpheus._lib.common.IndicatorsFontStyle # value = + crossed: morpheus._lib.common.IndicatorsFontStyle # value = + dark: morpheus._lib.common.IndicatorsFontStyle # value = + italic: morpheus._lib.common.IndicatorsFontStyle # value = + reverse: morpheus._lib.common.IndicatorsFontStyle # value = + underline: morpheus._lib.common.IndicatorsFontStyle # value = + pass +class IndicatorsTextColor(): + """ + Members: + + grey + + red + + green + + yellow + + blue + + magenta + + cyan + + white + + unspecified + """ + def __eq__(self, other: object) -> bool: ... + def __getstate__(self) -> int: ... + def __hash__(self) -> int: ... + def __index__(self) -> int: ... + def __init__(self, value: int) -> None: ... + def __int__(self) -> int: ... + def __ne__(self, other: object) -> bool: ... + def __repr__(self) -> str: ... + def __setstate__(self, state: int) -> None: ... + @property + def name(self) -> str: + """ + :type: str + """ + @property + def value(self) -> int: + """ + :type: int + """ + __members__: dict # value = {'grey': , 'red': , 'green': , 'yellow': , 'blue': , 'magenta': , 'cyan': , 'white': , 'unspecified': } + blue: morpheus._lib.common.IndicatorsTextColor # value = + cyan: morpheus._lib.common.IndicatorsTextColor # value = + green: morpheus._lib.common.IndicatorsTextColor # value = + grey: morpheus._lib.common.IndicatorsTextColor # value = + magenta: morpheus._lib.common.IndicatorsTextColor # value = + red: morpheus._lib.common.IndicatorsTextColor # value = + unspecified: morpheus._lib.common.IndicatorsTextColor # value = + white: morpheus._lib.common.IndicatorsTextColor # value = + yellow: morpheus._lib.common.IndicatorsTextColor # value = + pass class Tensor(): @staticmethod def from_cupy(arg0: object) -> Tensor: ... diff --git a/python/morpheus/morpheus/_lib/common/module.cpp b/python/morpheus/morpheus/_lib/common/module.cpp index 8692f0a5c4..d12d531205 100644 --- a/python/morpheus/morpheus/_lib/common/module.cpp +++ b/python/morpheus/morpheus/_lib/common/module.cpp @@ -37,6 +37,8 @@ #include #include // for return_value_policy::reference // for pathlib.Path -> std::filesystem::path conversions +#include +#include #include // IWYU pragma: keep #include // IWYU pragma: keep @@ -169,6 +171,27 @@ PYBIND11_MODULE(common, _module) .def("__enter__", &HttpServerInterfaceProxy::enter, py::return_value_policy::reference) .def("__exit__", &HttpServerInterfaceProxy::exit); + py::enum_(_module, "IndicatorsTextColor") + .value("grey", indicators::Color::grey) + .value("red", indicators::Color::red) + .value("green", indicators::Color::green) + .value("yellow", indicators::Color::yellow) + .value("blue", indicators::Color::blue) + .value("magenta", indicators::Color::magenta) + .value("cyan", indicators::Color::cyan) + .value("white", indicators::Color::white) + .value("unspecified", indicators::Color::unspecified); + + py::enum_(_module, "IndicatorsFontStyle") + .value("bold", indicators::FontStyle::bold) + .value("dark", indicators::FontStyle::dark) + .value("italic", indicators::FontStyle::italic) + .value("underline", indicators::FontStyle::underline) + .value("blink", indicators::FontStyle::blink) + .value("reverse", indicators::FontStyle::reverse) + .value("concealed", indicators::FontStyle::concealed) + .value("crossed", indicators::FontStyle::crossed); + _module.attr("__version__") = MRC_CONCAT_STR(morpheus_VERSION_MAJOR << "." << morpheus_VERSION_MINOR << "." << morpheus_VERSION_PATCH); } diff --git a/python/morpheus/morpheus/_lib/include/morpheus/controllers/monitor_controller.hpp b/python/morpheus/morpheus/_lib/include/morpheus/controllers/monitor_controller.hpp new file mode 100644 index 0000000000..321724b1a2 --- /dev/null +++ b/python/morpheus/morpheus/_lib/include/morpheus/controllers/monitor_controller.hpp @@ -0,0 +1,314 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "indicators/setting.hpp" + +#include "morpheus/messages/control.hpp" +#include "morpheus/messages/meta.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include // for Builder +#include // for PythonNode +#include // for trace_activity, decay_t, from + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace morpheus { +/******************* MonitorController**********************/ + +/** + * @addtogroup controllers + * @{ + * @file + */ + +// Workaround to display progress bars and other logs in the same terminal: +// https://github.com/p-ranav/indicators/issues/107 +// Adding "\n" (new line) "\033[A" (move cursor up) and "\033[1L" (insert line) before each log output line. +// This keeps the progress bars always display as the last line of console output +struct LineInsertingFilter : boost::iostreams::line_filter +{ + std::string do_filter(const std::string& line) override + { + return "\n\033[A\033[1L" + line; + } +}; + +// A singleton manager class that manages the lifetime of progress bars related to any MonitorController instances +// Meyer's singleton is guaranteed thread-safe +class ProgressBarContextManager +{ + public: + static ProgressBarContextManager& get_instance() + { + static ProgressBarContextManager instance; + return instance; + } + ProgressBarContextManager(ProgressBarContextManager const&) = delete; + ProgressBarContextManager& operator=(ProgressBarContextManager const&) = delete; + + size_t add_progress_bar(const std::string& description, + indicators::Color text_color = indicators::Color::cyan, + indicators::FontStyle font_style = indicators::FontStyle::bold) + { + std::lock_guard lock(m_mutex); + m_progress_bars.push_back(std::move(initialize_progress_bar(description, text_color, font_style))); + + // DynamicProgress should take ownership over progressbars: https://github.com/p-ranav/indicators/issues/134 + // The fix to this issue is not yet released, so we need to: + // - Maintain the lifetime of the progress bar in m_progress_bars + // - Push the underlying progress bar object to the DynamicProgress container, since it accepts + // Indicator &bar rather than std::unique_ptr bar before the fix + return m_dynamic_progress_bars.push_back(*m_progress_bars.back()); + } + + std::vector>& progress_bars() + { + return m_progress_bars; + } + + void display_all() + { + std::lock_guard lock(m_mutex); + + // A bit of hack here to make the font settings work. Indicators enables the font options only if the bars are + // output to standard streams (see is_colorized() in ), but since we are still using + // the ostream (m_stdout_os) that is connected to the console terminal, the font options should be enabled. + // The internal function here is used to manually enable the font display. + m_stdout_os.iword(termcolor::_internal::colorize_index()) = 1; + + for (auto& pbar : m_progress_bars) + { + pbar->print_progress(true); + m_stdout_os << termcolor::reset; // The font option only works for the current bar + m_stdout_os << "\n"; + } + + // After each round of display, move cursor up ("\033[A") to the beginning of the first bar + m_stdout_os << "\033[" << m_progress_bars.size() << "A"; + } + + private: + ProgressBarContextManager() : m_stdout_streambuf(std::cout.rdbuf()), m_stdout_os(m_stdout_streambuf) + { + init_log_streambuf(); + } + + ~ProgressBarContextManager() + { + // Reset std::cout to use the normal streambuf when exit + std::cout.rdbuf(m_stdout_streambuf); + } + + void init_log_streambuf() + { + // Configure all std::cout output to use LineInsertingFilter + m_log_streambuf.push(LineInsertingFilter()); + m_log_streambuf.push(*m_stdout_streambuf); + std::cout.rdbuf(&m_log_streambuf); + } + + std::unique_ptr initialize_progress_bar(const std::string& description, + indicators::Color text_color, + indicators::FontStyle font_style) + { + auto progress_bar = std::make_unique( + indicators::option::BarWidth{10}, + indicators::option::Start{"["}, + indicators::option::Fill{" "}, + indicators::option::Lead{"#"}, + indicators::option::End("]"), + indicators::option::PrefixText{description}, + indicators::option::ForegroundColor{text_color}, + indicators::option::FontStyles{std::vector{font_style}}, + indicators::option::Stream{m_stdout_os}); + + return std::move(progress_bar); + } + + indicators::DynamicProgress m_dynamic_progress_bars; + std::vector> m_progress_bars; + std::mutex m_mutex; + + // To ensure progress bars are displayed alongside other log outputs, we use two distinct stream buffers: + // - Progress bars are redirected to m_stdout_os, which points to the original standard output stream. + // - All std::cout output is redirected to m_log_streambuf, which incorporates a LineInsertingFilter to continually + // shift the progress bar display downward. + std::streambuf* m_stdout_streambuf; // Stores the original std::cout.rdbuf() + std::ostream m_stdout_os; + boost::iostreams::filtering_ostreambuf m_log_streambuf; +}; + +/** + * @brief A controller class that manages the display of progress bars that used by MonitorStage. + */ +template +class MonitorController +{ + public: + /** + * @brief Construct a new Monitor Controller object + * + * @param description : A text label displayed on the left side of the progress bars + * @param unit : the unit of message count + * @param determine_count_fn : A function that computes the count for each incoming message + */ + MonitorController(const std::string& description, + std::string unit = "messages", + indicators::Color text_color = indicators::Color::cyan, + indicators::FontStyle font_style = indicators::FontStyle::bold, + std::optional> determine_count_fn = std::nullopt); + + auto auto_count_fn() -> std::optional>; + + MessageT progress_sink(MessageT msg); + void sink_on_completed(); + + private: + static std::string format_duration(std::chrono::seconds duration); + static std::string format_throughput(std::chrono::seconds duration, size_t count, const std::string& unit); + + int m_bar_id; + const std::string m_unit; + std::optional> m_determine_count_fn; + size_t m_count{0}; + time_point_t m_start_time; + bool m_is_started{false}; // Set to true after the first call to progress_sink() +}; + +template +MonitorController::MonitorController(const std::string& description, + std::string unit, + indicators::Color text_color, + indicators::FontStyle font_style, + std::optional> determine_count_fn) : + m_unit(std::move(unit)), + m_determine_count_fn(determine_count_fn) +{ + if (!m_determine_count_fn) + { + m_determine_count_fn = auto_count_fn(); + if (!m_determine_count_fn) + { + throw std::runtime_error("No count function provided and no default count function available"); + } + } + + m_bar_id = ProgressBarContextManager::get_instance().add_progress_bar(description, text_color, font_style); +} + +template +MessageT MonitorController::progress_sink(MessageT msg) +{ + if (!m_is_started) + { + m_start_time = std::chrono::system_clock::now(); + m_is_started = true; + } + m_count += (*m_determine_count_fn)(msg); + auto duration = std::chrono::duration_cast(std::chrono::system_clock::now() - m_start_time); + + auto& manager = ProgressBarContextManager::get_instance(); + auto& pbar = manager.progress_bars()[m_bar_id]; + + // Update the progress bar + pbar->set_option(indicators::option::PostfixText{format_throughput(duration, m_count, m_unit)}); + pbar->tick(); + + manager.display_all(); + + return msg; +} + +template +void MonitorController::sink_on_completed() +{ + auto& manager = ProgressBarContextManager::get_instance(); + auto& pbar = manager.progress_bars()[m_bar_id]; + + pbar->mark_as_completed(); +} + +template +std::string MonitorController::format_duration(std::chrono::seconds duration) +{ + auto minutes = std::chrono::duration_cast(duration); + auto seconds = duration - minutes; + + std::ostringstream oss; + oss << std::setw(2) << std::setfill('0') << minutes.count() << "m:" << std::setw(2) << std::setfill('0') + << seconds.count() << "s"; + return oss.str(); +} + +template +std::string MonitorController::format_throughput(std::chrono::seconds duration, + size_t count, + const std::string& unit) +{ + double throughput = static_cast(count) / duration.count(); + std::ostringstream oss; + oss << count << " " << unit << " in " << format_duration(duration) << ", " + << "Throughput: " << std::fixed << std::setprecision(2) << throughput << " " << unit << "/s"; + return oss.str(); +} + +template +auto MonitorController::auto_count_fn() -> std::optional> +{ + if constexpr (std::is_same_v>) + { + return [](std::shared_ptr msg) { + return msg->count(); + }; + } + + if constexpr (std::is_same_v>) + { + return [](std::shared_ptr msg) { + if (!msg->payload()) + { + return 0; + } + return msg->payload()->count(); + }; + } + + return std::nullopt; +} + +/** @} */ // end of group +} // namespace morpheus diff --git a/python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp b/python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp new file mode 100644 index 0000000000..7f2e602a07 --- /dev/null +++ b/python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp @@ -0,0 +1,147 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "morpheus/controllers/monitor_controller.hpp" // for MonitorController +#include "morpheus/export.h" // for MORPHEUS_EXPORT + +#include +#include // for Builder +#include // for PythonNode +#include // for trace_activity, decay_t, from + +#include +#include + +namespace morpheus { +/*************** Component public implementations ***************/ +/******************** MonitorStage ********************/ + +/** + * @addtogroup controllers + * @{ + * @file + */ + +/** + * @brief Displays descriptive progress bars including throughput metrics for the messages passing through the pipeline. + */ +template +class MORPHEUS_EXPORT MonitorStage : public mrc::pymrc::PythonNode, std::shared_ptr> +{ + public: + using base_t = mrc::pymrc::PythonNode, std::shared_ptr>; + using typename base_t::sink_type_t; + using typename base_t::source_type_t; + using typename base_t::subscribe_fn_t; + + /** + * @brief Construct a new Monitor Stage object + * + * @param description : A text label displayed on the left side of the progress bars + * @param unit : the unit of message count + * @param determine_count_fn : A function that computes the count for each incoming message + */ + MonitorStage(const std::string& description, + const std::string& unit = "messages", + indicators::Color = indicators::Color::cyan, + indicators::FontStyle font_style = indicators::FontStyle::bold, + std::optional> determine_count_fn = std::nullopt); + + private: + subscribe_fn_t build_operator(); + + MonitorController m_monitor_controller; +}; + +template +MonitorStage::MonitorStage(const std::string& description, + const std::string& unit, + indicators::Color text_color, + indicators::FontStyle font_style, + std::optional> determine_count_fn) : + base_t(base_t::op_factory_from_sub_fn(build_operator())), + m_monitor_controller(MonitorController(description, unit, text_color, font_style, determine_count_fn)) +{} + +template +MonitorStage::subscribe_fn_t MonitorStage::build_operator() +{ + return [this](rxcpp::observable input, rxcpp::subscriber output) { + return input.subscribe(rxcpp::make_observer( + [this, &output](sink_type_t msg) { + m_monitor_controller.progress_sink(msg); + output.on_next(std::move(msg)); + }, + [&](std::exception_ptr error_ptr) { + output.on_error(error_ptr); + }, + [&]() { + m_monitor_controller.sink_on_completed(); + output.on_completed(); + })); + }; +} + +/****** MonitorStageInterfaceProxy******************/ +/** + * @brief Interface proxy, used to insulate python bindings. + */ +template +struct MORPHEUS_EXPORT MonitorStageInterfaceProxy +{ + /** + * @brief Create and initialize a MonitorStage, and return the result + * + * @param builder : Pipeline context object reference + * @param name : Name of a stage reference + * @param description : A text label displayed on the left side of the progress bars + * @param unit : the unit of message count + * @param text_color : the text color of progress bars + * @param font_style : the font style of progress bars + * @param determine_count_fn : A function that computes the count for each incoming message + * @return std::shared_ptr>> + */ + static std::shared_ptr>> init( + mrc::segment::Builder& builder, + const std::string& name, + const std::string& description, + const std::string& unit, + indicators::Color color = indicators::Color::cyan, + indicators::FontStyle font_style = indicators::FontStyle::bold, + std::optional::sink_type_t)>> determine_count_fn = + std::nullopt); +}; + +template +std::shared_ptr>> MonitorStageInterfaceProxy::init( + mrc::segment::Builder& builder, + const std::string& name, + const std::string& description, + const std::string& unit, + indicators::Color text_color, + indicators::FontStyle font_style, + std::optional::sink_type_t)>> determine_count_fn) +{ + auto stage = builder.construct_object>( + name, description, unit, text_color, font_style, determine_count_fn); + + return stage; +} +/** @} */ // end of group +} // namespace morpheus diff --git a/python/morpheus/morpheus/_lib/stages/__init__.pyi b/python/morpheus/morpheus/_lib/stages/__init__.pyi index 8d2fe7f911..d99f70983c 100644 --- a/python/morpheus/morpheus/_lib/stages/__init__.pyi +++ b/python/morpheus/morpheus/_lib/stages/__init__.pyi @@ -10,6 +10,7 @@ import morpheus._lib.stages import typing from morpheus._lib.common import FilterSource import morpheus._lib.common +import morpheus._lib.messages import mrc.core.coro import mrc.core.segment import os @@ -25,6 +26,8 @@ __all__ = [ "HttpServerMessageMetaSourceStage", "InferenceClientStage", "KafkaSourceStage", + "MonitorControlMessageStage", + "MonitorMessageMetaStage", "PreallocateControlMessageStage", "PreallocateMessageMetaStage", "PreprocessFILStage", @@ -67,6 +70,12 @@ class KafkaSourceStage(mrc.core.segment.SegmentObject): @typing.overload def __init__(self, builder: mrc.core.segment.Builder, name: str, max_batch_size: int, topics: typing.List[str], batch_timeout_ms: int, config: typing.Dict[str, str], disable_commits: bool = False, disable_pre_filtering: bool = False, stop_after: int = 0, async_commits: bool = True, oauth_callback: typing.Optional[function] = None) -> None: ... pass +class MonitorControlMessageStage(mrc.core.segment.SegmentObject): + def __init__(self, builder: mrc.core.segment.Builder, name: str, description: str, unit: str = 'messages', text_color: morpheus._lib.common.IndicatorsTextColor = IndicatorsTextColor.cyan, font_style: morpheus._lib.common.IndicatorsFontStyle = IndicatorsFontStyle.bold, determine_count_fn: typing.Optional[typing.Callable[[morpheus._lib.messages.ControlMessage], int]] = None) -> None: ... + pass +class MonitorMessageMetaStage(mrc.core.segment.SegmentObject): + def __init__(self, builder: mrc.core.segment.Builder, name: str, description: str, unit: str = 'messages', text_color: morpheus._lib.common.IndicatorsTextColor = IndicatorsTextColor.cyan, font_style: morpheus._lib.common.IndicatorsFontStyle = IndicatorsFontStyle.bold, determine_count_fn: typing.Optional[typing.Callable[[morpheus._lib.messages.MessageMeta], int]] = None) -> None: ... + pass class PreallocateControlMessageStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: ... pass diff --git a/python/morpheus/morpheus/_lib/stages/module.cpp b/python/morpheus/morpheus/_lib/stages/module.cpp index 6fb855e0c5..fdda9d34bc 100644 --- a/python/morpheus/morpheus/_lib/stages/module.cpp +++ b/python/morpheus/morpheus/_lib/stages/module.cpp @@ -26,6 +26,7 @@ #include "morpheus/stages/http_server_source_stage.hpp" // for HttpServerSourceStage, HttpServerSourceStageInterfac... #include "morpheus/stages/inference_client_stage.hpp" // for InferenceClientStage, InferenceClientStageInterfaceP... #include "morpheus/stages/kafka_source.hpp" // for KafkaSourceStage, KafkaSourceStageInterfaceProxy +#include "morpheus/stages/monitor.hpp" // for MonitorStage, MonitorStageInterfaceProxy #include "morpheus/stages/preallocate.hpp" // for PreallocateStage, PreallocateStageInterfaceProxy #include "morpheus/stages/preprocess_fil.hpp" // for PreprocessFILStage, PreprocessFILStageInterfaceProxy #include "morpheus/stages/preprocess_nlp.hpp" // for PreprocessNLPStage, PreprocessNLPStageInterfaceProxy @@ -34,6 +35,8 @@ #include "morpheus/utilities/http_server.hpp" // for DefaultMaxPayloadSize #include "morpheus/version.hpp" // for morpheus_VERSION_MAJOR, morpheus_VERSION_MINOR, morp... +#include // for Color +#include // for FontStyle #include // for Builder #include // for Object, ObjectProperties #include // for MRC_CONCAT_STR @@ -43,7 +46,7 @@ #include // for none, dict, str_attr #include // IWYU pragma: keep #include // IWYU pragma: keep -#include // for from_import, import +#include // for import, from_import #include // for trace_activity, decay_t #include // for path @@ -51,7 +54,6 @@ #include // for operator<<, basic_ostringstream #include // for string #include // for vector - namespace morpheus { namespace py = pybind11; @@ -190,6 +192,32 @@ PYBIND11_MODULE(stages, _module) py::arg("async_commits") = true, py::arg("oauth_callback") = py::none()); + py::class_>, + mrc::segment::ObjectProperties, + std::shared_ptr>>>( + _module, "MonitorMessageMetaStage", py::multiple_inheritance()) + .def(py::init<>(&MonitorStageInterfaceProxy::init), + py::arg("builder"), + py::arg("name"), + py::arg("description"), + py::arg("unit") = "messages", + py::arg("text_color") = indicators::Color::cyan, + py::arg("font_style") = indicators::FontStyle::bold, + py::arg("determine_count_fn") = py::none()); + + py::class_>, + mrc::segment::ObjectProperties, + std::shared_ptr>>>( + _module, "MonitorControlMessageStage", py::multiple_inheritance()) + .def(py::init<>(&MonitorStageInterfaceProxy::init), + py::arg("builder"), + py::arg("name"), + py::arg("description"), + py::arg("unit") = "messages", + py::arg("text_color") = indicators::Color::cyan, + py::arg("font_style") = indicators::FontStyle::bold, + py::arg("determine_count_fn") = py::none()); + py::class_>, mrc::segment::ObjectProperties, std::shared_ptr>>>( diff --git a/python/morpheus/morpheus/_lib/tests/CMakeLists.txt b/python/morpheus/morpheus/_lib/tests/CMakeLists.txt index 215b595576..3c0d2a9861 100644 --- a/python/morpheus/morpheus/_lib/tests/CMakeLists.txt +++ b/python/morpheus/morpheus/_lib/tests/CMakeLists.txt @@ -151,6 +151,12 @@ add_morpheus_test( test_multi_slices.cpp ) +add_morpheus_test( + NAME controllers + FILES + controllers/test_monitor_controller.cpp +) + add_morpheus_test( NAME stages FILES diff --git a/python/morpheus/morpheus/_lib/tests/controllers/test_monitor_controller.cpp b/python/morpheus/morpheus/_lib/tests/controllers/test_monitor_controller.cpp new file mode 100644 index 0000000000..70fad9bc99 --- /dev/null +++ b/python/morpheus/morpheus/_lib/tests/controllers/test_monitor_controller.cpp @@ -0,0 +1,88 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../test_utils/common.hpp" // for TEST_CLASS_WITH_PYTHON, morpheus + +#include "morpheus/controllers/monitor_controller.hpp" // for MonitorController +#include "morpheus/messages/control.hpp" // for ControlMessage +#include "morpheus/messages/meta.hpp" // for MessageMeta + +#include // for cudaMemcpy, cudaMemcpyKind +#include // for column +#include // for make_numeric_column +#include // for mutable_column_view +#include // for column_name_info, table_with_metadata, table_metadata +#include // for table +#include // for type_id, data_type +#include // for Message, TestPartResult, EXPECT_EQ, TestInfo, EXPECT_... + +#include // for int32_t +#include // for function +#include // for unique_ptr, shared_ptr, allocator, make_shared, make_... +#include // for iota +#include // for optional +#include // for runtime_error +#include // for unordered_map +#include // for move +#include // for vector + +using namespace morpheus; + +TEST_CLASS_WITH_PYTHON(MonitorController); + +cudf::io::table_with_metadata create_cudf_table_with_metadata(int rows, int cols) +{ + std::vector> columns; + + for (int i = 0; i < cols; ++i) + { + auto col = cudf::make_numeric_column(cudf::data_type{cudf::type_id::INT32}, rows); + auto col_view = col->mutable_view(); + + std::vector data(rows); + std::iota(data.begin(), data.end(), 0); + cudaMemcpy(col_view.data(), data.data(), data.size() * sizeof(int32_t), cudaMemcpyHostToDevice); + + columns.push_back(std::move(col)); + } + + auto table = std::make_unique(std::move(columns)); + + auto index_info = cudf::io::column_name_info{""}; + auto column_names = std::vector(cols, index_info); + auto metadata = cudf::io::table_metadata{std::move(column_names), {}, {}}; + + return cudf::io::table_with_metadata{std::move(table), metadata}; +} + +TEST_F(TestMonitorController, TestAutoCountFn) +{ + auto message_meta_mc = MonitorController>("test_message_meta"); + auto message_meta_auto_count_fn = message_meta_mc.auto_count_fn(); + auto meta = MessageMeta::create_from_cpp(std::move(create_cudf_table_with_metadata(10, 2))); + EXPECT_EQ((*message_meta_auto_count_fn)(meta), 10); + + auto control_message_mc = MonitorController>("test_control_message"); + auto control_message_auto_count_fn = control_message_mc.auto_count_fn(); + auto control_message = std::make_shared(); + auto cm_meta = MessageMeta::create_from_cpp(std::move(create_cudf_table_with_metadata(20, 3))); + control_message->payload(cm_meta); + EXPECT_EQ((*control_message_auto_count_fn)(control_message), 20); + + // Test invalid message type + EXPECT_THROW(MonitorController("invalid message type"), std::runtime_error); +} diff --git a/python/morpheus/morpheus/common/__init__.py b/python/morpheus/morpheus/common/__init__.py index 5935348143..8c1b279ae3 100644 --- a/python/morpheus/morpheus/common/__init__.py +++ b/python/morpheus/morpheus/common/__init__.py @@ -20,6 +20,8 @@ from morpheus._lib.common import FilterSource from morpheus._lib.common import HttpEndpoint from morpheus._lib.common import HttpServer +from morpheus._lib.common import IndicatorsFontStyle +from morpheus._lib.common import IndicatorsTextColor from morpheus._lib.common import Tensor from morpheus._lib.common import TypeId from morpheus._lib.common import determine_file_type @@ -35,6 +37,8 @@ "FilterSource", "HttpEndpoint", "HttpServer", + "IndicatorsFontStyle", + "IndicatorsTextColor", "read_file_to_df", "Tensor", "typeid_is_fully_supported", diff --git a/python/morpheus/morpheus/config.py b/python/morpheus/morpheus/config.py index 2bc589a186..16ba6ed86d 100644 --- a/python/morpheus/morpheus/config.py +++ b/python/morpheus/morpheus/config.py @@ -235,6 +235,7 @@ def freeze(self): """ self._check_cpp_mode(fix_mis_match=not self.frozen) if not self.frozen: + self._validate_config() self.frozen = True def _check_cpp_mode(self, fix_mis_match: bool = False): @@ -267,7 +268,6 @@ def pipeline_batch_size(self): @pipeline_batch_size.setter def pipeline_batch_size(self, value: int): self._pipeline_batch_size = value - self._validate_config() @property def model_max_batch_size(self): @@ -276,7 +276,6 @@ def model_max_batch_size(self): @model_max_batch_size.setter def model_max_batch_size(self, value: int): self._model_max_batch_size = value - self._validate_config() def _validate_config(self): if self._pipeline_batch_size < self._model_max_batch_size: diff --git a/python/morpheus/morpheus/controllers/monitor_controller.py b/python/morpheus/morpheus/controllers/monitor_controller.py index 940d079097..51c0ee8a72 100644 --- a/python/morpheus/morpheus/controllers/monitor_controller.py +++ b/python/morpheus/morpheus/controllers/monitor_controller.py @@ -19,6 +19,8 @@ import fsspec from tqdm import tqdm +from morpheus.common import IndicatorsFontStyle +from morpheus.common import IndicatorsTextColor from morpheus.messages import ControlMessage from morpheus.messages import MessageMeta from morpheus.utils.logger import LogLevels @@ -60,15 +62,19 @@ class MonitorController: SupportedTypes = typing.Union[DataFrameType, MessageMeta, ControlMessage, list] controller_count: int = 0 - def __init__(self, - position: int, - description: str, - smoothing: float, - unit: str, - delayed_start: bool, - determine_count_fn: typing.Callable[[typing.Any], int], - log_level: LogLevels, - tqdm_class: tqdm = None): + def __init__( + self, + position: int, + description: str, + smoothing: float, + unit: str, + delayed_start: bool, + determine_count_fn: typing.Callable[[typing.Any], int], + log_level: LogLevels, + tqdm_class: tqdm = None, + text_color: IndicatorsTextColor = IndicatorsTextColor.cyan, + font_style: IndicatorsFontStyle = IndicatorsFontStyle.bold, + ): self._progress: tqdm = None self._position = position @@ -77,6 +83,8 @@ def __init__(self, self._unit = unit self._delayed_start = delayed_start self._determine_count_fn = determine_count_fn + self._text_color = text_color + self._font_style = font_style self._tqdm_class = tqdm_class if tqdm_class else MorpheusTqdm if isinstance(log_level, LogLevels): # pylint: disable=isinstance-second-argument-not-valid-type diff --git a/python/morpheus/morpheus/controllers/write_to_file_controller.py b/python/morpheus/morpheus/controllers/write_to_file_controller.py index 2e2109e96e..d7cd1d8ef6 100644 --- a/python/morpheus/morpheus/controllers/write_to_file_controller.py +++ b/python/morpheus/morpheus/controllers/write_to_file_controller.py @@ -119,18 +119,32 @@ def _convert_to_strings(self, df: DataFrameType): def node_fn(self, obs: mrc.Observable, sub: mrc.Subscriber): - # Open up the file handle - with open(self._output_file, "a", encoding='UTF-8') as out_file: + # When writing to a parquet file, we need to open the file in binary mode + if self._file_type == FileTypes.PARQUET: + with open(self._output_file, "wb") as out_file: - def write_to_file(x: MessageMeta): + def write_to_file(x: MessageMeta): - lines = self._convert_to_strings(x.df) + x.df.to_parquet(out_file) - out_file.writelines(lines) + if self._flush: + out_file.flush() - if self._flush: - out_file.flush() - - return x + return x obs.pipe(ops.map(write_to_file)).subscribe(sub) + + else: + with open(self._output_file, "a", encoding='UTF-8') as out_file: + + def write_to_file(x: MessageMeta): + + lines = self._convert_to_strings(x.df) + out_file.writelines(lines) + + if self._flush: + out_file.flush() + + return x + + obs.pipe(ops.map(write_to_file)).subscribe(sub) diff --git a/python/morpheus/morpheus/stages/general/monitor_stage.py b/python/morpheus/morpheus/stages/general/monitor_stage.py index 821fe729bd..0be06a34d1 100644 --- a/python/morpheus/morpheus/stages/general/monitor_stage.py +++ b/python/morpheus/morpheus/stages/general/monitor_stage.py @@ -19,9 +19,13 @@ from mrc.core import operators as ops from tqdm import tqdm +import morpheus._lib.stages as _stages from morpheus.cli.register_stage import register_stage +from morpheus.common import IndicatorsFontStyle +from morpheus.common import IndicatorsTextColor from morpheus.config import Config from morpheus.controllers.monitor_controller import MonitorController +from morpheus.messages import ControlMessage from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage @@ -66,8 +70,10 @@ def __init__(self, description: str = "Progress", smoothing: float = 0.05, unit: str = "messages", - delayed_start: bool = False, + delayed_start: bool = True, determine_count_fn: typing.Callable[[typing.Any], int] = None, + text_color: IndicatorsTextColor = IndicatorsTextColor.cyan, + font_style: IndicatorsFontStyle = IndicatorsFontStyle.bold, log_level: LogLevels = LogLevels.INFO): super().__init__(c) @@ -78,6 +84,8 @@ def __init__(self, unit=unit, delayed_start=delayed_start, determine_count_fn=determine_count_fn, + text_color=text_color, + font_style=font_style, log_level=log_level) MonitorController.controller_count += 1 @@ -98,7 +106,7 @@ def accepted_types(self) -> typing.Tuple: return (typing.Any, ) def supports_cpp_node(self): - return False + return True async def start_async(self): """ @@ -120,14 +128,35 @@ async def join(self): self._mc.progress.close() def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: - if not self._mc.is_enabled(): - return input_node - - # Use a component so we track progress using the upstream progress engine. This will provide more accurate - # results - node = builder.make_node_component(self.unique_name, - ops.map(self._mc.progress_sink), - ops.on_completed(self._mc.sink_on_completed)) + if self._build_cpp_node(): + if self._schema.input_type == ControlMessage: + node = _stages.MonitorControlMessageStage(builder, + self.unique_name, + self._mc._description, + self._mc._unit, + self._mc._text_color, + self._mc._font_style, + self._mc._determine_count_fn) + node.launch_options.pe_count = self._config.num_threads + else: + node = _stages.MonitorMessageMetaStage(builder, + self.unique_name, + self._mc._description, + self._mc._unit, + self._mc._text_color, + self._mc._font_style, + self._mc._determine_count_fn) + node.launch_options.pe_count = self._config.num_threads + + else: + if not self._mc.is_enabled(): + return input_node + + # Use a component so we track progress using the upstream progress engine. This will provide more accurate + # results + node = builder.make_node_component(self.unique_name, + ops.map(self._mc.progress_sink), + ops.on_completed(self._mc.sink_on_completed)) builder.make_edge(input_node, node) diff --git a/tests/morpheus/pipeline/test_error_pipe.py b/tests/morpheus/pipeline/test_error_pipe.py index cb264f2231..2fd050245f 100755 --- a/tests/morpheus/pipeline/test_error_pipe.py +++ b/tests/morpheus/pipeline/test_error_pipe.py @@ -14,15 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - import pytest from _utils.stages.error_raiser import ErrorRaiserStage -from _utils.stages.in_memory_source_x_stage import InMemSourceXStage from morpheus.config import Config from morpheus.pipeline import LinearPipeline -from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.utils.type_aliases import DataFrameType @@ -42,23 +38,3 @@ def test_stage_raises_exception(config: Config, filter_probs_df: DataFrameType, # Ensure that the raised exception was from our stage and not from something else assert error_raiser_stage.error_raised assert len(sink_stage.get_messages()) == 0 - - -@pytest.mark.gpu_and_cpu_mode -@pytest.mark.parametrize("delayed_start", [False, True]) -def test_monitor_not_impl(config: Config, delayed_start: bool): - - class UnsupportedType: - pass - - pipe = LinearPipeline(config) - pipe.set_source(InMemSourceXStage(config, [UnsupportedType()])) - monitor_stage = pipe.add_stage(MonitorStage(config, log_level=logging.WARNING, delayed_start=delayed_start)) - sink_stage = pipe.add_stage(InMemorySinkStage(config)) - - assert monitor_stage._mc.is_enabled() - - with pytest.raises(NotImplementedError): - pipe.run() - - assert len(sink_stage.get_messages()) == 0 diff --git a/tests/morpheus/stages/test_monitor_stage.py b/tests/morpheus/stages/test_monitor_stage.py index e50153e7e5..d810f64aa7 100755 --- a/tests/morpheus/stages/test_monitor_stage.py +++ b/tests/morpheus/stages/test_monitor_stage.py @@ -59,11 +59,12 @@ def two_x(x): assert stage._mc._determine_count_fn is two_x +@pytest.mark.cpu_mode @mock.patch('morpheus.controllers.monitor_controller.MorpheusTqdm') def test_start_async(mock_morph_tqdm: mock.MagicMock, config: Config): mock_morph_tqdm.return_value = mock_morph_tqdm - stage = MonitorStage(config, log_level=logging.WARNING) + stage = MonitorStage(config, delayed_start=False, log_level=logging.WARNING) assert stage._mc._progress is None asyncio.run(stage.start_async()) @@ -72,11 +73,12 @@ def test_start_async(mock_morph_tqdm: mock.MagicMock, config: Config): assert stage._mc._progress is mock_morph_tqdm +@pytest.mark.cpu_mode @mock.patch('morpheus.controllers.monitor_controller.MorpheusTqdm') async def test_join(mock_morph_tqdm: mock.MagicMock, config: Config): mock_morph_tqdm.return_value = mock_morph_tqdm - stage = MonitorStage(config, log_level=logging.WARNING) + stage = MonitorStage(config, delayed_start=False, log_level=logging.WARNING) assert stage._mc._progress is None # Calling join is a noop if we are stopped @@ -88,11 +90,12 @@ async def test_join(mock_morph_tqdm: mock.MagicMock, config: Config): mock_morph_tqdm.close.assert_called_once() +@pytest.mark.cpu_mode @mock.patch('morpheus.controllers.monitor_controller.MorpheusTqdm') def test_refresh(mock_morph_tqdm: mock.MagicMock, config: Config): mock_morph_tqdm.return_value = mock_morph_tqdm - stage = MonitorStage(config, log_level=logging.WARNING) + stage = MonitorStage(config, delayed_start=False, log_level=logging.WARNING) assert stage._mc._progress is None asyncio.run(stage.start_async()) @@ -151,6 +154,7 @@ def test_progress_sink(mock_morph_tqdm: mock.MagicMock, config: Config): mock_morph_tqdm.update.assert_called_once_with(n=12) +@pytest.mark.cpu_mode @pytest.mark.usefixtures("reset_loglevel") @pytest.mark.parametrize('log_level', [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) @mock.patch('morpheus.stages.general.monitor_stage.MonitorController.sink_on_completed', autospec=True) @@ -179,7 +183,7 @@ def test_log_level(mock_progress_sink: mock.MagicMock, assert mock_sink_on_completed.call_count == expected_call_count -@pytest.mark.gpu_and_cpu_mode +@pytest.mark.cpu_mode def test_thread(config: Config, morpheus_log_level: int): """ Test ensures the monitor stage executes on the same thread as the parent stage diff --git a/tests/morpheus/stages/test_multi_processing_stage.py b/tests/morpheus/stages/test_multi_processing_stage.py index f88ec3d7d8..da9230dacd 100644 --- a/tests/morpheus/stages/test_multi_processing_stage.py +++ b/tests/morpheus/stages/test_multi_processing_stage.py @@ -172,7 +172,7 @@ def test_created_stage_pipe(config: Config, dataset_pandas: DatasetManager): expected_df = input_df.copy() expected_df["new_column"] = "Hello" - df_count = 100 + df_count = 10 df_generator = partial(pandas_dataframe_generator, dataset_pandas, df_count) partial_fn = partial(_process_df, column="new_column", value="Hello") @@ -225,7 +225,7 @@ def test_multiple_stages_pipe(config: Config, dataset_pandas: DatasetManager): expected_df["new_column_1"] = "new_value" expected_df["new_column_2"] = "Hello" - df_count = 100 + df_count = 10 df_generator = partial(pandas_dataframe_generator, dataset_pandas, df_count) partial_fn = partial(_process_df, column="new_column_1", value="new_value") diff --git a/tests/morpheus/stages/test_write_to_file_stage_pipe.py b/tests/morpheus/stages/test_write_to_file_stage_pipe.py new file mode 100644 index 0000000000..69409321d5 --- /dev/null +++ b/tests/morpheus/stages/test_write_to_file_stage_pipe.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import types + +import pandas as pd +import pytest + +from _utils.dataset_manager import DatasetManager +from morpheus.common import FileTypes +from morpheus.io.deserializers import read_file_to_df +from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.write_to_file_stage import WriteToFileStage +from morpheus.stages.postprocess.serialize_stage import SerializeStage +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage + + +@pytest.mark.parametrize("output_file", ["/tmp/output.json", "/tmp/output.csv", "/tmp/output.parquet"]) +@pytest.mark.gpu_and_cpu_mode +def test_write_to_file_stage_pipe(config, + df_pkg: types.ModuleType, + dataset: DatasetManager, + output_file: str, + execution_mode: str) -> None: + """ + Test WriteToFileStage with different output formats (JSON, CSV, Parquet) + """ + + filter_probs_df = dataset['filter_probs.csv'] + pipe = LinearPipeline(config) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) + pipe.add_stage(DeserializeStage(config)) + pipe.add_stage(SerializeStage(config)) + pipe.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) + pipe.run() + + if output_file.endswith(".json"): + output_df = pd.read_json(output_file, lines=True) + dataset.assert_compare_df(filter_probs_df, output_df) + + elif output_file.endswith(".csv"): + # The output data will contain an additional id column that we will need to slice off + output_df = df_pkg.read_csv(output_file).iloc[:, 1:] + dataset.assert_compare_df(filter_probs_df, output_df) + + elif output_file.endswith(".parquet"): + output_df = read_file_to_df(file_name=output_file, file_type=FileTypes.PARQUET) + # The c++ WriteToFileStage will add an additional index column to the output + if execution_mode == "GPU": + output_df = output_df.iloc[:, 1:] + assert output_df.values.tolist() == filter_probs_df.values.tolist() diff --git a/tests/morpheus/test_config.py b/tests/morpheus/test_config.py index 746acf3771..9ebce02c8b 100755 --- a/tests/morpheus/test_config.py +++ b/tests/morpheus/test_config.py @@ -159,6 +159,7 @@ def test_warning_model_batch_size_less_than_pipeline_batch_size(caplog: pytest.L config.pipeline_batch_size = 256 with caplog.at_level(logging.WARNING): config.model_max_batch_size = 257 + config.freeze() assert len(caplog.records) == 1 import re assert re.match(".*pipeline_batch_size < model_max_batch_size.*", caplog.records[0].message) is not None @@ -169,6 +170,7 @@ def test_warning_pipeline_batch_size_less_than_model_batch_size(caplog: pytest.L config.model_max_batch_size = 8 with caplog.at_level(logging.WARNING): config.pipeline_batch_size = 7 + config.freeze() assert len(caplog.records) == 1 import re assert re.match(".*pipeline_batch_size < model_max_batch_size.*", caplog.records[0].message) is not None diff --git a/tests/morpheus/utils/test_shared_process_pool.py b/tests/morpheus/utils/test_shared_process_pool.py index 0bca371bb0..7baa4ce1ae 100644 --- a/tests/morpheus/utils/test_shared_process_pool.py +++ b/tests/morpheus/utils/test_shared_process_pool.py @@ -149,43 +149,28 @@ def test_submit_single_task(shared_process_pool, a, b, expected): @pytest.mark.slow -def test_submit_task_with_invalid_stage(shared_process_pool): +def test_submit_invalid_tasks(shared_process_pool): pool = shared_process_pool + # submit_task() should raise ValueError if the stage does not exist with pytest.raises(ValueError): pool.submit_task("stage_does_not_exist", _add_task, 10, 20) - -@pytest.mark.slow -def test_submit_task_raises_exception(shared_process_pool): - - pool = shared_process_pool pool.set_usage("test_stage", 0.5) + # if the function raises exception, the task can be submitted and the exception will be raised when calling result() task = pool.submit_task("test_stage", _function_raises_exception) with pytest.raises(RuntimeError): task.result() - -@pytest.mark.slow -def test_submit_task_with_unserializable_result(shared_process_pool): - - pool = shared_process_pool - pool.set_usage("test_stage", 0.5) - + # if the function returns unserializable result, the task can be submitted and the exception will be raised + # when calling result() task = pool.submit_task("test_stage", _function_returns_unserializable_result) with pytest.raises(TypeError): task.result() - -@pytest.mark.slow -def test_submit_task_with_unserializable_arg(shared_process_pool): - - pool = shared_process_pool - pool.set_usage("test_stage", 0.5) - - # Unserializable arguments cannot be submitted to the pool + # Function with unserializable arguments cannot be submitted to the pool with pytest.raises(TypeError): pool.submit_task("test_stage", _arbitrary_function, threading.Lock()) @@ -207,7 +192,7 @@ def test_submit_multiple_tasks(shared_process_pool, a, b, expected): pool = shared_process_pool pool.set_usage("test_stage", 0.5) - num_tasks = 100 + num_tasks = 10 tasks = [] for _ in range(num_tasks): tasks.append(pool.submit_task("test_stage", _add_task, a, b)) diff --git a/tests/test_monitor_stage_pipe.py b/tests/test_monitor_stage_pipe.py new file mode 100755 index 0000000000..c35243864e --- /dev/null +++ b/tests/test_monitor_stage_pipe.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from functools import partial +from typing import Generator + +import numpy as np +import pytest + +import cudf + +from _utils import assert_results +from morpheus.common import IndicatorsFontStyle +from morpheus.common import IndicatorsTextColor +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta +from morpheus.pipeline import LinearPipeline +from morpheus.pipeline.stage_decorator import stage +from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.input.in_memory_data_generation_stage import InMemoryDataGenStage +from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage +from morpheus.stages.postprocess.serialize_stage import SerializeStage +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage + + +def sample_message_meta_generator(df_rows: int, df_cols: int, count: int) -> Generator[MessageMeta, None, None]: + data = {f'col_{i}': range(df_rows) for i in range(df_cols)} + df = cudf.DataFrame(data) + meta = MessageMeta(df) + for _ in range(count): + yield meta + + +@pytest.mark.gpu_mode +def test_cpp_monitor_stage_pipe(config): + config.num_threads = 1 + + df_rows = 10 + df_cols = 3 + expected_df = next(sample_message_meta_generator(df_rows, df_cols, 1)).copy_dataframe() + + count = 20 + + cudf_generator = partial(sample_message_meta_generator, df_rows, df_cols, count) + + @stage + def dummy_control_message_process_stage(msg: ControlMessage) -> ControlMessage: + matrix_a = np.random.rand(3000, 3000) + matrix_b = np.random.rand(3000, 3000) + matrix_c = np.dot(matrix_a, matrix_b) + msg.set_metadata("result", matrix_c[0][0]) + + return msg + + # The default determine_count_fn for MessageMeta and ControlMessage returns the number of rows in the DataFrame + # This customized_determine_count_fn returns 1 for each MessageMeta + def customized_determine_count_fn(msg: MessageMeta) -> int: # pylint: disable=unused-argument + return 1 + + pipe = LinearPipeline(config) + pipe.set_source(InMemoryDataGenStage(config, cudf_generator, output_data_type=MessageMeta)) + pipe.add_stage(DeserializeStage(config, ensure_sliceable_index=True)) + pipe.add_stage( + MonitorStage(config, + description="preprocess", + unit="records", + text_color=IndicatorsTextColor.green, + font_style=IndicatorsFontStyle.underline)) + pipe.add_stage(dummy_control_message_process_stage(config)) + pipe.add_stage( + MonitorStage(config, + description="postprocess", + unit="records", + text_color=IndicatorsTextColor.blue, + font_style=IndicatorsFontStyle.italic)) + pipe.add_stage(SerializeStage(config)) + pipe.add_stage( + MonitorStage(config, description="sink", unit="MessageMeta", determine_count_fn=customized_determine_count_fn)) + comp_stage = pipe.add_stage(CompareDataFrameStage(config, expected_df)) + pipe.run() + + assert_results(comp_stage.get_results())