Skip to content

Commit

Permalink
Implemented deduplicate_headers flag for load (#104)
Browse files Browse the repository at this point in the history
* Implemented `deduplicate_headers` flag

* Fixed tests
  • Loading branch information
roll authored and akariv committed Jul 16, 2019
1 parent b1e795c commit 00328b3
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 2 deletions.
2 changes: 2 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Loads data from various source types (local files, remote URLS, Google Spreadshe
def load(source, name=None, resources=None, strip=True, limit_rows=None,
infer_strategy=None, cast_strategy=None,
override_schema=None, override_fields=None,
deduplicate_headers=False,
on_error=raise_exception,
**options)
pass
Expand Down Expand Up @@ -76,6 +77,7 @@ Relevant only when _not_ loading data from a datapackage:
- `load.CAST_WITH_SCHEMA` - Data will be parsed and casted using the schema and will error in case of faulty data
- `override_schema` - Provided dictionary will be merged into the inferred schema. If `fields` key is set its contents will fully replace the inferred fields array. The same behavior will be applied for all other nested structures.
- `override_fields` - Provided mapping will patch the inferred `schema.fields` array. In the mapping keys must be field names and values must be dictionaries intended to be merged into the corresponding field's metadata.
- `deduplicate_headers` - (default `False`) If there are duplicate headers and the flag is set to `True` it will rename them using a `header (1), header (2), etc` approach. If there are duplicate headers and the flag is set to `False` it will raise an error.
- `on_error` - Dictates how `load` will behave in case of a validation error.
Options are identical to `on_error` in `set_type` and `validate`

Expand Down
2 changes: 2 additions & 0 deletions data/duplicate_headers.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
header1,header2,header2
value1,value2,value3
23 changes: 22 additions & 1 deletion dataflows/processors/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ class load(DataStreamProcessor):
ERRORS_RAISE = raise_exception

def __init__(self, load_source, name=None, resources=None, strip=True, limit_rows=None,
infer_strategy=None, cast_strategy=None, on_error=raise_exception,
infer_strategy=None, cast_strategy=None,
override_schema=None, override_fields=None,
deduplicate_headers=False,
on_error=raise_exception,
**options):
super(load, self).__init__()
self.load_source = load_source
Expand All @@ -119,6 +121,7 @@ def __init__(self, load_source, name=None, resources=None, strip=True, limit_row
self.resources = resources
self.override_schema = override_schema
self.override_fields = override_fields
self.deduplicate_headers = deduplicate_headers

self.load_dp = None
self.resource_descriptors = []
Expand Down Expand Up @@ -194,6 +197,11 @@ def process_datapackage(self, dp: Package):
self.options.setdefault('ignore_blank_headers', True)
self.options.setdefault('headers', 1)
stream: Stream = Stream(self.load_source, **self.options).open()
if len(stream.headers) != len(set(stream.headers)):
if not self.deduplicate_headers:
raise ValueError(
'Found duplicate headers. Use the `deduplicate_headers` flag')
stream.headers = self.rename_duplicate_headers(stream.headers)
schema = Schema().infer(
stream.sample, headers=stream.headers,
confidence=1, guesser_cls=self.guesser)
Expand Down Expand Up @@ -241,3 +249,16 @@ def process_resources(self, resources):
if self.limit_rows:
it = self.limiter(it)
yield it

def rename_duplicate_headers(self, duplicate_headers):
counter = {}
headers = []
for header in duplicate_headers:
counter.setdefault(header, 0)
counter[header] += 1
if counter[header] > 1:
if counter[header] == 2:
headers[headers.index(header)] = '%s (%s)' % (header, 1)
header = '%s (%s)' % (header, counter[header])
headers.append(header)
return headers
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def read(*paths):
PACKAGE = 'dataflows'
NAME = PACKAGE.replace('_', '-')
INSTALL_REQUIRES = [
'tabulator>=1.23.0',
'datapackage>=1.5.0',
'tableschema>=1.5',
'kvfile>=0.0.6',
Expand Down
29 changes: 28 additions & 1 deletion tests/test_lib.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
from dataflows import Flow

data = [
Expand Down Expand Up @@ -1236,7 +1237,7 @@ def test_load_override_schema_and_fields():
{'name': 'george', 'age': '17'},
{'name': None, 'age': '22'},
]]

def test_delete_fields_regex():
from dataflows import load, delete_fields
flow = Flow(
Expand Down Expand Up @@ -1271,3 +1272,29 @@ def test_join_full_outer():
{'id': 3, 'city': 'rome', 'population': None},
{'id': 4, 'city': None, 'population': 3},
]]


def test_load_duplicate_headers():
from dataflows import load
flow = Flow(
load('data/duplicate_headers.csv'),
)
with pytest.raises(ValueError) as excinfo:
flow.results()
assert 'duplicate headers' in str(excinfo.value)


def test_load_duplicate_headers_with_deduplicate_headers_flag():
from dataflows import load
flow = Flow(
load('data/duplicate_headers.csv', deduplicate_headers=True),
)
data, package, stats = flow.results()
assert package.descriptor['resources'][0]['schema']['fields'] == [
{'name': 'header1', 'type': 'string', 'format': 'default'},
{'name': 'header2 (1)', 'type': 'string', 'format': 'default'},
{'name': 'header2 (2)', 'type': 'string', 'format': 'default'},
]
assert data == [[
{'header1': 'value1', 'header2 (1)': 'value2', 'header2 (2)': 'value3'},
]]

0 comments on commit 00328b3

Please sign in to comment.