-
Notifications
You must be signed in to change notification settings - Fork 0
/
task_queue.hpp
427 lines (382 loc) · 13.6 KB
/
task_queue.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
/*------------------------------------------------------------------------------
*
* This file is part of rendermq
*
* Author: [email protected]
*
* Copyright 2010-1 Mapquest, Inc. All Rights reserved.
*
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
*-----------------------------------------------------------------------------*/
#ifndef TASK_QUEUE_HPP
#define TASK_QUEUE_HPP
#include "tile_protocol.hpp"
#include "logging/logger.hpp"
// stl
#include <iostream>
#include <vector>
#include <ctime> // for std::time_t
// boost
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/identity.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/composite_key.hpp>
#include <boost/optional.hpp>
namespace rendermq
{
using namespace boost::multi_index;
struct task : tile_protocol
{
typedef std::vector<std::pair<tile_protocol,std::string> > cont_type;
typedef cont_type::const_iterator iterator;
explicit task(tile_protocol const& t, int priority=0)
: tile_protocol(t),
priority_(priority),
timestamp_(std::time(0)),
processed_(false) {}
task(int x,int y, int z, const std::string &style, protoCmd status, protoFmt fmt, std::time_t last_mod_, std::time_t req_last_mod_, int priority=0)
: tile_protocol(status,x,y,z,0,style,fmt,last_mod_,req_last_mod_),
priority_(priority),
timestamp_(std::time(0)),
processed_(false) {}
void set_priority(int priority)
{
priority_ = priority;
}
int priority() const
{
return priority_;
}
void add_subscriber(tile_protocol const& tile, std::string const& addr)
{
if (&tile == this) {
LOG_WARNING("Adding self to own subscribers vector - this shouldn't happen");
}
subscribers_.push_back(std::make_pair(tile,addr));
}
std::pair<iterator,iterator> subscribers() const
{
return std::make_pair(subscribers_.begin(),subscribers_.end());
}
void set_processed(bool b)
{
processed_ = b;
}
bool processed() const
{
return processed_;
}
void set_timestamp(std::time_t const& t)
{
timestamp_ = t;
}
std::time_t timestamp() const
{
return timestamp_;
}
int priority_;
std::time_t timestamp_;
cont_type subscribers_;
bool processed_;
};
inline bool operator==(task const& t0, task const& t1)
{
return (t0.x == t1.x &&
t0.y == t1.y &&
t0.z == t1.z &&
t0.style == t1.style);
}
inline std::size_t hash_value( task const& t)
{
return hash_value(static_cast<const tile_protocol &>(t));
}
struct priority {};
struct metatile {};
struct timestamp {};
/* a priority queue of tasks, sorted by priority (highest priority at the
* *front* of the queue) and unique by position and style parameters. finally
* it's sorted on timestamp.
*/
class task_queue
{
typedef multi_index_container<task,
indexed_by<
// sort on priority
ordered_non_unique<tag<priority>,
member<task,int, &task::priority_>,
std::greater<int> > ,
// hash index on x,y,z & style
hashed_unique<tag<metatile>,
identity<task> >,
// index to order by timestamp
ordered_non_unique<tag<timestamp>,
member<task,std::time_t, &task::timestamp_>,
std::less<int> >
> > cont_type;
typedef cont_type::index<rendermq::priority>::type priority_index_type;
typedef priority_index_type::iterator priority_index_iterator;
/* boost multi-index needs all modifications to the entries in the
* data structure to happen through these functor objects, so that
* the multi-index container can make the appropriate changes to
* the index after the modification happens.
*/
// sets the priority of a task
struct priority_modifier
{
priority_modifier(int priority)
: priority_(priority) {}
void operator() (task & t)
{
t.set_priority(priority_);
}
int priority_;
};
// called when a new task is to be added. this functor may need to
// adjust the priority of a task and the formats requested, as when
// tasks are added they may have higher priority than the one
// they're being merged with, or be requesting different formats.
// this ensures that high priority tasks still get processed
// quickly, even if they're merged with existing low priority tasks.
struct add_subscriber
{
add_subscriber(tile_protocol const& tile, std::string const& addr,int priority)
: tile_(tile),
addr_(addr),
priority_(priority) {}
void operator() (task & t)
{
if (priority_ > t.priority())
t.set_priority(priority_);
t.add_subscriber(tile_,addr_);
// union all the requested formats for the same metatile
t.format = static_cast<protoFmt>(t.format | tile_.format);
}
tile_protocol const& tile_;
std::string const& addr_;
int priority_;
};
// marks the task as being processed.
struct processed_fun
{
void operator() (task & t)
{
t.set_processed(true);
}
};
// unmarks the task as being processed and resets the timestamp so
// that it doesn't immediately get resubmitted again.
struct unprocessed_fun
{
void operator() (task & t)
{
std::time_t current_time(std::time(0));
t.set_processed(false);
t.set_timestamp(current_time);
}
};
public:
/* sets the task identified by the tile parameter as being processed.
*
* this means that the task will not appear as available via the
* front() function unless it is resubmitted via the
* resubmit_older_than() function, which means it won't get send out
* to other workers.
*/
void set_processed(tile_protocol const& tile)
{
typedef cont_type::index<rendermq::metatile>::type meta_index_type;
meta_index_type & index = queue.get<rendermq::metatile>();
meta_index_type::iterator itr = index.find(task(tile,0));
if (itr!=index.end())
{
processed_fun op;
index.modify(itr,op);
}
}
/* resets all tasks in the queue which have been marked as being
* processed for at least a timeout number of seconds.
*
* this is used to detect jobs which are running longer than expected,
* possibly due to worker failure, and make them available to be
* processed by other workers.
*
* jobs are resubmitted only when they have been in the queue for at
* least timeout seconds, are marked as being processed and are not
* bulk requests.
*/
void resubmit_older_than (int timeout)
{
typedef cont_type::index<rendermq::timestamp>::type index_type;
index_type & index = queue.get<rendermq::timestamp>();
index_type::iterator itr = index.begin();
index_type::iterator end = index.end();
std::time_t now = std::time(0);
for (; itr!=end; ++itr)
{
if (itr->status != cmdRenderBulk && now - itr->timestamp() >= timeout &&
itr->processed())
{
LOG_INFO(boost::format("Resubmitting task: %1%") % static_cast<tile_protocol>(*itr));
unprocessed_fun op;
index.modify(itr,op);
}
}
}
/* add a new task to the queue with the given priority, possibly
* merging it with tasks for the same metatile which are already on
* the queue.
*
* the address given is stored with the task and can be used to
* route the finished job back to the endpoint which originated
* it.
*
* returns true when the task was newly-added, false if the task
* was merged with another already in the queue.
*/
bool push(tile_protocol const& tile, std::string const& address, int priority)
{
tile_protocol meta(tile);
meta.x &= ~(METATILE-1);
meta.y &= ~(METATILE-1);
// because jobs can come in at any time, including while the job
// is out being rendered by a worker, it's necessary to play it
// safe and always tell the worker to render. otherwise the
// worker might think that no data is required back (bulk) and
// then this broker wouldn't have anything to send to the
// handler.
meta.status = cmdRender;
std::pair<cont_type::iterator,bool> result = queue.insert(rendermq::task(meta,priority));
if (result.first != queue.end())
{
add_subscriber sub(tile,address,priority);
queue.modify(result.first,sub);
}
return result.second;
}
/* remove the highest priority item from the queue.
*
* note that this doesn't pay any attention to whether the item is
* being processed or not, and should be used with care. a better
* approach may be to use erase() with the tile/task which you want
* to remove.
*/
void pop()
{
typedef cont_type::index<rendermq::priority>::type priority_index_type;
priority_index_type & index = queue.get<rendermq::priority>();
priority_index_type::iterator itr = index.begin();
priority_index_type::iterator end = index.end();
if (itr!=end) index.erase(itr);
}
/* remove a specific task from the queue.
*
* note that this removes the whole task - not just a single
* subscribed instance.
*
* returns true if the task existed, and was removed. false if the
* task was not found in the queue.
*/
bool erase(tile_protocol const& tile)
{
typedef cont_type::index<rendermq::metatile>::type meta_index_type;
meta_index_type & index = queue.get<rendermq::metatile>();
meta_index_type::iterator itr = index.find(task(tile,0));
if (itr!=index.end())
{
index.erase(*itr);
return true;
}
return false;
}
/* returns the task corresponding to a given tile, or an empty
* optional if it could not be found.
*
* note that the tile parameter must be metatile-aligned, or it
* won't match any tasks.
*/
boost::optional<task const&> get(tile_protocol const& tile) const
{
typedef cont_type::index<rendermq::metatile>::type meta_index_type;
meta_index_type const& index = queue.get<rendermq::metatile>();
meta_index_type::iterator itr = index.find(task(tile,0));
boost::optional<task const&> result;
if (itr!=index.end()) return boost::optional<task const&>(*itr);
return result;
}
/* returns an iterator range through all the tasks in the queue.
*/
std::pair<priority_index_iterator,priority_index_iterator> tasks() const
{
priority_index_type const& index = queue.get<rendermq::priority>();
priority_index_iterator itr = index.begin();
priority_index_iterator end = index.end();
return std::make_pair<priority_index_iterator,priority_index_iterator>(itr,end);
}
/* returns the highest priority unprocessed task, if there is
* one. otherwise returns an empty optional.
*
* FIXME: method name is misleading, should be front_unprocessed()?
*/
boost::optional<task const&> front() const
{
typedef cont_type::index<rendermq::priority>::type priority_index_type;
priority_index_type const& index = queue.get<rendermq::priority>();
priority_index_type::iterator itr = index.begin();
priority_index_type::iterator end = index.end();
boost::optional<task const&> result;
for (;itr!=end;++itr)
{
if (!itr->processed())
return boost::optional<task const&>(*itr);
}
return result;
}
/* returns the number of tasks in the queue, total.
*
* see count_unprocessed() if you want the number of available,
* unprocessed tasks in the queue.
*/
size_t size() const
{
return queue.size();
}
/* returns the number of available tasks in the queue.
*/
size_t count_unprocessed() const
{
size_t count=0;
typedef cont_type::index<rendermq::priority>::type priority_index_type;
priority_index_type const& index = queue.get<rendermq::priority>();
priority_index_type::iterator itr = index.begin();
priority_index_type::iterator end = index.end();
for (; itr!=end;++itr)
{
if (!itr->processed()) ++count;
}
return count;
}
/* removes all tasks from the queue.
*/
void clear() { queue.clear();}
private:
// the queue itself.
cont_type queue;
};
} // namespace rendermq
#endif // TASK_QUEUE_HPP