A native Python implementation of Spark's RDD interface, but instead of being resilient and distributed it is just transient and local; but fast (lower latency than PySpark). It is a drop in replacement for PySpark's SparkContext and RDD.
Use case: you have a pipeline that processes 100k input documents and converts them to normalized features. They are used to train a local scikit-learn classifier. The preprocessing is perfect for a full Spark task. Now, you want to use this trained classifier in an API endpoint. You need the same pre-processing pipeline for a single document per API call. This does not have to be done in parallel, but there should be only a small overhead in initialization and preferably no dependency on the JVM. This is what
pysparkling
is for.
pip install pysparkling
- Supports multiple URI schemes like
s3n://
,http://
andfile://
. Specify multiple files separated by comma. Resolves*
and?
wildcards. - Handles
.gz
and.bz2
compressed files. - Parallelization via
multiprocessing.Pool
,concurrent.futures.ThreadPoolExecutor
or any other Pool-like objects that have amap(func, iterable)
method. - only dependencies:
boto
for AWS S3 andrequests
for http
The example source codes are included in tests/readme_example*.py
.
Line counts: Count the lines in the *.py
files in the tests
directory and
count only those lines that start with import
:
from pysparkling import Context
my_rdd = Context().textFile('tests/*.py')
print('In tests/*.py: all lines={0}, with import={1}'.format(
my_rdd.count(),
my_rdd.filter(lambda l: l.startswith('import ')).count()
))
which prints In tests/*.py: all lines=518, with import=11
.
Common Crawl: More info on the dataset is in this blog post.
from pysparkling import Context
# read all the paths of warc and wat files of the latest Common Crawl
paths_rdd = Context().textFile(
's3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2015-11/warc.paths.*,'
's3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2015-11/wat.paths.gz'
)
print(paths_rdd.collect())
which prints a long list of paths extracted from two gzip compressed files.
Human Microbiome Project: Get a random line without loading the entire dataset.
from pysparkling import Context
by_subject_rdd = Context().textFile(
's3n://human-microbiome-project/DEMO/HM16STR/46333/by_subject/*'
)
print(by_subject_rdd.takeSample(1))
which prints out a line like [u'CAACGCCGCGTGAGGGATGACGGCCTTCGGGTTGTAAACCTCTTTCAGTATCGACGAAGC']
.
aggregate(zeroValue, seqOp, combOp)
: aggregate value in partition with seqOp and combine with combOpaggregateByKey(zeroValue, seqFunc, combFunc)
: aggregate by keycache()
: synonym forpersist()
cartesian(other)
: cartesian productcoalesce()
: do nothingcollect()
: return the underlying listcount()
: get length of internal listcountApprox()
: same ascount()
countByKey
: input is list of pairs, returns a dictionarycountByValue
: input is a list, returns a dictionarycontext()
: return the contextdistinct()
: returns a new RDD containing the distinct elementsfilter(func)
: return new RDD filtered with funcfirst()
: return first elementflatMap(func)
: return a new RDD of a flattened mapflatMapValues(func)
: return new RDDfold(zeroValue, op)
: aggregate elementsfoldByKey(zeroValue, op)
: aggregate elements by keyforeach(func)
: apply func to every elementforeachPartition(func)
: apply func to every partitiongetNumPartitions()
: number of partitionsgetPartitions()
: returns an iterator over the partitionsgroupBy(func)
: group by the output of funcgroupByKey()
: group by key where the RDD is of type [(key, value), ...]histogram(buckets)
: buckets can be a list or an intid()
: currently just returns Noneintersection(other)
: return a new RDD with the intersectionisCheckpointed()
: returns Falsejoin(other)
: joinkeyBy(func)
: creates tuple in new RDDkeys()
: returns the keys of tuples in new RDDleftOuterJoin(other)
: left outer joinlookup(key)
: return list of values for this keymap(func)
: apply func to every element and return a new RDDmapPartitions(func)
: apply f to entire partitionsmapValues(func)
: apply func to value in (key, value) pairs and return a new RDDmax()
: get the maximum elementmean()
: meanmin()
: get the minimum elementname()
: RDD's namepersist()
: caches outputs of previous operations (previous steps are still executed lazily)pipe(command)
: pipe the elements through an external command line toolreduce()
: reducereduceByKey()
: reduce by key and return the new RDDrightOuterJoin(other)
: right outer joinsample(withReplacement, fraction, seed=None)
: sample from the RDDsaveAsTextFile(path)
: save RDD as text filesubtract(other)
: return a new RDD without the elements in othersum()
: sumtake(n)
: get the first n elementstakeSample(n)
: get n random samples
__init__(pool=None, serializer=None, deserializer=None, data_serializer=None, data_deserializer=None)
: takes a pool object (an object that has amap()
method, e.g. a multiprocessing.Pool) to parallelize methods. To support functions and lambda functions, specify custom serializers and deserializers, e.g.serializer=dill.dumps, deserializer=dill.loads
.broadcast(var)
: returns an instance ofBroadcast()
and it's values are accessed withvalue
.newRddId()
: incrementing numbertextFile(filename)
: load every line of a text file into a RDD.filename
can contain a comma separated list of many files,?
and*
wildcards, file paths on S3 (s3n://bucket_name/filename.txt
) and local file paths (relative/path/my_text.txt
,/absolut/path/my_text.txt
orfile:///absolute/file/path.txt
). If the filename points to a folder containingpart*
files, those are resolved.version
: the version of pysparkling
value
: access the value it stores
The functionality provided by this module is used in Context.textFile()
for reading and in RDD.saveAsTextFile()
for writing. Normally, you should
not have to use this submodule directly.
Use environment variables AWS_SECRET_ACCESS_KEY
and AWS_ACCESS_KEY_ID
for auth and Use file paths of the form s3n://bucket_name/filename.txt
.
Infers .gz
and .bz2
compressions from the file name.
File(file_name)
: file_name is either local, http, on S3 or ...[static] exists(path)
: check for existance of path[static] resolve_filenames(expr)
: given a glob-like expression with*
and?
, get a list of all matching filenames (either locally or on S3).load()
: return the contents as BytesIOdump(stream)
: write the stream to the filemake_public(recursive=False)
: only for files on S3
- master
- v0.2.13 (2015-05-28)
- make
cache()
andpersist()
do something useful - logo
- fix
foreach()
- make
- v0.2.10 (2015-05-27)
- fix
fileio.codec
import - support
http://
- fix
- v0.2.8 (2015-05-26)
- parallelized text file reading (and made it lazy)
- parallelized take() and takeSample() that only computes required data partitions
- add example: access Human Microbiome Project
- v0.2.6 (2015-05-21)
- factor out
fileio.fs
andfileio.codec
modules - merge
WholeFile
intoFile
- improved handling of compressed files (backwards incompatible)
fileio
interface changed todump()
andload()
methods. Addedmake_public()
for S3.- factor file related operations into
fileio
submodule
- factor out
- v0.2.2 (2015-05-18)
- compressions:
.gz
,.bz2
- compressions:
- v0.2.0 (2015-05-17)
- proper handling of partitions
- custom serializers, deserializers (for functions and data separately)
- more tests for parallelization options
- execution of distributed jobs is such that a chain of
map()
operations gets executed on workers without sending intermediate results back to the master - a few more methods for RDDs implemented
- v0.1.1 (2015-05-12)
- implemented a few more RDD methods
- changed handling of context in RDD
- v0.1.0 (2015-05-09)