Skip to content

Commit

Permalink
Merge pull request #55 from darkwood-com/1.x-dev
Browse files Browse the repository at this point in the history
v1.2.2
  • Loading branch information
matyo91 authored Aug 30, 2024
2 parents c9d1224 + ede040c commit af333ff
Show file tree
Hide file tree
Showing 29 changed files with 320 additions and 79 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v1.2.2

- Flow can now use `Flow\JobInterface` as job input
- Add Symfony Bridge
- new `Flow\Attribute\AsJob` attribute allows cast job on function or class and embed it's name and description

## v1.2.1

- Add new Interface Flow\AsyncHandlerInterface to control the Event::SYNC step when processing an IP
Expand Down
4 changes: 2 additions & 2 deletions docs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"keywords": [
"flow"
],
"version": "1.2.1",
"version": "1.2.2",
"browserslist": [
"defaults"
],
Expand Down Expand Up @@ -91,4 +91,4 @@
}
]
}
}
}
30 changes: 30 additions & 0 deletions docs/src/content/en/docs/getting-started/job.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
title: "Job"
description: "Job."
lead: "Job."
date: 2020-10-13T15:21:01+02:00
lastmod: 2020-10-13T15:21:01+02:00
draft: false
images: []
menu:
docs:
parent: "getting-started"
weight: 10
toc: true
---

# Job

When using Flow, you can pass Closure or JobInterface, it's useful when you want to specialize your Job, that come with dependecy injection.

## ClosureJob

ClosureJob simplifies job handling by allowing the use of closures or custom job classes, providing a versatile solution for managing jobs in your application.

## YJob

The YJob class defines the Y combinator to recursively apply the job function, making it particularly useful in scenarios where you need to perform recursive tasks without explicitly writing recursive functions.

## Make your own Job

You can make your custom Job by implementing `Flow\JobInterface`.
1 change: 1 addition & 0 deletions docs/src/content/en/docs/getting-started/ressources.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Video of Y-Combinator : [https://www.youtube.com/watch?v=QSS_ZcO8Q1g](https://ww
- Combinator : [https://github.com/loophp/combinator](https://github.com/loophp/combinator)
- Lambda-php : [https://github.com/igorw/lambda-php](https://github.com/igorw/lambda-php)
- Deriving the y combinator in 7 easy steps : [https://gist.github.com/igstan/388351](https://gist.github.com/igstan/388351)
- Y combinator real life application: recursive memoization in clojure : [https://blog.klipse.tech/lambda/2016/08/10/y-combinator-app.html](https://blog.klipse.tech/lambda/2016/08/10/y-combinator-app.html)

## Messaging approach with East oriented code from [Frédéric Hardy](https://twitter.com/mageekguy)

Expand Down
5 changes: 3 additions & 2 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Flow\Flow\Flow;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
use Flow\Job\ClosureJob;

$driver = match (random_int(1, 4)) {
1 => new AmpDriver(),
Expand Down Expand Up @@ -49,7 +50,7 @@
return new DataB($dataA->id, $d, $dataA->c);
};

$job2 = static function (DataB $dataB) use ($driver): DataC {
$job2 = new ClosureJob(static function (DataB $dataB) use ($driver): DataC {
printf(".* #%d - Job 2 Calculating %d * %d\n", $dataB->id, $dataB->d, $dataB->e);

// simulating calculating some "heavy" operation from from 1 to 3 seconds
Expand All @@ -65,7 +66,7 @@
printf(".* #%d - Job 2 Result for %d * %d = %d and took %.01f seconds\n", $dataB->id, $dataB->d, $dataB->e, $f, $delay);

return new DataC($dataB->id, $f);
};
});

$job3 = static function (DataC $dataC): DataD {
printf("** #%d - Job 3 Result is %d\n", $dataC->id, $dataC->f);
Expand Down
23 changes: 13 additions & 10 deletions examples/yflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
use Flow\Flow\Flow;
use Flow\Flow\YFlow;
use Flow\Ip;
use Flow\Job\YJob;
use Flow\JobInterface;

$driver = match (random_int(1, 4)) {
1 => new AmpDriver(),
Expand All @@ -29,12 +31,14 @@ function factorial(int $n): int
return ($n <= 1) ? 1 : $n * factorial($n - 1);
}

function Ywrap(callable $func, callable $wrapperFunc): Closure
/**
* @return JobInterface<mixed, mixed>
*/
function Ywrap(callable $func, callable $wrapperFunc): JobInterface
{
$U = static fn ($f) => $f($f);
$Y = static fn (callable $f, callable $g) => $U(static fn (Closure $x) => $f($g(static fn ($y) => $U($x)($y))));
$wrappedFunc = static fn ($recurse) => $wrapperFunc(static fn (...$args) => $func($recurse)(...$args));

return $Y($func, $wrapperFunc);
return new YJob($wrappedFunc);
}

function memoWrapperGenerator(callable $f): Closure
Expand All @@ -50,7 +54,10 @@ function memoWrapperGenerator(callable $f): Closure
};
}

function Ymemo(callable $f): Closure
/**
* @return JobInterface<mixed, mixed>
*/
function Ymemo(callable $f): JobInterface
{
return Ywrap($f, 'memoWrapperGenerator');
}
Expand Down Expand Up @@ -110,17 +117,13 @@ function factorialYMemo(int $n): int
return new YFlowData($data->id, $data->number);
};

// Define the Y-Combinator
$U = static fn (Closure $f) => $f($f);
$Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y)));

$factorialYJobDeferBefore = static function (YFlowData $data) {
printf("...* #%d - Job 4 : Calculating factorialYJobDefer(%d)\n", $data->id, $data->number);

return new YFlowData($data->id, $data->number, $data->number);
};

