diff --git a/examples/log_parsing/README.md b/examples/log_parsing/README.md index 55dbffa0bb..b4f882d763 100644 --- a/examples/log_parsing/README.md +++ b/examples/log_parsing/README.md @@ -110,7 +110,7 @@ PYTHONPATH="examples/log_parsing" \ morpheus --log_level INFO \ --plugin "inference" \ --plugin "postprocessing" \ - run --num_threads 1 --use_cpp False --pipeline_batch_size 1024 --model_max_batch_size 32 \ + run --num_threads 1 --pipeline_batch_size 1024 --model_max_batch_size 32 \ pipeline-nlp \ from-file --filename ./models/datasets/validation-data/log-parsing-validation-data-input.csv \ deserialize \ diff --git a/examples/log_parsing/inference.py b/examples/log_parsing/inference.py index 21afa4c583..5947fbdcba 100644 --- a/examples/log_parsing/inference.py +++ b/examples/log_parsing/inference.py @@ -61,14 +61,14 @@ class TritonInferenceLogParsing(TritonInferenceWorker): def build_output_message(self, x: MultiInferenceMessage) -> MultiResponseMessage: seq_ids = cp.zeros((x.count, 3), dtype=cp.uint32) seq_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + x.count, dtype=cp.uint32) - seq_ids[:, 2] = x.seq_ids[:, 2] + seq_ids[:, 2] = x.get_tensor('seq_ids')[:, 2] memory = TensorMemory( count=x.count, tensors={ 'confidences': cp.zeros((x.count, self._inputs[list(self._inputs.keys())[0]].shape[1])), 'labels': cp.zeros((x.count, self._inputs[list(self._inputs.keys())[0]].shape[1])), - 'input_ids': cp.zeros((x.count, x.input_ids.shape[1])), + 'input_ids': cp.zeros((x.count, x.get_tensor('input_ids').shape[1])), 'seq_ids': seq_ids }) @@ -145,25 +145,36 @@ def compute_schema(self, schema: StageSchema): @staticmethod def _convert_one_response(output: MultiResponseMessage, inf: MultiInferenceNLPMessage, res: TensorMemory) -> MultiResponseMessage: + memory = output.memory - output.input_ids[inf.offset:inf.count + inf.offset, :] = inf.input_ids - output.seq_ids[inf.offset:inf.count + inf.offset, :] = inf.seq_ids + out_seq_ids = memory.get_tensor('seq_ids') + input_ids = memory.get_tensor('input_ids') + confidences = memory.get_tensor('confidences') + labels = memory.get_tensor('labels') + + seq_ids = inf.get_id_tensor() + + seq_offset = seq_ids[0, 0].item() - output.mess_offset + seq_count = (seq_ids[-1, 0].item() + 1 - seq_offset) - output.mess_offset + + input_ids[inf.offset:inf.count + inf.offset, :] = inf.get_tensor('input_ids') + out_seq_ids[inf.offset:inf.count + inf.offset, :] = seq_ids # Two scenarios: if (inf.mess_count == inf.count): - output.confidences[inf.offset:inf.count + inf.offset, :] = res.get_tensor('confidences') - output.labels[inf.offset:inf.count + inf.offset, :] = res.get_tensor('labels') + assert seq_count == res.count + confidences[seq_offset:seq_offset + seq_count, :] = res.get_tensor('confidences') + labels[seq_offset:seq_offset + seq_count, :] = res.get_tensor('labels') else: assert inf.count == res.count - mess_ids = inf.seq_ids[:, 0].get().tolist() + mess_ids = seq_ids[:, 0].get().tolist() - # Out message has more reponses, so we have to do key based blending of probs for i, idx in enumerate(mess_ids): - output.confidences[idx, :] = cp.maximum(output.confidences[idx, :], res.get_tensor('confidences')[i, :]) - output.labels[idx, :] = cp.maximum(output.labels[idx, :], res.get_tensor('labels')[i, :]) + confidences[idx, :] = cp.maximum(confidences[idx, :], res.get_tensor('confidences')[i, :]) + labels[idx, :] = cp.maximum(labels[idx, :], res.get_tensor('labels')[i, :]) - return MultiResponseMessage.from_message(inf, memory=output.memory, offset=inf.offset, count=inf.mess_count) + return MultiResponseMessage.from_message(inf, memory=memory, offset=inf.offset, count=inf.mess_count) def _get_inference_worker(self, inf_queue: ProducerConsumerQueue) -> TritonInferenceLogParsing: return TritonInferenceLogParsing(inf_queue=inf_queue, c=self._config, **self._kwargs) diff --git a/examples/log_parsing/postprocessing.py b/examples/log_parsing/postprocessing.py index 6e034afc1b..a43419f63d 100644 --- a/examples/log_parsing/postprocessing.py +++ b/examples/log_parsing/postprocessing.py @@ -22,6 +22,8 @@ import pandas as pd from mrc.core import operators as ops +import cudf + from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes @@ -80,11 +82,11 @@ def compute_schema(self, schema: StageSchema): def _postprocess(self, x: MultiResponseMessage): - infer_pdf = pd.DataFrame(x.seq_ids.get()).astype(int) + infer_pdf = pd.DataFrame(x.get_tensor('seq_ids').get()).astype(int) infer_pdf.columns = ["doc", "start", "stop"] - infer_pdf["confidences"] = x.confidences.tolist() - infer_pdf["labels"] = x.labels.tolist() - infer_pdf["token_ids"] = x.input_ids.tolist() + infer_pdf["confidences"] = x.get_tensor('confidences').tolist() + infer_pdf["labels"] = x.get_tensor('labels').tolist() + infer_pdf["token_ids"] = x.get_tensor('input_ids').tolist() infer_pdf["confidences"] = infer_pdf.apply(lambda row: row["confidences"][row["start"]:row["stop"]], axis=1) @@ -115,8 +117,7 @@ def _postprocess(self, x: MultiResponseMessage): # decode cleanup parsed_df = self.__decode_cleanup(parsed_df) - - return MessageMeta(df=parsed_df) + return MessageMeta(df=cudf.DataFrame.from_pandas(parsed_df)) def __get_label_dicts(self, row): token_dict = defaultdict(str) diff --git a/examples/log_parsing/run.py b/examples/log_parsing/run.py index 71db12c831..d241e69a4f 100644 --- a/examples/log_parsing/run.py +++ b/examples/log_parsing/run.py @@ -92,8 +92,6 @@ def run_pipeline( model_config_file, server_url, ): - CppConfig.set_should_use_cpp(False) - config = Config() config.mode = PipelineModes.NLP config.num_threads = num_threads