forked from trec-kba/kba-corpus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subcorpus_counter.py
124 lines (98 loc) · 4.67 KB
/
subcorpus_counter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
'''
reads in paths to individual chunk files in the TREC KBA Stream Corpus
2012, loads them from s3, and counts things about the data
Uses mrjob to run itself in hadoop on AWS EMR
You can generate paths using this command:
(for a in `s3cmd ls s3://aws-publicdatasets/trec/kba/kba-stream-corpus-2012/2012-04-23-08/ | grep xz.gpg | cut -c 32-`; do echo http$a; done;) >& public_urls-2012-04-23-08.txt
^^^^^^^^^^^^^
date_hour
You can get a list of all the date_hour strings here:
s3cmd get s3://aws-publicdatasets/trec/kba/kba-stream-corpus-2012/dir-names.txt
'''
import re
import os
import sys
import json
import time
import urllib
import syslog
import traceback
## you can configure boto within EMR by modifying your copy of the
## shell scripts in s3://trec-kba-emr/emr-setup to create ~/.boto file
#import boto
## load mrjob components
import mrjob.emr
from mrjob.job import MRJob
import mrjob.protocol
import kba_corpus
class SubcorpusCounter(MRJob):
INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
## this is the default, so redundant
INTERNAL_PROTOCOL = mrjob.protocol.JSONProtocol
def mapper(self, empty, public_url):
'''
Takes as input a public URL to a TREC KBA 2012 chunk file,
which it then loads, decrypts, uncompresses, and deserializes,
so that it can count the number of NER tokens.
This emits keys equal to the subcorpus name ('news',
'linking', or 'social') and value is a two tuple of integers.
First integer in the two-tuple is equal to the number of NER
tokens, and second the number of sentences as tokenized by
Stanford NER.
'''
subcorpus_name = None
num_ner_tokens = 0
num_ner_sentences = 0
try:
## fetch the file to a local tempfile
kba_corpus.log('fetching %r' % public_url)
data = urllib.urlopen(public_url.strip()).read()
## shell out to gpg and xz to get the thrift
thrift_data = kba_corpus.decrypt_and_uncompress(
data, 'kba_corpus.tar.gz/trec-kba-rsa.secret-key')
## iterate over all the docs in this chunk
for stream_item in kba_corpus.stream_items(thrift_data):
## this should be the same every time, could assert
subcorpus_name = stream_item.source
## for fun, keep counters on how many docs have NER or not
if not (stream_item.body.ner or stream_item.anchor.ner or stream_item.title.ner):
self.increment_counter('SubcorpusCounter', 'no-NER', 1)
else:
self.increment_counter('SubcorpusCounter', 'hasNER', 1)
## tell hadoop we are still alive
self.increment_counter('SubcorpusCounter', 'StreamItemsProcessed', 1)
## iterate over sentences to generate the two counts
for content in ['body', 'anchor', 'title']:
for sentence in kba_corpus.sentences(stream_item, content=content):
num_ner_tokens += len(sentence)
num_ner_sentences += 1
except Exception, exc:
## oops, log verbosely, including with counters (maybe too clever)
kba_corpus.log(traceback.format_exc(exc))
key = 'FAILED-%s' % re.sub('\s+', '-', str(exc))
## could emit this, but that would polute the output
# yield key, public_url
self.increment_counter('SubcorpusCounter', key, 1)
else:
## it must have all worked, so emit data
self.increment_counter('SubcorpusCounter','Success',1)
yield subcorpus_name, (num_ner_tokens, num_ner_sentences)
finally:
## help hadoop keep track
self.increment_counter('SkippingTaskCounters','MapProcessedRecords',1)
def reducer(self, source, counts):
'''
Sums up all the counts for a given source
'''
num_ner_tokens = 0
num_ner_sentences = 0
kba_corpus.log('reading counts for %r' % source)
self.increment_counter('SubcorpusCounter','ReducerLaunched',1)
for count_pair in counts:
num_ner_tokens += count_pair[0]
num_ner_sentences += count_pair[1]
self.increment_counter('SubcorpusCounter','CountPairRead',1)
yield source, (num_ner_tokens, num_ner_sentences)
self.increment_counter('SkippingTaskCounters','ReduceProcessedRecords',1)
if __name__ == '__main__':
SubcorpusCounter.run()