From 512430842333c173ec315f0aace5193f01255a4f Mon Sep 17 00:00:00 2001 From: Ed J Date: Fri, 25 Oct 2024 23:45:53 +0100 Subject: [PATCH] incorporate PDL::Parallel::threads --- .github/workflows/ci.yml | 5 + Basic/Makefile.PL | 2 + Basic/SIMD.pm | 536 +++++++++++++++ .../PDL-Parallel-threads/barrier-sync.pl | 74 ++ .../PDL-Parallel-threads/perl-barrier.pl | 59 ++ .../simple-parallelize.pl | 7 + .../test-memory-consumption.pl | 31 + Basic/t/ppt-01_ref_counting.t | 38 ++ Basic/t/ppt-02_non_threaded.t | 71 ++ Basic/t/ppt-03_name_munging.t | 26 + Basic/t/ppt-10_physical_piddles.t | 140 ++++ Basic/t/ppt-11_memory_mapped.t | 45 ++ Basic/t/ppt-20_simd.t | 90 +++ Basic/t/ppt-30_sharing_from_threads.t | 90 +++ Basic/threads.pm | 633 ++++++++++++++++++ Changes | 1 + MANIFEST | 13 + Makefile.PL | 4 +- 18 files changed, 1863 insertions(+), 2 deletions(-) create mode 100644 Basic/SIMD.pm create mode 100644 Basic/examples/PDL-Parallel-threads/barrier-sync.pl create mode 100644 Basic/examples/PDL-Parallel-threads/perl-barrier.pl create mode 100644 Basic/examples/PDL-Parallel-threads/simple-parallelize.pl create mode 100644 Basic/examples/PDL-Parallel-threads/test-memory-consumption.pl create mode 100644 Basic/t/ppt-01_ref_counting.t create mode 100644 Basic/t/ppt-02_non_threaded.t create mode 100644 Basic/t/ppt-03_name_munging.t create mode 100644 Basic/t/ppt-10_physical_piddles.t create mode 100644 Basic/t/ppt-11_memory_mapped.t create mode 100644 Basic/t/ppt-20_simd.t create mode 100644 Basic/t/ppt-30_sharing_from_threads.t create mode 100644 Basic/threads.pm diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b47a46c9f..e735df70e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,6 +24,7 @@ jobs: matrix: os: [ubuntu-latest] perl-version: ['5.10'] + perl-threaded: [false] include: - perl-version: '5.30' os: ubuntu-latest @@ -32,6 +33,9 @@ jobs: - perl-version: '5.30' os: ubuntu-latest disttest: true + - perl-version: '5.10' + os: ubuntu-latest + perl-threaded: true - perl-version: '5.30' os: ubuntu-latest eumm-blead: 1 @@ -55,6 +59,7 @@ jobs: with: target-setup-perl: true perl-version: ${{ matrix.perl-version }} + perl-threaded: ${{ matrix.perl-threaded }} # conditional config - name: Maybe gfortran diff --git a/Basic/Makefile.PL b/Basic/Makefile.PL index 87a14a200..f7cede635 100644 --- a/Basic/Makefile.PL +++ b/Basic/Makefile.PL @@ -10,6 +10,8 @@ my %pm = map { my $h = '$(INST_LIBDIR)/'; ( $_, $h . $_ ); } ( @pm_names, 'default.perldlrc' ); $pm{'PDLdb.pl'} = '$(INST_LIB)/PDLdb.pl'; +$pm{'threads.pm'} = '$(INST_LIB)/PDL/Parallel/threads.pm'; +$pm{'SIMD.pm'} = '$(INST_LIB)/PDL/Parallel/threads/SIMD.pm'; my %man3pods = map { my $h = '$(INST_MAN3DIR)/'; $h .= 'PDL::' if $_ !~ /PDL.pm$/; diff --git a/Basic/SIMD.pm b/Basic/SIMD.pm new file mode 100644 index 000000000..80ddc8087 --- /dev/null +++ b/Basic/SIMD.pm @@ -0,0 +1,536 @@ +package PDL::Parallel::threads::SIMD; + +use strict; +use warnings; +use Carp; + +our $VERSION = '0.02'; + +require Exporter; +our @ISA = qw(Exporter); + + +our @EXPORT_OK = qw(parallelize parallel_sync parallel_id); + +use threads qw(yield); +use threads::shared qw(cond_wait); + +our $N_threads = -1; + +################ +# barrier_sync # +################ + +our $barrier_count :shared = 0; +our $barrier_state :shared = 'ready'; + +sub parallel_sync () { + if ($N_threads < 1) { + carp("Cannot call parallel_sync outside of a parallelized block"); + return; + } + + yield until $barrier_state eq 'ready' or $barrier_state eq 'up'; + + lock($barrier_count); + $barrier_state = 'up'; + $barrier_count++; + + if ($barrier_count == $N_threads) { + $barrier_count--; + $barrier_state = 'down'; + cond_broadcast($barrier_count); + yield; + } + else { + cond_wait($barrier_count) while $barrier_state eq 'up'; + $barrier_count--; + $barrier_state = 'ready' if $barrier_count == 0 + } +} + +sub parallel_id () { + goto \&get_tid; +} + +sub get_tid { + carp("Cannot get parallel_id outside of a parallelized block"); +} + +################## +# SIMD launching # +################## + +sub run_it { + my ($tid, $subref) = @_; + no warnings 'redefine'; + local *get_tid = sub { $tid }; + $subref->(); + parallel_sync; +} + +# Takes a block, the number of threads, and any thread arguments +sub parallelize (&$) { + my ($sub, $requested_threads,) = @_; + # For now, prevent nested launches + croak("Cannot nest parallelized blocks (yet)") + unless $N_threads == -1; + croak("Must request a positive number of parallelized threads") + unless $requested_threads =~ /^\d/ and $requested_threads > 0; + # This does not need to be localized once nested launches gets + # figured out. But for now, localize it so it gets restored to -1 at + # the end of the function call. + local $N_threads = $requested_threads; +# local $barrier_count = 0; +# local $barrier_state = 'ready'; + + # Launch N-1 threads... + my @to_join = map { + threads->create(\&run_it, $_, $sub) + } (1..$N_threads-1); + # ... and execute the last thread in this, the main thread + run_it(0, $sub); + + # Reap the threads + for my $thr (@to_join) { + $thr->join; + } +} + +1; + +__END__ + +=head1 NAME + +PDL::Parallel::threads::SIMD - launch and synchronize +Single-Instruction-Multiple-Dataset code + +=head1 VERSION + +This documentation describes version 0.02 of PDL::Parallel::threads::SIMD. + +=head1 SYNOPSIS + + use PDL::Parallel::threads::SIMD qw(parallelize parallel_sync parallel_id); + + # Launch five threads that all print a statement + parallelize { + my $pid = parallel_id; + print "Hello from parallel thread $pid\n"; + } 5; + + my @positions :shared; + + # Run 47 time steps, performing the calculations + # for the time steps in parallel + my $size = 100; + my $N_threads = 10; + my $stride = $size / $N_threads; + parallelize { + my $pid = parallel_id; + + # Set this thread's portion of the positions to zero + my $start = $stride * $pid; + my $end = $start + $stride - 1; + @positions[$start..$end] = (0) x $stride; + + for (1..47) { + # First make sure all the threads are lined up + parallel_sync; + + # Now calculate the next positions + $positions[$_] += $velocities[$_] for $start .. $end; + } + } $N_threads; + +=head1 DESCRIPTION + +In my experience, parallel algorithms are nearly always expressed in a form +called single-instruction, multiple-dataset (SIMD). That is, the exact same +code runs in multiple threads, and the only difference between the threads +is the data they manipulate. This is certainly the case for MPI +and CUDA, two high-performance parallel computing frameworks. The goal of +this module is to provide a means for you to write single-machine SIMD code +in Perl. It won't be as performant as MPI, CUDA, or OpenCL, but with a little +work I hope it can give decent results. In the very least, I hope it can +serve as a good pedagogical tool for understanding parallel algorithms. + +SIMD code needs three facilities: a fast mechanism for +data sharing, a means to enforce barrier synchronization, and an indication +of which thread is which, typically in the form of a thread id. This module +provides a way to realize the second and third of these; data sharing is +already available thanks to general Perl data sharing (not fast, but it is +easy to share data) and L, which provides a simple +way to share PDL data across threads in a way that is quite fast. + +=for soon +I + +The main element that this module provides is the L function, +which allows for a simple and obvious specification for your SIMD code. From +within the block, you can obtain the parallelized thread id, which is a +block-specific number between 0 and one less the number of threads executing +in your parallelized block. You obtain the parallel thread id by calling +L. Also from within the block, you can enforce a +barrier synchronization point using L. + +For example, here's a complete working script that demonstrates the use of +L and L: + + use PDL::Parallel::threads::SIMD qw(parallelize parallel_id); + parallelize { + my $pid = parallel_id; + print "Hello from parallel thread $pid\n" + } 10; + +When I run this on my machine, I get this output: + + Hello from parallel thread 1 + Hello from parallel thread 2 + Hello from parallel thread 3 + Hello from parallel thread 4 + Hello from parallel thread 5 + Hello from parallel thread 6 + Hello from parallel thread 7 + Hello from parallel thread 8 + Hello from parallel thread 0 + Hello from parallel thread 9 + +Look closely at that output and you should notice that between thread 8 and +9 comes thread 0. In general, parallel threads have no guarantee of ordering +and for longer parallelized blocks the eventual order for such a printout is +often essentially random. + +As you can see, the block that you provide to L gets executed +ten times, but within each block the value returned by L is a +unique integer between 0 and 9. Perl assigns a unique id to every thread +that it runs, so my use of the phrase I here is a +deliberate way to distinguish this id from Perl's thread id. Perl's thread +id will incrementally increase throughout the life of your program, +increasing with each thread that you spawn, but the +parallel thread id will always begin counting from zero for a given +parallelized block. + +Why would you want each thread to have a different number that is distinct +from its Perl-assigned thread id? The reason is that having such unique, +sequential, and normalized numbers makes it very easy for you to divide the +work between the threads in a simple and predictable way. For example, in +the code shown below, the bounds for the slice are calculated in a +thread-specific fashion based on the parallel thread id. + + use PDL; + use PDL::Parallel::threads; + use PDL::Parallel::threads:SIMD qw(parallelize parallel); + use PDL::NiceSlice; + + # Load the data with 7 million elements into $to_sum... + # ... + # Share it. + $to_sum->share_as('to-sum'); + + # Also allocate some shared, temproary memory: + my $N_threads = 10; + zeroes($N_threads)->share_as('workspace'); + + my $stride = $to_sum->nelem / $N_threads; + + # Parallelize the sum: + parallelize { + my $pid = parallel_id; + my ($to_sum, $temporary) + = retrieve_pdls('to-sum', 'workspace'); + + # Calculate the thread-specific slice bounds + my $start = $stride * $pid; + my $end = $start + $stride - 1; + $end = $to_sum->nelem - 1 if $end >= $to_sum->nelem; + + # Perform this thread's sum + $temporary($pid) + .= $to_sum($start:$end)->sum; + }); + + # This code will not run until that launch has returned + # for all threads, so at this point we can assume the + # workspace memory has been filled. + my $final_sum = retrieve_pdls('workspace')->sum; + + print "The sum is $final_sum\n"; + +As mentioned in the last comment in that example code, the last +L block will always finish executing before the next line of +Perl code in your script. In other words, all the threads perform a +barrier synchronization just before returning control to your code. Any why +would anybody want to force code to wait at a barrier, you ask? + +Nontrivial multi-threaded code must be able to set locations in the code +that all threads must reach before any threads go forward. This is called +barrier synchronization and is important when your threaded code has +multiple stages. A particularly important example of an algorithm that needs +the ability to set barrier synchronization points is a time-stepping +simulation. In that case, you need to make sure that all of your threads +have a chance to reach the "end" of the time step before moving to the next +time step, since ostensibly the results of one thread's time step depend on +the previous results of other threads. If the calculations for each thread +on one step depend on all (or at least some of) the threads having completed +a previous set of calculations, you should use a barrier synchronization +event by calling L. + +In light of the synchronization that occurs just before returning control to +your code, you can conceptualize the timeline of your code's execution as +follows: + + PDL::Parallel::threads::SIMD Execution + ====================================== + + main thread + | + | + | + V + th0 th1 th2 th3 th4 th5 ... th(N-1) + | | | | | | | + | | | | | | | + V V V V V V V + main thread + | + | + | + V + +This is in contrast to, say, CUDA, in which a thread-launch returns +immediately, thus allowing you to perform calculations on your CPU while you +wait for the GPU to finish: + + CUDA Execution + ============== + + main thread + | + | + | + | --> thread launch + | th0 th1 th2 th3 th4 th5 ... th(N-1) + | | | | | | | | + | | | | | | | | + | V V V V V V V + | + | + | + | + V + +It also contrasts with MPI, in which there is no I
to speak of: +all execution occurs in one thread or another, and any central coordination +needs to be specifically orchestrated through a chosen thread. + +It is important to note that if any thread makes a call to L, +I threads must make a call to L. Otherwise, the thread +will hang until, possibly, the next call for barrier synchronization, and +that could lead to I confusing apparent errors in logic. For example: + + ... + parallelize { + my $pid = parallel_id; + + # do some calculations + + # Do some thread-specific work + if ($pid < 5) { + # Notice the *two* barriers set up here: + parallel_sync; + # Do something that requires synchronization + parallel_sync; + } + else { + # THIS PART IS NECESSARY TO PREVENT PROBLEMS + # Call parallel_sync the same number of times + # in this else block as in the if block + parallel_sync; + parallel_sync; + } + } 10; + +As a general rule, avoid putting L in conditional blocks +like C statements. C loops are another possible problem if the +condition within the while loop (more specifically, the number of iterations +through the C loop) depends on thread-specific aspects of the data. +You can do it, of course, but you have to be very careful that I +threads make the same number of calls at the same algorithmically-intended +point in the execution. + +=head1 FUNCTIONS + +This module provides three functions: one for lanching a block of code in +parallel across multiple threads, and two that are meant to be called within +that block: a function for synchronizing the execution of the different +threads executing that block and a function to obtain the parallel block's +sequential id. + +=head2 parallelize + +=for ref + +Launches a block of code in parallel across a bunch of threads. + +=for usage + + parallelize BLOCK N_THREADS + +This function requires two arguments: the block to execute and the number of +threads to launch to execute this block, and returns nothing. This is the +means by which you specify the code that you want run in parallel. + +=head2 parallel_sync + +=for ref + +Synchronizes all threads at the given barrier. + +Usage: + +=for usage + + parallelize { + # ... + + parallel_sync; + + # ... + + } $N_threads; + +This function enforces barrier synchronization among all the threads in your +parallelized block. It takes no arguments and does not return anything. + +The barrier synchronization is tightly coupled with the L +function: you can only call C from the middle of a +L block. If you call C from outside a +L block, you will get an error. + +I need to include an example and exposition on when and why to synchronize... + +=head2 parallel_id + +=for ref + +Gives the thread's I. + +Usage: + +=for usage + + parallelize { + # ... + + my $pid = parallel_id; + + # ... + + } $N_threads; + +From within the L block, you obtain the current thread's +parallel id with this simple function. When called outside the scope of a +L block, the function simply croaks. + +=head1 DIAGNOSTICS + +This module does not croak. It does, however, issue a handful of warnings. + +=over + +=item C<< Cannot call parallel_sync outside of a parallelized block >> + +You tried to issue a barrier synchronization (L) outside the +context of a L block, but that's the only context where it +makes sense. + +=item C<< Cannot get parallel_id outside of a parallelized block >> + +You will get this warning when you ask for a parallel id from code that is +not executing in a parallel block. The resulting return value will be the +undefined value. + +=item C<< Cannot nest parallelized blocks (yet) >> + +This exception gets thrown when you have a L block within +another L block. That's not presently allowed, though I'm open +to any ideas for implementing it if you have any. :-) + +=item C<< Must request a positive number of parallelized threads >> + +If you send something that's not a positive integer as the number of threads +to launch on your parallelized block, you will get this error. Always +specify a positive integer number. + +=back + +=head1 LIMITATIONS + +I am actually quite pleased with how this module has turned out, but there +are certainly some limitations. For example, you cannot launch a parallel +block from within another parallel block. You can still create and join +threads, you just cannot do it with the L function. + +I'm sure there are plenty of limitations, but it's hard for me to see what +differentiates a design goal from a limitation. Feedback on this would be +much appreciated. + +=head1 BUGS + +None known at this point. + +=head1 SEE ALSO + +The basic module for Perl parallel computing is L. + +Work on this module was originally inspired by work on +L, so you might want to check that out. + +Modules related to scientific parallel computing include +L, L, L, +L and L. + +Other modules provide alternative parallel computing frameworks. These may +be less suitable for scientific computing, but will likely serve other +purposes: L, Gearman, L, L, +L. + +=head1 AUTHOR, COPYRIGHT, LICENSE + +This module was written by David Mertens. The documentation is copyright (C) +David Mertens, 2012. The source code is copyright (C) Northwestern University, +2012. All rights reserved. + +This module is free software; you can redistribute it and/or modify it under +the same terms as Perl itself. + +=head1 DISCLAIMER OF WARRANTY + +Parallel computing is hard to get right, and it can be exacerbated by errors +in the underlying software. Please do not use this software in anything that +is mission-critical unless you have tested and verified it yourself. I cannot +guarantee that it will perform perfectly under all loads. I hope this is +useful and I wish you well in your usage thereof, but BECAUSE THIS SOFTWARE +IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE SOFTWARE, TO THE +EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING +THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE SOFTWARE "AS IS" +WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT +NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +PARTICULAR PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE +SOFTWARE IS WITH YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE, YOU ASSUME THE +COST OF ALL NECESSARY SERVICING, REPAIR, OR CORRECTION. + +IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL +ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE +THE SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE SOFTWARE (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU +OR THIRD PARTIES OR A FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER +SOFTWARE), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +=cut diff --git a/Basic/examples/PDL-Parallel-threads/barrier-sync.pl b/Basic/examples/PDL-Parallel-threads/barrier-sync.pl new file mode 100644 index 000000000..fc4a9ea67 --- /dev/null +++ b/Basic/examples/PDL-Parallel-threads/barrier-sync.pl @@ -0,0 +1,74 @@ +use strict; +use warnings; + +use PDL; +use PDL::NiceSlice; +use PDL::Parallel::threads qw(retrieve_pdls); +use PDL::Parallel::threads::SIMD qw(parallelize parallel_sync parallel_id); +my $pdl = zeroes(20); +$pdl->share_as('test'); +#undef($pdl); + +# Create and share a slice +my $slice = $pdl(10:15)->sever; +$slice->share_as('slice'); + +# Create and share a memory mapped ndarray +use PDL::IO::FastRaw; +my $mmap = mapfraw('foo.bin', {Creat => 1, Datatype => double, Dims => [50]}); +$mmap->share_as('mmap'); + +END { + unlink 'foo.bin'; + unlink 'foo.bin.hdr'; +} + +my $N_threads = 5; + +parallelize { + my $tid = parallel_id; + my $pdl = retrieve_pdls('test'); + + print "Thread id $tid says the ndarray is $pdl\n"; + parallel_sync; + + my $N_data_to_fix = $pdl->nelem / $N_threads; + my $idx = sequence($N_data_to_fix) * $N_threads + $tid; + $pdl($idx) .= $tid; + parallel_sync; + + print "After set, thread id $tid says the ndarray is $pdl\n"; + parallel_sync; + + $pdl->set($tid, 0); + parallel_sync; + + print "Thread id $tid says the ndarray is now $pdl\n"; +} $N_threads; + +print "mmap is $mmap\n"; +parallelize { + my $tid = parallel_id; + my $mmap = retrieve_pdls('mmap'); + + $mmap($tid) .= $tid; +} $N_threads; + +print "now mmap is $mmap\n"; + +parallelize { + my $tid = parallel_id; + my $pdl = retrieve_pdls('test'); + + print "Thread id is $tid\n"; + + my $N_data_to_fix = $pdl->nelem / $N_threads; + my $idx = sequence($N_data_to_fix - 1) * $N_threads + $tid; + use PDL::NiceSlice; + $pdl($idx) .= -$tid; + + my $slice = retrieve_pdls('slice'); + $slice($tid) .= -10 * $tid; +} $N_threads; + +print "Final ndarray value is $pdl\n"; diff --git a/Basic/examples/PDL-Parallel-threads/perl-barrier.pl b/Basic/examples/PDL-Parallel-threads/perl-barrier.pl new file mode 100644 index 000000000..15d7cfa19 --- /dev/null +++ b/Basic/examples/PDL-Parallel-threads/perl-barrier.pl @@ -0,0 +1,59 @@ +use strict; +use warnings; +use threads qw(yield); +use threads::shared qw(cond_wait); + +my $N_threads = 4; + +# Allocate the shared memory outside the threads +my @data :shared; + +# Needed for barrier_sync +my $barrier_count :shared = 0; +my $barrier_state :shared = 'ready'; + +# Launch the threads, then have this (parent) thread join the fray +threads->create(\&main) for (1..$N_threads-1); +main(); + +# Reap the remaining threads +for my $thr (threads->list) { + $thr->join; +} + +sub barrier_sync { + yield until $barrier_state eq 'ready' or $barrier_state eq 'up'; + + lock($barrier_count); + $barrier_state = 'up'; + $barrier_count++; + + if ($barrier_count == $N_threads) { + $barrier_count--; + $barrier_state = 'down'; + cond_broadcast($barrier_count); + yield; + } + else { + cond_wait($barrier_count) while $barrier_state eq 'up'; + $barrier_count--; + $barrier_state = 'ready' if $barrier_count == 0 + } +} + +# This is the code that actually does calculations +sub main { + my $tid = threads->self->tid; + + $data[$tid] = $tid; + barrier_sync; + + print "Thread id $tid says the array is ", join(', ', @data), "\n"; + barrier_sync; + + $data[$tid] = 0; + barrier_sync; + + print "Thread id $tid says the array is now ", join(', ', @data), "\n"; +} + diff --git a/Basic/examples/PDL-Parallel-threads/simple-parallelize.pl b/Basic/examples/PDL-Parallel-threads/simple-parallelize.pl new file mode 100644 index 000000000..e9f9cb3aa --- /dev/null +++ b/Basic/examples/PDL-Parallel-threads/simple-parallelize.pl @@ -0,0 +1,7 @@ +use strict; +use warnings; +use PDL::Parallel::threads::SIMD qw(parallelize parallel_id); + parallelize { + my $pid = parallel_id; + print "Hello from parallel thread $pid\n" + } 10; diff --git a/Basic/examples/PDL-Parallel-threads/test-memory-consumption.pl b/Basic/examples/PDL-Parallel-threads/test-memory-consumption.pl new file mode 100644 index 000000000..0438de094 --- /dev/null +++ b/Basic/examples/PDL-Parallel-threads/test-memory-consumption.pl @@ -0,0 +1,31 @@ +use strict; +use warnings; + +my $N_threads = $ARGV[0] || 2; + +use PDL; +use PDL::Parallel::threads qw(retrieve_pdls); +use PDL::Parallel::threads::SIMD qw(parallel_sync parallelize parallel_id); +zeroes(100_000_000)->share_as('test'); +use PDL::IO::FastRaw; +mapfraw('foo.dat', {Creat => 1, Dims => [$N_threads], Datatype => double}) + ->share_as('mapped'); + +print "Main thread is about to rest for 5 seconds\n"; +sleep 5; + +parallelize { + my $tid = parallel_id; + my ($pdl, $mapped) = retrieve_pdls('test', 'mapped'); + + print "Thread id $tid is about to sleep for 5 seconds\n"; + parallel_sync; + sleep 5; + parallel_sync; +} $N_threads; + + +END { + # Clean up the testing files + unlink $_ for qw(foo.dat foo.dat.hdr); +} diff --git a/Basic/t/ppt-01_ref_counting.t b/Basic/t/ppt-01_ref_counting.t new file mode 100644 index 000000000..d4c9dcc75 --- /dev/null +++ b/Basic/t/ppt-01_ref_counting.t @@ -0,0 +1,38 @@ +use strict; +use warnings; + +use Test::More; + +use PDL::LiteF; +use PDL::Parallel::threads qw(retrieve_pdls free_pdls); + +my $data = sequence(20); +is($data->datasv_refcount, 1, "Data's initial refcount for normal ndarray is 1"); + +my $copy = $data; +is($data->datasv_refcount, 1, "Shallow copy does not increase data's refcount"); + +$data->share_as('foo'); +is($data->datasv_refcount, 2, "Sharing data increases data's refcount"); + +my $shallow = retrieve_pdls('foo'); +is($data->datasv_refcount, 3, "Retrieving data increases data's refcount"); + +undef($shallow); +is($data->datasv_refcount, 2, "Undef'ing retrieved copy decreases data's refcount"); + +undef($copy); +is($data->datasv_refcount, 2, "Undef'ing one of two original copies does not decrease data's refcount"); + +undef($data); + +# At this point, there should be only one reference, but we can't actually +# know because we don't have a reference to an ndarray to check! Get a new +# shared copy: +$shallow = retrieve_pdls('foo'); +is($shallow->datasv_refcount, 2, "Getting rid of original does not destroy the data"); + +free_pdls('foo'); +is($shallow->datasv_refcount, 1, "Freeing memory only decrements refcount by one"); + +done_testing; diff --git a/Basic/t/ppt-02_non_threaded.t b/Basic/t/ppt-02_non_threaded.t new file mode 100644 index 000000000..a081023aa --- /dev/null +++ b/Basic/t/ppt-02_non_threaded.t @@ -0,0 +1,71 @@ +# Boilerplate +use strict; +use warnings; + +# Test declaration +use Test::More tests => 9; + +# Modules needed for actual testing +use PDL::LiteF; +use PDL::Parallel::threads qw(retrieve_pdls); + +######################## +# Direct comparison: 4 # +######################## + +# Create some memory with some irrational values. The goal here is to +# perform a strict comparison between floating point values that have +# something nontrivial across all its bits. +my $data = (sequence(10)+1)->sqrt->share_as('Test::Set1'); +my $to_compare = $data; +ok(all($to_compare == $data), 'A ndarray exactly equals itself') + or diag("Original is $data and comparison is $to_compare;\n" + . "original - comparison = " . ($data - $to_compare)); + +# Now retrieve the value from the "off-site" storage +$to_compare = retrieve_pdls('Test::Set1'); +is_deeply([$to_compare->dims], [$data->dims], 'Retrieved dims is correct') + or diag("Original dims are " . join(', ', $data->dims) + . " and retrieved dims are " . join', ', $to_compare->dims); + +ok($data->type == $to_compare->type, 'Retrieved type is correct') + or diag("Original type is " . $data->type + . " and retrieved type is " . $to_compare->type); + +ok(all($to_compare == $data), 'Retrieved value exactly equals original') + or diag("Original is $data and retrieved is $to_compare;\n" + . "original - retrieved = " . ($data - $to_compare)); + +########################### +# Shared modifications: 2 # +########################### + +use PDL::NiceSlice; +# Modify the original, see if it is reflected in the retrieved copy +$data(3) .= -10; +ok(all($to_compare == $data), 'Modification to original is reflected in retrieved') + or diag("Original is $data and retrieved is $to_compare;\n" + . "original - retrieved = " . ($data - $to_compare)); + +$to_compare(8) .= -50; +ok(all($to_compare == $data), 'Modification to retrieved is reflected in original') + or diag("Original is $data and retrieved is $to_compare;\n" + . "original - retrieved = " . ($data - $to_compare)); + +############################### +# Undefine doesn't destroy: 3 # +############################### + +my $expected = pdl(1, -10, -50); # These need to line up with the +my $idx = pdl(0, 3, 8); # indices and values used/set above + +undef($to_compare); +ok(all($data($idx) == $expected), "Undeffing copy doesn't destroy data"); + +undef($data); +my $new = retrieve_pdls('Test::Set1'); +ok(all($new($idx) == $expected), "Can retrieve data even after undefing original"); + +PDL::Parallel::threads::free_pdls('Test::Set1'); +ok(all($new($idx) == $expected), "Reference counting works"); + diff --git a/Basic/t/ppt-03_name_munging.t b/Basic/t/ppt-03_name_munging.t new file mode 100644 index 000000000..659ffa811 --- /dev/null +++ b/Basic/t/ppt-03_name_munging.t @@ -0,0 +1,26 @@ +# Boilerplate +use strict; +use warnings; + +package My::Foo; +use PDL::LiteF; +use PDL::Parallel::threads qw(retrieve_pdls); +use Test::More; +use Test::Exception; + +sequence(20)->sqrt->share_as('test'); +my $short_name = retrieve_pdls('test'); +my $long_name; +lives_ok { $long_name = retrieve_pdls('My::Foo/test') } 'Retrieving fully ' + . 'resolved name does not croak (that is, they exist)'; +ok(all($short_name == $long_name), 'Regular names get auto-munged with the ' + . 'current package name'); + +sequence(20)->share_as('??foo'); +lives_ok { retrieve_pdls('??foo') } 'Basic retrieval with funny name works'; +throws_ok { + retrieve_pdls('My::Foo/??foo') +} qr/retrieve_pdls could not find data associated with/ +, 'Names with weird characters are not auto-munged'; + +done_testing; diff --git a/Basic/t/ppt-10_physical_piddles.t b/Basic/t/ppt-10_physical_piddles.t new file mode 100644 index 000000000..ed19d8836 --- /dev/null +++ b/Basic/t/ppt-10_physical_piddles.t @@ -0,0 +1,140 @@ +use strict; +use warnings; + +BEGIN { + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # Skip: Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use threads::shared; +use Test::More; +use Test::Exception; +use PDL::LiteF; +use PDL::Parallel::threads qw(retrieve_pdls); + +# This is a somewhat complicated test script. The goals are to test the +# following: +# 1) Can we share data for any data type? +# 2) Does each thread think it succeeded at setting the data? +# 3) Does the end result confirm that each thread changed the data? +# 4) Are we prevented from sharing slices? +# +# Here we allocate shared work space for each PDL data type. We then create +# a collection of threads and have each thread modify the contents of one +# part of the shared memory. +# +# While there, each thread does a number of things. It sets a value in the +# shared memory, it confirms that the now-set value is correct, and it +# builds the hash of expected values from such checks. That last part need +# not be done in the threads explicitly, but it makes it easier to write. :-) +# +# After all the threads return, we check that all the values agree with what +# we expect, which is fairly easy (though not entirely trivial) to construct +# by hand. I encorporate square-roots into the calculations to ensure good +# bit coverage of the tests, at least for the floating point numbers. +# +# The last step simply confirms that sharing slices croaks, a pretty easy +# pair of tests. + +# Allocate workspace with one extra slot (to verify zeroeth element troubles) +my $N_threads = 20; +my %workspaces = ( + c => sequence(byte, $N_threads, 2)->share_as('workspace_c'), + s => sequence(short, $N_threads, 2)->share_as('workspace_s'), + n => sequence(ushort, $N_threads, 2)->share_as('workspace_n'), + l => sequence(long, $N_threads, 2)->share_as('workspace_l'), + q => sequence(longlong, $N_threads, 2)->share_as('workspace_q'), + f => sequence(float, $N_threads, 2)->share_as('workspace_f'), + d => sequence($N_threads, 2)->share_as('workspace_d'), +); + +# Remove longlong if Perl doesn't like longlong types +eval { + pack('q*', 10); +} or do { + delete $workspaces{q}; +}; + +############################################### +# Spawn a bunch of threads that work together # +############################################### + +use PDL::NiceSlice; +my @success : shared; +my @expected : shared; +threads->create(sub { + my $tid = shift; + + my (%expected_hash, %success_hash, %bits_hash); + for my $type_letter (keys %workspaces) { + my $workspace = retrieve_pdls("workspace_$type_letter"); + + # Build this up one thread at a time + $expected_hash{$type_letter} = 1; + + # Have this thread touch one of the values, and have it double-check + # that the value is correctly set + my $tid_plus_1 = double($tid + 1); + my $five = double(5); + $workspace($tid) .= pdl($workspace->type, $tid_plus_1->sqrt + $five->sqrt); + my $to_test = zeros($workspace->type, 1); + $to_test(0) .= pdl($workspace->type, $tid_plus_1->sqrt + $five->sqrt); + $success_hash{$type_letter} + = ($workspace->at($tid,0) == $to_test->at(0)); + } + + # Make sure the results for each type have a space in shared memory + $expected[$tid] = shared_clone(\%expected_hash); + $success[$tid] = shared_clone(\%success_hash); + +}, $_) for 0..$N_threads-1; + +# Reap the threads +for my $thr (threads->list) { + $thr->join; +} + +######################## +# Now test the results # +######################## + +# Do all the threads think that they were successful at setting their value? +is_deeply(\@success, \@expected, 'All threads changed their local values'); +# Do the results of all but the zeroeth element agree with what we expect? + +for my $type_letter (keys %workspaces) { + my $workspace = $workspaces{$type_letter}; + my $type = $workspace->type; + # Allocate the expected results with the proper type + my $expected = zeroes($type, $N_threads, 2); + # Perform the arithmetic using double precision (on the right side of + # this asignment) before down-casting to the workspace's type + $expected .= (zeroes($N_threads, 2)->xvals + 1)->sqrt + pdl(5)->sqrt; + # Perform an exact comparison. The operations may have high bit coverage, + # but they should also be free from bit noise, I hope. + ok(all($workspace == $expected), "Sharing $type ndarrays works") + or diag("Got workspace of $workspace; expected $expected"); +} + +###################################################### +# Test croaking behavior for slices of various kinds # +###################################################### + +# Test what happens when we try to share a slice +my $slice = $workspaces{d}->(2:-3); +throws_ok { + $slice->share_as('slice'); +} qr/share_pdls: Could not share an ndarray under.*because the ndarray does not have any allocated memory/ +, 'Sharing a slice croaks'; + +my $rotation = $workspaces{d}->rotate(5); +throws_ok { + $rotation->share_as('rotation') +} qr/share_pdls: Could not share an ndarray under.*because the ndarray does not have any allocated memory/ +, 'Sharing a rotation (slice) croaks'; + +done_testing(); diff --git a/Basic/t/ppt-11_memory_mapped.t b/Basic/t/ppt-11_memory_mapped.t new file mode 100644 index 000000000..9c8eabddc --- /dev/null +++ b/Basic/t/ppt-11_memory_mapped.t @@ -0,0 +1,45 @@ +use strict; +use warnings; + +BEGIN { + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # Skip: Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use PDL::LiteF; +use PDL::Parallel::threads qw(retrieve_pdls); +use PDL::IO::FastRaw; + +use Test::More; + +my $N_threads = 10; +mapfraw('foo.dat', {Creat => 1, Dims => [$N_threads], Datatype => double}) + ->share_as('workspace'); + +# Spawn a bunch of threads that do the work for us +use PDL::NiceSlice; +threads->create(sub { + my $tid = shift; + my $workspace = retrieve_pdls('workspace'); + $workspace($tid) .= sqrt($tid + 1); +}, $_) for 0..$N_threads-1; + +# Reap the threads +for my $thr (threads->list) { + $thr->join; +} + +my $expected = (sequence($N_threads) + 1)->sqrt; +my $workspace = retrieve_pdls('workspace'); +ok(all($expected == $workspace), 'Sharing memory mapped ndarrays works'); + +END { + # Clean up the testing files + unlink $_ for qw(foo.dat foo.dat.hdr); +} + +done_testing; diff --git a/Basic/t/ppt-20_simd.t b/Basic/t/ppt-20_simd.t new file mode 100644 index 000000000..08f8d65dd --- /dev/null +++ b/Basic/t/ppt-20_simd.t @@ -0,0 +1,90 @@ +use strict; +use warnings; + +BEGIN { + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # Skip: Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use Test::More; +use Test::Warn; + +#use PDL; +use PDL::Parallel::threads::SIMD qw(parallelize parallel_sync parallel_id); + +my $N_threads = 20; +use threads; + +# Test basic croaking behavior for function calls that should not work +warning_is { + my $pid = parallel_id; +} 'Cannot get parallel_id outside of a parallelized block' + , 'parallel_id not allowed outside of parallelize block'; + +warning_is { + parallel_sync; +} 'Cannot call parallel_sync outside of a parallelized block' + , 'parallel_sync not allowed outside of parallelize block'; + +# Create five threads that each spawn five threads +my @after_first_block : shared; +my @after_second_block : shared; +my @pids : shared; +my @recursive_simd_allowed : shared; + +my @workspace : shared; + +parallelize { + # Get the pid and log the presence + my $pid = parallel_id; + $pids[$pid] = 1; + + $workspace[$pid] = $pid + 1; + + # First barrier sync: make sure everybody has updated workspace + parallel_sync; + + # Make sure that the previosu pid set the correct value before we reached + # this point. + my $pid_to_check = $pid - 1; + $pid_to_check = $N_threads - 1 if $pid_to_check < 0; + $after_first_block[$pid] = 1; + $after_first_block[$pid] = 0 + if $workspace[$pid_to_check] != $pid_to_check + 1; + + # Update the workspace value + $workspace[$pid_to_check] = -$pid; + + # Second barrier sync: make sure we could perform the first check and + # the assignment + parallel_sync; + + # Make sure that the newly changed value, from the other thread, is + # correct here. + $pid_to_check = $pid + 1; + $pid_to_check = 0 if $pid_to_check == $N_threads; + $after_second_block[$pid] = 1; + $after_second_block[$pid] = 0 if $workspace[$pid] != -$pid_to_check; + + # Check recursive parallelized block + eval { + parallelize { + my $a = 1; + } 5; + $recursive_simd_allowed[$pid] = 1; + } or do { + $recursive_simd_allowed[$pid] = 0; + }; + +} $N_threads; + +my @expected = (1) x $N_threads; +is_deeply(\@after_first_block, \@expected, 'First barrier synchronization works'); +is_deeply(\@after_second_block, \@expected, 'Second barrier synchronization works'); +@expected = (0) x $N_threads; +is_deeply(\@recursive_simd_allowed, \@expected, 'Recursive paralleliztion not (yet) allowed'); + +done_testing; diff --git a/Basic/t/ppt-30_sharing_from_threads.t b/Basic/t/ppt-30_sharing_from_threads.t new file mode 100644 index 000000000..15a1c97e2 --- /dev/null +++ b/Basic/t/ppt-30_sharing_from_threads.t @@ -0,0 +1,90 @@ +use strict; +use warnings; + +BEGIN { + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # Skip: Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +# Tests if the threads can create data and share amongst themselves + +use Test::More; +use Test::Exception; + +use PDL::LiteF; +use PDL::Parallel::threads qw(retrieve_pdls); +use PDL::Parallel::threads::SIMD qw(parallelize parallel_id parallel_sync); + +# Run the parallel block in which the threads create and share each other's +# data +my $N_threads = 5; +my @data_is_correct : shared; +my @could_get_data : shared; +my @bad_is_correct : shared; +parallelize { + my $pid = parallel_id; + + # Create data that is unique to this thread + my $pdl = ones(10) * $pid; + $pdl->share_as("data$pid"); + my $bad = ones(cfloat, 5); + $bad->badvalue(17); + $bad->setbadat(2); + $bad->share_as("bad$pid"); + + # We will get the data from the *previous* thread (modulo the number of + # threads, of course: circular boundary conditions) + my $thread_to_grab = $pid - 1; + $thread_to_grab = $N_threads - 1 if $pid == 0; + + # Synchronize; make sure all the threads have had a chance to create + # their data + parallel_sync; + + # This should be in an eval block in case the data pull fails + eval { + # Pull in the data: + my $to_test = retrieve_pdls("data$thread_to_grab"); + $could_get_data[$pid] = 1; + + # Make sure it's what we expected + $data_is_correct[$pid] = all($to_test == $thread_to_grab)->sclr + or diag("For thread $pid, expected ${thread_to_grab}s but got $to_test"); + + $to_test = retrieve_pdls("bad$thread_to_grab"); + my $isbad = $to_test->isbad; + $bad_is_correct[$pid] = all($isbad == pdl(0,0,1,0,0))->sclr || diag "got=$isbad\nto_test=$to_test"; + + 1; + } or do { + diag("data pull for pid $pid failed: $@"); + $could_get_data[$pid] = 0; + $data_is_correct[$pid] = 0; + $bad_is_correct[$pid] = 0; + }; + +} $N_threads; + +my @expected = (1) x $N_threads; +is_deeply(\@could_get_data, \@expected, + 'Threads could access data created by sibling threads') + or diag("expected all 1s, actually got @could_get_data"); +is_deeply(\@data_is_correct, \@expected, + 'Data created by sibling threads worked correctly') + or diag("expected all 1s, actually got @data_is_correct"); +is_deeply(\@bad_is_correct, \@expected, + 'Data created by sibling threads badflags survived correctly') + or diag("expected all 1s, actually got @data_is_correct"); + +# Make sure the retrieval causes a croak +for (1..$N_threads-1) { + throws_ok { + retrieve_pdls("data$_") + } qr/was created in a thread that has ended or is detached/ + , "Retrieving shared data created by already-terminated thread $_ croaks"; +} + +done_testing(); diff --git a/Basic/threads.pm b/Basic/threads.pm new file mode 100644 index 000000000..1ac2ec82f --- /dev/null +++ b/Basic/threads.pm @@ -0,0 +1,633 @@ +package PDL::Parallel::threads; + +use strict; +use warnings; +use Carp; +use PDL::LiteF; +use PDL::Exporter; +our $VERSION = '0.07'; + +our @ISA = ( 'PDL::Exporter' ); +our @EXPORT_OK = qw(share_pdls retrieve_pdls free_pdls); +our %EXPORT_TAGS = (Func=>\@EXPORT_OK); + +my $can_use_threads; +BEGIN { + $can_use_threads = eval { + require threads; + threads->import(); + require threads::shared; + threads::shared->import(); + 1; + }; +} + +# These are the means by which we share data across Perl threads. Note that +# we cannot share ndarrays directly across threads, but we can share arrays +# of scalars, scalars whose integer values are the pointers to ndarray data, +# etc. +my %datasv_pointers :shared; +my %dataref_svs; +my %dim_arrays :shared; +my %types :shared; +my %badflag :shared; +my %badvalue :shared; +my %nbytes :shared; +my %originating_tid :shared; + +# PDL data should not be naively copied by Perl. Tell PDL we know about this +$PDL::no_clone_skip_warning = 1; + +sub auto_package_name { + my $name = shift; + my ($package_name) = caller(1); + $name = "$package_name/$name" if $name =~ /^\w+$/; + return $name; +} + +sub share_pdls { + croak("share_pdls: expected key/value pairs") + unless @_ % 2 == 0; + my %to_store = @_; + + while (my ($name, $to_store) = each %to_store) { + $name = auto_package_name($name); + + # Make sure we're not overwriting already shared data + if (exists $datasv_pointers{$name}) { + croak("share_pdls: you already have data associated with '$name'"); + } + + if ( eval{$to_store->isa("PDL")} ) { + # Integers (which can be cast to and from + # pointers) are easily shared using threads::shared + # in a shared hash. This method provides a + # way to obtain the pointer to the datasv for + # the incoming ndarray, and it increments the + # SV's refcount. + $dataref_svs{$name} = eval{ + croak("the ndarray does not have any allocated memory\n") + if !$to_store->allocated; + $to_store->get_dataref; + }; + if ($@) { + my $error = $@; + chomp $error; + delete $datasv_pointers{$name}; + croak('share_pdls: Could not share an ndarray under ' + . "name '$name' because $error"); + } + $datasv_pointers{$name} = 0+$dataref_svs{$name}; + $to_store->set_donttouchdata($nbytes{$name} = $to_store->nbytes); # protect its memory + if ($can_use_threads) { + $dim_arrays{$name} = shared_clone([$to_store->dims]); + $originating_tid{$name} = threads->tid; + } + else { + $dim_arrays{$name} = [$to_store->dims]; + } + $types{$name} = $to_store->get_datatype; + $badflag{$name} = $to_store->badflag; + my $badval = $to_store->badvalue->sclr; + $badval = shared_clone([$badval->Re,$badval->Im]) if ref $badval ; + $badvalue{$name} = $badval; + } + else { + croak("share_pdls passed data under '$name' that it doesn't " + . "know how to store"); + } + } +} + + + +# Frees the memory associated with the given names. +sub free_pdls { + # Keep track of each name that is successfully freed + my @removed; + + for my $short_name (@_) { + my $name = auto_package_name($short_name); + + # If it's a regular ndarray, decrement the memory's refcount + if (exists $datasv_pointers{$name}) { + delete $dataref_svs{$name}; + delete $datasv_pointers{$name}; + delete $dim_arrays{$name}; + delete $types{$name}; + delete $badflag{$name}; + delete $badvalue{$name}; + delete $nbytes{$name}; + delete $originating_tid{$name}; + push @removed, $name; + } + # If its none of the above, indicate that we didn't free anything + else { + push @removed, ''; + } + } + + return @removed; +} + +# PDL method to share an individual ndarray +sub PDL::share_as { + my ($self, $name) = @_; + share_pdls(auto_package_name($name) => $self); + return $self; +} + +# Method to get an ndarray that points to the shared data associated with the +# given name(s). +sub retrieve_pdls { + return if @_ == 0; + + my @to_return; + for my $short_name (@_) { + my $name = auto_package_name($short_name); + + if (exists $datasv_pointers{$name}) { + # Make sure that the originating thread still exists, or the + # data will be gone. + if ($can_use_threads and $originating_tid{$name} > 0 + and not defined (threads->object($originating_tid{$name})) + ) { + croak("retrieve_pdls: '$name' was created in a thread that " + . "has ended or is detached"); + } + + # Create the new thinly wrapped ndarray + my $new_ndarray = PDL->new_around_datasv($datasv_pointers{$name}); + $new_ndarray->set_datatype($types{$name}); + $new_ndarray->badflag($badflag{$name}); + $new_ndarray->badvalue(ref $badvalue{$name} + ? Math::Complex->make(@{$badvalue{$name}}) + : $badvalue{$name}); + $new_ndarray->setdims(\@{$dim_arrays{$name}}); + $new_ndarray->set_donttouchdata($nbytes{$name}); # protect its memory + push @to_return, $new_ndarray; + } + else { + croak("retrieve_pdls could not find data associated with '$name'"); + } + } + + # In list context, return all the ndarrays + return @to_return if wantarray; + + # Scalar context only makes sense if they asked for a single name + return $to_return[0] if @_ == 1; + + # We're here if they asked for multiple names but assigned the result + # to a single scalar, which is probably not what they meant: + carp("retrieve_pdls: requested many ndarrays... in scalar context?"); + return $to_return[0]; +} + +1; + +__END__ + +=head1 NAME + +PDL::Parallel::threads - sharing PDL data between Perl threads + +=head1 SYNOPSIS + + use PDL; + use PDL::Parallel::threads qw(retrieve_pdls share_pdls); + + # Technically, this is pulled in for you by PDL::Parallel::threads, + # but using it in your code pulls in the named functions like async. + use threads; + + # Also, technically, you can use PDL::Parallel::threads with + # single-threaded programs, and even with perl's not compiled + # with thread support. + + # Create some shared PDL data + zeroes(1_000_000)->share_as('My::shared::data'); + + # Create an ndarray and share its data + my $test_data = sequence(100); + share_pdls(some_name => $test_data); # allows multiple at a time + $test_data->share_as('some_name'); # or use the PDL method + + # Kick off some processing in the background + async { + my ($shallow_copy) + = retrieve_pdls('some_name'); + + # thread-local memory + my $other_ndarray = sequence(20); + + # Modify the shared data: + $shallow_copy++; + }; + + # ... do some other stuff ... + + # Rejoin all threads + for my $thr (threads->list) { + $thr->join; + } + + use PDL::NiceSlice; + print "First ten elements of test_data are ", + $test_data(0:9), "\n"; + +=head1 DESCRIPTION + +This module provides a means to share PDL data between different Perl +threads. In contrast to PDL's posix thread support (see L), +this module lets you work with Perl's built-in threading model. In contrast +to Perl's L, this module focuses on sharing I, not +I. + +Because this module focuses on sharing data, not variables, it does not use +attributes to mark shared variables. Instead, you must explicitly share your +data by using the L function or L PDL method that this +module introduces. Those both associate a name with your data, which you use +in other threads to retrieve the data with the L. Once your +thread has access to the ndarray data, any modifications will operate directly +on the shared memory, which is exactly what shared data is supposed to do. +When you are completely done using a piece of data, you need to explicitly +remove the data from the shared pool with the L function. +Otherwise your data will continue to consume memory until the originating +thread terminates, or put differently, you will have a memory leak. + +This module lets you share two sorts of ndarray data. You can share data for +an ndarray that is based on actual I, such as the result of +L. You can also share data using I files. +(Note: PDL v2.4.11 and higher support memory mapped ndarrays on all major +platforms, including Windows.) There are other sorts of ndarrays whose data +you cannot share. You cannot directly share ndarrays that have not +been physicalised, though a simple L, +L, or L will give you an ndarray +based on physical memory that you can share. Also, certain functions +wrap external data into ndarrays so you can manipulate them with PDL methods. +For example, see L and +L. These you cannot share directly, but +making a physical copy with L will give you +something that you can safely share. + +=head2 Physical Memory + +The mechanism by which this module achieves data sharing of physical memory +is remarkably cheap. It's even cheaper then a simple affine transformation. +The sharing works by creating a new shell of an ndarray for each call to +L and setting that ndarray's memory structure to point back to +the same locations of the original (shared) ndarray. This means that you can +share ndarrays that are created with standard constructors like +L, L, and L, or which +are the result of operations and function evaluations for which there is no +data flow, such as L (but not L), arithmetic, +L, and L. When in doubt, C your +ndarray before sharing and everything should work. + +There is an important nuance to sharing physical memory: The memory will +always be freed when the originating thread terminates, even if it terminated +cleanly. This can lead to segmentation faults when one thread exits and +frees its memory before another thread has had a chance to finish +calculations on the shared data. It is best to use barrier synchronization +to avoid this (via L), or to share data solely +from your main thread. + +=head2 Memory Mapped Data + +As of 0.07, data sharing of memory-mapped ndarrays is exactly the +same as any other. It has not been tested with L-mapped +ndarrays. + +=head2 Package and Name Munging + +C lets you associate your data with a specific text +name. Put differently, it provides a global namespace for data. Users of the +C programming language will immediately notice that this means there is +plenty of room for developers using this module to choose the same name for +their data. Without some combination of discipline and help, it would be +easy for shared memory names to clash. One solution to this would be to +require users (i.e. you) to choose names that include their current package, +such as C or, following L, +C instead of just C. This is sometimes +called name mangling. Well, I decided that this is such a good idea that +C does the second form of name mangling for you +automatically! Of course, you can opt out, if you wish. + +The basic rules are that the package name is prepended to the name of the +shared memory as long as the name is only composed of word characters, i.e. +names matching C. Here's an example demonstrating how this works: + + package Some::Package; + use PDL; + use PDL::Parallel::threads 'retrieve_pdls'; + + # Stored under '??foo' + sequence(20)->share_as('??foo'); + + # Shared as 'Some::Package/foo' + zeroes(100)->share_as('foo'); + + sub do_something { + # Retrieve 'Some::Package/foo' + my $copy_of_foo = retrieve_pdls('foo'); + + # Retrieve '??foo': + my $copy_of_weird_foo = retrieve_pdls('??foo'); + + # ... + } + + # Move to a different package: + package Other::Package; + use PDL::Parallel::threads 'retrieve_pdls'; + + sub something_else { + # Retrieve 'Some::Package/foo' + my $copy_of_foo = retrieve_pdls('Some::Package/foo'); + + # Retrieve '??foo': + my $copy_of_weird_foo = retrieve_pdls('??foo'); + + # ... + } + +The upshot of all of this is that if you use some module that also uses +C, namespace clashes are highly unlikely to occur +as long as you (and the author of that other module) use simple names, +like the sort of thing that works for variable names. + +=head1 FUNCTIONS + +This module provides three stand-alone functions and adds one new PDL method. + +=head2 share_pdls + +=for ref + +Shares ndarray data across threads using the given names. + +=for usage + + share_pdls (name => ndarray, name => ndarray, ...) + +This function takes key/value pairs where the value is the ndarray to store, +and the key is the name under which to store +the ndarray. You can later retrieve the memory +with the L method. + +Sharing an ndarray with physical memory (or that is memory-mapped) +increments the data's reference count; +you can decrement the reference count by calling L on the given +C. In general this ends up doing what you mean, and freeing memory +only when you are really done using it. + +=for example + + my $data1 = zeroes(20); + my $data2 = ones(30); + share_pdls(foo => $data1, bar => $data2); + +This can be combined with constructors and fat commas to allocate a +collection of shared memory that you may need to use for your algorithm: + + share_pdls( + main_data => zeroes(1000, 1000), + workspace => zeroes(1000), + reduction => zeroes(100), + ); + +=for bad + +C preserves the badflag and badvalue on ndarrays. + +=head2 share_as + +=for ref + +Method to share an ndarray's data across threads under the given name. + +=for usage + + $pdl->share_as(name) + +This PDL method lets you directly share an ndarray. It does the exact same +thing as L, but its invocation is a little different: + +=for example + + # Directly share some constructed memory + sequence(20)->share_as('baz'); + + # Share individual ndarrays: + my $data1 = zeroes(20); + my $data2 = ones(30); + $data1->share_as('foo'); + $data2->share_as('bar'); + +Like many other PDL methods, this method returns the just-shared ndarray. +This can lead to some amusing ways of storing partial calculations partway +through a long chain: + + my $results = $input->sumover->share_as('pre_offset') + $offset; + + # Now you can get the result of the sumover operation + # before that offset was added, by calling: + my $pre_offset = retrieve_pdls('pre_offset'); + +This function achieves the same end as L: There's More Than One +Way To Do It, because it can make for easier-to-read code. In general I +recommend using the C method when you only need to share a single +ndarray memory space. + +=for bad + +C preserves the badflag and badvalue on ndarrays. + +=head2 retrieve_pdls + +=for ref + +Obtain ndarrays providing access to the data shared under the given names. + +=for usage + + my ($copy1, $copy2, ...) = retrieve_pdls (name, name, ...) + +This function takes a list of names and returns a list of ndarrays that +provide access to the data shared under those names. In scalar context the +function returns the ndarray corresponding with the first named data set, +which is usually what you mean when you use a single name. If you specify +multiple names but call it in scalar context, you will get a warning +indicating that you probably meant to say something differently. + +=for example + + my $local_copy = retrieve_pdls('foo'); + my @both_ndarrays = retrieve_pdls('foo', 'bar'); + my ($foo, $bar) = retrieve_pdls('foo', 'bar'); + +=for bad + +C preserves the badflag and badvalue on ndarrays. + +=head2 free_pdls + +=for ref + +Frees the shared memory (if any) associated with the named shared data. + +=for usage + + free_pdls(name, name, ...) + +This function marks the memory associated with the given names as no longer +being shared, handling all reference counting and other low-level stuff. +You generally won't need to worry about the return value. But if you care, +you get a list of values---one for each name---where a successful removal +gets the name and an unsuccessful removal gets an empty string. + +So, if you say C and both removals were +successful, you will get C<('name1', 'name2')> as the return values. If +there was trouble removing C (because there is no memory associated +with that name), you will get C<('', 'name2')> instead. This means you +can handle trouble with perl Cs and other conditionals: + + my @to_remove = qw(name1 name2 name3 name4); + my @results = free_pdls(@to_remove); + if (not grep {$_ eq 'name2'} @results) { + print "That's weird; did you remove name2 already?\n"; + } + if (not $results[2]) { + print "Couldn't remove name3 for some reason\n"; + } + +=for bad + +This function simply removes an ndarray's memory from the shared pool. It +does not interact with bad values in any way. But then again, it does not +interfere with or screw up bad values, either. + +=head1 DIAGNOSTICS + +=over + +=item C<< share_pdls: expected key/value pairs >> + +You called C with an odd number of arguments, which means that +you could not have supplied key/value pairs. Double-check that every ndarray +(or filename) that you supply is preceded by its shared name. + +=item C<< share_pdls: you already have data associated with '$name' >> + +You tried to share some data under C<$name>, but some data is already +associated with that name. Typo? You can avoid namespace clashes with other +modules by using simple names and letting C mangle +the name internally for you. + +=item C<< share_pdls: Could not share an ndarray under name '$name' because ... >> + +=over + +=item C<< ... the ndarray does not have any allocated memory. >> + +You tried to share an ndarray that does not have any memory associated with it. + +=item C<< ... the ndarray's data does not come from the datasv. >> + +You tried to share an ndarray that has a funny internal structure, in which +the data does not point to the buffer portion of the datasv. I'm not sure +how that could happen without triggering a more specific error, so I hope +you know what's going on if you get this. :-) + +=back + +=item C<< share_pdls passed data under '$name' that it doesn't know how to +store >> + +C only knows how to store raw data +ndarrays. It'll croak if you try to share other kinds of ndarrays, and it'll +throw this error if you try to share anything else, like a hashref. + +=item C<< retrieve_pdls: '$name' was created in a thread that has ended or +is detached >> + +In some other thread, you added some data to the shared pool. If that thread +ended without you freeing that data (or the thread has become a detached +thread), then we cannot know if the data is available. You should always +free your data from the data pool when you're done with it, to avoid this +error. + +=item C<< retrieve_pdls could not find data associated with '$name' >> + +Pretty simple: either data has never been added under this name, or data +under this name has been removed. + +=item C<< retrieve_pdls: requested many ndarrays... in scalar context? >> + +This is just a warning. You requested multiple ndarrays (sent multiple names) +but you called the function in scalar context. Why do such a thing? + +=back + +=head1 LIMITATIONS + +You cannot share memory mapped files that require +features of L. That is a cool module that lets you pack +multiple ndarrays into a single file, but simple cross-thread sharing is not +trivial and is not (yet) supported. + +If you are dealing +with a physical ndarray, you have to be a bit careful +about how the memory gets freed. If you don't call C on the data, +it will persist in memory until the end of the originating thread, which +means you have a classic memory leak. If another thread +creates a thread-local copy of the data before the originating thread ends, +but then tries to access the data after the originating thread ends, +this will be fine as the reference count of the C will have +been increased. + +=head1 BUGS + +None known at this point. + +=head1 SEE ALSO + +L, L, L, L, L, +L + +=head1 AUTHOR, COPYRIGHT, LICENSE + +This module was written by David Mertens. The documentation is copyright (C) +David Mertens, 2012. The source code is copyright (C) Northwestern University, +2012. All rights reserved. + +This module is distributed under the same terms as Perl itself. + +=head1 DISCLAIMER OF WARRANTY + +Parallel computing is hard to get right, and it can be exacerbated by errors +in the underlying software. Please do not use this software in anything that +is mission-critical unless you have tested and verified it yourself. I cannot +guarantee that it will perform perfectly under all loads. I hope this is +useful and I wish you well in your usage thereof, but BECAUSE THIS SOFTWARE +IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE SOFTWARE, TO THE +EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING +THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE SOFTWARE "AS IS" +WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT +NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +PARTICULAR PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE +SOFTWARE IS WITH YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE, YOU ASSUME THE +COST OF ALL NECESSARY SERVICING, REPAIR, OR CORRECTION. + +IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL +ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE +THE SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE SOFTWARE (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU +OR THIRD PARTIES OR A FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER +SOFTWARE), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +=cut diff --git a/Changes b/Changes index fc27e9561..9555adf17 100644 --- a/Changes +++ b/Changes @@ -19,6 +19,7 @@ - replace CallCopy mechanism with initialize called also as instance method - drop WITH_HDF, WITH_GD, WITH_DEVEL_REPL config; just try building - removed PDL::Config mechanism, leave vestigial file for back-compat and use env vars for libs/incs +- incorporate PDL::Parallel::threads 2.093 2024-09-29 - PDL.set_datatype now doesn't physicalise input, PDL.convert_type does diff --git a/MANIFEST b/MANIFEST index 41bdabf88..e6489a752 100644 --- a/MANIFEST +++ b/MANIFEST @@ -58,6 +58,10 @@ Basic/examples/InlinePdlpp/inlppminimal.pl Basic/examples/InlinePdlpp/Module/Makefile.PL Basic/examples/InlinePdlpp/Module/MyInlineMod.pm Basic/examples/InlinePdlpp/Module/t/myinlinemod.t +Basic/examples/PDL-Parallel-threads/barrier-sync.pl +Basic/examples/PDL-Parallel-threads/perl-barrier.pl +Basic/examples/PDL-Parallel-threads/simple-parallelize.pl +Basic/examples/PDL-Parallel-threads/test-memory-consumption.pl Basic/Gen/Inline/Makefile.PL Basic/Gen/Inline/MakePdlppInstallable.pm Basic/Gen/Inline/Pdlpp.pm @@ -203,6 +207,7 @@ Basic/Primitive/Makefile.PL Basic/Primitive/primitive.pd Basic/Primitive/xoshiro256plus.c Basic/Reduce.pm +Basic/SIMD.pm Basic/Slices/Makefile.PL Basic/Slices/slices.pd Basic/SourceFilter/Changes @@ -242,6 +247,13 @@ Basic/t/pdl_from_string.t Basic/t/pdlchar.t Basic/t/pp_croaking.t Basic/t/pp_line_numbers.t +Basic/t/ppt-01_ref_counting.t +Basic/t/ppt-02_non_threaded.t +Basic/t/ppt-03_name_munging.t +Basic/t/ppt-10_physical_piddles.t +Basic/t/ppt-11_memory_mapped.t +Basic/t/ppt-20_simd.t +Basic/t/ppt-30_sharing_from_threads.t Basic/t/primitive-append.t Basic/t/primitive-clip.t Basic/t/primitive-interpolate.t @@ -261,6 +273,7 @@ Basic/t/subclass.t Basic/t/thread.t Basic/t/thread_def.t Basic/t/ufunc.t +Basic/threads.pm Basic/Ufunc/Makefile.PL Basic/Ufunc/ufunc.pd Basic/utils/address-pseudonymise diff --git a/Makefile.PL b/Makefile.PL index 9550473a3..e168db199 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -154,7 +154,7 @@ $text .= "\n" . ::coretarget($self); my $coretest = join ' ', map File::Spec->catfile('t', $_.'.t'), qw( 01-pptest autoload bad basic bool clump constructor core croak lvalue math matrix matrixops nat_complex ops-bitwise ops pdl_from_string - pdlchar pp_croaking pp_line_numbers primitive-* pthread reduce + pdlchar pp_croaking pp_line_numbers primitive-* ppt-* pthread reduce slice subclass thread thread_def ufunc ); $text .= <