Skip to content

Commit

Permalink
Merge pull request #320 from boazmohar/repartition_bug
Browse files Browse the repository at this point in the history
base.repartition() is missing a sortByKey() to the RDD
  • Loading branch information
freeman-lab committed May 26, 2016
2 parents 149cf88 + 8ec6a66 commit 00c19ce
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 5 deletions.
21 changes: 20 additions & 1 deletion test/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,23 @@ def test_map_with_keys(eng):
data = images.fromlist([array([[1, 1], [1, 1]]), array([[2, 2], [2, 2]])], engine=eng)
mapped = data.map(lambda kv: kv[0] + kv[1], with_keys=True)
assert allclose(mapped.shape, [2, 2, 2])
assert allclose(mapped.toarray(), [[[1, 1], [1, 1]], [[3, 3], [3, 3]]])
assert allclose(mapped.toarray(), [[[1, 1], [1, 1]], [[3, 3], [3, 3]]])


def test_repartition(eng):
if eng is not None:
data = images.fromlist([array([1, 1]), array([2, 2]), array([3, 3]), array([4, 4]),
array([5, 5]), array([6, 6]), array([7, 7]), array([8, 8]),
array([9, 9]), array([10, 10]), array([11, 11]), array([12, 12])],
engine=eng, npartitions=10)
assert allclose(data.first(), array([1, 1]))
data = data.repartition(3)
assert allclose(data.first(), array([1, 1]))

data = series.fromlist([array([1, 1]), array([2, 2]), array([3, 3]), array([4, 4]),
array([5, 5]), array([6, 6]), array([7, 7]), array([8, 8]),
array([9, 9]), array([10, 10]), array([11, 11]), array([12, 12])],
engine=eng, npartitions=10)
assert allclose(data.first(), array([1, 1]))
data = data.repartition(3)
assert allclose(data.first(), array([1, 1]))
3 changes: 1 addition & 2 deletions thunder/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ def repartition(self, npartitions):
Number of partitions after repartitions.
"""
if self.mode == 'spark':
self.values._rdd = self.values._rdd.repartition(npartitions)
return self
return self._constructor(self.values.repartition(npartitions)).__finalize__(self)
else:
notsupported(self.mode)

Expand Down
2 changes: 1 addition & 1 deletion thunder/images/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def first(self):
return self.values[0]

if self.mode == 'spark':
return self.values.tordd().values().first()
return self.values.first()

def toblocks(self, size='150', padding=None):
"""
Expand Down
2 changes: 1 addition & 1 deletion thunder/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def first(self):
return self.values[tuple(zeros(len(self.baseaxes))) + (slice(None, None),)]

if self.mode == 'spark':
return self.values.tordd().values().first()
return self.values.first()

def tolocal(self):
"""
Expand Down

0 comments on commit 00c19ce

Please sign in to comment.