Skip to content

How to find records on HDFS using pyspark

Valentin Kuznetsov edited this page Dec 9, 2016 · 10 revisions

How to find records on HDFS using pyspark

We developed tools to use pyspark platform in order to run Map-Reduce job, see myspark.

It based on idea of using pyspark to look-up Avro files on HDFS. Each file then can be loaded into spark context object and processed via Map-Reduce functions. We can use the same idea to find records on HDFS. For that we provide several pre-defined classes, e.g. RecordFinder. It defines MapReduce class whose purpose to select records based on provided spec. The latter is just python dictionary which can be passed to myspark. The class has basic spec parser (which so far only convert strings into regex patterns) and match function. The latter check if given record matches the given spec. If conditions are met the mapper method of MapReduce will return such record, otherwise it returns empty dict. The reducer method collect all records and filter out empty ones. At the end we'll have list of found records.

Environment

To run myspark job you need a node which provides hadoop libraries. For general usage please login to CERN analytix node. Here all steps you need to setup your environment

ssh analytix
cd <YOUR_WORK_DIR>
git clone [email protected]:dmwm/WMArchive.git
cd WMArchive
export PYTHONPATH=$PWD/WMArchive/src/python:/afs/cern.ch/user/v/valya/public/spark:/usr/lib/spark/python
export PATH=$PWD/bin:$PATH

Usage

To find your records in WMArchive you need to setup WMArchive environment and use myspark script.

myspark --help
usage: PROG [-h] [--hdir HDIR] [--schema SCHEMA] [--script SCRIPT]
            [--list-scripts] [--spec SPEC] [--yarn] [--no-log4j]
            [--store STORE] [--wmaid WMAID] [--ckey CKEY] [--cert CERT]
            [--verbose] [--records-output ROUT]

optional arguments:
  -h, --help            show this help message and exit
  --hdir HDIR           Input data location on HDFS, e.g.
                        hdfs:///cms/wmarchive/avro/2016
  --schema SCHEMA       Input schema, default
                        hdfs:///cms/wmarchive/avro/fwjr_prod.avsc
  --script SCRIPT       python script with custom mapper/reducer functions
  --list-scripts        python script with custom mapper/reducer functions
  --spec SPEC           json file with query spec or valid json
  --yarn                run job on analytics cluster via yarn resource manager
  --no-log4j            Disable spark log4j messages
  --store STORE         store results into WMArchive, provide WMArchvie url
  --wmaid WMAID         provide wmaid for store submission
  --ckey CKEY           specify private key file name, default
                        $X509_USER_PROXY
  --cert CERT           specify private certificate file name, default
                        $X509_USER_PROXY
  --verbose             verbose output
  --records-output ROUT
                        Output file for records

For most cases users will only need --spec and --records-output options. The former specify the file name of your constrains and latter defines the output filename where found records will be stored. There is a pre-defined set of MapReduce classes, see --list-scripts.

Examples

Find records for given task/lfn. Define your spec file as following (we'll name it as cond.spec):

{"spec":{"task":"/amaltaro_StepChain_ReDigi3_HG1612_WMArchive_161130_192654_9283/DIGI","timerange":[20161130,20161202]}, "fields":[]}

or

{"spec":{"lfn":"/path/lfn.root","timerange":[20161130,20161202]}, "fields":[]}

then run myspark job

myspark --spec=cond.spec --script=RecordFinder --records-output=records.json

Please note, a spec file may support a pattern conditions, e.g.

{"spec":{"task": "/AbcCde_Task_Data_test_(7842|2526|3820|4291)/RECO", "timerange":[20161129,20161201]}, "fields":[]}

Find records for cmsRun attributes, e.g. prep_id. For those who are interested to find records based on cmsRun attributes you'll need to use RecordFinderCMSRun script. For instance, if you're interested in prep_id, define your spec as following:

{"spec":{"prep_id":"SUS-RunIISummer16DR80Premix-00169", "timerange":[20161129, 20161201]}, "fields":[]}

then you'll run it as following:

myspark --spec=cond.spec --script=RecordFinderCMSRun --records-output=records.json