From 9ab3f6c5905a25be22ca7c4092827e156e837f0b Mon Sep 17 00:00:00 2001 From: Yuchen Zhang <134643420+yczhang-nv@users.noreply.github.com> Date: Wed, 9 Oct 2024 15:27:56 -0700 Subject: [PATCH 1/5] initial commit --- .../morpheus/controllers/write_to_file_controller.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/morpheus/morpheus/controllers/write_to_file_controller.py b/python/morpheus/morpheus/controllers/write_to_file_controller.py index 2e2109e96e..5d056da0ba 100644 --- a/python/morpheus/morpheus/controllers/write_to_file_controller.py +++ b/python/morpheus/morpheus/controllers/write_to_file_controller.py @@ -16,6 +16,7 @@ import mrc import mrc.core.operators as ops +import pyarrow.parquet as pq from morpheus.common import FileTypes from morpheus.common import determine_file_type @@ -124,9 +125,12 @@ def node_fn(self, obs: mrc.Observable, sub: mrc.Subscriber): def write_to_file(x: MessageMeta): - lines = self._convert_to_strings(x.df) - - out_file.writelines(lines) + if self._file_type == FileTypes.PARQUET: + table = x.df.to_arrow() + pq.write_table(table, out_file) + else: + lines = self._convert_to_strings(x.df) + out_file.writelines(lines) if self._flush: out_file.flush() From bccb9085222f701d499b0834600f25e98aebefbc Mon Sep 17 00:00:00 2001 From: Yuchen Zhang <134643420+yczhang-nv@users.noreply.github.com> Date: Wed, 6 Nov 2024 11:37:39 -0800 Subject: [PATCH 2/5] add test --- external/utilities | 2 +- output.csv | 21 +++++++ output.json | 20 ++++++ output.parquet | 0 .../controllers/write_to_file_controller.py | 3 +- .../stages/test_write_to_file_stage_pipe.py | 62 +++++++++++++++++++ 6 files changed, 105 insertions(+), 3 deletions(-) create mode 100644 output.csv create mode 100644 output.json create mode 100644 output.parquet create mode 100644 tests/morpheus/stages/test_write_to_file_stage_pipe.py diff --git a/external/utilities b/external/utilities index 7f5904513c..d0bf0272d0 160000 --- a/external/utilities +++ b/external/utilities @@ -1 +1 @@ -Subproject commit 7f5904513ca1281670aea8c351dae140892d3dfc +Subproject commit d0bf0272d0ba8e1ebc182bba3cdbc1d6798db97d diff --git a/output.csv b/output.csv new file mode 100644 index 0000000000..28af4dca97 --- /dev/null +++ b/output.csv @@ -0,0 +1,21 @@ +,v1,v2,v3,v4 +0,0.1,0.7000000000000001,0.7000000000000001,0.7000000000000001 +1,0.0,0.5,0.6000000000000001,0.30000000000000004 +2,1.0,0.9,0.5,0.9 +3,0.5,0.6000000000000001,0.2,0.9 +4,0.5,0.5,0.7000000000000001,0.9 +5,1.0,0.1,0.1,0.30000000000000004 +6,0.6000000000000001,0.0,0.5,0.5 +7,1.0,0.2,0.2,0.9 +8,0.5,0.8,0.6000000000000001,0.0 +9,0.30000000000000004,0.4,0.1,0.4 +10,0.1,0.30000000000000004,1.0,0.6000000000000001 +11,0.9,0.0,0.1,0.5 +12,0.5,0.30000000000000004,0.6000000000000001,0.8 +13,0.0,0.5,0.5,0.6000000000000001 +14,0.2,0.5,0.1,0.30000000000000004 +15,0.0,0.30000000000000004,0.5,0.6000000000000001 +16,0.5,1.0,0.4,0.7000000000000001 +17,0.6000000000000001,0.8,0.8,0.1 +18,0.8,0.8,1.0,0.6000000000000001 +19,0.1,0.9,0.1,0.30000000000000004 diff --git a/output.json b/output.json new file mode 100644 index 0000000000..eb0efb1555 --- /dev/null +++ b/output.json @@ -0,0 +1,20 @@ +{"v1":0.1,"v2":0.7,"v3":0.7,"v4":0.7} +{"v1":0.0,"v2":0.5,"v3":0.6,"v4":0.3} +{"v1":1.0,"v2":0.9,"v3":0.5,"v4":0.9} +{"v1":0.5,"v2":0.6,"v3":0.2,"v4":0.9} +{"v1":0.5,"v2":0.5,"v3":0.7,"v4":0.9} +{"v1":1.0,"v2":0.1,"v3":0.1,"v4":0.3} +{"v1":0.6,"v2":0.0,"v3":0.5,"v4":0.5} +{"v1":1.0,"v2":0.2,"v3":0.2,"v4":0.9} +{"v1":0.5,"v2":0.8,"v3":0.6,"v4":0.0} +{"v1":0.3,"v2":0.4,"v3":0.1,"v4":0.4} +{"v1":0.1,"v2":0.3,"v3":1.0,"v4":0.6} +{"v1":0.9,"v2":0.0,"v3":0.1,"v4":0.5} +{"v1":0.5,"v2":0.3,"v3":0.6,"v4":0.8} +{"v1":0.0,"v2":0.5,"v3":0.5,"v4":0.6} +{"v1":0.2,"v2":0.5,"v3":0.1,"v4":0.3} +{"v1":0.0,"v2":0.3,"v3":0.5,"v4":0.6} +{"v1":0.5,"v2":1.0,"v3":0.4,"v4":0.7} +{"v1":0.6,"v2":0.8,"v3":0.8,"v4":0.1} +{"v1":0.8,"v2":0.8,"v3":1.0,"v4":0.6} +{"v1":0.1,"v2":0.9,"v3":0.1,"v4":0.3} diff --git a/output.parquet b/output.parquet new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/morpheus/morpheus/controllers/write_to_file_controller.py b/python/morpheus/morpheus/controllers/write_to_file_controller.py index 5d056da0ba..fbf9e07d3b 100644 --- a/python/morpheus/morpheus/controllers/write_to_file_controller.py +++ b/python/morpheus/morpheus/controllers/write_to_file_controller.py @@ -126,8 +126,7 @@ def node_fn(self, obs: mrc.Observable, sub: mrc.Subscriber): def write_to_file(x: MessageMeta): if self._file_type == FileTypes.PARQUET: - table = x.df.to_arrow() - pq.write_table(table, out_file) + x.df.to_parquet(out_file, index=self._include_index_col) else: lines = self._convert_to_strings(x.df) out_file.writelines(lines) diff --git a/tests/morpheus/stages/test_write_to_file_stage_pipe.py b/tests/morpheus/stages/test_write_to_file_stage_pipe.py new file mode 100644 index 0000000000..431a8a0779 --- /dev/null +++ b/tests/morpheus/stages/test_write_to_file_stage_pipe.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import types + +import pytest + +from _utils.dataset_manager import DatasetManager +from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.write_to_file_stage import WriteToFileStage +from morpheus.stages.postprocess.serialize_stage import SerializeStage +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage + +@pytest.mark.parametrize("output_file", [ + "/tmp/output.json", + "/tmp/output.csv", + "/tmp/output.parquet" +]) +@pytest.mark.gpu_and_cpu_mode +def test_write_to_file_stage_pipe(config, + df_pkg: types.ModuleType, + dataset: DatasetManager, + output_file: str + ) -> None: + """ + Test WriteToFileStage with different output formats (JSON, CSV, Parquet) + """ + + filter_probs_df = dataset['filter_probs.csv'] + pipe = LinearPipeline(config) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) + pipe.add_stage(DeserializeStage(config)) + pipe.add_stage(SerializeStage(config)) + pipe.add_stage( + WriteToFileStage(config, filename=output_file, overwrite=True)) + pipe.run() + + # Load the output file and compare with the input dataframe + if output_file.endswith(".json"): + output_df = df_pkg.read_json(output_file) + elif output_file.endswith(".csv"): + output_df = df_pkg.read_csv(output_file) + elif output_file.endswith(".parquet"): + output_df = df_pkg.read_parquet(output_file) + else: + raise ValueError(f"Unsupported file format: {output_file}") + + dataset.assert_compare_df(filter_probs_df, output_df) From a7cbbb0835dd0af9ec8c12ddc2c09d20ccf531e2 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang <134643420+yczhang-nv@users.noreply.github.com> Date: Wed, 6 Nov 2024 11:53:35 -0800 Subject: [PATCH 3/5] intermediate --- external/utilities | 2 +- .../stages/test_write_to_file_stage_pipe.py | 25 +++++++++---------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/external/utilities b/external/utilities index d0bf0272d0..7f5904513c 160000 --- a/external/utilities +++ b/external/utilities @@ -1 +1 @@ -Subproject commit d0bf0272d0ba8e1ebc182bba3cdbc1d6798db97d +Subproject commit 7f5904513ca1281670aea8c351dae140892d3dfc diff --git a/tests/morpheus/stages/test_write_to_file_stage_pipe.py b/tests/morpheus/stages/test_write_to_file_stage_pipe.py index 431a8a0779..d454024810 100644 --- a/tests/morpheus/stages/test_write_to_file_stage_pipe.py +++ b/tests/morpheus/stages/test_write_to_file_stage_pipe.py @@ -24,18 +24,17 @@ from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage +import pandas as pd -@pytest.mark.parametrize("output_file", [ - "/tmp/output.json", - "/tmp/output.csv", - "/tmp/output.parquet" -]) + +@pytest.mark.parametrize( + "output_file", + [ + "/tmp/output.json", # "/tmp/output.csv", + # "/tmp/output.parquet" + ]) @pytest.mark.gpu_and_cpu_mode -def test_write_to_file_stage_pipe(config, - df_pkg: types.ModuleType, - dataset: DatasetManager, - output_file: str - ) -> None: +def test_write_to_file_stage_pipe(config, df_pkg: types.ModuleType, dataset: DatasetManager, output_file: str) -> None: """ Test WriteToFileStage with different output formats (JSON, CSV, Parquet) """ @@ -45,13 +44,13 @@ def test_write_to_file_stage_pipe(config, pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(SerializeStage(config)) - pipe.add_stage( - WriteToFileStage(config, filename=output_file, overwrite=True)) + pipe.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) pipe.run() # Load the output file and compare with the input dataframe if output_file.endswith(".json"): - output_df = df_pkg.read_json(output_file) + with open(output_file, 'r') as f: + output_df = pd.concat([pd.read_json(line) for line in f], ignore_index=True) elif output_file.endswith(".csv"): output_df = df_pkg.read_csv(output_file) elif output_file.endswith(".parquet"): From 43d1aa5e4bddc8099d5d4f4b4505021998b69ec1 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang <134643420+yczhang-nv@users.noreply.github.com> Date: Wed, 6 Nov 2024 13:03:23 -0800 Subject: [PATCH 4/5] fix unit tests --- output.csv | 21 ---------- output.json | 20 ---------- output.parquet | 0 .../controllers/write_to_file_controller.py | 32 +++++++++++----- .../stages/test_write_to_file_stage_pipe.py | 38 ++++++++++--------- 5 files changed, 43 insertions(+), 68 deletions(-) delete mode 100644 output.csv delete mode 100644 output.json delete mode 100644 output.parquet diff --git a/output.csv b/output.csv deleted file mode 100644 index 28af4dca97..0000000000 --- a/output.csv +++ /dev/null @@ -1,21 +0,0 @@ -,v1,v2,v3,v4 -0,0.1,0.7000000000000001,0.7000000000000001,0.7000000000000001 -1,0.0,0.5,0.6000000000000001,0.30000000000000004 -2,1.0,0.9,0.5,0.9 -3,0.5,0.6000000000000001,0.2,0.9 -4,0.5,0.5,0.7000000000000001,0.9 -5,1.0,0.1,0.1,0.30000000000000004 -6,0.6000000000000001,0.0,0.5,0.5 -7,1.0,0.2,0.2,0.9 -8,0.5,0.8,0.6000000000000001,0.0 -9,0.30000000000000004,0.4,0.1,0.4 -10,0.1,0.30000000000000004,1.0,0.6000000000000001 -11,0.9,0.0,0.1,0.5 -12,0.5,0.30000000000000004,0.6000000000000001,0.8 -13,0.0,0.5,0.5,0.6000000000000001 -14,0.2,0.5,0.1,0.30000000000000004 -15,0.0,0.30000000000000004,0.5,0.6000000000000001 -16,0.5,1.0,0.4,0.7000000000000001 -17,0.6000000000000001,0.8,0.8,0.1 -18,0.8,0.8,1.0,0.6000000000000001 -19,0.1,0.9,0.1,0.30000000000000004 diff --git a/output.json b/output.json deleted file mode 100644 index eb0efb1555..0000000000 --- a/output.json +++ /dev/null @@ -1,20 +0,0 @@ -{"v1":0.1,"v2":0.7,"v3":0.7,"v4":0.7} -{"v1":0.0,"v2":0.5,"v3":0.6,"v4":0.3} -{"v1":1.0,"v2":0.9,"v3":0.5,"v4":0.9} -{"v1":0.5,"v2":0.6,"v3":0.2,"v4":0.9} -{"v1":0.5,"v2":0.5,"v3":0.7,"v4":0.9} -{"v1":1.0,"v2":0.1,"v3":0.1,"v4":0.3} -{"v1":0.6,"v2":0.0,"v3":0.5,"v4":0.5} -{"v1":1.0,"v2":0.2,"v3":0.2,"v4":0.9} -{"v1":0.5,"v2":0.8,"v3":0.6,"v4":0.0} -{"v1":0.3,"v2":0.4,"v3":0.1,"v4":0.4} -{"v1":0.1,"v2":0.3,"v3":1.0,"v4":0.6} -{"v1":0.9,"v2":0.0,"v3":0.1,"v4":0.5} -{"v1":0.5,"v2":0.3,"v3":0.6,"v4":0.8} -{"v1":0.0,"v2":0.5,"v3":0.5,"v4":0.6} -{"v1":0.2,"v2":0.5,"v3":0.1,"v4":0.3} -{"v1":0.0,"v2":0.3,"v3":0.5,"v4":0.6} -{"v1":0.5,"v2":1.0,"v3":0.4,"v4":0.7} -{"v1":0.6,"v2":0.8,"v3":0.8,"v4":0.1} -{"v1":0.8,"v2":0.8,"v3":1.0,"v4":0.6} -{"v1":0.1,"v2":0.9,"v3":0.1,"v4":0.3} diff --git a/output.parquet b/output.parquet deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/python/morpheus/morpheus/controllers/write_to_file_controller.py b/python/morpheus/morpheus/controllers/write_to_file_controller.py index fbf9e07d3b..e0cc4a8dbd 100644 --- a/python/morpheus/morpheus/controllers/write_to_file_controller.py +++ b/python/morpheus/morpheus/controllers/write_to_file_controller.py @@ -120,20 +120,32 @@ def _convert_to_strings(self, df: DataFrameType): def node_fn(self, obs: mrc.Observable, sub: mrc.Subscriber): - # Open up the file handle - with open(self._output_file, "a", encoding='UTF-8') as out_file: + # When writing to a parquet file, we need to open the file in binary mode + if self._file_type == FileTypes.PARQUET: + with open(self._output_file, "wb") as out_file: - def write_to_file(x: MessageMeta): + def write_to_file(x: MessageMeta): + + x.df.to_parquet(out_file) + + if self._flush: + out_file.flush() + + return x + + obs.pipe(ops.map(write_to_file)).subscribe(sub) + + else: + with open(self._output_file, "a", encoding='UTF-8') as out_file: + + def write_to_file(x: MessageMeta): - if self._file_type == FileTypes.PARQUET: - x.df.to_parquet(out_file, index=self._include_index_col) - else: lines = self._convert_to_strings(x.df) out_file.writelines(lines) - if self._flush: - out_file.flush() + if self._flush: + out_file.flush() - return x + return x - obs.pipe(ops.map(write_to_file)).subscribe(sub) + obs.pipe(ops.map(write_to_file)).subscribe(sub) diff --git a/tests/morpheus/stages/test_write_to_file_stage_pipe.py b/tests/morpheus/stages/test_write_to_file_stage_pipe.py index d454024810..8050bc5519 100644 --- a/tests/morpheus/stages/test_write_to_file_stage_pipe.py +++ b/tests/morpheus/stages/test_write_to_file_stage_pipe.py @@ -16,25 +16,26 @@ import types +import pandas as pd import pytest from _utils.dataset_manager import DatasetManager +from morpheus.common import FileTypes +from morpheus.io.deserializers import read_file_to_df from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -import pandas as pd -@pytest.mark.parametrize( - "output_file", - [ - "/tmp/output.json", # "/tmp/output.csv", - # "/tmp/output.parquet" - ]) +@pytest.mark.parametrize("output_file", ["/tmp/output.json", "/tmp/output.csv", "/tmp/output.parquet"]) @pytest.mark.gpu_and_cpu_mode -def test_write_to_file_stage_pipe(config, df_pkg: types.ModuleType, dataset: DatasetManager, output_file: str) -> None: +def test_write_to_file_stage_pipe(config, + df_pkg: types.ModuleType, + dataset: DatasetManager, + output_file: str, + execution_mode: str) -> None: """ Test WriteToFileStage with different output formats (JSON, CSV, Parquet) """ @@ -47,15 +48,18 @@ def test_write_to_file_stage_pipe(config, df_pkg: types.ModuleType, dataset: Dat pipe.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) pipe.run() - # Load the output file and compare with the input dataframe if output_file.endswith(".json"): - with open(output_file, 'r') as f: - output_df = pd.concat([pd.read_json(line) for line in f], ignore_index=True) + output_df = pd.read_json(output_file, lines=True) + dataset.assert_compare_df(filter_probs_df, output_df) + elif output_file.endswith(".csv"): - output_df = df_pkg.read_csv(output_file) - elif output_file.endswith(".parquet"): - output_df = df_pkg.read_parquet(output_file) - else: - raise ValueError(f"Unsupported file format: {output_file}") + # The output data will contain an additional id column that we will need to slice off + output_df = df_pkg.read_csv(output_file).iloc[:, 1:] + dataset.assert_compare_df(filter_probs_df, output_df) - dataset.assert_compare_df(filter_probs_df, output_df) + elif output_file.endswith(".parquet"): + output_df = read_file_to_df(file_name=output_file, file_type=FileTypes.PARQUET) + # The c++ WriteToFileStage will add an additional index column to the output + if execution_mode == "GPU": + output_df = output_df.iloc[:, 1:] + assert output_df.values.tolist() == filter_probs_df.values.tolist() From 25a5eeda11501cec27d17958b02ad152bf7119c3 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang <134643420+yczhang-nv@users.noreply.github.com> Date: Wed, 6 Nov 2024 13:06:41 -0800 Subject: [PATCH 5/5] fix format --- .../morpheus/morpheus/controllers/write_to_file_controller.py | 1 - tests/morpheus/stages/test_write_to_file_stage_pipe.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/python/morpheus/morpheus/controllers/write_to_file_controller.py b/python/morpheus/morpheus/controllers/write_to_file_controller.py index e0cc4a8dbd..d7cd1d8ef6 100644 --- a/python/morpheus/morpheus/controllers/write_to_file_controller.py +++ b/python/morpheus/morpheus/controllers/write_to_file_controller.py @@ -16,7 +16,6 @@ import mrc import mrc.core.operators as ops -import pyarrow.parquet as pq from morpheus.common import FileTypes from morpheus.common import determine_file_type diff --git a/tests/morpheus/stages/test_write_to_file_stage_pipe.py b/tests/morpheus/stages/test_write_to_file_stage_pipe.py index 8050bc5519..69409321d5 100644 --- a/tests/morpheus/stages/test_write_to_file_stage_pipe.py +++ b/tests/morpheus/stages/test_write_to_file_stage_pipe.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License");