diff --git a/src/patches/clawpatch/fclaw2d_clawpatch_output_vtk.c b/src/patches/clawpatch/fclaw2d_clawpatch_output_vtk.c index 5d07be6f0..9ef5e74dc 100644 --- a/src/patches/clawpatch/fclaw2d_clawpatch_output_vtk.c +++ b/src/patches/clawpatch/fclaw2d_clawpatch_output_vtk.c @@ -31,6 +31,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define PATCH_DIM 2 #endif +/** Round up quotient x / y. */ +#define FCLAW_VTK_CEIL(x, y) (((x) + (y) - 1) / (y)) /**< round up the quotient + x / y; this is a local + macro used for + computations regarding + the VTK patch buffering */ + #if REFINE_DIM == 2 && PATCH_DIM == 2 #include @@ -59,6 +66,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include +#include typedef struct fclaw2d_vtk_state { @@ -91,6 +99,23 @@ typedef struct fclaw2d_vtk_state MPI_Offset mpibegin; #endif char *buf; + + /* The following three struct elements are dedicated to manage buffering + * patches. + * We track the number of buffered patches in \b num_buffered_patches and + * if the buffer contains more than \b patch_threshold many patches, the + * patches are written to disk and the buffer is flushed. + * The variable \b num_buffered_patches relates to the number of buffered + * patches during one fclaw2d_vtk_write_field call. + * If \b patch_threshold is -1, there are no intermediate writes but all + * patches during a fclaw2d_vtk_write_field are buffered and then written + * to the disk. + */ + sc_io_sink_t *sink; /**< the sink to manage the patches buffer */ + int num_buffered_patches; /**< the number of patches buffered during the + current fclaw2d_vtk_write_field call */ + int patch_threshold; /**< maximal number of patches that are buffered during + a call of fclaw2d_vtk_write_field */ } fclaw2d_vtk_state_t; @@ -194,28 +219,71 @@ fclaw2d_vtk_write_header (fclaw2d_domain_t * domain, fclaw2d_vtk_state_t * s) return retval ? -1 : 0; } -/** - * @brief Write the buffer to file +/** This function adds to a buffer and writes to file if a threshold is exceeded. + * + * This function assumes that the buffer consists of patch data of the size + * \b psize_field. This means it must hold + * \b s->sink->buffer_bytes % \b s->sink->buffer_bytes == 0. * - * @param s the vtk state - * @param psize_field the size of the buffer + * \param [in,out] s The VTK state. + * \param [in] psize_field The number of bytes, which are intended to + * add to the buffer. + * \param [in] threshold If the number of patches, which are already + * stored in the buffer + 1 is greater than + * \b threshold, the buffer is flushed to disk. + * Then the new patch is added to the buffer. + * -1 means that there is no threshold and + * the data just added to \b s->sink. */ static void -write_buffer (fclaw2d_vtk_state_t * s, int64_t psize_field) +add_to_buffer (fclaw2d_vtk_state_t * s, int64_t psize_field) { -#ifndef P4EST_ENABLE_MPIIO - size_t retvalz; + FCLAW_ASSERT (s != NULL); - retvalz = fwrite (s->buf, psize_field, 1, s->file); - SC_CHECK_ABORT (retvalz == 1, "VTK file write failed"); -#else int mpiret; +#ifdef P4EST_ENABLE_MPIIO MPI_Status mpistatus; +#else + size_t retvalz; +#endif + FCLAW_ASSERT (s->patch_threshold == -1 || s->patch_threshold > 0); + FCLAW_ASSERT (((int) s->sink->buffer_bytes) % psize_field == 0); - mpiret = MPI_File_write (s->mpifile, s->buf, psize_field, MPI_BYTE, - &mpistatus); - SC_CHECK_MPI (mpiret); + if (s->patch_threshold != -1 + && (s->num_buffered_patches + 1 > s->patch_threshold)) + { + FCLAW_ASSERT ((int) s->sink->buffer_bytes >= psize_field); + FCLAW_ASSERT (((int) s->sink->buffer_bytes) / psize_field == + s->num_buffered_patches); + /* buffer threshold exceeded */ + /* write the current buffer to disk */ +#ifdef P4EST_ENABLE_MPIIO + if (s->num_buffered_patches > 0) + { + mpiret = + MPI_File_write_all (s->mpifile, s->sink->buffer->array, + (int) s->sink->buffer_bytes, MPI_BYTE, + &mpistatus); + SC_CHECK_MPI (mpiret); + } +#else + retvalz = + fwrite (s->sink->buffer->array, s->sink->buffer_bytes, 1, + s->file); + SC_CHECK_ABORT (retvalz == 1, "VTK file write failed"); #endif + /* reset the sink */ + mpiret = sc_io_sink_complete (s->sink, NULL, NULL); + SC_CHECK_ABORT (mpiret == 0, "VTK buffer completion failed"); + /* reset the buffer */ + sc_array_reset (s->sink->buffer); + s->sink->buffer_bytes = 0; + s->num_buffered_patches = 0; + } + + mpiret = sc_io_sink_write (s->sink, s->buf, (size_t) psize_field); + SC_CHECK_ABORT (mpiret == 0, "VTK buffer write failed"); + ++s->num_buffered_patches; } static void @@ -226,7 +294,7 @@ write_position_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch, fclaw2d_vtk_state_t *s = (fclaw2d_vtk_state_t *) g->user; s->coordinate_cb (g->glob, patch, blockno, patchno, s->buf); - write_buffer (s, s->psize_position); + add_to_buffer (s, s->psize_position); } static void @@ -322,7 +390,7 @@ write_connectivity_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch, } #endif } - write_buffer (s, s->psize_connectivity); + add_to_buffer (s, s->psize_connectivity); } static void @@ -354,7 +422,7 @@ write_offsets_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch, *idata++ = k; } } - write_buffer (s, s->psize_offsets); + add_to_buffer (s, s->psize_offsets); } static void @@ -374,7 +442,7 @@ write_types_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch, *cdata++ = 12; #endif } - write_buffer (s, s->psize_types); + add_to_buffer (s, s->psize_types); } static void @@ -390,7 +458,7 @@ write_mpirank_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch, { *idata++ = domain->mpirank; } - write_buffer (s, s->psize_mpirank); + add_to_buffer (s, s->psize_mpirank); } static void @@ -406,7 +474,7 @@ write_blockno_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch, { *idata++ = blockno; } - write_buffer (s, s->psize_blockno); + add_to_buffer (s, s->psize_blockno); } static void @@ -437,7 +505,7 @@ write_patchno_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch, *idata++ = gpno; } } - write_buffer (s, s->psize_patchno); + add_to_buffer (s, s->psize_patchno); } static void @@ -449,7 +517,7 @@ write_meqn_cb (fclaw2d_domain_t * domain, fclaw2d_patch_t * patch, fclaw2d_vtk_state_t *s = (fclaw2d_vtk_state_t *) g->user; s->value_cb (g->glob, patch, blockno, patchno, s->buf); - write_buffer (s, s->psize_meqn); + add_to_buffer (s, s->psize_meqn); } static void @@ -460,9 +528,12 @@ fclaw2d_vtk_write_field (fclaw2d_global_t * glob, fclaw2d_vtk_state_t * s, fclaw2d_domain_t *domain = glob->domain; int64_t bcount; + sc_array_t buffer; #ifndef P4EST_ENABLE_MPIIO size_t retvalz; #else + int i; + int max_num_writes, local_num_writes; int mpiret; MPI_Offset mpipos; #ifdef P4EST_ENABLE_DEBUG @@ -472,6 +543,12 @@ fclaw2d_vtk_write_field (fclaw2d_global_t * glob, fclaw2d_vtk_state_t * s, #endif s->buf = P4EST_ALLOC (char, psize_field); + /* The buffer array will be resized when required. */ + sc_array_init_size (&buffer, 1, psize_field); + s->sink = sc_io_sink_new (SC_IO_TYPE_BUFFER, SC_IO_MODE_APPEND, + SC_IO_ENCODE_NONE, &buffer); + FCLAW_ASSERT (s->num_buffered_patches == 0); + s->num_buffered_patches = 0; #ifdef P4EST_ENABLE_MPIIO mpipos = s->mpibegin + offset_field; if (domain->mpirank > 0) @@ -495,12 +572,59 @@ fclaw2d_vtk_write_field (fclaw2d_global_t * glob, fclaw2d_vtk_state_t * s, retvalz = fwrite (&bcount, s->ndsize, 1, s->file); SC_CHECK_ABORT (retvalz == 1, "VTK file write failed"); #else - mpiret = MPI_File_write (s->mpifile, &bcount, 1, MPI_LONG, &mpistatus); + mpiret = + MPI_File_write (s->mpifile, &bcount, 1, MPI_LONG, &mpistatus); SC_CHECK_MPI (mpiret); #endif } fclaw2d_global_iterate_patches (glob, cb, s); + +#ifdef P4EST_ENABLE_MPIIO + if (s->num_buffered_patches > 0 || s->patch_threshold == -1) + { + /* write the remaining buffered bytes */ + mpiret = + MPI_File_write_all (s->mpifile, buffer.array, + (int) s->sink->buffer_bytes, MPI_BYTE, + &mpistatus); + SC_CHECK_MPI (mpiret); + } + + if (s->patch_threshold != -1) + { + /* Ensure that the collective function MPI_File_write_all is called + * equally often on each rank. + */ + max_num_writes = + FCLAW_VTK_CEIL (glob->domain->local_max_patches, + s->patch_threshold); + local_num_writes = + FCLAW_VTK_CEIL (glob->domain->local_num_patches, + s->patch_threshold); + FCLAW_ASSERT (max_num_writes - local_num_writes >= 0); + + for (i = 0; i < max_num_writes - local_num_writes; ++i) + { + /* This rank has less patches than the rank that holds the maximal + * number of patches. We compensate this by empty write calls to avoid + * a deadlock. + */ + mpiret = + MPI_File_write_all (s->mpifile, buffer.array, 0, MPI_BYTE, + &mpistatus); + SC_CHECK_MPI (mpiret); + } + } +#else + retvalz = fwrite (buffer.array, s->sink->buffer_bytes, 1, s->file); + SC_CHECK_ABORT (retvalz == 1, "VTK file write failed"); +#endif + + /* free buffers */ + sc_array_reset (&buffer); + sc_io_sink_destroy (s->sink); P4EST_FREE (s->buf); + s->num_buffered_patches = 0; #ifdef P4EST_ENABLE_MPIIO #ifdef P4EST_ENABLE_DEBUG @@ -662,6 +786,14 @@ fclaw2d_vtk_write_file (fclaw2d_global_t * glob, const char *basename, s->offset_end = s->ndsize + s->offset_meqn + s->psize_meqn * domain->global_num_patches; + s->buf = NULL; + s->sink = NULL; + /* The threshold may be adjusted; see the documentation of + * fclaw2d_vtk_state_t for further information. + */ + s->patch_threshold = -1; + s->num_buffered_patches = 0; + /* write header meta data and check for error */ retval = 0; if (domain->mpirank == 0)