From ddffd9045fd26c729b58488e01a7782c3bc78959 Mon Sep 17 00:00:00 2001 From: Gabriel Cedran Date: Thu, 1 Feb 2024 18:53:41 +0000 Subject: [PATCH] CSVTarget to enable processing of completed lines of csv files in chunks --- streaming_form_data/targets.py | 51 +++++++++++++++++++++++ tests/test_targets.py | 76 ++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/streaming_form_data/targets.py b/streaming_form_data/targets.py index a399f3a4..f7cc7048 100644 --- a/streaming_form_data/targets.py +++ b/streaming_form_data/targets.py @@ -185,3 +185,54 @@ def on_data_received(self, chunk: bytes): def on_finish(self): if self._fd: self._fd.close() + + +class CSVTarget(BaseTarget): + """ + CSVTarget enables the processing and release of csv lines as soon as they are + completed by a chunk. + It enables developers to apply their own logic (e.g save to a db or send the + entry to another api) to each line + and free it from the memory in sequence, without the need to wait for the + whole file and/or save the file locally. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._lines = [] + self._previous_partial_line = "" + + def on_data_received(self, chunk: bytes): + # join the previous partial line with the new chunk + combined = self._previous_partial_line + chunk.decode("utf-8") + + # split the combined string into lines + lines = combined.splitlines(keepends=True) + + # process all lines except the last one (which may be partial) + for line in lines[:-1]: + self._lines.append(line.replace("\n", "")) + + # if the last line ends with a newline, it is complete + if lines[-1].endswith("\n"): + self._lines.append(lines[-1].replace("\n", "")) + self._previous_partial_line = "" + else: + # otherwise, it is partial, and we save it for later + self._previous_partial_line = lines[-1] + + def pop_lines(self, include_partial_line=False): + # this clears the lines to keep memory usage low + lines = self._lines + if include_partial_line and self._previous_partial_line: + lines.append(self._previous_partial_line) + self._previous_partial_line = "" + self._lines = [] + return lines + + def get_lines(self, include_partial_line=False): + # this never clears the lines + lines = self._lines.copy() + if include_partial_line and self._previous_partial_line: + lines.append(self._previous_partial_line) + return lines diff --git a/tests/test_targets.py b/tests/test_targets.py index 0cc79ab2..3ee555da 100644 --- a/tests/test_targets.py +++ b/tests/test_targets.py @@ -12,6 +12,7 @@ NullTarget, ValueTarget, S3Target, + CSVTarget, ) from streaming_form_data.validators import MaxSizeValidator, ValidationError @@ -295,3 +296,78 @@ def test_s3_upload(mock_client): ) assert resp == "my test file" + + +def test_csv_upload__incomplete_line_gets_completed_next_chunk__pop_between_chunks(): + target = CSVTarget() + target.start() + + target.data_received(b"name,surname,age\nDon,Bob,99\nGabe,Sai") + assert target.get_lines() == ["name,surname,age", "Don,Bob,99"] + assert target.pop_lines() == ["name,surname,age", "Don,Bob,99"] + + target.data_received(b"nt,33\nMary,Bel,22\n") + + assert target.get_lines() == ["Gabe,Saint,33", "Mary,Bel,22"] + assert target.pop_lines() == ["Gabe,Saint,33", "Mary,Bel,22"] + + assert not target.pop_lines(include_partial_line=True) + assert not target.get_lines(include_partial_line=True) + + target.finish() + + +def test_csv_upload__complete_line_in_the_end_of_chunk(): + target = CSVTarget() + target.start() + + target.data_received(b"Odin,Grand,1029\nRachel,Ced,44\n") + + assert target.get_lines() == ["Odin,Grand,1029", "Rachel,Ced,44"] + assert target.pop_lines() == ["Odin,Grand,1029", "Rachel,Ced,44"] + + assert not target.get_lines(include_partial_line=True) + assert not target.pop_lines(include_partial_line=True) + + target.finish() + + +def test_csv_upload__incomplete_line_in_the_end_of_chunk(): + target = CSVTarget() + target.start() + + target.data_received(b"name,surname,age\nDon,Bob,99\nGabe,Sai") + + assert target.get_lines() == ["name,surname,age", "Don,Bob,99"] + assert target.pop_lines() == ["name,surname,age", "Don,Bob,99"] + + assert target.get_lines(include_partial_line=True) == ["Gabe,Sai"] + assert target.pop_lines(include_partial_line=True) == ["Gabe,Sai"] + + assert not target.get_lines(include_partial_line=True) + assert not target.pop_lines(include_partial_line=True) + + target.finish() + + +def test_csv_upload__incomplete_line_in_the_end_of_chunk__include_partial(): + target = CSVTarget() + target.start() + + target.data_received(b"name,surname,age\nDon,Bob,99\nGabe,Sai") + + assert target.get_lines(include_partial_line=True) == [ + "name,surname,age", + "Don,Bob,99", + "Gabe,Sai", + ] + assert target.pop_lines(include_partial_line=True) == [ + "name,surname,age", + "Don,Bob,99", + "Gabe,Sai", + ] + + assert not target.get_lines(include_partial_line=True) + assert not target.pop_lines(include_partial_line=True) + + target.finish()