Skip to content

Commit

Permalink
spiders: OAI-PMH: continue where left off
Browse files Browse the repository at this point in the history
Signed-off-by: Szymon Łopaciuk <[email protected]>
  • Loading branch information
szymonlopaciuk committed Dec 12, 2017
1 parent e6ed6df commit 3c0a238
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 18 deletions.
1 change: 1 addition & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
- APP_CRAWLER_HOST_URL=http://scrapyd:6800
- APP_API_PIPELINE_TASK_ENDPOINT_DEFAULT=hepcrawl.testlib.tasks.submit_results
- APP_FILES_STORE=/tmp/file_urls
- APP_LAST_RUNS_PATH=/code/.scrapy/last_runs
- APP_CRAWL_ONCE_PATH=/code/.scrapy
- COVERAGE_PROCESS_START=/code/.coveragerc
- BASE_USER_UID=${BASE_USER_UID:-1000}
Expand Down
6 changes: 6 additions & 0 deletions hepcrawl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
'http://localhost/schemas/records/'
)

# Location of last run information
LAST_RUNS_PATH = os.environ.get(
'APP_LAST_RUNS_PATH',
'/var/lib/scrapy/last_runs/'
)

# Configure maximum concurrent requests performed by Scrapy (default: 16)
# CONCURRENT_REQUESTS=32

Expand Down
2 changes: 1 addition & 1 deletion hepcrawl/spiders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
class StatefulSpider(Spider):
def __init__(self, *args, **kwargs):
self.state = {}
return super(Spider, self).__init__(*args, **kwargs)
super(StatefulSpider, self).__init__(*args, **kwargs)
2 changes: 1 addition & 1 deletion hepcrawl/spiders/cds_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def parse_record(self, selector):
try:
cds_bibrec, ok, errs = create_bibrec(selector.xpath('.//record').extract()[0])
if not ok:
raise RuntimeError("Cannot parse record %s: %s", record, errs)
raise RuntimeError("Cannot parse record %s: %s", selector, errs)
self.logger.info("Here's the record: %s" % cds_bibrec)
inspire_bibrec = CDS2Inspire(cds_bibrec).get_record()
marcxml_record = record_xml_output(inspire_bibrec)
Expand Down
100 changes: 86 additions & 14 deletions hepcrawl/spiders/oaipmh_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@

import logging
from enum import Enum
from errno import EEXIST
from datetime import datetime
from dateutil import parser as dateparser
import hashlib
import json
from os import path, makedirs

from sickle import Sickle
from sickle.models import Record
from sickle.oaiexceptions import NoRecordsMatch

from scrapy.http import Request, XmlResponse
from scrapy.selector import Selector
from scrapy.spiders import Spider
from . import StatefulSpider

logger = logging.getLogger(__name__)

Expand All @@ -33,22 +38,21 @@ def format(self, datetime_object):
return datetime_object.strftime('%Y-%m-%d')
if self == self.SECOND:
return datetime_object.strftime('%Y-%m-%dT%H:%M:%SZ')
raise ValueError("Invalid granularity: %s" % self.granularity)


class OAIPMHSpider(Spider):
class OAIPMHSpider(StatefulSpider):
"""
Implements a spider for the OAI-PMH protocol by using the Python sickle library.
In case of successful harvest (OAI-PMH crawling) the spider will remember the initial starting
date and will use it as `from_date` argument on the next harvest.
In case of successful harvest (OAI-PMH crawling) the spider will remember
the initial starting date and will use it as `from_date` argument on the
next harvest.
"""
name = 'OAI-PMH'
state = {}
granularity = _Granularity.DATE

