Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deployed updates to new GDELT v3 RSS feeds; other refactoring #9

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions gdelt/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
/*.egg-info
/build
/dist
/data
/configs
*.log
35 changes: 32 additions & 3 deletions gdelt/gdelt/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
#
import sys
import os

import re
import urllib2

import logging

import xml.etree.ElementTree as ET

from urlparse import urlsplit
from urllib2 import quote,unquote
from ConfigParser import ConfigParser

quoted_url = re.compile('^https?%3A%2F%2F', re.IGNORECASE)

class Deduper(object):
LAST_FILE = 'LAST'
THIS_FILE = 'THIS'
Expand Down Expand Up @@ -58,6 +63,8 @@ def dedup(self, source):

self.lastf.close()
self.thisf.close()


def step(self):
lastfile = os.path.join(self.datadir, self.LAST_FILE)
thisfile = os.path.join(self.datadir, self.THIS_FILE)
Expand All @@ -66,6 +73,8 @@ def step(self):
else:
self.log.warn('%s does not exist, step is no-op.', thisfile)



class FeedReader(object):
"""
Simple parser for RSS feed.
Expand All @@ -76,6 +85,8 @@ def __init__(self, source):
self.parse = ET.iterparse(source, ['start', 'end'])

self._item = None
self.log = logging.getLogger(__name__)

def __iter__(self):
return self
def next(self):
Expand All @@ -89,4 +100,22 @@ def next(self):
self._item = None
elif elem.tag == 'link':
if self._item is not None:
return elem.text
if elem.text:
try:
urlsplit(elem.text)
except Exception as ex:
self.log.error(
'urlsplit exception: {}'.format(elem.text),
exc_info=1)
continue

if not re.match(quoted_url, elem.text):
return elem.text
else:
unq = unquote(elem.text)
# quote anon-ascii characters.
chars = [quote(c) if ord(c)>127 else c for c in unq]
url = ''.join(chars)
return url
else:
continue
104 changes: 66 additions & 38 deletions gdelt/process-feed.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/1/crawling/gdelt0/bin/python
#
import sys
import os
Expand All @@ -17,6 +17,8 @@
from gdelt.feed import FeedReader, Deduper
from crawllib.headquarter import HeadquarterSubmitter



CONFIG_FILE = 'config.yaml'

def crawluri(urls):
Expand All @@ -37,24 +39,16 @@ def httpdate(dt):
"""format time tuple `dt` in HTTP Date format."""
return time.strftime('%a, %d %b %Y %H:%M:%S %Z', dt)

class FeedScheduler(object):
def __init__(self, feed_url, hqbase, hqjob,
datadir='data', timeout=20,
check_interval=-1):
self.log = logging.getLogger(
'gdelt.{0.__name__}'.format(FeedScheduler))
class Feed(object):
def __init__(self, name, feed_url, datadir, log, timeout, hqclient):
self.name = name
self.feed_url = feed_url
self.hqbase = hqbase
self.hqjob = hqjob
self.datadir = datadir
self.timeout = int(timeout)
self.check_interval = int(check_interval)

assert os.path.isdir(self.datadir)

self.log = log
self.timeout = timeout
self.hqclient = hqclient
self.deduper = Deduper(self.datadir)
self.hqclient = HeadquarterSubmitter(self.hqbase, self.hqjob)


rfiles = [fn for fn in os.listdir(self.datadir)
if re.match(r'feed-\d{14}$', fn)]
if rfiles:
Expand All @@ -65,28 +59,8 @@ def __init__(self, feed_url, hqbase, hqjob,
time.strptime(max(rfiles)[-14:], '%Y%m%d%H%M%S')))
else:
self.last_time = None

def process(self):
while True:
t = time.time()
try:
self.process1()
except KeyboardInterrupt as ex:
raise
except Exception as ex:
self.log.error('process1 failed', exc_info=1)
if self.check_interval < 0:
self.log.debug('exiting because check_interval < 0')
break
if test_mode:
self.log.debug('exiting because test_mode=True')
break
dt = t + self.check_interval - time.time()
if dt >= 1.0:
self.log.debug('sleeping %ds until next cycle', int(dt))
time.sleep(dt)

def process1(self):
# file name is in UTC.
rid = time.strftime('%Y%m%d%H%M%S', time.gmtime())
rfile = os.path.join(self.datadir, 'feed-{}'.format(rid))
Expand Down Expand Up @@ -121,7 +95,7 @@ def process1(self):
return

self.last_time = time.gmtime()

urlcount = 0
slfile = os.path.join(self.datadir, 'sche-{}'.format(rid))
with open(slfile, 'wb') as sl:
Expand All @@ -139,7 +113,60 @@ def process1(self):
urlcount, os.path.basename(slfile))

self.deduper.step()

class FeedScheduler(object):
def __init__(self, feeds, hqbase, hqjob, datadir='data', timeout=20,
check_interval=-1):
self.hqbase = hqbase
self.hqjob = hqjob
self.hqclient = HeadquarterSubmitter(self.hqbase, self.hqjob)
self.check_interval = int(check_interval)
self.log = logging.getLogger(
'gdelt.{0.__name__}'.format(FeedScheduler))
self.datadir = datadir
assert os.path.isdir(self.datadir)

self.feeds = []
for f in feeds:
flog = logging.getLogger('gdelt.Feed.{}'.format(f))
furl = feeds[f]['url']
fdatadir = os.path.join(self.datadir,f)

assert os.path.isdir(fdatadir)

fconfig = {'name':f,
'feed_url': furl,
'datadir':fdatadir,
'log':flog,
'timeout':int(timeout),
'hqclient': self.hqclient,
}
feed = Feed(**fconfig)
self.feeds.append(feed)

def process(self):
while True:
t = time.time()
for feed in self.feeds:
try:
feed.process()
except KeyboardInterrupt as ex:
raise
except Exception as ex:
feed.log.error('process failed',exc_info=1)
if self.check_interval < 0:
self.log.debug('exiting because check_interval < 0')
break
if test_mode:
self.log.debug('exiting because test_mode=True')
break
dt = t + self.check_interval - time.time()
if dt >= 1.0:
self.log.debug('sleeping %ds until next cycle', int(dt))
time.sleep(dt)



parser = ArgumentParser()
parser.add_argument('--test', action='store_true', default=False,
help='disables submission to HQ')
Expand Down Expand Up @@ -170,6 +197,7 @@ def process1(self):
print >>sys.stderr, "configuration must have 'gdelt' section"
exit(1)


sch = FeedScheduler(**config['gdelt'])
try:
sch.process()
Expand Down