diff --git a/streaming_form_data/targets.py b/streaming_form_data/targets.py index a399f3a4..b99dfa4f 100644 --- a/streaming_form_data/targets.py +++ b/streaming_form_data/targets.py @@ -185,3 +185,51 @@ def on_data_received(self, chunk: bytes): def on_finish(self): if self._fd: self._fd.close() + + +class CsvTarget(BaseTarget): + """ + CsvTarget enables the processing and release of csv lines as soon as they are completed by a chunk. + It enables developers to apply their own logic (e.g save to a db or send the entry to another api) to each line + and free it from the memory in sequence, without the need to wait for the whole file and/or save the file locally. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._lines = [] + self._previous_partial_line = "" + + def on_data_received(self, chunk: bytes): + # join the previous partial line with the new chunk + combined = self._previous_partial_line + chunk.decode("utf-8") + + # split the combined string into lines + lines = combined.splitlines(keepends=True) + + # process all lines except the last one (which may be partial) + for line in lines[:-1]: + self._lines.append(line.replace("\n", "")) + + # if the last line ends with a newline, it is complete + if lines[-1].endswith("\n"): + self._lines.append(lines[-1].replace("\n", "")) + self._previous_partial_line = "" + else: + # otherwise, it is partial, and we save it for later + self._previous_partial_line = lines[-1] + + def pop_lines(self, include_partial_line=False): + # this clears the lines to keep memory usage low + lines = self._lines + if include_partial_line and self._previous_partial_line: + lines.append(self._previous_partial_line) + self._previous_partial_line = "" + self._lines = [] + return lines + + def get_lines(self, include_partial_line=False): + # this never clears the lines + lines = self._lines.copy() + if include_partial_line and self._previous_partial_line: + lines.append(self._previous_partial_line) + return lines diff --git a/tests/test_targets.py b/tests/test_targets.py index 0cc79ab2..6e47bc9b 100644 --- a/tests/test_targets.py +++ b/tests/test_targets.py @@ -12,6 +12,7 @@ NullTarget, ValueTarget, S3Target, + CsvTarget, ) from streaming_form_data.validators import MaxSizeValidator, ValidationError @@ -295,3 +296,69 @@ def test_s3_upload(mock_client): ) assert resp == "my test file" + + +def test_csv_upload(): + target = CsvTarget() + + target.start() + + # 1 partial line in the end of the chunk + target.data_received(b"name,surname,age\nDon,Bob,99\nGabe,Sai") + + assert target.get_lines() == ["name,surname,age", "Don,Bob,99"] + assert target.get_lines(include_partial_line=True) == [ + "name,surname,age", + "Don,Bob,99", + "Gabe,Sai", + ] + + assert target.pop_lines() == ["name,surname,age", "Don,Bob,99"] + assert target.pop_lines(include_partial_line=True) == ["Gabe,Sai"] + assert not target.pop_lines() + assert not target.get_lines() + + # 2 complete line in the end of the chunk + target.data_received(b"name,surname,age\nDon,Bob,99\nGabe,Sai") + target.data_received(b"nt,33\nMary,Bel,22\n") + + assert target.get_lines() == [ + "name,surname,age", + "Don,Bob,99", + "Gabe,Saint,33", + "Mary,Bel,22", + ] + assert target.get_lines(include_partial_line=True) == [ + "name,surname,age", + "Don,Bob,99", + "Gabe,Saint,33", + "Mary,Bel,22", + ] + + assert target.pop_lines() == [ + "name,surname,age", + "Don,Bob,99", + "Gabe,Saint,33", + "Mary,Bel,22", + ] + assert target.pop_lines(include_partial_line=True) == [] + assert not target.pop_lines() + assert not target.get_lines() + + # partial line in the end of the chunk + target.data_received(b"Odin,Grand,1029\nRachel,Ced,44") + + assert target.get_lines() == ["Odin,Grand,1029"] + assert target.get_lines(include_partial_line=True) == [ + "Odin,Grand,1029", + "Rachel,Ced,44", + ] + + assert target.pop_lines(include_partial_line=True) == [ + "Odin,Grand,1029", + "Rachel,Ced,44", + ] + assert not target.pop_lines() + assert not target.get_lines() + + target.finish()