From 1f98a7b6b47e30f304f3b30d46fb4db07463bf2a Mon Sep 17 00:00:00 2001 From: Natalia Date: Fri, 27 Jan 2017 18:52:42 +0300 Subject: [PATCH 1/8] Initial commit. Updated the code to handle several feeds instead of one. --- gdelt/gdelt/feed.py | 208 ++++++++++++++++++----------- gdelt/process-feed.py | 295 ++++++++++++++++++++++++------------------ 2 files changed, 298 insertions(+), 205 deletions(-) diff --git a/gdelt/gdelt/feed.py b/gdelt/gdelt/feed.py index 9a2b556..aa9a3fe 100644 --- a/gdelt/gdelt/feed.py +++ b/gdelt/gdelt/feed.py @@ -2,91 +2,147 @@ # import sys import os - +import re import urllib2 - import logging + import xml.etree.ElementTree as ET +from xml.etree.ElementTree import ParseError + +from urlparse import urlsplit +from urllib2 import quote,unquote from ConfigParser import ConfigParser +quoted_url = re.compile('^https?%3A%2F%2F', re.IGNORECASE) + + class Deduper(object): - LAST_FILE = 'LAST' - THIS_FILE = 'THIS' - def __init__(self, datadir): - self.datadir = datadir - self.log = logging.getLogger(__name__) + LAST_FILE = 'LAST' + THIS_FILE = 'THIS' + def __init__(self, datadir): + self.datadir = datadir + self.log = logging.getLogger( + '{0}.{1.__name__}'.format(__name__,Deduper)) + + def dedup(self, source): + lastfile = os.path.join(self.datadir, self.LAST_FILE) + if not os.path.exists(lastfile): + with open(lastfile, 'w') as f: + pass - def dedup(self, source): - lastfile = os.path.join(self.datadir, self.LAST_FILE) - if not os.path.exists(lastfile): - with open(lastfile, 'w') as f: - pass + self.lastf = open(lastfile, 'r') + self.thisf = open(os.path.join(self.datadir, self.THIS_FILE), 'w+') + + for l in source: + self.thisf.write(l + '\n') + self.thisf.seek(0, 0) - self.lastf = open(lastfile, 'r') - self.thisf = open(os.path.join(self.datadir, self.THIS_FILE), 'w+') - - for l in source: - self.thisf.write(l + '\n') - self.thisf.seek(0, 0) + # assume new lines are appended at the bottom + lt = self.thisf.readline() + while True: + ll = self.lastf.readline() + if ll == '' or ll == lt: + break + if ll != '': + # assumption was right + while True: + lt = self.thisf.readline() + ll = self.lastf.readline() + if ll == '' or lt != ll: + break + while lt != '': + yield lt.rstrip() + lt = self.thisf.readline() + else: + # assumption was wrong - new lines are inserted at the top + # (also the case where LAST and THIS has no overwrap at all) + self.lastf.seek(0, 0) + ll = self.lastf.readline() + while True: + yield lt.rstrip() + lt = self.thisf.readline() + if lt == '' or lt == ll: + break - # assume new lines are appended at the bottom - lt = self.thisf.readline() - while True: - ll = self.lastf.readline() - if ll == '' or ll == lt: - break - if ll != '': - # assumption was right - while True: - lt = self.thisf.readline() - ll = self.lastf.readline() - if ll == '' or lt != ll: - break - while lt != '': - yield lt.rstrip() - lt = self.thisf.readline() - else: - # assumption was wrong - new lines are inserted at the top - # (also the case where LAST and THIS has no overwrap at all) - self.lastf.seek(0, 0) - ll = self.lastf.readline() - while True: - yield lt.rstrip() - lt = self.thisf.readline() - if lt == '' or lt == ll: - break + self.lastf.close() + self.thisf.close() + + def step(self): + lastfile = os.path.join(self.datadir, self.LAST_FILE) + thisfile = os.path.join(self.datadir, self.THIS_FILE) + if os.path.exists(thisfile): + os.rename(thisfile, lastfile) + else: + self.log.warn('%s does not exist, step is no-op.', thisfile) - self.lastf.close() - self.thisf.close() - def step(self): - lastfile = os.path.join(self.datadir, self.LAST_FILE) - thisfile = os.path.join(self.datadir, self.THIS_FILE) - if os.path.exists(thisfile): - os.rename(thisfile, lastfile) - else: - self.log.warn('%s does not exist, step is no-op.', thisfile) -class FeedReader(object): - """ - Simple parser for RSS feed. - Uses etree :func:`iterparse` to reduce memory footprint for large - feeds. - """ - def __init__(self, source): - self.parse = ET.iterparse(source, ['start', 'end']) +class URLFilter(object): + """Filter URLs from `FeedReader`.""" + def __init__(self, source): + self.log = logging.getLogger( + '{0}.{1.__name__}'.format(__name__,URLFilter)) + self.source = source + + def __iter__(self): + return self + + def next(self): + while True: + url = next(self.source) + if re.match(quoted_url, url): + url = self.unquote_url(url) + valid = self.validate_url(url) + if not valid: + continue + else: + return url + + def unquote_url(self,url): + unq = unquote(url) + # quote back anon-ascii characters. + chars = [quote(c) if ord(c)>127 else c for c in unq] + url = ''.join(chars) + return url + + # TODO: think through url validation rules and add them to the code. + def validate_url(self,url): + try: + urlsplit(url) + return True + except Exception as ex: + self.log.error('Invalid url: {}'.format(url), exc_info=1) + return False - self._item = None - def __iter__(self): - return self - def next(self): - while True: - event, elem = next(self.parse) - if event == 'start': - if elem.tag == 'item': - self._item = elem - elif event == 'end': - if elem.tag == 'item': - self._item = None - elif elem.tag == 'link': - if self._item is not None: - return elem.text + +class FeedReader(object): + """ + Simple parser for RSS feed. + Uses etree :func:`iterparse` to reduce memory footprint for large + feeds. + """ + def __init__(self, source): + self.log = logging.getLogger( + '{0}.{1.__name__}'.format(__name__,FeedReader)) + self.parse = ET.iterparse(source, ['start', 'end']) + self._item = None + + def __iter__(self): + return self + + def next(self): + while True: + event, elem = next(self.parse) + if event == 'start': + if elem.tag == 'item': + self._item = elem + elif event == 'end': + if elem.tag == 'item': + self._item = None + elif elem.tag == 'link': + if self._item is not None: + if elem.text: + return elem.text + + + + diff --git a/gdelt/process-feed.py b/gdelt/process-feed.py index 40c5b49..5a4c161 100755 --- a/gdelt/process-feed.py +++ b/gdelt/process-feed.py @@ -14,137 +14,173 @@ import yaml from argparse import ArgumentParser -from gdelt.feed import FeedReader, Deduper +from gdelt.feed import FeedReader, Deduper, URLFilter from crawllib.headquarter import HeadquarterSubmitter +import json + CONFIG_FILE = 'config.yaml' def crawluri(urls): - for url in urls: - yield dict(u=url) + for url in urls: + yield dict(u=url) def batchup(iter, n): - b = [] - for e in iter: - b.append(e) - if len(b) >= n: - yield b - b = [] - if b: - yield b + b = [] + for e in iter: + b.append(e) + if len(b) >= n: + yield b + b = [] + if b: + yield b def httpdate(dt): - """format time tuple `dt` in HTTP Date format.""" - return time.strftime('%a, %d %b %Y %H:%M:%S %Z', dt) + """format time tuple `dt` in HTTP Date format.""" + return time.strftime('%a, %d %b %Y %H:%M:%S %Z', dt) + + +class Feed(object): + def __init__(self, name, feed_url, datadir, log, timeout, hqclient): + self.name = name + self.feed_url = feed_url + self.datadir = datadir + self.log = log + self.timeout = timeout + self.hqclient = hqclient + self.deduper = Deduper(self.datadir) + + rfiles = [fn for fn in os.listdir(self.datadir) + if re.match(r'feed-\d{14}$', fn)] + if rfiles: + self.log.debug('last=%s', max(rfiles)) + # time.strptime() returns time tuple without timezone. make it + # UTC with timegm() and gmtime() + self.last_time = time.gmtime(timegm( + time.strptime(max(rfiles)[-14:], '%Y%m%d%H%M%S'))) + else: + self.last_time = None + + def process(self): + # file name is in UTC. + rid = time.strftime('%Y%m%d%H%M%S', time.gmtime()) + rfile = os.path.join(self.datadir, 'feed-{}'.format(rid)) + try: + req = urllib2.Request(self.feed_url) + if self.last_time: + self.log.debug('last_time=%s', httpdate(self.last_time)) + req.add_header('If-Modified-Since', httpdate(self.last_time)) + f = urllib2.urlopen(req, timeout=self.timeout) + try: + with open(rfile, 'wb') as w: + while True: + d = f.read(16*1024) + if not d: break + w.write(d) + self.log.info('downloaded %d bytes in %s', w.tell(), + rfile) + except KeyboardInterrupt as ex: + if os.path.exists(rfile): + os.remove(rfile) + raise + except urllib2.HTTPError as ex: + if ex.code == 304: + # Not Modified + self.log.debug('feed %s not modified since %s', self.feed_url, + httpdate(self.last_time)) + return + self.log.warn('%s %s %s', self.feed_url, ex.code, ex.reason) + return + except (urllib2.URLError, socket.error) as ex: + self.log.warn('%s %s', self.feed_url, ex) + return + + self.last_time = time.gmtime() + + urlcount = 0 + slfile = os.path.join(self.datadir, 'sche-{}'.format(rid)) + with open(slfile, 'wb') as sl: + with open(rfile, 'rb') as f: + reader = FeedReader(f) + urlfilter = URLFilter(reader) + for urls in batchup(crawluri(self.deduper.dedup(urlfilter)), 500): + self.log.debug('submitting %s URLs...', len(urls)) + try: + jsdata = json.dumps(urls) + except: + for js in jsdata.split(','): + print js + + if not test_mode: + self.hqclient.put(urls) + for curl in urls: + sl.write(curl['u']) + sl.write('\n') + urlcount += len(urls) + self.log.info('submitted total %s URLs (see %s)', + urlcount, os.path.basename(slfile)) + + self.deduper.step() -class FeedScheduler(object): - def __init__(self, feed_url, hqbase, hqjob, - datadir='data', timeout=20, - check_interval=-1): - self.log = logging.getLogger( - 'gdelt.{0.__name__}'.format(FeedScheduler)) - self.feed_url = feed_url - self.hqbase = hqbase - self.hqjob = hqjob - self.datadir = datadir - self.timeout = int(timeout) - self.check_interval = int(check_interval) - - assert os.path.isdir(self.datadir) - - self.deduper = Deduper(self.datadir) - self.hqclient = HeadquarterSubmitter(self.hqbase, self.hqjob) - - rfiles = [fn for fn in os.listdir(self.datadir) - if re.match(r'feed-\d{14}$', fn)] - if rfiles: - self.log.debug('last=%s', max(rfiles)) - # time.strptime() returns time tuple without timezone. make it - # UTC with timegm() and gmtime() - self.last_time = time.gmtime(timegm( - time.strptime(max(rfiles)[-14:], '%Y%m%d%H%M%S'))) - else: - self.last_time = None - - def process(self): - while True: - t = time.time() - try: - self.process1() - except KeyboardInterrupt as ex: - raise - except Exception as ex: - self.log.error('process1 failed', exc_info=1) - if self.check_interval < 0: - self.log.debug('exiting because check_interval < 0') - break - if test_mode: - self.log.debug('exiting because test_mode=True') - break - dt = t + self.check_interval - time.time() - if dt >= 1.0: - self.log.debug('sleeping %ds until next cycle', int(dt)) - time.sleep(dt) - - def process1(self): - # file name is in UTC. - rid = time.strftime('%Y%m%d%H%M%S', time.gmtime()) - rfile = os.path.join(self.datadir, 'feed-{}'.format(rid)) - try: - req = urllib2.Request(self.feed_url) - if self.last_time: - self.log.debug('last_time=%s', httpdate(self.last_time)) - req.add_header('If-Modified-Since', httpdate(self.last_time)) - f = urllib2.urlopen(req, timeout=self.timeout) - try: - with open(rfile, 'wb') as w: - while True: - d = f.read(16*1024) - if not d: break - w.write(d) - self.log.info('downloaded %d bytes in %s', w.tell(), - rfile) - except KeyboardInterrupt as ex: - if os.path.exists(rfile): - os.remove(rfile) - raise - except urllib2.HTTPError as ex: - if ex.code == 304: - # Not Modified - self.log.debug('feed %s not modified since %s', self.feed_url, - httpdate(self.last_time)) - return - self.log.warn('%s %s %s', self.feed_url, ex.code, ex.reason) - return - except (urllib2.URLError, socket.error) as ex: - self.log.warn('%s %s', self.feed_url, ex) - return - - self.last_time = time.gmtime() - - urlcount = 0 - slfile = os.path.join(self.datadir, 'sche-{}'.format(rid)) - with open(slfile, 'wb') as sl: - with open(rfile, 'rb') as f: - reader = FeedReader(f) - for urls in batchup(crawluri(self.deduper.dedup(reader)), 500): - self.log.debug('submitting %s URLs...', len(urls)) - if not test_mode: - self.hqclient.put(urls) - for curl in urls: - sl.write(curl['u']) - sl.write('\n') - urlcount += len(urls) - self.log.info('submitted total %s URLs (see %s)', - urlcount, os.path.basename(slfile)) - - self.deduper.step() +class FeedScheduler(object): + def __init__(self, feeds, hqbase, hqjob, datadir='data', timeout=20, + check_interval=-1): + self.hqbase = hqbase + self.hqjob = hqjob + self.hqclient = HeadquarterSubmitter(self.hqbase, self.hqjob) + self.check_interval = int(check_interval) + self.log = logging.getLogger( + 'gdelt.{0.__name__}'.format(FeedScheduler)) + self.datadir = datadir + assert os.path.isdir(self.datadir) + + self.feeds = [] + for f in feeds: + flog = logging.getLogger('gdelt.Feed.{}'.format(f)) + furl = feeds[f]['url'] + fdatadir = os.path.join(self.datadir,f) + + assert os.path.isdir(fdatadir) + + fconfig = {'name':f, + 'feed_url': furl, + 'datadir':fdatadir, + 'log':flog, + 'timeout':int(timeout), + 'hqclient': self.hqclient, + } + feed = Feed(**fconfig) + self.feeds.append(feed) + + def process(self): + while True: + t = time.time() + for feed in self.feeds: + try: + feed.process() + except KeyboardInterrupt as ex: + raise + except Exception as ex: + feed.log.error('process failed',exc_info=1) + if self.check_interval < 0: + self.log.debug('exiting because check_interval < 0') + break + if test_mode: + self.log.debug('exiting because test_mode=True') + break + dt = t + self.check_interval - time.time() + if dt >= 1.0: + self.log.debug('sleeping %ds until next cycle', int(dt)) + time.sleep(dt) + + + parser = ArgumentParser() parser.add_argument('--test', action='store_true', default=False, - help='disables submission to HQ') + help='disables submission to HQ') parser.add_argument('--config', default=CONFIG_FILE, - help='configuration file filename (default %(default)s)') + help='configuration file filename (default %(default)s)') args = parser.parse_args() @@ -152,27 +188,28 @@ def process1(self): config_file = args.config if os.path.isfile(config_file): - config = yaml.load(open(config_file)) + config = yaml.load(open(config_file)) else: - print >>sys.stderr, "config file {} is not found".format(config_file) - exit(1) + print >>sys.stderr, "config file {} is not found".format(config_file) + exit(1) logconf = config.get('logging') if logconf: - import logging.config - logging.config.dictConfig(logconf) + import logging.config + logging.config.dictConfig(logconf) else: - logging.basicConfig( - level=logging.INFO, filename='process.log', - format='%(asctime)s %(levelname)s %(name)s %(message)s') + logging.basicConfig( + level=logging.INFO, filename='process.log', + format='%(asctime)s %(levelname)s %(name)s %(message)s') if 'gdelt' not in config: - print >>sys.stderr, "configuration must have 'gdelt' section" - exit(1) + print >>sys.stderr, "configuration must have 'gdelt' section" + exit(1) + sch = FeedScheduler(**config['gdelt']) try: - sch.process() + sch.process() except KeyboardInterrupt: - pass + pass From 6b7a9938cabcaeed39f916fee9d782cad249a5b0 Mon Sep 17 00:00:00 2001 From: Natalia Date: Sat, 28 Jan 2017 04:51:30 +0300 Subject: [PATCH 2/8] Renamed URLFilter to URLHandler, added `data`,`configs`,`.log` to .gitignore. --- gdelt/.gitignore | 3 +++ gdelt/gdelt/feed.py | 33 +++++++++++++++++---------------- gdelt/process-feed.py | 6 +++--- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/gdelt/.gitignore b/gdelt/.gitignore index 45b1167..94be941 100644 --- a/gdelt/.gitignore +++ b/gdelt/.gitignore @@ -3,3 +3,6 @@ /*.egg-info /build /dist +/data +/configs +*.log diff --git a/gdelt/gdelt/feed.py b/gdelt/gdelt/feed.py index aa9a3fe..d942f8d 100644 --- a/gdelt/gdelt/feed.py +++ b/gdelt/gdelt/feed.py @@ -76,27 +76,16 @@ def step(self): self.log.warn('%s does not exist, step is no-op.', thisfile) -class URLFilter(object): - """Filter URLs from `FeedReader`.""" +class URLHandler(object): + """Normalizes and validates URL.""" def __init__(self, source): self.log = logging.getLogger( - '{0}.{1.__name__}'.format(__name__,URLFilter)) + '{0}.{1.__name__}'.format(__name__,URLHandler)) self.source = source def __iter__(self): return self - def next(self): - while True: - url = next(self.source) - if re.match(quoted_url, url): - url = self.unquote_url(url) - valid = self.validate_url(url) - if not valid: - continue - else: - return url - def unquote_url(self,url): unq = unquote(url) # quote back anon-ascii characters. @@ -104,14 +93,26 @@ def unquote_url(self,url): url = ''.join(chars) return url - # TODO: think through url validation rules and add them to the code. + # TODO: url validation rules instead of `urlsplit` test. def validate_url(self,url): try: urlsplit(url) return True except Exception as ex: self.log.error('Invalid url: {}'.format(url), exc_info=1) - return False + return False + + def next(self): + while True: + url = next(self.source) + # normalize escaped URL. + if re.match(quoted_url, url): + url = self.unquote_url(url) + valid = self.validate_url(url) + if not valid: + continue + else: + return url class FeedReader(object): diff --git a/gdelt/process-feed.py b/gdelt/process-feed.py index 5a4c161..6697f22 100755 --- a/gdelt/process-feed.py +++ b/gdelt/process-feed.py @@ -14,7 +14,7 @@ import yaml from argparse import ArgumentParser -from gdelt.feed import FeedReader, Deduper, URLFilter +from gdelt.feed import FeedReader, Deduper, URLHandler from crawllib.headquarter import HeadquarterSubmitter import json @@ -102,8 +102,8 @@ def process(self): with open(slfile, 'wb') as sl: with open(rfile, 'rb') as f: reader = FeedReader(f) - urlfilter = URLFilter(reader) - for urls in batchup(crawluri(self.deduper.dedup(urlfilter)), 500): + handler = URLHandler(reader) + for urls in batchup(crawluri(self.deduper.dedup(handler)), 500): self.log.debug('submitting %s URLs...', len(urls)) try: jsdata = json.dumps(urls) From b16d87a28815c38655b25b3bf27bc2d98cd9e951 Mon Sep 17 00:00:00 2001 From: Scrip7 Date: Sat, 28 Jan 2017 06:55:32 +0300 Subject: [PATCH 3/8] Update feed.py --- gdelt/gdelt/feed.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gdelt/gdelt/feed.py b/gdelt/gdelt/feed.py index d942f8d..efe6f55 100644 --- a/gdelt/gdelt/feed.py +++ b/gdelt/gdelt/feed.py @@ -77,7 +77,6 @@ def step(self): class URLHandler(object): - """Normalizes and validates URL.""" def __init__(self, source): self.log = logging.getLogger( '{0}.{1.__name__}'.format(__name__,URLHandler)) From 3f23b45ab44b2950cd1de4ec3fe74f8ffb111bde Mon Sep 17 00:00:00 2001 From: Scrip7 Date: Sat, 28 Jan 2017 08:29:03 +0300 Subject: [PATCH 4/8] Update process-feed.py --- gdelt/process-feed.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/gdelt/process-feed.py b/gdelt/process-feed.py index 6697f22..89bfb30 100755 --- a/gdelt/process-feed.py +++ b/gdelt/process-feed.py @@ -105,12 +105,6 @@ def process(self): handler = URLHandler(reader) for urls in batchup(crawluri(self.deduper.dedup(handler)), 500): self.log.debug('submitting %s URLs...', len(urls)) - try: - jsdata = json.dumps(urls) - except: - for js in jsdata.split(','): - print js - if not test_mode: self.hqclient.put(urls) for curl in urls: From 9b8154dad6db022da868bcdaf5d678b0d43d1b85 Mon Sep 17 00:00:00 2001 From: nzaitseva Date: Fri, 25 May 2018 01:15:33 +0300 Subject: [PATCH 5/8] Update feed.py --- gdelt/gdelt/feed.py | 233 ++++++++++++++++++++------------------------ 1 file changed, 103 insertions(+), 130 deletions(-) diff --git a/gdelt/gdelt/feed.py b/gdelt/gdelt/feed.py index efe6f55..e33c15d 100644 --- a/gdelt/gdelt/feed.py +++ b/gdelt/gdelt/feed.py @@ -7,7 +7,6 @@ import logging import xml.etree.ElementTree as ET -from xml.etree.ElementTree import ParseError from urlparse import urlsplit from urllib2 import quote,unquote @@ -15,134 +14,108 @@ quoted_url = re.compile('^https?%3A%2F%2F', re.IGNORECASE) - class Deduper(object): - LAST_FILE = 'LAST' - THIS_FILE = 'THIS' - def __init__(self, datadir): - self.datadir = datadir - self.log = logging.getLogger( - '{0}.{1.__name__}'.format(__name__,Deduper)) - - def dedup(self, source): - lastfile = os.path.join(self.datadir, self.LAST_FILE) - if not os.path.exists(lastfile): - with open(lastfile, 'w') as f: - pass - - self.lastf = open(lastfile, 'r') - self.thisf = open(os.path.join(self.datadir, self.THIS_FILE), 'w+') - - for l in source: - self.thisf.write(l + '\n') - self.thisf.seek(0, 0) - - # assume new lines are appended at the bottom - lt = self.thisf.readline() - while True: - ll = self.lastf.readline() - if ll == '' or ll == lt: - break - if ll != '': - # assumption was right - while True: - lt = self.thisf.readline() - ll = self.lastf.readline() - if ll == '' or lt != ll: - break - while lt != '': - yield lt.rstrip() - lt = self.thisf.readline() - else: - # assumption was wrong - new lines are inserted at the top - # (also the case where LAST and THIS has no overwrap at all) - self.lastf.seek(0, 0) - ll = self.lastf.readline() - while True: - yield lt.rstrip() - lt = self.thisf.readline() - if lt == '' or lt == ll: - break - - self.lastf.close() - self.thisf.close() - - def step(self): - lastfile = os.path.join(self.datadir, self.LAST_FILE) - thisfile = os.path.join(self.datadir, self.THIS_FILE) - if os.path.exists(thisfile): - os.rename(thisfile, lastfile) - else: - self.log.warn('%s does not exist, step is no-op.', thisfile) - - -class URLHandler(object): - def __init__(self, source): - self.log = logging.getLogger( - '{0}.{1.__name__}'.format(__name__,URLHandler)) - self.source = source - - def __iter__(self): - return self - - def unquote_url(self,url): - unq = unquote(url) - # quote back anon-ascii characters. - chars = [quote(c) if ord(c)>127 else c for c in unq] - url = ''.join(chars) - return url - - # TODO: url validation rules instead of `urlsplit` test. - def validate_url(self,url): - try: - urlsplit(url) - return True - except Exception as ex: - self.log.error('Invalid url: {}'.format(url), exc_info=1) - return False - - def next(self): - while True: - url = next(self.source) - # normalize escaped URL. - if re.match(quoted_url, url): - url = self.unquote_url(url) - valid = self.validate_url(url) - if not valid: - continue - else: - return url - - + LAST_FILE = 'LAST' + THIS_FILE = 'THIS' + def __init__(self, datadir): + self.datadir = datadir + self.log = logging.getLogger(__name__) + + def dedup(self, source): + lastfile = os.path.join(self.datadir, self.LAST_FILE) + if not os.path.exists(lastfile): + with open(lastfile, 'w') as f: + pass + + self.lastf = open(lastfile, 'r') + self.thisf = open(os.path.join(self.datadir, self.THIS_FILE), 'w+') + + for l in source: + self.thisf.write(l + '\n') + self.thisf.seek(0, 0) + + # assume new lines are appended at the bottom + lt = self.thisf.readline() + while True: + ll = self.lastf.readline() + if ll == '' or ll == lt: + break + if ll != '': + # assumption was right + while True: + lt = self.thisf.readline() + ll = self.lastf.readline() + if ll == '' or lt != ll: + break + while lt != '': + yield lt.rstrip() + lt = self.thisf.readline() + else: + # assumption was wrong - new lines are inserted at the top + # (also the case where LAST and THIS has no overwrap at all) + self.lastf.seek(0, 0) + ll = self.lastf.readline() + while True: + yield lt.rstrip() + lt = self.thisf.readline() + if lt == '' or lt == ll: + break + + self.lastf.close() + self.thisf.close() + + + def step(self): + lastfile = os.path.join(self.datadir, self.LAST_FILE) + thisfile = os.path.join(self.datadir, self.THIS_FILE) + if os.path.exists(thisfile): + os.rename(thisfile, lastfile) + else: + self.log.warn('%s does not exist, step is no-op.', thisfile) + + + class FeedReader(object): - """ - Simple parser for RSS feed. - Uses etree :func:`iterparse` to reduce memory footprint for large - feeds. - """ - def __init__(self, source): - self.log = logging.getLogger( - '{0}.{1.__name__}'.format(__name__,FeedReader)) - self.parse = ET.iterparse(source, ['start', 'end']) - self._item = None - - def __iter__(self): - return self - - def next(self): - while True: - event, elem = next(self.parse) - if event == 'start': - if elem.tag == 'item': - self._item = elem - elif event == 'end': - if elem.tag == 'item': - self._item = None - elif elem.tag == 'link': - if self._item is not None: - if elem.text: - return elem.text - - - - + """ + Simple parser for RSS feed. + Uses etree :func:`iterparse` to reduce memory footprint for large + feeds. + """ + def __init__(self, source): + self.parse = ET.iterparse(source, ['start', 'end']) + + self._item = None + self.log = logging.getLogger(__name__) + + def __iter__(self): + return self + def next(self): + while True: + event, elem = next(self.parse) + if event == 'start': + if elem.tag == 'item': + self._item = elem + elif event == 'end': + if elem.tag == 'item': + self._item = None + elif elem.tag == 'link': + if self._item is not None: + if elem.text: + try: + urlsplit(elem.text) + except Exception as ex: + self.log.error( + 'urlsplit exception: {}'.format(elem.text), + exc_info=1) + continue + + if not re.match(quoted_url, elem.text): + return elem.text + else: + unq = unquote(elem.text) + # quote anon-ascii characters. + chars = [quote(c) if ord(c)>127 else c for c in unq] + url = ''.join(chars) + return url + else: + continue From 912ae809c346c67e8fd9c7a447bbf69ead385322 Mon Sep 17 00:00:00 2001 From: nzaitseva Date: Fri, 25 May 2018 01:21:33 +0300 Subject: [PATCH 6/8] Update process-feed.py --- gdelt/process-feed.py | 312 +++++++++++++++++++++--------------------- 1 file changed, 154 insertions(+), 158 deletions(-) diff --git a/gdelt/process-feed.py b/gdelt/process-feed.py index 89bfb30..f1f8b4e 100755 --- a/gdelt/process-feed.py +++ b/gdelt/process-feed.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/1/crawling/gdelt1/bin/python # import sys import os @@ -14,167 +14,164 @@ import yaml from argparse import ArgumentParser -from gdelt.feed import FeedReader, Deduper, URLHandler +from gdelt.feed import FeedReader, Deduper from crawllib.headquarter import HeadquarterSubmitter -import json + CONFIG_FILE = 'config.yaml' def crawluri(urls): - for url in urls: - yield dict(u=url) + for url in urls: + yield dict(u=url) def batchup(iter, n): - b = [] - for e in iter: - b.append(e) - if len(b) >= n: - yield b - b = [] - if b: - yield b + b = [] + for e in iter: + b.append(e) + if len(b) >= n: + yield b + b = [] + if b: + yield b def httpdate(dt): - """format time tuple `dt` in HTTP Date format.""" - return time.strftime('%a, %d %b %Y %H:%M:%S %Z', dt) - + """format time tuple `dt` in HTTP Date format.""" + return time.strftime('%a, %d %b %Y %H:%M:%S %Z', dt) class Feed(object): - def __init__(self, name, feed_url, datadir, log, timeout, hqclient): - self.name = name - self.feed_url = feed_url - self.datadir = datadir - self.log = log - self.timeout = timeout - self.hqclient = hqclient - self.deduper = Deduper(self.datadir) - - rfiles = [fn for fn in os.listdir(self.datadir) - if re.match(r'feed-\d{14}$', fn)] - if rfiles: - self.log.debug('last=%s', max(rfiles)) - # time.strptime() returns time tuple without timezone. make it - # UTC with timegm() and gmtime() - self.last_time = time.gmtime(timegm( - time.strptime(max(rfiles)[-14:], '%Y%m%d%H%M%S'))) - else: - self.last_time = None - - def process(self): - # file name is in UTC. - rid = time.strftime('%Y%m%d%H%M%S', time.gmtime()) - rfile = os.path.join(self.datadir, 'feed-{}'.format(rid)) - try: - req = urllib2.Request(self.feed_url) - if self.last_time: - self.log.debug('last_time=%s', httpdate(self.last_time)) - req.add_header('If-Modified-Since', httpdate(self.last_time)) - f = urllib2.urlopen(req, timeout=self.timeout) - try: - with open(rfile, 'wb') as w: - while True: - d = f.read(16*1024) - if not d: break - w.write(d) - self.log.info('downloaded %d bytes in %s', w.tell(), - rfile) - except KeyboardInterrupt as ex: - if os.path.exists(rfile): - os.remove(rfile) - raise - except urllib2.HTTPError as ex: - if ex.code == 304: - # Not Modified - self.log.debug('feed %s not modified since %s', self.feed_url, - httpdate(self.last_time)) - return - self.log.warn('%s %s %s', self.feed_url, ex.code, ex.reason) - return - except (urllib2.URLError, socket.error) as ex: - self.log.warn('%s %s', self.feed_url, ex) - return - - self.last_time = time.gmtime() - - urlcount = 0 - slfile = os.path.join(self.datadir, 'sche-{}'.format(rid)) - with open(slfile, 'wb') as sl: - with open(rfile, 'rb') as f: - reader = FeedReader(f) - handler = URLHandler(reader) - for urls in batchup(crawluri(self.deduper.dedup(handler)), 500): - self.log.debug('submitting %s URLs...', len(urls)) - if not test_mode: - self.hqclient.put(urls) - for curl in urls: - sl.write(curl['u']) - sl.write('\n') - urlcount += len(urls) - self.log.info('submitted total %s URLs (see %s)', - urlcount, os.path.basename(slfile)) - - self.deduper.step() - + def __init__(self, name, feed_url, datadir, log, timeout, hqclient): + self.name = name + self.feed_url = feed_url + self.datadir = datadir + self.log = log + self.timeout = timeout + self.hqclient = hqclient + self.deduper = Deduper(self.datadir) + + rfiles = [fn for fn in os.listdir(self.datadir) + if re.match(r'feed-\d{14}$', fn)] + if rfiles: + self.log.debug('last=%s', max(rfiles)) + # time.strptime() returns time tuple without timezone. make it + # UTC with timegm() and gmtime() + self.last_time = time.gmtime(timegm( + time.strptime(max(rfiles)[-14:], '%Y%m%d%H%M%S'))) + else: + self.last_time = None + + def process(self): + # file name is in UTC. + rid = time.strftime('%Y%m%d%H%M%S', time.gmtime()) + rfile = os.path.join(self.datadir, 'feed-{}'.format(rid)) + try: + req = urllib2.Request(self.feed_url) + if self.last_time: + self.log.debug('last_time=%s', httpdate(self.last_time)) + req.add_header('If-Modified-Since', httpdate(self.last_time)) + f = urllib2.urlopen(req, timeout=self.timeout) + try: + with open(rfile, 'wb') as w: + while True: + d = f.read(16*1024) + if not d: break + w.write(d) + self.log.info('downloaded %d bytes in %s', w.tell(), + rfile) + except KeyboardInterrupt as ex: + if os.path.exists(rfile): + os.remove(rfile) + raise + except urllib2.HTTPError as ex: + if ex.code == 304: + # Not Modified + self.log.debug('feed %s not modified since %s', self.feed_url, + httpdate(self.last_time)) + return + self.log.warn('%s %s %s', self.feed_url, ex.code, ex.reason) + return + except (urllib2.URLError, socket.error) as ex: + self.log.warn('%s %s', self.feed_url, ex) + return + + self.last_time = time.gmtime() + + urlcount = 0 + slfile = os.path.join(self.datadir, 'sche-{}'.format(rid)) + with open(slfile, 'wb') as sl: + with open(rfile, 'rb') as f: + reader = FeedReader(f) + for urls in batchup(crawluri(self.deduper.dedup(reader)), 500): + self.log.debug('submitting %s URLs...', len(urls)) + if not test_mode: + self.hqclient.put(urls) + for curl in urls: + sl.write(curl['u']) + sl.write('\n') + urlcount += len(urls) + self.log.info('submitted total %s URLs (see %s)', + urlcount, os.path.basename(slfile)) + + self.deduper.step() class FeedScheduler(object): - def __init__(self, feeds, hqbase, hqjob, datadir='data', timeout=20, - check_interval=-1): - self.hqbase = hqbase - self.hqjob = hqjob - self.hqclient = HeadquarterSubmitter(self.hqbase, self.hqjob) - self.check_interval = int(check_interval) - self.log = logging.getLogger( - 'gdelt.{0.__name__}'.format(FeedScheduler)) - self.datadir = datadir - assert os.path.isdir(self.datadir) - - self.feeds = [] - for f in feeds: - flog = logging.getLogger('gdelt.Feed.{}'.format(f)) - furl = feeds[f]['url'] - fdatadir = os.path.join(self.datadir,f) - - assert os.path.isdir(fdatadir) - - fconfig = {'name':f, - 'feed_url': furl, - 'datadir':fdatadir, - 'log':flog, - 'timeout':int(timeout), - 'hqclient': self.hqclient, - } - feed = Feed(**fconfig) - self.feeds.append(feed) - - def process(self): - while True: - t = time.time() - for feed in self.feeds: - try: - feed.process() - except KeyboardInterrupt as ex: - raise - except Exception as ex: - feed.log.error('process failed',exc_info=1) - if self.check_interval < 0: - self.log.debug('exiting because check_interval < 0') - break - if test_mode: - self.log.debug('exiting because test_mode=True') - break - dt = t + self.check_interval - time.time() - if dt >= 1.0: - self.log.debug('sleeping %ds until next cycle', int(dt)) - time.sleep(dt) - - - + def __init__(self, feeds, hqbase, hqjob, datadir='data', timeout=20, + check_interval=-1): + self.hqbase = hqbase + self.hqjob = hqjob + self.hqclient = HeadquarterSubmitter(self.hqbase, self.hqjob) + self.check_interval = int(check_interval) + self.log = logging.getLogger( + 'gdelt.{0.__name__}'.format(FeedScheduler)) + self.datadir = datadir + assert os.path.isdir(self.datadir) + + self.feeds = [] + for f in feeds: + flog = logging.getLogger('gdelt.Feed.{}'.format(f)) + furl = feeds[f]['url'] + fdatadir = os.path.join(self.datadir,f) + + assert os.path.isdir(fdatadir) + + fconfig = {'name':f, + 'feed_url': furl, + 'datadir':fdatadir, + 'log':flog, + 'timeout':int(timeout), + 'hqclient': self.hqclient, + } + feed = Feed(**fconfig) + self.feeds.append(feed) + + def process(self): + while True: + t = time.time() + for feed in self.feeds: + try: + feed.process() + except KeyboardInterrupt as ex: + raise + except Exception as ex: + feed.log.error('process failed',exc_info=1) + if self.check_interval < 0: + self.log.debug('exiting because check_interval < 0') + break + if test_mode: + self.log.debug('exiting because test_mode=True') + break + dt = t + self.check_interval - time.time() + if dt >= 1.0: + self.log.debug('sleeping %ds until next cycle', int(dt)) + time.sleep(dt) + + + parser = ArgumentParser() parser.add_argument('--test', action='store_true', default=False, - help='disables submission to HQ') + help='disables submission to HQ') parser.add_argument('--config', default=CONFIG_FILE, - help='configuration file filename (default %(default)s)') + help='configuration file filename (default %(default)s)') args = parser.parse_args() @@ -182,28 +179,27 @@ def process(self): config_file = args.config if os.path.isfile(config_file): - config = yaml.load(open(config_file)) + config = yaml.load(open(config_file)) else: - print >>sys.stderr, "config file {} is not found".format(config_file) - exit(1) + print >>sys.stderr, "config file {} is not found".format(config_file) + exit(1) logconf = config.get('logging') if logconf: - import logging.config - logging.config.dictConfig(logconf) + import logging.config + logging.config.dictConfig(logconf) else: - logging.basicConfig( - level=logging.INFO, filename='process.log', - format='%(asctime)s %(levelname)s %(name)s %(message)s') + logging.basicConfig( + level=logging.INFO, filename='process.log', + format='%(asctime)s %(levelname)s %(name)s %(message)s') if 'gdelt' not in config: - print >>sys.stderr, "configuration must have 'gdelt' section" - exit(1) + print >>sys.stderr, "configuration must have 'gdelt' section" + exit(1) sch = FeedScheduler(**config['gdelt']) try: - sch.process() + sch.process() except KeyboardInterrupt: - pass - + pass From b232cbbd79f3ad6161094721ef31de9e252b2792 Mon Sep 17 00:00:00 2001 From: nzaitseva Date: Fri, 25 May 2018 02:10:35 +0300 Subject: [PATCH 7/8] Update process-feed.py --- gdelt/process-feed.py | 307 +++++++++++++++++++++--------------------- 1 file changed, 154 insertions(+), 153 deletions(-) diff --git a/gdelt/process-feed.py b/gdelt/process-feed.py index f1f8b4e..14d0b80 100755 --- a/gdelt/process-feed.py +++ b/gdelt/process-feed.py @@ -1,4 +1,4 @@ -#!/1/crawling/gdelt1/bin/python +#!/1/crawling/gdelt0/bin/python # import sys import os @@ -22,156 +22,156 @@ CONFIG_FILE = 'config.yaml' def crawluri(urls): - for url in urls: - yield dict(u=url) + for url in urls: + yield dict(u=url) def batchup(iter, n): - b = [] - for e in iter: - b.append(e) - if len(b) >= n: - yield b - b = [] - if b: - yield b + b = [] + for e in iter: + b.append(e) + if len(b) >= n: + yield b + b = [] + if b: + yield b def httpdate(dt): - """format time tuple `dt` in HTTP Date format.""" - return time.strftime('%a, %d %b %Y %H:%M:%S %Z', dt) + """format time tuple `dt` in HTTP Date format.""" + return time.strftime('%a, %d %b %Y %H:%M:%S %Z', dt) class Feed(object): - def __init__(self, name, feed_url, datadir, log, timeout, hqclient): - self.name = name - self.feed_url = feed_url - self.datadir = datadir - self.log = log - self.timeout = timeout - self.hqclient = hqclient - self.deduper = Deduper(self.datadir) - - rfiles = [fn for fn in os.listdir(self.datadir) - if re.match(r'feed-\d{14}$', fn)] - if rfiles: - self.log.debug('last=%s', max(rfiles)) - # time.strptime() returns time tuple without timezone. make it - # UTC with timegm() and gmtime() - self.last_time = time.gmtime(timegm( - time.strptime(max(rfiles)[-14:], '%Y%m%d%H%M%S'))) - else: - self.last_time = None - - def process(self): - # file name is in UTC. - rid = time.strftime('%Y%m%d%H%M%S', time.gmtime()) - rfile = os.path.join(self.datadir, 'feed-{}'.format(rid)) - try: - req = urllib2.Request(self.feed_url) - if self.last_time: - self.log.debug('last_time=%s', httpdate(self.last_time)) - req.add_header('If-Modified-Since', httpdate(self.last_time)) - f = urllib2.urlopen(req, timeout=self.timeout) - try: - with open(rfile, 'wb') as w: - while True: - d = f.read(16*1024) - if not d: break - w.write(d) - self.log.info('downloaded %d bytes in %s', w.tell(), - rfile) - except KeyboardInterrupt as ex: - if os.path.exists(rfile): - os.remove(rfile) - raise - except urllib2.HTTPError as ex: - if ex.code == 304: - # Not Modified - self.log.debug('feed %s not modified since %s', self.feed_url, - httpdate(self.last_time)) - return - self.log.warn('%s %s %s', self.feed_url, ex.code, ex.reason) - return - except (urllib2.URLError, socket.error) as ex: - self.log.warn('%s %s', self.feed_url, ex) - return - - self.last_time = time.gmtime() - - urlcount = 0 - slfile = os.path.join(self.datadir, 'sche-{}'.format(rid)) - with open(slfile, 'wb') as sl: - with open(rfile, 'rb') as f: - reader = FeedReader(f) - for urls in batchup(crawluri(self.deduper.dedup(reader)), 500): - self.log.debug('submitting %s URLs...', len(urls)) - if not test_mode: - self.hqclient.put(urls) - for curl in urls: - sl.write(curl['u']) - sl.write('\n') - urlcount += len(urls) - self.log.info('submitted total %s URLs (see %s)', - urlcount, os.path.basename(slfile)) - - self.deduper.step() - + def __init__(self, name, feed_url, datadir, log, timeout, hqclient): + self.name = name + self.feed_url = feed_url + self.datadir = datadir + self.log = log + self.timeout = timeout + self.hqclient = hqclient + self.deduper = Deduper(self.datadir) + + rfiles = [fn for fn in os.listdir(self.datadir) + if re.match(r'feed-\d{14}$', fn)] + if rfiles: + self.log.debug('last=%s', max(rfiles)) + # time.strptime() returns time tuple without timezone. make it + # UTC with timegm() and gmtime() + self.last_time = time.gmtime(timegm( + time.strptime(max(rfiles)[-14:], '%Y%m%d%H%M%S'))) + else: + self.last_time = None + + def process(self): + # file name is in UTC. + rid = time.strftime('%Y%m%d%H%M%S', time.gmtime()) + rfile = os.path.join(self.datadir, 'feed-{}'.format(rid)) + try: + req = urllib2.Request(self.feed_url) + if self.last_time: + self.log.debug('last_time=%s', httpdate(self.last_time)) + req.add_header('If-Modified-Since', httpdate(self.last_time)) + f = urllib2.urlopen(req, timeout=self.timeout) + try: + with open(rfile, 'wb') as w: + while True: + d = f.read(16*1024) + if not d: break + w.write(d) + self.log.info('downloaded %d bytes in %s', w.tell(), + rfile) + except KeyboardInterrupt as ex: + if os.path.exists(rfile): + os.remove(rfile) + raise + except urllib2.HTTPError as ex: + if ex.code == 304: + # Not Modified + self.log.debug('feed %s not modified since %s', self.feed_url, + httpdate(self.last_time)) + return + self.log.warn('%s %s %s', self.feed_url, ex.code, ex.reason) + return + except (urllib2.URLError, socket.error) as ex: + self.log.warn('%s %s', self.feed_url, ex) + return + + self.last_time = time.gmtime() + + urlcount = 0 + slfile = os.path.join(self.datadir, 'sche-{}'.format(rid)) + with open(slfile, 'wb') as sl: + with open(rfile, 'rb') as f: + reader = FeedReader(f) + for urls in batchup(crawluri(self.deduper.dedup(reader)), 500): + self.log.debug('submitting %s URLs...', len(urls)) + if not test_mode: + self.hqclient.put(urls) + for curl in urls: + sl.write(curl['u']) + sl.write('\n') + urlcount += len(urls) + self.log.info('submitted total %s URLs (see %s)', + urlcount, os.path.basename(slfile)) + + self.deduper.step() + class FeedScheduler(object): - def __init__(self, feeds, hqbase, hqjob, datadir='data', timeout=20, - check_interval=-1): - self.hqbase = hqbase - self.hqjob = hqjob - self.hqclient = HeadquarterSubmitter(self.hqbase, self.hqjob) - self.check_interval = int(check_interval) - self.log = logging.getLogger( - 'gdelt.{0.__name__}'.format(FeedScheduler)) - self.datadir = datadir - assert os.path.isdir(self.datadir) - - self.feeds = [] - for f in feeds: - flog = logging.getLogger('gdelt.Feed.{}'.format(f)) - furl = feeds[f]['url'] - fdatadir = os.path.join(self.datadir,f) - - assert os.path.isdir(fdatadir) - - fconfig = {'name':f, - 'feed_url': furl, - 'datadir':fdatadir, - 'log':flog, - 'timeout':int(timeout), - 'hqclient': self.hqclient, - } - feed = Feed(**fconfig) - self.feeds.append(feed) - - def process(self): - while True: - t = time.time() - for feed in self.feeds: - try: - feed.process() - except KeyboardInterrupt as ex: - raise - except Exception as ex: - feed.log.error('process failed',exc_info=1) - if self.check_interval < 0: - self.log.debug('exiting because check_interval < 0') - break - if test_mode: - self.log.debug('exiting because test_mode=True') - break - dt = t + self.check_interval - time.time() - if dt >= 1.0: - self.log.debug('sleeping %ds until next cycle', int(dt)) - time.sleep(dt) - - - + def __init__(self, feeds, hqbase, hqjob, datadir='data', timeout=20, + check_interval=-1): + self.hqbase = hqbase + self.hqjob = hqjob + self.hqclient = HeadquarterSubmitter(self.hqbase, self.hqjob) + self.check_interval = int(check_interval) + self.log = logging.getLogger( + 'gdelt.{0.__name__}'.format(FeedScheduler)) + self.datadir = datadir + assert os.path.isdir(self.datadir) + + self.feeds = [] + for f in feeds: + flog = logging.getLogger('gdelt.Feed.{}'.format(f)) + furl = feeds[f]['url'] + fdatadir = os.path.join(self.datadir,f) + + assert os.path.isdir(fdatadir) + + fconfig = {'name':f, + 'feed_url': furl, + 'datadir':fdatadir, + 'log':flog, + 'timeout':int(timeout), + 'hqclient': self.hqclient, + } + feed = Feed(**fconfig) + self.feeds.append(feed) + + def process(self): + while True: + t = time.time() + for feed in self.feeds: + try: + feed.process() + except KeyboardInterrupt as ex: + raise + except Exception as ex: + feed.log.error('process failed',exc_info=1) + if self.check_interval < 0: + self.log.debug('exiting because check_interval < 0') + break + if test_mode: + self.log.debug('exiting because test_mode=True') + break + dt = t + self.check_interval - time.time() + if dt >= 1.0: + self.log.debug('sleeping %ds until next cycle', int(dt)) + time.sleep(dt) + + + parser = ArgumentParser() parser.add_argument('--test', action='store_true', default=False, - help='disables submission to HQ') + help='disables submission to HQ') parser.add_argument('--config', default=CONFIG_FILE, - help='configuration file filename (default %(default)s)') + help='configuration file filename (default %(default)s)') args = parser.parse_args() @@ -179,27 +179,28 @@ def process(self): config_file = args.config if os.path.isfile(config_file): - config = yaml.load(open(config_file)) + config = yaml.load(open(config_file)) else: - print >>sys.stderr, "config file {} is not found".format(config_file) - exit(1) + print >>sys.stderr, "config file {} is not found".format(config_file) + exit(1) logconf = config.get('logging') if logconf: - import logging.config - logging.config.dictConfig(logconf) + import logging.config + logging.config.dictConfig(logconf) else: - logging.basicConfig( - level=logging.INFO, filename='process.log', - format='%(asctime)s %(levelname)s %(name)s %(message)s') + logging.basicConfig( + level=logging.INFO, filename='process.log', + format='%(asctime)s %(levelname)s %(name)s %(message)s') if 'gdelt' not in config: - print >>sys.stderr, "configuration must have 'gdelt' section" - exit(1) + print >>sys.stderr, "configuration must have 'gdelt' section" + exit(1) sch = FeedScheduler(**config['gdelt']) try: - sch.process() + sch.process() except KeyboardInterrupt: - pass + pass + From 96bc40456668cac28be6ad9945e40b76b0634bf0 Mon Sep 17 00:00:00 2001 From: nzaitseva Date: Fri, 25 May 2018 02:10:58 +0300 Subject: [PATCH 8/8] Update feed.py --- gdelt/gdelt/feed.py | 204 ++++++++++++++++++++++---------------------- 1 file changed, 102 insertions(+), 102 deletions(-) diff --git a/gdelt/gdelt/feed.py b/gdelt/gdelt/feed.py index e33c15d..81cca25 100644 --- a/gdelt/gdelt/feed.py +++ b/gdelt/gdelt/feed.py @@ -15,107 +15,107 @@ quoted_url = re.compile('^https?%3A%2F%2F', re.IGNORECASE) class Deduper(object): - LAST_FILE = 'LAST' - THIS_FILE = 'THIS' - def __init__(self, datadir): - self.datadir = datadir - self.log = logging.getLogger(__name__) - - def dedup(self, source): - lastfile = os.path.join(self.datadir, self.LAST_FILE) - if not os.path.exists(lastfile): - with open(lastfile, 'w') as f: - pass - - self.lastf = open(lastfile, 'r') - self.thisf = open(os.path.join(self.datadir, self.THIS_FILE), 'w+') - - for l in source: - self.thisf.write(l + '\n') - self.thisf.seek(0, 0) - - # assume new lines are appended at the bottom + LAST_FILE = 'LAST' + THIS_FILE = 'THIS' + def __init__(self, datadir): + self.datadir = datadir + self.log = logging.getLogger(__name__) + + def dedup(self, source): + lastfile = os.path.join(self.datadir, self.LAST_FILE) + if not os.path.exists(lastfile): + with open(lastfile, 'w') as f: + pass + + self.lastf = open(lastfile, 'r') + self.thisf = open(os.path.join(self.datadir, self.THIS_FILE), 'w+') + + for l in source: + self.thisf.write(l + '\n') + self.thisf.seek(0, 0) + + # assume new lines are appended at the bottom + lt = self.thisf.readline() + while True: + ll = self.lastf.readline() + if ll == '' or ll == lt: + break + if ll != '': + # assumption was right + while True: lt = self.thisf.readline() - while True: - ll = self.lastf.readline() - if ll == '' or ll == lt: - break - if ll != '': - # assumption was right - while True: - lt = self.thisf.readline() - ll = self.lastf.readline() - if ll == '' or lt != ll: - break - while lt != '': - yield lt.rstrip() - lt = self.thisf.readline() - else: - # assumption was wrong - new lines are inserted at the top - # (also the case where LAST and THIS has no overwrap at all) - self.lastf.seek(0, 0) - ll = self.lastf.readline() - while True: - yield lt.rstrip() - lt = self.thisf.readline() - if lt == '' or lt == ll: - break - - self.lastf.close() - self.thisf.close() - - - def step(self): - lastfile = os.path.join(self.datadir, self.LAST_FILE) - thisfile = os.path.join(self.datadir, self.THIS_FILE) - if os.path.exists(thisfile): - os.rename(thisfile, lastfile) - else: - self.log.warn('%s does not exist, step is no-op.', thisfile) - - - + ll = self.lastf.readline() + if ll == '' or lt != ll: + break + while lt != '': + yield lt.rstrip() + lt = self.thisf.readline() + else: + # assumption was wrong - new lines are inserted at the top + # (also the case where LAST and THIS has no overwrap at all) + self.lastf.seek(0, 0) + ll = self.lastf.readline() + while True: + yield lt.rstrip() + lt = self.thisf.readline() + if lt == '' or lt == ll: + break + + self.lastf.close() + self.thisf.close() + + + def step(self): + lastfile = os.path.join(self.datadir, self.LAST_FILE) + thisfile = os.path.join(self.datadir, self.THIS_FILE) + if os.path.exists(thisfile): + os.rename(thisfile, lastfile) + else: + self.log.warn('%s does not exist, step is no-op.', thisfile) + + + class FeedReader(object): - """ - Simple parser for RSS feed. - Uses etree :func:`iterparse` to reduce memory footprint for large - feeds. - """ - def __init__(self, source): - self.parse = ET.iterparse(source, ['start', 'end']) - - self._item = None - self.log = logging.getLogger(__name__) - - def __iter__(self): - return self - def next(self): - while True: - event, elem = next(self.parse) - if event == 'start': - if elem.tag == 'item': - self._item = elem - elif event == 'end': - if elem.tag == 'item': - self._item = None - elif elem.tag == 'link': - if self._item is not None: - if elem.text: - try: - urlsplit(elem.text) - except Exception as ex: - self.log.error( - 'urlsplit exception: {}'.format(elem.text), - exc_info=1) - continue - - if not re.match(quoted_url, elem.text): - return elem.text - else: - unq = unquote(elem.text) - # quote anon-ascii characters. - chars = [quote(c) if ord(c)>127 else c for c in unq] - url = ''.join(chars) - return url - else: - continue + """ + Simple parser for RSS feed. + Uses etree :func:`iterparse` to reduce memory footprint for large + feeds. + """ + def __init__(self, source): + self.parse = ET.iterparse(source, ['start', 'end']) + + self._item = None + self.log = logging.getLogger(__name__) + + def __iter__(self): + return self + def next(self): + while True: + event, elem = next(self.parse) + if event == 'start': + if elem.tag == 'item': + self._item = elem + elif event == 'end': + if elem.tag == 'item': + self._item = None + elif elem.tag == 'link': + if self._item is not None: + if elem.text: + try: + urlsplit(elem.text) + except Exception as ex: + self.log.error( + 'urlsplit exception: {}'.format(elem.text), + exc_info=1) + continue + + if not re.match(quoted_url, elem.text): + return elem.text + else: + unq = unquote(elem.text) + # quote anon-ascii characters. + chars = [quote(c) if ord(c)>127 else c for c in unq] + url = ''.join(chars) + return url + else: + continue