def __init__(self, url, metadata_prefix='marcxml', oai_set=None, alias=None,
from_date=None, until_date=None, granularity='',
from_date=None, until_date=None, granularity=_Granularity.DATE,
record_class=Record, *args, **kwargs):
super(OAIPMHSpider, self).__init__(*args, **kwargs)
self.url = url
Expand All @@ -57,13 +61,13 @@ def __init__(self, url, metadata_prefix='marcxml', oai_set=None, alias=None,
self.granularity = granularity
self.alias = alias or self._make_alias()
self.from_date = from_date
logger.info("Current state:{}".format(self.state))
self.until_date = until_date
self.record_class = record_class

def start_requests(self):
self.from_date = self.from_date or self.state.get(self.alias)
logger.info("Current state 2:{}".format(self.state))
self.from_date = self.from_date or self._resume_from
started_at = datetime.utcnow()

logger.info("Starting harvesting of {url} with set={set} and "
"metadataPrefix={metadata_prefix}, from={from_date}, "
"until={until_date}".format(
Expand All @@ -73,11 +77,15 @@ def start_requests(self):
from_date=self.from_date,
until_date=self.until_date
))
now = datetime.utcnow()

request = Request('oaipmh+{}'.format(self.url), self.parse)
yield request
self.state[self.alias] = self.granularity.format(now)
logger.info("Harvesting completed. Next harvesting will resume from {}".format(self.state[self.alias]))

now = datetime.utcnow()
self._save_run(started_at)

logger.info("Harvesting completed. Next harvesting will resume from {}"
.format(self.until_date or self.granularity.format(now)))

def parse_record(self, record):
"""
Expand Down Expand Up @@ -109,8 +117,72 @@ def parse(self, response):
yield self.parse_record(selector)

def _make_alias(self):
return '{url}-{metadata_prefix}-{set}'.format(
return '{url}?metadataPrefix={metadata_prefix}&set={set}'.format(
url=self.url,
metadata_prefix=self.metadata_prefix,
set=self.set
)

def _last_run_file_path(self):
"""Render a path to a file where last run information is stored.
Returns:
string: path to last runs path
"""
lasts_run_path = self.settings['LAST_RUNS_PATH']
file_name = hashlib.sha1(self._make_alias()).hexdigest() + '.json'
return path.join(lasts_run_path, self.name, file_name)

def _load_last_run(self):
"""Return stored last run information
Returns:
Optional[dict]: last run information or None if don't exist
"""
file_path = self._last_run_file_path()
try:
with open(file_path) as f:
last_run = json.load(f)
logger.info('Last run file loaded: {}'.format(repr(last_run)))
return last_run
except IOError:
return None

def _save_run(self, started_at):
"""Store last run information
Args:
started_at (datetime.datetime)
Raises:
IOError: if writing the file is unsuccessful
"""
last_run_info = {
'spider': self.name,
'url': self.url,
'metadata_prefix': self.metadata_prefix,
'set': self.set,
'granularity': self.granularity.value,
'from_date': self.from_date,
'until_date': self.until_date,
'last_run_started_at': started_at.isoformat(),
'last_run_finished_at': datetime.utcnow().isoformat(),
}
file_path = self._last_run_file_path()
logger.info("Last run file saved to {}".format(file_path))
try:
makedirs(path.dirname(file_path))
except OSError as exc:
if exc.errno == EEXIST:
pass
else:
raise
with open(file_path, 'w') as f:
json.dump(last_run_info, f, indent=4)

@property
def _resume_from(self):
last_run = self._load_last_run()
resume_at = last_run['until_date'] or last_run['last_run_finished_at']
date_parsed = dateparser.parse(resume_at)
return self.granularity.format(date_parsed)
3 changes: 1 addition & 2 deletions tests/functional/cds/test_cds.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from tempfile import NamedTemporaryFile
from twisted.internet import reactor

from hepcrawl.testlib.fixtures import (
get_test_suite_path,
Expand Down Expand Up @@ -69,7 +68,7 @@ def _override(field_key, original_dict, backup_dict, new_value):


def test_cds(cds_oai_server):
f = NamedTemporaryFile('rw')
f = NamedTemporaryFile('r+')

settings = get_project_settings()
settings.set('FEED_FORMAT', 'json')
Expand Down
113 changes: 113 additions & 0 deletions tests/unit/test_oaipmh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
#
# This file is part of hepcrawl.
# Copyright (C) 2017 CERN.
#
# hepcrawl is a free software; you can redistribute it and/or modify it
# under the terms of the Revised BSD License; see LICENSE file for
# more details.

from datetime import datetime
import json
from mock import patch
from os import remove, rmdir
import pytest

from hepcrawl.spiders.oaipmh_spider import OAIPMHSpider, _Granularity
from scrapy.utils.project import get_project_settings


def override_dynamic_fields(run):
if 'last_run_finished_at' in run:
run['last_run_finished_at'] = '2017-12-08T23:55:54.794969'
return run


@pytest.fixture(scope='function')
def cleanup():
yield
remove('/tmp/last_runs/OAI-PMH/2cea86bbc1d329b4273a29dc603fb8c0bb91439c.json')
rmdir('/tmp/last_runs/OAI-PMH')
rmdir('/tmp/last_runs')


@pytest.fixture
def settings():
settings_patch = {
'LAST_RUNS_PATH': '/tmp/last_runs/'
}
settings = get_project_settings()
with patch.dict(settings, settings_patch):
yield settings


@pytest.fixture
def spider(settings):
spider = OAIPMHSpider('http://export.arxiv.org/oai2', settings=settings)
spider.from_date = '2017-12-08'
spider.set = 'physics:hep-th'
spider.metadata_prefix = 'marcxml'
yield spider


def test_last_run_file_path(spider):
expected = '/tmp/last_runs/OAI-PMH/2cea86bbc1d329b4273a29dc603fb8c0bb91439c.json'
result = spider._last_run_file_path()
assert expected == result


def test_store_and_load_last_run(spider, cleanup):
now = datetime.utcnow()
spider._save_run(started_at=now)

file_path = spider._last_run_file_path()
result = override_dynamic_fields(json.load(open(file_path)))

expected = override_dynamic_fields({
'spider': 'OAI-PMH',
'url': 'http://export.arxiv.org/oai2',
'metadata_prefix': 'marcxml',
'set': 'physics:hep-th',
'granularity': 'YYYY-MM-DD',
'from_date': '2017-12-08',
'until_date': None,
'last_run_started_at': now.isoformat(),
'last_run_finished_at': '2017-12-08T13:55:00.000000',
})

assert expected == result

result = override_dynamic_fields(spider._load_last_run())

assert expected == result


def test_load_inexisting(spider):
last_run = spider._load_last_run()
assert last_run == None


@pytest.mark.parametrize('until_date,last_run,expected,granularity', [
('2017-12-08T13:54:00.0', '2017-12-08T13:54:00.0', '2017-12-08', _Granularity.DATE),
('2017-12-08T13:54:00.0', '2017-12-08T13:54:00.0', '2017-12-08T13:54:00Z', _Granularity.SECOND),
('2017-12-08', '2017-12-08', '2017-12-08', _Granularity.DATE),
('2017-12-08', '2017-12-08', '2017-12-08T00:00:00Z', _Granularity.SECOND),
(None, '2017-12-10T13:54:00.0', '2017-12-10', _Granularity.DATE),
(None, '2017-12-10', '2017-12-10T00:00:00Z', _Granularity.SECOND),
])
def test_resume_from(spider, until_date, last_run, expected, granularity, cleanup):
spider.until_date = until_date
spider.granularity = granularity
spider._save_run(started_at=datetime.utcnow())

with open(spider._last_run_file_path(), 'r') as f:
run_record = json.load(f)

run_record['last_run_finished_at'] = last_run

with open(spider._last_run_file_path(), 'w+') as f:
json.dump(run_record, f)

result = spider._resume_from

assert expected == result

0 comments on commit 3c0a238

Please sign in to comment.