-
Notifications
You must be signed in to change notification settings - Fork 3
/
enqueue.py
executable file
·58 lines (44 loc) · 1.37 KB
/
enqueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#! /usr/bin/env python3
import sys
from sqlalchemy.sql import select
import hasher
import wmc
import db
def main():
try:
max_tasks = int(sys.argv[1])
except (IndexError, TypeError):
sys.exit('Usage: {} NUM_TASKS'.format(sys.argv[0]))
session = db.open_session()
count = 0
diff = 0
while max_tasks > 0:
# Find the next bunch of unqueued works,
stmt = select([db.Work.id]).where(
db.Work.status=='loaded'
).limit(min(max_tasks, 50))
work_ids = [row[0] for row in session.execute(stmt).fetchall()]
if not work_ids:
print('No more unqueued works in the database')
return
max_tasks -= len(work_ids)
count += len(work_ids)
diff += len(work_ids)
if diff >= 10000:
print('Queued works: {}'.format(count))
diff = 0
# We expect that this job is not run in parallel, so we don't
# have to worry about the status changing under our feet
try:
stmt = db.Work.__table__.update().where(
db.Work.id.in_(work_ids)
).values(status='queued')
session.execute(stmt)
wmc.process.apply_async((work_ids, ))
except:
session.rollback()
raise
else:
session.commit()
if __name__ == '__main__':
main()