$factorialYJobDefer = $Y(static function ($factorial) {
$factorialYJobDefer = new YJob(static function ($factorial) {
return static function ($args) use ($factorial) {
[$data, $defer] = $args;

Expand Down
16 changes: 16 additions & 0 deletions src/Attribute/AsJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Flow\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_FUNCTION | Attribute::TARGET_METHOD)]
class AsJob
{
public function __construct(
public string $name = '',
public string $description = '',
) {}
}
16 changes: 16 additions & 0 deletions src/Bridge/Symfony/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\DependencyInjection;

use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;

class Configuration implements ConfigurationInterface
{
public function getConfigTreeBuilder(): TreeBuilder
{
return new TreeBuilder('flow');
}
}
13 changes: 13 additions & 0 deletions src/Bridge/Symfony/DependencyInjection/FlowExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\DependencyInjection;

use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Extension\Extension;

class FlowExtension extends Extension
{
public function load(array $configs, ContainerBuilder $container): void {}
}
9 changes: 9 additions & 0 deletions src/Bridge/Symfony/FlowBundle.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony;

use Symfony\Component\HttpKernel\Bundle\Bundle;

class FlowBundle extends Bundle {}
7 changes: 7 additions & 0 deletions src/Bridge/Symfony/Resources/config/services.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

declare(strict_types=1);

namespace Symfony\Component\DependencyInjection\Loader\Configurator;

return static function (ContainerConfigurator $container) {};
9 changes: 5 additions & 4 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use Revolt\EventLoop;
use Revolt\EventLoop\Driver;
use RuntimeException as NativeRuntimeException;
Expand Down Expand Up @@ -49,10 +50,10 @@ public function __construct(?Driver $driver = null)
/**
* @return Closure(TArgs): Future<TReturn>
*/
public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
return async(static function (Closure $callback, array $args) {
return async(static function (Closure|JobInterface $callback, array $args) {
try {
return $callback(...$args, ...($args = []));
} catch (Throwable $exception) {
Expand Down Expand Up @@ -86,7 +87,7 @@ public function defer(Closure $callback): Future

public function await(array &$stream): void
{
$async = function (Closure $job) {
$async = function (Closure|JobInterface $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

Expand All @@ -99,7 +100,7 @@ public function await(array &$stream): void
};
};

$defer = function (Closure $job) {
$defer = function (Closure|JobInterface $job) {
return function (Closure $map) use ($job) {
/** @var Closure(TReturn): mixed $map */
$future = $this->defer($job);
Expand Down
11 changes: 6 additions & 5 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use Throwable;

use function array_key_exists;
Expand All @@ -34,7 +35,7 @@ class FiberDriver implements DriverInterface
*/
private array $ticks = [];

public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
return new Fiber(static function () use ($callback, $args) {
Expand Down Expand Up @@ -69,7 +70,7 @@ public function defer(Closure $callback): mixed
public function await(array &$stream): void
{
$async = function ($isTick) use (&$fiberDatas) {
return function (Closure $job) use (&$fiberDatas, $isTick) {
return function (Closure|JobInterface $job) use (&$fiberDatas, $isTick) {
return function (mixed $data) use (&$fiberDatas, $isTick, $job) {
$async = $this->async($job);

Expand Down Expand Up @@ -97,7 +98,7 @@ public function await(array &$stream): void
};

$defer = static function ($isTick) {
return static function (Closure $job) use ($isTick) {
return static function (Closure|JobInterface $job) use ($isTick) {
return static function (Closure $next) use ($isTick, $job) {
$fiber = new Fiber(static function () use ($isTick, $job, $next) {
try {
Expand Down Expand Up @@ -136,9 +137,9 @@ public function await(array &$stream): void
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) {
return $async(false)($job);
}, static function (Closure $job) use ($defer) {
}, static function (Closure|JobInterface $job) use ($defer) {
return $defer(false)($job);
}, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) {
Expand Down
7 changes: 4 additions & 3 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Promise\Deferred;
Expand Down Expand Up @@ -46,7 +47,7 @@ public function __construct(?LoopInterface $eventLoop = null)
$this->eventLoop = $eventLoop ?? Loop::get();
}

public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
return async(static function () use ($callback, $args) {
Expand Down Expand Up @@ -81,7 +82,7 @@ public function defer(Closure $callback): Promise

public function await(array &$stream): void
{
$async = function (Closure $job) {
$async = function (Closure|JobInterface $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

Expand All @@ -93,7 +94,7 @@ public function await(array &$stream): void
};
};

$defer = function (Closure $job) {
$defer = function (Closure|JobInterface $job) {
return function ($then) use ($job) {
$promise = $this->defer($job);
$promise->then($then);
Expand Down
7 changes: 4 additions & 3 deletions src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use RuntimeException as NativeRuntimeException;
use Spatie\Async\Pool;
use Throwable;
Expand Down Expand Up @@ -45,7 +46,7 @@ public function __construct()
}
}

public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return function (...$args) use ($callback) {
return function ($onResolve) use ($callback, $args) {
Expand All @@ -67,15 +68,15 @@ public function defer(Closure $callback): mixed

public function await(array &$stream): void
{
$async = function (Closure $job) {
$async = function (Closure|JobInterface $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

return $async($data);
};
};

$defer = function (Closure $job) {
$defer = function (Closure|JobInterface $job) {
return function (Closure $onResolve) use ($job) {
$this->pool->add(static function () use ($job, $onResolve) {
return $job($onResolve, static function ($fn, $next) {
Expand Down
Loading

0 comments on commit af333ff

Please sign in to comment.