Skip to content

Commit

Permalink
added: bugfix for serax file joins (#55)
Browse files Browse the repository at this point in the history
* added: bugfix for serax file joins
* removed sorter test
  • Loading branch information
Thijzer authored Sep 25, 2023
1 parent e3fe33c commit c5935cb
Show file tree
Hide file tree
Showing 20 changed files with 402,677 additions and 333 deletions.
26 changes: 15 additions & 11 deletions src/Component/Common/Cursor/ContinuousBufferFetcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,28 @@ class ContinuousBufferFetcher
private string $indexReference;
private array $buffer = [];
private CursorInterface $cursor;
private ZoneIndexer $indexer;
private ZoneFileIndexer $indexer;
private bool $allowFileIndexRemoval;

public function __construct(CursorInterface $cursor, string $indexReference)
public function __construct(CursorInterface $cursor, string $indexReference, bool $allowFileIndexRemoval = false)
{
$this->indexer = new ZoneIndexer();
$this->indexer = new ZoneFileIndexer();
$this->cursor = $cursor;
$this->indexReference = $indexReference;
$this->allowFileIndexRemoval = $allowFileIndexRemoval;
}

public function get(string $reference)
{
$this->indexer->init($this->cursor, $this->indexReference);

$index = $this->indexer->getIndexByReference($reference);
if (null === $index) {
$fileIndex = $this->indexer->getFileIndexByReference($reference);
if (null === $fileIndex) {
return false;
}

$zone = $this->indexer->getZoneByIndex($index);
if (false === $this->itemInBuffer($index, $zone)) {
$zone = $this->indexer->getZoneByFileIndex($fileIndex);
if (false === $this->itemInBuffer($fileIndex, $zone)) {
// new item to load in
$this->loadBufferFromZone($zone);

Expand All @@ -38,9 +40,11 @@ public function get(string $reference)
}

// clear memory
$item = $this->buffer[$zone][$index] ?? false;
unset($this->buffer[$zone][$index]);
$this->indexer->depleteIndex($reference, $index);
$item = $this->buffer[$zone][$fileIndex] ?? false;
if ($this->allowFileIndexRemoval) {
unset($this->buffer[$zone][$fileIndex]);
$this->indexer->removeFileIndex($reference, $fileIndex);
}

return $item;
}
Expand All @@ -54,7 +58,7 @@ private function loadBufferFromZone(int $zone): void
{
$range = $this->indexer->getRangeFromZone($zone);

$this->cursor->seek(current($range));
$this->cursor->seek(current($range)); # reset line number
while ($row = $this->cursor->current()) {
$this->buffer[$zone][$this->cursor->key()] = $row;
if (\count($this->buffer[$zone]) === \count($range)) {
Expand Down
4 changes: 2 additions & 2 deletions src/Component/Common/Cursor/OldCachedZoneFetcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class OldCachedZoneFetcher

/** @var array */
private $ranges = [];
/** @var ZoneIndexer */
/** @var ZoneFileIndexer */
private $indexes;

public function __construct(CursorInterface $cursor, string $reference)
Expand Down Expand Up @@ -44,7 +44,7 @@ public function get(string $reference)
}
}

$i = $this->indexes->getIndexByReference($reference);
$i = $this->indexes->getFileIndexByReference($reference);

return $this->ranges[$zone][$i] ?? false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Misery\Component\Common\Cursor;

class ZoneIndexer
class ZoneFileIndexer
{
private const MEDIUM_CACHE_SIZE = 10000;

Expand All @@ -15,8 +15,8 @@ public function init(CursorInterface $cursor, string $reference): void
// prep indexes
$cursor->loop(function ($row) use ($cursor, $reference) {
if ($row) {
$index = (int) $cursor->key();
$zone = (int) (($index -1) / self::MEDIUM_CACHE_SIZE);
$index = (int) $cursor->key(); # line number
$zone = (int) (($index -1) / self::MEDIUM_CACHE_SIZE); # grouping number
$referenceValue = $row[$reference];
$this->indexes[crc32($referenceValue)] = $index;
$this->zones[$index] = $zone;
Expand All @@ -26,18 +26,18 @@ public function init(CursorInterface $cursor, string $reference): void
}
}

public function depleteIndex(string $reference, int $index): void
public function removeFileIndex(string $reference, int $index): void
{
unset($this->zones[$index]);
unset($this->indexes[crc32($reference)]);
}

public function getIndexByReference(string $reference)
public function getFileIndexByReference(string $reference)
{
return $this->indexes[crc32($reference)] ?? null;
}

public function getZoneByIndex(int $index)
public function getZoneByFileIndex(int $index)
{
return $this->zones[$index] ?? null;
}
Expand Down
9 changes: 1 addition & 8 deletions src/Component/Parser/ItemParserFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@
namespace Misery\Component\Parser;

use Assert\Assert;
use Misery\Component\Combine\ItemCombine;
use Misery\Component\Common\Collection\ArrayCollection;
use Misery\Component\Common\Cursor\CachedCursor;
use Misery\Component\Common\Cursor\OldCachedZoneFetcher;
use Misery\Component\Common\Cursor\ContinuousBufferFetcher;
use Misery\Component\Common\Cursor\CursorInterface;
use Misery\Component\Common\Cursor\FunctionalCursor;
use Misery\Component\Common\FileManager\InMemoryFileManager;
use Misery\Component\Common\FileManager\LocalFileManager;
use Misery\Component\Common\Registry\RegisteredByNameInterface;
use Misery\Component\Filter\ColumnReducer;
use Misery\Component\Reader\ItemCollection;
use Misery\Component\Writer\CsvWriter;

class ItemParserFactory implements RegisteredByNameInterface
{
Expand All @@ -35,8 +29,7 @@ public function createFromConfiguration(
$mainParser = $this->createFromConfiguration($configuration, $manager);

foreach ($joins as $join) {
$fetcher = clone new OldCachedZoneFetcher($this->createFromConfiguration($join, $manager), $join['link_join']);
#$fetcher = clone new ContinuousBufferFetcher($this->createFromConfiguration($join, $manager), $join['link_join']);
$fetcher = clone new ContinuousBufferFetcher($this->createFromConfiguration($join, $manager), $join['link_join'], $join['allow_fileindex_removal'] ?? false);
$mainParser = new FunctionalCursor($mainParser, function ($row) use ($fetcher, $join) {
$masterID = $row[$join['link']];
$item = $fetcher->get($masterID) ?? [];
Expand Down
4 changes: 2 additions & 2 deletions tests/Component/Functions/ArrayFunctionsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function test_impossible_values(): void
public function test_flatten_performance(): void
{
$file = new \SplFileObject(__DIR__ . '/../../examples/users.csv');
$reader = new ItemReader(new CachedCursor(new FunctionalCursor(new CsvParser($file, ','), function($item) {
$reader = new ItemReader(new CachedCursor(new FunctionalCursor(new CsvParser($file, ';'), function($item) {
return ArrayFunctions::unflatten($item);
})));

Expand All @@ -88,7 +88,7 @@ public function test_flatten_performance(): void
public function test_unflatten_performance(): void
{
$file = new \SplFileObject(__DIR__ . '/../../examples/users.csv');
$reader = new ItemReader(new CachedCursor(new CsvParser($file, ',')));
$reader = new ItemReader(new CachedCursor(new CsvParser($file, ';')));

// approx 300.000 lines test
$tracker = new TimeTracker();
Expand Down
Loading

0 comments on commit c5935cb

Please sign in to comment.