Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.0 #75

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

2.0 #75

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 39 additions & 38 deletions src/poolboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@

-define(TIMEOUT, 5000).

-ifdef(pre17).
-type pid_queue() :: queue().
-else.
-type pid_queue() :: queue:queue().
-endif.

-type pool() ::
Name :: (atom() | pid()) |
{Name :: atom(), node()} |
Expand All @@ -30,7 +24,7 @@
-record(state, {
supervisor :: pid(),
workers :: [pid()],
waiting :: pid_queue(),
waiting :: ets:tid(),
monitors :: ets:tid(),
size = 5 :: non_neg_integer(),
overflow = 0 :: non_neg_integer(),
Expand Down Expand Up @@ -121,7 +115,7 @@ status(Pool) ->

init({PoolArgs, WorkerArgs}) ->
process_flag(trap_exit, true),
Waiting = queue:new(),
Waiting = ets:new(waiting, [private]),
Monitors = ets:new(monitors, [private]),
init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}).

Expand Down Expand Up @@ -149,11 +143,14 @@ handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
{noreply, State}
end;

handle_cast({cancel_waiting, Pid}, State) ->
Waiting = queue:filter(fun ({{P, _}, Ref}) ->
P =/= Pid orelse not(erlang:demonitor(Ref))
end, State#state.waiting),
{noreply, State#state{waiting = Waiting}};
handle_cast({cancel_waiting, Pid}, State = #state{waiting = Waiting}) ->
case ets:match_object(Waiting, {{Pid, '_'}, '_'}, 1) of
{[{From, Ref}], _} ->
true = erlang:demonitor(Ref),
true = ets:delete(Waiting, From);
{[], _} -> true
end,
{noreply, State};

handle_cast(_Msg, State) ->
{noreply, State}.
Expand All @@ -177,8 +174,8 @@ handle_call({checkout, Block}, {FromPid, _} = From, State) ->
{reply, full, State};
[] ->
Ref = erlang:monitor(process, FromPid),
Waiting = queue:in({From, Ref}, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
true = ets:insert(State#state.waiting, {From, Ref}),
{noreply, State}
end;

handle_call(status, _From, State) ->
Expand All @@ -203,15 +200,15 @@ handle_call(_Msg, _From, State) ->
Reply = {error, invalid_message},
{reply, Reply, State}.

handle_info({'DOWN', Ref, _, _, _}, State) ->
case ets:match(State#state.monitors, {'$1', Ref}) of
[[Pid]] ->
true = ets:delete(State#state.monitors, Pid),
handle_info({'DOWN', Ref, _, _, _}, #state{monitors = Monitors} = State) ->
case ets:match(Monitors, {'$1', Ref}, 1) of
{[[Pid]], _} ->
true = ets:delete(Monitors, Pid),
NewState = handle_checkin(Pid, State),
{noreply, NewState};
[] ->
Waiting = queue:filter(fun ({_, R}) -> R =/= Ref end, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
{[], _} ->
true = ets:match_delete(State#state.waiting, {'_', Ref}),
{noreply, State}
end;
handle_info({'EXIT', Pid, _Reason}, State) ->
#state{supervisor = Sup,
Expand Down Expand Up @@ -280,37 +277,41 @@ handle_checkin(Pid, State) ->
waiting = Waiting,
monitors = Monitors,
overflow = Overflow} = State,
case queue:out(Waiting) of
{{value, {From, Ref}}, Left} ->
true = ets:insert(Monitors, {Pid, Ref}),
gen_server:reply(From, Pid),
State#state{waiting = Left};
{empty, Empty} when Overflow > 0 ->
case ets:first(Waiting) of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Waiting ets table is of type set so dequeues are random or rather hash based. This is interesting but likely leads to the large 99% latencies of LIFO queues because a process hashed to an unlucky bucket will effectively be at the end of the queue, with newer process jumping ahead of it.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was operating based on my assumption that the first key according to the table's internal order will be returned (Erlang docs) meant there would order based on time of insertion, not hashed based on the waiting pid. That's unfortunate if that's how it works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will be disappointed I'm afraid :(.

'$end_of_table' when Overflow > 0 ->
ok = dismiss_worker(Sup, Pid),
State#state{waiting = Empty, overflow = Overflow - 1};
{empty, Empty} ->
State#state{overflow = Overflow - 1};
'$end_of_table' ->
Workers = [Pid | State#state.workers],
State#state{workers = Workers, waiting = Empty, overflow = 0}
State#state{workers = Workers, overflow = 0};
From ->
Ref = ets:lookup_element(Waiting, From, 2),
true = ets:delete(Waiting, From),
true = ets:insert(Monitors, {Pid, Ref}),
gen_server:reply(From, Pid),
State
end.

handle_worker_exit(Pid, State) ->
#state{supervisor = Sup,
waiting = Waiting,
monitors = Monitors,
overflow = Overflow} = State,
case queue:out(State#state.waiting) of
{{value, {{FromPid, _} = From, _}}, LeftWaiting} ->
case ets:first(Waiting) of
{FromPid, _} = From ->
true = ets:delete(Waiting, From),
MonitorRef = erlang:monitor(process, FromPid),
NewWorker = new_worker(State#state.supervisor),
true = ets:insert(Monitors, {NewWorker, MonitorRef}),
gen_server:reply(From, NewWorker),
State#state{waiting = LeftWaiting};
{empty, Empty} when Overflow > 0 ->
State#state{overflow = Overflow - 1, waiting = Empty};
{empty, Empty} ->
State;
'$end_of_table' when Overflow > 0 ->
State#state{overflow = Overflow - 1};
'$end_of_table' ->
Workers =
[new_worker(Sup)
| lists:filter(fun (P) -> P =/= Pid end, State#state.workers)],
State#state{workers = Workers, waiting = Empty}
State#state{workers = Workers}
end.

state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
Expand Down