Skip to content

Commit

Permalink
CsvTarget to enable processing of completed lines of csv files in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielcedran committed Feb 1, 2024
1 parent d2484de commit 709dc93
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 0 deletions.
48 changes: 48 additions & 0 deletions streaming_form_data/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,51 @@ def on_data_received(self, chunk: bytes):
def on_finish(self):
if self._fd:
self._fd.close()


class CsvTarget(BaseTarget):
"""
CsvTarget enables the processing and release of csv lines as soon as they are completed by a chunk.
It enables developers to apply their own logic (e.g save to a db or send the entry to another api) to each line
and free it from the memory in sequence, without the need to wait for the whole file and/or save the file locally.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._lines = []
self._previous_partial_line = ""

def on_data_received(self, chunk: bytes):
# join the previous partial line with the new chunk
combined = self._previous_partial_line + chunk.decode("utf-8")

# split the combined string into lines
lines = combined.splitlines(keepends=True)

# process all lines except the last one (which may be partial)
for line in lines[:-1]:
self._lines.append(line.replace("\n", ""))

# if the last line ends with a newline, it is complete
if lines[-1].endswith("\n"):
self._lines.append(lines[-1].replace("\n", ""))
self._previous_partial_line = ""
else:
# otherwise, it is partial, and we save it for later
self._previous_partial_line = lines[-1]

def pop_lines(self, include_partial_line=False):
# this clears the lines to keep memory usage low
lines = self._lines
if include_partial_line and self._previous_partial_line:
lines.append(self._previous_partial_line)
self._previous_partial_line = ""
self._lines = []
return lines

def get_lines(self, include_partial_line=False):
# this never clears the lines
lines = self._lines.copy()
if include_partial_line and self._previous_partial_line:
lines.append(self._previous_partial_line)
return lines
67 changes: 67 additions & 0 deletions tests/test_targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
NullTarget,
ValueTarget,
S3Target,
CsvTarget,
)

from streaming_form_data.validators import MaxSizeValidator, ValidationError
Expand Down Expand Up @@ -295,3 +296,69 @@ def test_s3_upload(mock_client):
)

assert resp == "my test file"


def test_csv_upload():
target = CsvTarget()

target.start()

# 1 partial line in the end of the chunk
target.data_received(b"name,surname,age\nDon,Bob,99\nGabe,Sai")

assert target.get_lines() == ["name,surname,age", "Don,Bob,99"]
assert target.get_lines(include_partial_line=True) == [
"name,surname,age",
"Don,Bob,99",
"Gabe,Sai",
]

assert target.pop_lines() == ["name,surname,age", "Don,Bob,99"]
assert target.pop_lines(include_partial_line=True) == ["Gabe,Sai"]
assert not target.pop_lines()
assert not target.get_lines()

# 2 complete line in the end of the chunk
target.data_received(b"name,surname,age\nDon,Bob,99\nGabe,Sai")
target.data_received(b"nt,33\nMary,Bel,22\n")

assert target.get_lines() == [
"name,surname,age",
"Don,Bob,99",
"Gabe,Saint,33",
"Mary,Bel,22",
]
assert target.get_lines(include_partial_line=True) == [
"name,surname,age",
"Don,Bob,99",
"Gabe,Saint,33",
"Mary,Bel,22",
]

assert target.pop_lines() == [
"name,surname,age",
"Don,Bob,99",
"Gabe,Saint,33",
"Mary,Bel,22",
]
assert target.pop_lines(include_partial_line=True) == []
assert not target.pop_lines()
assert not target.get_lines()

# partial line in the end of the chunk
target.data_received(b"Odin,Grand,1029\nRachel,Ced,44")

assert target.get_lines() == ["Odin,Grand,1029"]
assert target.get_lines(include_partial_line=True) == [
"Odin,Grand,1029",
"Rachel,Ced,44",
]

assert target.pop_lines(include_partial_line=True) == [
"Odin,Grand,1029",
"Rachel,Ced,44",
]
assert not target.pop_lines()
assert not target.get_lines()

target.finish()

0 comments on commit 709dc93

Please sign in to comment.