-
Notifications
You must be signed in to change notification settings - Fork 0
/
threadpool.h
121 lines (118 loc) · 2.77 KB
/
threadpool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#pragma once
#include <list>
#include <thread>
#include <vector>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <future>
#include <atomic>
#include <stdexcept>
namespace std
{
#define THREADPOOL_MAX_NUM 6
#define QUEUE_MAX_NUM 6
class threadpool
{
using Task = function < void() >;
vector<thread> _pool;
queue<Task> _tasks;
mutex _lock;
mutex _drain_lock;
mutex _queue_full_lock;
condition_variable _task_cv;
condition_variable _drain_cv;
condition_variable _queue_cv;
atomic<bool> _run{ true };
atomic<int> _idlThrNum{ 0 };
atomic<int> _totalThrNum{ 0 };
public:
inline threadpool(unsigned short size = 6) { addThread(size); }
inline ~threadpool()
{
_run = false;
_task_cv.notify_all();
for (thread& thread : _pool) {
if (thread.joinable())
thread.join();
}
}
public:
//bind:-commit(std::bind(&Dog::sayHello, &dog));
//mem_fn:-commit(std::mem_fn(&Dog::sayHello), this)
template<class F, class... Args>
auto commit(F&& f, Args&&... args) ->future < decltype(f(args...)) >
{
if (!_run)
throw runtime_error("commit on ThreadPool is stopped.");
// check size of queue
{
unique_lock<mutex> lock{ _lock };
_queue_cv.wait(_lock, [this] {
return _tasks.size() < QUEUE_MAX_NUM;
});
}
using RetType = decltype(f(args...));
auto task = make_shared<packaged_task<RetType()>>(
bind(forward<F>(f), forward<Args>(args)...)
);
future<RetType> future = task->get_future();
{
lock_guard<mutex> lock{ _lock };
_tasks.emplace([task]() {
(*task)();
});
}
#ifdef THREADPOOL_AUTO_GROW
if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM)
addThread(1);
#endif // !THREADPOOL_AUTO_GROW
_task_cv.notify_one();
return future;
}
void drainTask()
{
unique_lock<mutex> lock{ _drain_lock };
_drain_cv.wait(lock, [this] {
return _run && _tasks.empty() && (_idlThrNum == _totalThrNum);
});
}
int idlCount() { return _idlThrNum; }
int thrCount() { return (int)_pool.size(); }
#ifndef THREADPOOL_AUTO_GROW
private:
#endif // !THREADPOOL_AUTO_GROW
void addThread(unsigned short size)
{
for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
{
_pool.emplace_back([this] {
while (_run)
{
Task task;
{
unique_lock<mutex> lock{ _lock };
_task_cv.wait(lock, [this] {
return !_run || !_tasks.empty();
});
if (!_run && _tasks.empty())
return;
task = move(_tasks.front());
_tasks.pop();
_queue_cv.notify_one();
}
_idlThrNum--;
task();
_idlThrNum++;
if (_run && _tasks.empty() && _idlThrNum == _totalThrNum)
{
_drain_cv.notify_one();
}
}
});
_idlThrNum++;
_totalThrNum++;
}
}
};
}