Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Streaming Serializer [WIP][Concept] #90

Open
wants to merge 5 commits into
base: feature/next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ workspace:
base: /var/www/owncloud
path: apps/data_exporter

branches: [ master, release* ]
branches: [ master, release*, feature/next ]

clone:
git:
Expand Down
8 changes: 4 additions & 4 deletions lib/Exporter.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

use OCA\DataExporter\Extractor\FilesExtractor;
use OCA\DataExporter\Extractor\MetadataExtractor;
use OCA\DataExporter\Io\Serializer;
use Symfony\Component\Filesystem\Filesystem;

class Exporter {
Expand All @@ -47,10 +48,9 @@ public function __construct(Serializer $serializer, MetadataExtractor $metadataE
public function export($uid, $exportDirectoryPath) {
$exportPath = "$exportDirectoryPath/$uid";
$metaData = $this->metadataExtractor->extract($uid);
$this->filesystem->dumpFile(
"$exportPath/metadata.json",
$this->serializer->serialize($metaData)
);
$stream =\fopen("$exportPath/metadata.json", 'wb');
$this->serializer->serializeToStream($metaData, $stream);
\fclose($stream);

$filesPath = \ltrim("$exportPath/files");
$this->filesExtractor->export($uid, $filesPath);
Expand Down
7 changes: 5 additions & 2 deletions lib/Importer.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

use OCA\DataExporter\Importer\ImportException;
use OCA\DataExporter\Importer\MetadataImporter;
use OCA\DataExporter\Io\Serializer;
use OCA\DataExporter\Model\Metadata;
use Symfony\Component\Filesystem\Filesystem;
use OCA\DataExporter\Importer\FilesImporter;
Expand Down Expand Up @@ -73,9 +74,11 @@ public function import($pathToExportDir, $alias = null) {
throw new ImportException("metadata.json not found in '$metaDataPath'");
}

$stream = \fopen($metaDataPath, 'rb');

/** @var Metadata $metadata */
$metadata = $this->serializer->deserialize(
\file_get_contents($metaDataPath),
$metadata = $this->serializer->deserializeStream(
$stream,
Metadata::class
);

Expand Down
7 changes: 4 additions & 3 deletions lib/InstanceExporter.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

use OCA\DataExporter\Exporter\InstanceExtractor;
use Symfony\Component\Filesystem\Filesystem;
use OCA\DataExporter\Io\Serializer;

/**
* Class InstanceExporter
Expand Down Expand Up @@ -69,9 +70,9 @@ public function __construct(Serializer $serializer, InstanceExtractor $instanceE
*/
public function export($exportDirectoryPath) {
$instanceData = $this->instanceExtractor->extract();
$this->filesystem->dumpFile(
"$exportDirectoryPath/instancedata.json",
$this->serializer->serialize($instanceData)
$this->serializer->serializeToStream(
$instanceData,
\fopen("$exportDirectoryPath/instancedata.json", 'wb')
);
}
}
10 changes: 7 additions & 3 deletions lib/InstanceImporter.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use OCA\DataExporter\Importer\InstanceDataImporter;
use OCA\DataExporter\Model\Instance;
use Symfony\Component\Filesystem\Filesystem;
use OCA\DataExporter\Io\Serializer;

class InstanceImporter {
/**
Expand Down Expand Up @@ -74,13 +75,16 @@ public function import($pathToExportDir) {
}

/**
* @var Instance $instanceData
* @var Instance|\Traversable $instanceData
*/
$instanceData = $this->serializer->deserialize(
\file_get_contents($instanceDataPath),
$instanceData = $this->serializer->deserializeStream(
\fopen($instanceDataPath, 'wb'),
Instance::class
);

// Workaround, generator should be passed to importer for lazy io
\iterator_to_array($instanceData)[0];

$this->instanceDataImporter->import($instanceData);
}
}
71 changes: 71 additions & 0 deletions lib/Io/JsonLinesEncoder.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

namespace OCA\DataExporter\Io;

use Symfony\Component\Serializer\Encoder\EncoderInterface;
use Symfony\Component\Serializer\Encoder\JsonEncode;
use Symfony\Component\Serializer\Exception\UnexpectedValueException;

class JsonLinesEncoder implements EncoderInterface {
const FORMAT = 'jsonl';

/** @var JsonEncode */
private $jsonEncoder;

public function __construct() {
//single jsonl lines are valid json
$this->jsonEncoder = new JsonEncode();
}

/**
* Encodes data into the given format.
*
* @param mixed $data Data to encode
* @param string $format Format name
* @param array $context Options that normalizers/encoders have access to
*
* @return string|int|float|bool
*
* @throws UnexpectedValueException
*/
public function encode($data, $format, array $context = []) {
$typeHint = $this->getEncodingTypeHint($context);

if (!\in_array($typeHint, ['object', 'array'])) {
throw new \InvalidArgumentException('Only objects and arrays supported for jsonl encoding');
}

if ($typeHint === 'object') {
return $this->jsonEncoder->encode($data, 'json') . PHP_EOL;
}

$jsonLines = '';

if ($typeHint === 'array' && \count($data) > 0) {
foreach ($data as $line) {
$jsonLines .= $this->jsonEncoder->encode($line, 'json') . PHP_EOL;
}
}

return $jsonLines;
}

private function getEncodingTypeHint($context) {
if (!isset($context[self::class]['type_hint'])) {
throw new \InvalidArgumentException('Missing typehint for jsonl encoder');
}

return $context[self::class]['type_hint'];
}

/**
* Checks whether the serializer can encode to given format.
*
* @param string $format Format name
*
* @return bool
*/
public function supportsEncoding($format) {
return $format === self::FORMAT;
}
}
64 changes: 48 additions & 16 deletions lib/Serializer.php → lib/Io/Serializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,24 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
*/
namespace OCA\DataExporter;
namespace OCA\DataExporter\Io;

use Symfony\Component\PropertyInfo\Extractor\PhpDocExtractor;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
use Symfony\Component\Serializer\Normalizer\DateTimeNormalizer;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

/**
* Lazy jsonl (de)serialization from streams.
*/
class Serializer {

/** @var \Symfony\Component\Serializer\Serializer */
private $serializer;

public function __construct() {
$encoders = [new JsonEncoder()];
$encoders = [new JsonEncoder(), new JsonLinesEncoder()];
$normalizers = [
new DateTimeNormalizer(),
new ArrayDenormalizer(),
Expand All @@ -45,25 +48,54 @@ public function __construct() {
}

/**
* Serializes data in the appropriate format.
*
* @param mixed $data Any data
*
* @return string
* @param $jsonlStream
* @param $type
* @return \Generator
*/
public function serialize($data) {
return $this->serializer->serialize($data, 'json', []);
public function deserializeStream($jsonlStream, $type) {
foreach ($this->readLines($jsonlStream) as $jsonLine) {
$jsonLine = $this->serializer->decode($jsonLine, 'json');
yield $this->serializer->denormalize($jsonLine, $type);
}
}

/**
* Deserializes data into the given type.
*
* @param mixed $data
* @param string $type
* @param $data
* @param $toStream
*/
public function serializeToStream($data, $toStream) {
$ctx = [JsonLinesEncoder::class => ['type_hint' => \gettype($data)]];
$norm = $this->serializer->normalize($data, 'jsonl');
$jsonLine = $this->serializer->encode($norm, 'jsonl', $ctx);

\fwrite($toStream, $jsonLine);
}

/**
* Lazily-reads a stream of lines in to a buffer, then blocks until
* the buffer is yielded completely.
*
* @return object
* @param resource $stream
* @param int $lineBufSize Number of lines to buffer
* @return \Generator
*/
public function deserialize($data, $type) {
return $this->serializer->deserialize($data, $type, 'json', []);
private function readLines($stream, $lineBufSize = 256) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method might not be needed as streams should be lazy by default

$buf = [];
while (($line = \fgets($stream)) !== false) {
$buf[] = $line;
//Buffer n lines then decode batch
if (\sizeof($buf) >= $lineBufSize) {
foreach ($buf as $k => $l) {
yield $l;
unset($buf[$k]);
}
}
}

// Empty the remaining buffer
foreach ($buf as $k => $l) {
yield $l;
unset($buf[$k]);
}
}
}
76 changes: 76 additions & 0 deletions tests/integration/SerializerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

namespace OCA\DataExporter\Tests\Acceptance\SerializerTest;

use OCA\DataExporter\Io\Serializer;
use OCA\DataExporter\Model\File;
use Test\TestCase;

class SerializerTest extends TestCase {
const TEST_JSONL = <<< JSONL
{"type":"file","path":"\/foo\/bar.txt","eTag":"12413rr","permissions":19}
{"type":"folder","path":"\/pics","eTag":"43t3t3g3g","permissions":20}

JSONL;

/** @var Serializer */
private $ser;
private $testId;

private $testObjects;

public function setUp() {
parent::setUp();
$this->testId = \bin2hex(\random_bytes(4));
$this->ser = new Serializer();

$this->testObjects = [
(new File())
->setPermissions(19)
->setETag('12413rr')
->setType(File::TYPE_FILE)
->setPath('/foo/bar.txt'),
(new File())
->setPermissions(20)
->setETag('43t3t3g3g')
->setType(File::TYPE_FOLDER)
->setPath('/pics'),
];
}

public function testSerialize() {
$stream = \fopen('php://memory', 'rb+');

// Serialize single objects
foreach ($this->testObjects as $f) {
$this->ser->serializeToStream($f, $stream);
}

\rewind($stream);

$actual = \stream_get_contents($stream);
$this->assertEquals(self::TEST_JSONL, $actual);

\fclose($stream);
}

public function testDeserialization() {
$stream = \fopen('php://memory', 'rb+');
\fwrite($stream, self::TEST_JSONL);
\rewind($stream);

/** @var File[] $expected */
$expected = $this->testObjects;
/** @var File[] $actual */
$actual = $this->ser->deserializeStream($stream, File::class);

foreach ($actual as $key => $obj) {
$this->assertEquals($expected[$key]->getETag(), $obj->getETag());
$this->assertEquals($expected[$key]->getPath(), $obj->getPath());
$this->assertEquals($expected[$key]->getType(), $obj->getType());
$this->assertEquals($expected[$key]->getPermissions(), $obj->getPermissions());
}

\fclose($stream);
}
}