From 9570dc51643e92eb9b19030d30af2c30e76d4cff Mon Sep 17 00:00:00 2001 From: jdhedden Date: Thu, 21 Apr 2016 09:41:19 -0400 Subject: [PATCH] Thread-Queue v3.06 --- Changes | 3 ++ MANIFEST | 1 + Makefile.PL | 4 +- README | 2 +- lib/Thread/Queue.pm | 42 +++++++++++++++--- t/11_limit.t | 101 ++++++++++++++++++++++++++++++++++++++++++++ t/99_pod.t | 1 + 7 files changed, 146 insertions(+), 8 deletions(-) create mode 100644 t/11_limit.t diff --git a/Changes b/Changes index a588886..1e6ce94 100755 --- a/Changes +++ b/Changes @@ -1,5 +1,8 @@ Revision history for Perl extension Thread::Queue. +3.06 Sat Aug 22 20:33:23 2015 + - Added queue limit feature as per suggestion by Mark Zealey + 3.05 Thu Mar 20 21:39:32 2014 - Sync with blead diff --git a/MANIFEST b/MANIFEST index 689a4dc..ecc0371 100755 --- a/MANIFEST +++ b/MANIFEST @@ -14,6 +14,7 @@ t/07_lock.t t/08_nothreads.t t/09_ended.t t/10_timed.t +t/11_limit.t t/99_pod.t t/test.pl examples/callback.pl diff --git a/Makefile.PL b/Makefile.PL index a898943..f0f7a8e 100755 --- a/Makefile.PL +++ b/Makefile.PL @@ -32,8 +32,8 @@ sub MY::postamble { return <<'_EXTRAS_'; fixfiles: - @dos2unix `cat MANIFEST` - @$(CHMOD) 644 `cat MANIFEST` + @dos2unix `cat MANIFEST | grep -v META` + @$(CHMOD) 644 `cat MANIFEST | grep -v META` @$(CHMOD) 755 examples/*.pl _EXTRAS_ } diff --git a/README b/README index da1df15..0db5db3 100755 --- a/README +++ b/README @@ -1,4 +1,4 @@ -Thread::Queue version 3.05 +Thread::Queue version 3.06 ========================== Thread-safe queues diff --git a/lib/Thread/Queue.pm b/lib/Thread/Queue.pm index 316644a..ebc1c31 100755 --- a/lib/Thread/Queue.pm +++ b/lib/Thread/Queue.pm @@ -3,7 +3,7 @@ package Thread::Queue; use strict; use warnings; -our $VERSION = '3.05'; +our $VERSION = '3.06'; $VERSION = eval $VERSION; use threads::shared 1.21; @@ -26,14 +26,29 @@ sub enqueue { my $self = shift; lock(%$self); + if ($$self{'ENDED'}) { require Carp; Carp::croak("'enqueue' method called on queue that has been 'end'ed"); } - push(@{$$self{'queue'}}, map { shared_clone($_) } @_) + + # Block if queue size exceeds any specified limit + my $queue = $$self{'queue'}; + cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'})); + + # Add items to queue, and then signal other threads + push(@$queue, map { shared_clone($_) } @_) and cond_signal(%$self); } +# Set or return the max. size for a queue +sub limit : lvalue +{ + my $self = shift; + lock(%$self); + $$self{'LIMIT'}; +} + # Return a count of the number of items on a queue sub pending { @@ -47,7 +62,7 @@ sub pending sub end { my $self = shift; - lock $self; + lock(%$self); # No more data is coming $$self{'ENDED'} = 1; # Try to release at least one blocked thread @@ -289,7 +304,7 @@ Thread::Queue - Thread-safe queues =head1 VERSION -This document describes Thread::Queue version 3.05 +This document describes Thread::Queue version 3.06 =head1 SYNOPSIS @@ -334,6 +349,9 @@ This document describes Thread::Queue version 3.05 # Work on $item } + # Set a size for a queue + $q->limit = 5; + # Get the second item in the queue without dequeuing anything my $item = $q->peek(1); @@ -423,7 +441,7 @@ Adds a list of items onto the end of the queue. Removes the requested number of items (default is 1) from the head of the queue, and returns them. If the queue contains fewer than the requested number of items, then the thread will be blocked until the requisite number -of items are available (i.e., until other threads more items). +of items are available (i.e., until other threads C more items). =item ->dequeue_nb() @@ -461,6 +479,20 @@ behaves the same as C. Returns the number of items still in the queue. Returns C if the queue has been ended (see below), and there are no more items in the queue. +=item ->limit + +Sets the size of the queue. If set, calls to C will block until +the number of pending items in the queue drops below the C. The +C does not prevent enqueuing items beyond that count: + + my $q = Thread::Queue->new(1, 2); + $q->limit = 4; + $q->enqueue(3, 4, 5); # Does not block + $q->enqueue(6); # Blocks until at least 2 items are dequeued + + my $size = $q->limit; # Returns the current limit (may return 'undef') + $q->limit = 0; # Queue size is now unlimited + =item ->end() Declares that no more items will be added to the queue. diff --git a/t/11_limit.t b/t/11_limit.t new file mode 100644 index 0000000..a2ab918 --- /dev/null +++ b/t/11_limit.t @@ -0,0 +1,101 @@ +use strict; +use warnings; + +use Config; + +BEGIN { + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } + if (! $Config{'d_select'}) { + print("1..0 # SKIP 'select()' not available for testing\n"); + exit(0); + } +} + +use threads; +use Thread::Queue; + +use Test::More; + +plan tests => 8; + +my $q = Thread::Queue->new(); +my $rpt = Thread::Queue->new(); + +my $th = threads->create( sub { + # (1) Set queue limit, and report it + $q->limit = 3; + $rpt->enqueue($q->limit); + + # (3) Fetch an item from queue + my $item = $q->dequeue(); + is($item, 1, 'Dequeued item 1'); + # Report queue count + $rpt->enqueue($q->pending()); + + # q = (2, 3, 4, 5); r = (4) + + # (4) Enqueue more items - will block + $q->enqueue(6, 7); + # q = (5, 'foo', 6, 7); r = (4, 3, 4, 3) + + # (6) Get reports from main + my @items = $rpt->dequeue(5); + is_deeply(\@items, [4, 3, 4, 3, 'go'], 'Queue reports'); + + # Dequeue all items + @items = $q->dequeue_nb(99); + is_deeply(\@items, [5, 'foo', 6, 7], 'Queue items'); +}); + +# (2) Read queue limit from thread +my $item = $rpt->dequeue(); +is($item, $q->limit, 'Queue limit set'); +# Send items +$q->enqueue(1, 2, 3, 4, 5); + +# (5) Read queue count +$item = $rpt->dequeue; +# q = (2, 3, 4, 5); r = () +is($item, $q->pending(), 'Queue count'); +# Report back the queue count +$rpt->enqueue($q->pending); +# q = (2, 3, 4, 5); r = (4) + +# Read an item from queue +$item = $q->dequeue(); +is($item, 2, 'Dequeued item 2'); +# q = (3, 4, 5); r = (4) +# Report back the queue count +$rpt->enqueue($q->pending); +# q = (3, 4, 5); r = (4, 3) + +# 'insert' doesn't care about queue limit +$q->insert(3, 'foo'); +$rpt->enqueue($q->pending); +# q = (3, 4, 5, 'foo'); r = (4, 3, 4) + +# Read an item from queue +$item = $q->dequeue(); +is($item, 3, 'Dequeued item 3'); +# q = (3, 4, 5); r = (4) +# Report back the queue count +$rpt->enqueue($q->pending); +# q = (4, 5, 'foo'); r = (4, 3, 4, 3) + +# Read an item from queue +$item = $q->dequeue(); +is($item, 4, 'Dequeued item 4'); +# Thread is now unblocked + +# Handshake with thread +$rpt->enqueue('go'); + +# (7) - Done +$th->join; + +exit(0); + +# EOF diff --git a/t/99_pod.t b/t/99_pod.t index 8fda796..57c75fb 100755 --- a/t/99_pod.t +++ b/t/99_pod.t @@ -57,6 +57,7 @@ CPAN readonly baz dequeuing +enqueuing pre __END__