forked from pypi/legacy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
executable file
·89 lines (68 loc) · 2.61 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
import urlparse
import redis
import requests
from fncache import RedisLru
def purge_redis_cache(cache_redis_url, tags):
cache_redis = redis.StrictRedis.from_url(cache_redis_url)
lru_cache = RedisLru(cache_redis)
all_tags = set(tags)
for tag in set(all_tags):
print tag
lru_cache.purge(tag)
def purge_fastly_tags(domain, api_key, service_id, tags):
session = requests.session()
headers = {"Fastly-Key": api_key, "Accept": "application/json"}
all_tags = set(tags)
purges = {}
for tag in set(all_tags):
# Build the URL
url_path = "/service/%s/purge/%s" % (service_id, tag)
url = urlparse.urljoin(domain, url_path)
# Issue the Purge
resp = session.post(url, headers=headers)
resp.raise_for_status()
# Store the Purge ID so we can track it later
purges[tag] = resp.json()["id"]
# for tag, purge_id in purges.iteritems():
# # Ensure that the purge completed successfully
# url = urlparse.urljoin(domain, "/purge")
# status = session.get(url, params={"id": purge_id})
# status.raise_for_status()
# # If the purge completely successfully remove the tag from
# # our list.
# if status.json().get("results", {}).get("complete", None):
# all_tags.remove(tag)
import config
import store
from zope.pagetemplate.pagetemplatefile import PageTemplateFile
class PyPiPageTemplate(PageTemplateFile):
def pt_getContext(self, args=(), options={}, **kw):
"""Add our data into ZPT's defaults"""
rval = PageTemplateFile.pt_getContext(self, args=args)
options.update(rval)
return options
def rss_regen():
root = os.path.dirname(os.path.abspath(__file__))
conf = config.Config(os.path.join(root, "config.ini"))
template_dir = os.path.join(root, 'templates')
if conf.cache_redis_url:
cache_redis = redis.StrictRedis.from_url(conf.cache_redis_url)
else:
return True
(protocol, machine, path, x, x, x) = urlparse.urlparse(conf.url)
context = {}
context['store'] = store.Store(conf)
context['url_machine'] = '%s://%s'%(protocol, machine)
context['url_path'] = path
context['test'] = ''
if 'testpypi' in conf.url:
context['test'] = 'Test '
# generate the releases RSS
template = PyPiPageTemplate('rss.xml', template_dir)
content = template(**context)
cache_redis.set('rss~main', content)
# generate the packages RSS
template = PyPiPageTemplate('packages-rss.xml', template_dir)
content = template(**context)
cache_redis.set('rss~pkgs', content)