diff --git a/src/poolboy.erl b/src/poolboy.erl index 7c5605e..db20541 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -38,7 +38,7 @@ -record(state, { supervisor :: undefined | pid(), - workers = [] :: [pid()], + workers :: undefined | pid_queue(), waiting :: pid_queue(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), @@ -205,19 +205,20 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> workers = Workers, monitors = Monitors, overflow = Overflow, - max_overflow = MaxOverflow} = State, - case Workers of - [Pid | Left] -> + max_overflow = MaxOverflow, + strategy = Strategy} = State, + case get_worker_with_strategy(Workers, Strategy) of + {{value, Pid}, Left} -> MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), {reply, Pid, State#state{workers = Left}}; - [] when MaxOverflow > 0, Overflow < MaxOverflow -> + {empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow -> {Pid, MRef} = new_worker(Sup, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), {reply, Pid, State#state{overflow = Overflow + 1}}; - [] when Block =:= false -> + {empty, _Left} when Block =:= false -> {reply, full, State}; - [] -> + {empty, _Left} -> MRef = erlang:monitor(process, FromPid), Waiting = queue:in({From, CRef, MRef}, State#state.waiting), {noreply, State#state{waiting = Waiting}} @@ -228,7 +229,7 @@ handle_call(status, _From, State) -> monitors = Monitors, overflow = Overflow} = State, StateName = state_name(State), - {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; + {reply, {StateName, queue:len(Workers), Overflow, ets:info(Monitors, size)}, State}; handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, {reply, Workers, State}; @@ -266,10 +267,10 @@ handle_info({'EXIT', Pid, _Reason}, State) -> NewState = handle_worker_exit(Pid, State), {noreply, NewState}; [] -> - case lists:member(Pid, State#state.workers) of + case queue:member(Pid, State#state.workers) of true -> - W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), - {noreply, State#state{workers = [new_worker(Sup) | W]}}; + W = filter_worker_by_pid(Pid, State#state.workers), + {noreply, State#state{workers = queue:in(new_worker(Sup), W)}}; false -> {noreply, State} end @@ -279,7 +280,8 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State) -> - ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers), + Workers = queue:to_list(State#state.workers), + ok = lists:foreach(fun (W) -> unlink(W) end, Workers), true = exit(State#state.supervisor, shutdown), ok. @@ -304,26 +306,33 @@ new_worker(Sup, FromPid) -> Ref = erlang:monitor(process, FromPid), {Pid, Ref}. +get_worker_with_strategy(Workers, fifo) -> + queue:out(Workers); +get_worker_with_strategy(Workers, lifo) -> + queue:out_r(Workers). + dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). +filter_worker_by_pid(Pid, Workers) -> + queue:filter(fun (WPid) -> WPid =/= Pid end, Workers). + prepopulate(N, _Sup) when N < 1 -> - []; + queue:new(); prepopulate(N, Sup) -> - prepopulate(N, Sup, []). + prepopulate(N, Sup, queue:new()). prepopulate(0, _Sup, Workers) -> Workers; prepopulate(N, Sup, Workers) -> - prepopulate(N-1, Sup, [new_worker(Sup) | Workers]). + prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)). handle_checkin(Pid, State) -> #state{supervisor = Sup, waiting = Waiting, monitors = Monitors, - overflow = Overflow, - strategy = Strategy} = State, + overflow = Overflow} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), @@ -333,10 +342,7 @@ handle_checkin(Pid, State) -> ok = dismiss_worker(Sup, Pid), State#state{waiting = Empty, overflow = Overflow - 1}; {empty, Empty} -> - Workers = case Strategy of - lifo -> [Pid | State#state.workers]; - fifo -> State#state.workers ++ [Pid] - end, + Workers = queue:in(Pid, State#state.workers), State#state{workers = Workers, waiting = Empty, overflow = 0} end. @@ -353,15 +359,14 @@ handle_worker_exit(Pid, State) -> {empty, Empty} when Overflow > 0 -> State#state{overflow = Overflow - 1, waiting = Empty}; {empty, Empty} -> - Workers = - [new_worker(Sup) - | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)], + W = filter_worker_by_pid(Pid, State#state.workers), + Workers = queue:in(new_worker(Sup), W), State#state{workers = Workers, waiting = Empty} end. state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> #state{max_overflow = MaxOverflow, workers = Workers} = State, - case length(Workers) == 0 of + case queue:len(Workers) == 0 of true when MaxOverflow < 1 -> full; true -> overflow; false -> ready diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 552f6e9..5b27024 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -127,13 +127,13 @@ transaction_timeout() -> pool_startup() -> %% Check basic pool operation. {ok, Pid} = new_pool(10, 5), - ?assertEqual(10, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(10, queue:len(pool_call(Pid, get_avail_workers))), poolboy:checkout(Pid), - ?assertEqual(9, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(9, queue:len(pool_call(Pid, get_avail_workers))), Worker = poolboy:checkout(Pid), - ?assertEqual(8, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(8, queue:len(pool_call(Pid, get_avail_workers))), checkin_worker(Pid, Worker), - ?assertEqual(9, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(9, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(1, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -141,23 +141,23 @@ pool_overflow() -> %% Check that the pool overflows properly. {ok, Pid} = new_pool(5, 5), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E, F, G] = Workers, checkin_worker(Pid, A), checkin_worker(Pid, B), - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), checkin_worker(Pid, F), - ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, G), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -167,7 +167,7 @@ pool_empty() -> %% overflow is enabled. {ok, Pid} = new_pool(5, 2), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E, F, G] = Workers, Self = self(), @@ -192,18 +192,18 @@ pool_empty() -> after 500 -> ?assert(false) end, - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), checkin_worker(Pid, F), - ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, G), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -213,7 +213,7 @@ pool_empty_no_overflow() -> %% disabled. {ok, Pid} = new_pool(5, 0), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E] = Workers, Self = self(), @@ -238,14 +238,14 @@ pool_empty_no_overflow() -> after 500 -> ?assert(false) end, - ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -256,16 +256,16 @@ worker_death() -> {ok, Pid} = new_pool(5, 2), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), [A, B, C|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), kill_worker(A), - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(6, length(pool_call(Pid, get_all_workers))), kill_worker(B), kill_worker(C), - ?assertEqual(1, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(4, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -277,9 +277,9 @@ worker_death_while_full() -> {ok, Pid} = new_pool(5, 2), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), [A, B|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), Self = self(), spawn(fun() -> @@ -306,7 +306,7 @@ worker_death_while_full() -> 1000 -> ?assert(false) end, kill_worker(B), - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(6, length(pool_call(Pid, get_all_workers))), ?assertEqual(6, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -318,9 +318,9 @@ worker_death_while_full_no_overflow() -> {ok, Pid} = new_pool(5, 0), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), [A, B, C|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), Self = self(), spawn(fun() -> @@ -346,10 +346,10 @@ worker_death_while_full_no_overflow() -> 1000 -> ?assert(false) end, kill_worker(B), - ?assertEqual(1, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), kill_worker(C), - ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(3, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). @@ -359,7 +359,7 @@ pool_full_nonblocking_no_overflow() -> %% option to use non-blocking checkouts is used. {ok, Pid} = new_pool(5, 0), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(full, poolboy:checkout(Pid, false)), ?assertEqual(full, poolboy:checkout(Pid, false)), @@ -374,7 +374,7 @@ pool_full_nonblocking() -> %% option to use non-blocking checkouts is used. {ok, Pid} = new_pool(5, 5), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 9)], - ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(10, length(pool_call(Pid, get_all_workers))), ?assertEqual(full, poolboy:checkout(Pid, false)), A = hd(Workers), @@ -395,17 +395,17 @@ owner_death() -> receive after 500 -> exit(normal) end end), timer:sleep(1000), - ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). checkin_after_exception_in_transaction() -> {ok, Pool} = new_pool(2, 0), - ?assertEqual(2, length(pool_call(Pool, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pool, get_avail_workers))), Tx = fun(Worker) -> ?assert(is_pid(Worker)), - ?assertEqual(1, length(pool_call(Pool, get_avail_workers))), + ?assertEqual(1, queue:len(pool_call(Pool, get_avail_workers))), throw(it_on_the_ground), ?assert(false) end, @@ -414,7 +414,7 @@ checkin_after_exception_in_transaction() -> catch throw:it_on_the_ground -> ok end, - ?assertEqual(2, length(pool_call(Pool, get_avail_workers))), + ?assertEqual(2, queue:len(pool_call(Pool, get_avail_workers))), ok = pool_call(Pool, stop). pool_returns_status() ->