Skip to content

Commit

Permalink
Thread-Queue v3.06
Browse files Browse the repository at this point in the history
  • Loading branch information
jdhedden committed Apr 21, 2016
1 parent e84d4c9 commit 9570dc5
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 8 deletions.
3 changes: 3 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
Revision history for Perl extension Thread::Queue.

3.06 Sat Aug 22 20:33:23 2015
- Added queue limit feature as per suggestion by Mark Zealey

3.05 Thu Mar 20 21:39:32 2014
- Sync with blead

Expand Down
1 change: 1 addition & 0 deletions MANIFEST
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ t/07_lock.t
t/08_nothreads.t
t/09_ended.t
t/10_timed.t
t/11_limit.t
t/99_pod.t
t/test.pl
examples/callback.pl
Expand Down
4 changes: 2 additions & 2 deletions Makefile.PL
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ sub MY::postamble
{
return <<'_EXTRAS_';
fixfiles:
@dos2unix `cat MANIFEST`
@$(CHMOD) 644 `cat MANIFEST`
@dos2unix `cat MANIFEST | grep -v META`
@$(CHMOD) 644 `cat MANIFEST | grep -v META`
@$(CHMOD) 755 examples/*.pl
_EXTRAS_
}
Expand Down
2 changes: 1 addition & 1 deletion README
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Thread::Queue version 3.05
Thread::Queue version 3.06
==========================

Thread-safe queues
Expand Down
42 changes: 37 additions & 5 deletions lib/Thread/Queue.pm
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package Thread::Queue;
use strict;
use warnings;

our $VERSION = '3.05';
our $VERSION = '3.06';
$VERSION = eval $VERSION;

use threads::shared 1.21;
Expand All @@ -26,14 +26,29 @@ sub enqueue
{
my $self = shift;
lock(%$self);

if ($$self{'ENDED'}) {
require Carp;
Carp::croak("'enqueue' method called on queue that has been 'end'ed");
}
push(@{$$self{'queue'}}, map { shared_clone($_) } @_)

# Block if queue size exceeds any specified limit
my $queue = $$self{'queue'};
cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));

# Add items to queue, and then signal other threads
push(@$queue, map { shared_clone($_) } @_)
and cond_signal(%$self);
}

# Set or return the max. size for a queue
sub limit : lvalue
{
my $self = shift;
lock(%$self);
$$self{'LIMIT'};
}

# Return a count of the number of items on a queue
sub pending
{
Expand All @@ -47,7 +62,7 @@ sub pending
sub end
{
my $self = shift;
lock $self;
lock(%$self);
# No more data is coming
$$self{'ENDED'} = 1;
# Try to release at least one blocked thread
Expand Down Expand Up @@ -289,7 +304,7 @@ Thread::Queue - Thread-safe queues
=head1 VERSION
This document describes Thread::Queue version 3.05
This document describes Thread::Queue version 3.06
=head1 SYNOPSIS
Expand Down Expand Up @@ -334,6 +349,9 @@ This document describes Thread::Queue version 3.05
# Work on $item
}
# Set a size for a queue
$q->limit = 5;
# Get the second item in the queue without dequeuing anything
my $item = $q->peek(1);
Expand Down Expand Up @@ -423,7 +441,7 @@ Adds a list of items onto the end of the queue.
Removes the requested number of items (default is 1) from the head of the
queue, and returns them. If the queue contains fewer than the requested
number of items, then the thread will be blocked until the requisite number
of items are available (i.e., until other threads <enqueue> more items).
of items are available (i.e., until other threads C<enqueue> more items).
=item ->dequeue_nb()
Expand Down Expand Up @@ -461,6 +479,20 @@ behaves the same as C<dequeue_nb>.
Returns the number of items still in the queue. Returns C<undef> if the queue
has been ended (see below), and there are no more items in the queue.
=item ->limit
Sets the size of the queue. If set, calls to C<enqueue()> will block until
the number of pending items in the queue drops below the C<limit>. The
C<limit> does not prevent enqueuing items beyond that count:
my $q = Thread::Queue->new(1, 2);
$q->limit = 4;
$q->enqueue(3, 4, 5); # Does not block
$q->enqueue(6); # Blocks until at least 2 items are dequeued
my $size = $q->limit; # Returns the current limit (may return 'undef')
$q->limit = 0; # Queue size is now unlimited
=item ->end()
Declares that no more items will be added to the queue.
Expand Down
101 changes: 101 additions & 0 deletions t/11_limit.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use strict;
use warnings;

use Config;

BEGIN {
if (! $Config{'useithreads'}) {
print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
exit(0);
}
if (! $Config{'d_select'}) {
print("1..0 # SKIP 'select()' not available for testing\n");
exit(0);
}
}

use threads;
use Thread::Queue;

use Test::More;

plan tests => 8;

my $q = Thread::Queue->new();
my $rpt = Thread::Queue->new();

my $th = threads->create( sub {
# (1) Set queue limit, and report it
$q->limit = 3;
$rpt->enqueue($q->limit);

# (3) Fetch an item from queue
my $item = $q->dequeue();
is($item, 1, 'Dequeued item 1');
# Report queue count
$rpt->enqueue($q->pending());

# q = (2, 3, 4, 5); r = (4)

# (4) Enqueue more items - will block
$q->enqueue(6, 7);
# q = (5, 'foo', 6, 7); r = (4, 3, 4, 3)

# (6) Get reports from main
my @items = $rpt->dequeue(5);
is_deeply(\@items, [4, 3, 4, 3, 'go'], 'Queue reports');

# Dequeue all items
@items = $q->dequeue_nb(99);
is_deeply(\@items, [5, 'foo', 6, 7], 'Queue items');
});

# (2) Read queue limit from thread
my $item = $rpt->dequeue();
is($item, $q->limit, 'Queue limit set');
# Send items
$q->enqueue(1, 2, 3, 4, 5);

# (5) Read queue count
$item = $rpt->dequeue;
# q = (2, 3, 4, 5); r = ()
is($item, $q->pending(), 'Queue count');
# Report back the queue count
$rpt->enqueue($q->pending);
# q = (2, 3, 4, 5); r = (4)

# Read an item from queue
$item = $q->dequeue();
is($item, 2, 'Dequeued item 2');
# q = (3, 4, 5); r = (4)
# Report back the queue count
$rpt->enqueue($q->pending);
# q = (3, 4, 5); r = (4, 3)

# 'insert' doesn't care about queue limit
$q->insert(3, 'foo');
$rpt->enqueue($q->pending);
# q = (3, 4, 5, 'foo'); r = (4, 3, 4)

# Read an item from queue
$item = $q->dequeue();
is($item, 3, 'Dequeued item 3');
# q = (3, 4, 5); r = (4)
# Report back the queue count
$rpt->enqueue($q->pending);
# q = (4, 5, 'foo'); r = (4, 3, 4, 3)

# Read an item from queue
$item = $q->dequeue();
is($item, 4, 'Dequeued item 4');
# Thread is now unblocked

# Handshake with thread
$rpt->enqueue('go');

# (7) - Done
$th->join;

exit(0);

# EOF
1 change: 1 addition & 0 deletions t/99_pod.t
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ CPAN
readonly
baz
dequeuing
enqueuing
pre
__END__

0 comments on commit 9570dc5

Please sign in to comment.