Skip to content

Commit

Permalink
Control readers.ept thread queue more carefully to prevent it from fi…
Browse files Browse the repository at this point in the history
…lling too fast (PDAL#4359)

* Make sure we don't fill the queue between readers and the main thread with too many points.
Close PDAL#4357

* Remove garbage

---------

Co-authored-by: Andrew Bell <[email protected]>
  • Loading branch information
hobu and abellgithub authored Mar 15, 2024
1 parent ecf5a55 commit 4f63a07
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions io/EptReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ void EptReader::handleOriginQuery()
{
BOX3D q(toBox3d(found.at("bounds")));
// Bloat the query bounds slightly (we'll choose one tick of the EPT's
// scale) to make sure we don't miss any points due to them being
// scale) to make sure we don't miss any points due to them being
// precisely on the bounds edge.
q.grow(
(std::max)(
Expand Down Expand Up @@ -553,6 +553,8 @@ void EptReader::addDimensions(PointLayoutPtr layout)
// stick the tile on the queue and notify the main thread.
void EptReader::load(const ept::Overlap& overlap)
{
using namespace std::chrono_literals;

m_p->pool->add([this, overlap]()
{
// Read the tile.
Expand All @@ -571,9 +573,23 @@ void EptReader::load(const ept::Overlap& overlap)
// Put the tile on the output queue. Note that if the tile has
// an error and ignoreUnreadable isn't set, this will be fatal
// but that will occur downstream outside of this pool thread.
std::unique_lock<std::mutex> l(m_p->mutex);
m_p->contents.push(std::move(tile));
l.unlock();

// Loop to push the tile on the queue until there is room.
while (true)
{
{
std::lock_guard<std::mutex> l(m_p->mutex);
if (m_p->contents.size() < m_p->pool->numThreads())
{
m_p->contents.push(std::move(tile));
break;
}
}
// No room on queue, sleep. Could do a condition variable but that's
// more complex and probably makes no difference in most cases where
// this would come up.
std::this_thread::sleep_for(50ms);
}
}
else
{
Expand All @@ -597,17 +613,17 @@ void EptReader::ready(PointTableRef table)
m_pointIdDim = table.layout()->findDim("EptPointId");

if (
m_queryOriginId != -1 &&
m_queryOriginId != -1 &&
!table.layout()->hasDim(Dimension::Id::OriginId))
{
// In this case we can't compare the OriginId for each point since the
// EPT data does not have that attribute saved. We will keep the
// EPT data does not have that attribute saved. We will keep the
// spatial query to limit the data to the extents of the requested
// origin, but if other origins overlap these extents, then their points
// will also be included.
m_queryOriginId = -1;

log()->get(LogLevel::Warning) <<
log()->get(LogLevel::Warning) <<
"An origin query was given but no OriginId dimension exists - " <<
"points from other origins may be included" << std::endl;
}
Expand Down Expand Up @@ -818,7 +834,7 @@ bool EptReader::processPoint(PointRef& dst, const ept::TileContents& tile)
PointId pointId = m_pointId++;

PointRef p(t, pointId);
if (m_queryOriginId != -1 &&
if (m_queryOriginId != -1 &&
p.getFieldAs<int64_t>(Id::OriginId)!= m_queryOriginId)
{
return false;
Expand Down

0 comments on commit 4f63a07

Please sign in to comment.