Skip to content

Commit

Permalink
Merge pull request #10 from vtsykun/event-loop
Browse files Browse the repository at this point in the history
Added ReactPHP EventLoop and standalone cron runner engine.
  • Loading branch information
vtsykun authored Aug 19, 2023
2 parents 647c18f + 369b96b commit e868550
Show file tree
Hide file tree
Showing 29 changed files with 1,395 additions and 76 deletions.
122 changes: 116 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,35 @@ This bundle provides interfaces for registering and handle scheduled tasks withi
[![License](https://poser.okvpn.org/okvpn/cron-bundle/license)](https://packagist.org/packages/okvpn/cron-bundle)

## Purpose
This is a more simpler alternative of existing cron bundle without doctrine deps.
This is a simpler alternative of existing cron bundle without doctrine deps.
Here also added support middleware for customization handling cron jobs across a cluster install:
(Send jobs to message queue, like Symfony Messenger; locking; etc.).
This allow to limit the number of parallel running processes and prioritized it.
This allows to limit the number of parallel running processes and prioritized it.

Features
--------

- Not need doctrine/database.
- Docker friendly, runs as background command without `crond`.
- Schedule tasks with one-millisecond precision.
- More ways to randomize crons with `@random 3600` and `jitter`.
- Integration with Symfony Messenger.
- Load a cron job from a different storage (config.yml, tagged services, commands).
- Support many engines to run cron (in parallel process, message queue, consistently), allow to use `random` expression.
- Support many engines to run cron (in parallel process, message queue, consistently).
- Support many types of cron handlers/command: (services, symfony commands, UNIX shell commands).
- Can be used along with timers, subscriber and async I/O with React EventLoop, like Redis subscriber [clue/redis-react](https://github.com/clue/reactphp-redis).
- Middleware and customization.

## Table of Contents

- [Install](#install)
- [Commands](#commands)
- [Registration a new scheduled task](#registration-a-new-scheduled-task)
- [Configuration](#full-configuration-reference)
- [Symfony Messenger Integration](#handle-cron-jobs-via-symfony-messenger)
- [Your own Scheduled Tasks Loader](#your-own-scheduled-tasks-loaders)
- [Handling cron jobs across a cluster](#handling-cron-jobs-across-a-cluster-or-custom-message-queue)
- [Use ReactPHP EventLoop](#use-reactphp-eventloop)

Install
------
Expand All @@ -54,26 +59,36 @@ return [

## Quick Usage

You can use `AsCron` attribute for autoconfigure.
You can use `AsCron` or `AsPeriodicTask` attribute for autoconfigure.

```php
<?php declare(strict_types=1);

namespace App\Service;

use Okvpn\Bundle\CronBundle\Attribute\AsCron;
use Okvpn\Bundle\CronBundle\Attribute\AsPeriodicTask;

#[AsCron('*/5 * * * *')]
#[AsCron('*/5 * * * *', messenger: true)]
class SyncAppWorker
{
public function __invoke(array $arguments = []): void
{
// code
}
}

#[AsCron('*/10 * * * *', jitter: 60)]
class Sync2AppWorker { /* ... */ } // Run each 10 minutes with 60 sec random delay

#[AsCron('@random 3600')]
class Sync3AppWorker { /* ... */ } // Run with random 0-3600 sec

#[AsPeriodicTask('30 seconds', jitter: 5)]
class Sync4AppWorker { /* ... */ } // Run each 30 sec with 5 sec random delay.
```

### Commands
## Commands

Runs the current cron schedule

Expand All @@ -98,6 +113,14 @@ php bin/console okvpn:debug:cron --execute-one=7

![debug](docs/image1.png)

#### Dry run cron tasks.

```
php bin/console okvpn:cron --dry-run --demand -vvv
```

![debug](docs/img2.png)

### Cron Expression

A CRON expression syntax was take from lib [dragonmantank/cron-expressions](https://github.com/dragonmantank/cron-expression#cron-expressions)
Expand Down Expand Up @@ -202,6 +225,8 @@ To use this option need to install symfony [lock component](https://symfony.com/
- `arguments` - Array of arguments, used to run symfony console commands or pass arguments to handler.
- `priority` - Sorting priority.
- `group` - Group name, see Cron Grouping section.
- `jitter` - Random delay 0-60 sec
- `interval` - Run periodic tasks by interval. Examples `10`, `10 seconds`, `1 day`.
- `messenger` - Send jobs into Messenger Bus. Default `false`. You also can specify transport here `{routing: async}`,
see [Symfony Routing Messages to a Transport](https://symfony.com/doc/current/messenger.html#routing-messages-to-a-transport)

Expand Down Expand Up @@ -235,6 +260,12 @@ okvpn_cron:
cron: "*/30 * * * *"
async: true
arguments: { '--transport': 15 } # command arguments or options
jitter: 60 # 60 sec random delay

-
command: 'app:cron:wrfda-grib2' # run the command with 20 sec interval and 10 sec random delay
interval: "20 seconds"
jitter: 10
```
## Full Configuration Reference
Expand All @@ -257,6 +288,9 @@ okvpn_cron:
# Stamps it's markers that will add to each tasks.
with_stamps:
- 'Packagist\WebBundle\Cron\WorkerStamp'

# service name for run cron in demand (Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface)
loop_engine: ~

tasks: # Defined tasks via configuration
-
Expand Down Expand Up @@ -417,6 +451,82 @@ See example of customization
[one](https://github.com/vtsykun/packeton/tree/master/src/Cron/WorkerMiddleware.php),
[two](https://github.com/vtsykun/packeton/tree/master/src/Cron/CronWorker.php)


## Use ReactPHP EventLoop

You can add your own periodic tasks directly to `Loop`.
The bundle uses a simple wrapper `Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface` for the library `react/event-loop`

```php
<?php
use Okvpn\Bundle\CronBundle\Event\LoopEvent;
use Okvpn\Bundle\CronBundle\Runner\TimerStorage;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

class CronStartListener
{
#[AsEventListener('loopInit')]
public function loopInit(LoopEvent $event): void
{
$dataDogS = $this->getDDog();
$event->getLoop()->addPeriodicTimer(6.0, static function () use ($dataDogS) {
$dataDogS->set('crond', getmypid());
});
}
}
```

#### Configure ReactPHP adapter

Need to install [react/event-loop](https://github.com/reactphp/event-loop) if you want to use with async I/O, for example for handle websockets, redis.

```
composer req react/event-loop
```

```yaml
# Add file to config/packages/*
okvpn_cron:
loop_engine: okvpn_cron.react_loop # service name
```
```php
<?php
use Okvpn\Bundle\CronBundle\Event\LoopEvent;
use Okvpn\Bundle\CronBundle\Runner\TimerStorage;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Okvpn\Bundle\CronBundle\Runner\TimerStorage;
use React\EventLoop\Loop;

class CronStartListener
{
public function __construct(
private TimerStorage $timers,
) {
}

#[AsEventListener('loopInit')]
public function loopInit(LoopEvent $event): void
{
$redis = new RedisClient('127.0.0.1:6379');
$timers = $this->timers;
$loop = $event->getLoop();

$redis->on('message', static function (string $channel, string $payload) use ($timers, $loop) {
[$command, $args] = unserialize($payload);
if ($timers->hasTimer($envelope = $timers->find($command, $args))) {
[$timer] = $timers->getTimer($envelope);
$loop->futureTick($timer);
}
});

Loop::addPeriodicTimer(5.0, static function () use ($redis) {
$redis->ping();
});
}
}
```

License
-------

Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
},
"extra": {
"branch-alias": {
"dev-master": "1.0-dev"
"dev-master": "1.1-dev"
}
},
"suggest": {
Expand Down
Binary file added docs/img2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 4 additions & 1 deletion src/Attribute/AsCron.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,23 @@ final class AsCron
public $async;
public $options;
public $messenger;
public $jitter;

public function __construct(
string $cron,
bool $lock = null,
bool $async = null,
array $options = [],
bool $messenger = null
bool $messenger = null,
int $jitter = null,
) {
// Replace when update PHP > 7.2
$this->async = $async;
$this->lock = $lock;
$this->cron = $cron;
$this->options = $options;
$this->messenger = $messenger;
$this->jitter = $jitter;
}

public function getAttributes(): array
Expand Down
47 changes: 47 additions & 0 deletions src/Attribute/AsPeriodicTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

declare(strict_types=1);

namespace Okvpn\Bundle\CronBundle\Attribute;

#[\Attribute(\Attribute::TARGET_CLASS)]
class AsPeriodicTask
{
public $interval;
public $lock;
public $async;
public $options;
public $messenger;
public $jitter;

public function __construct(
/*int|string */ $interval,
bool $lock = null,
bool $async = null,
bool $messenger = null,
int $jitter = null,
array $options = [],
) {
$this->async = $async;
$this->lock = $lock;
$this->interval = $interval;
$this->options = $options;
$this->messenger = $messenger;
$this->jitter = $jitter;
}

public function getAttributes(): array
{
$attributes = get_object_vars($this);
foreach ($attributes as $name => $value) {
if (null === $value) {
unset($attributes[$name]);
}
}

$attributes = $attributes + $attributes['options'];
unset($attributes['options']);

return $attributes;
}
}
55 changes: 55 additions & 0 deletions src/Clock/OkvpnLoopClock.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Okvpn\Bundle\CronBundle\Clock;

/**
* Bridge for SF 4-5 support. Remove after update requires >= SF 6
*
* @author Nicolas Grekas <[email protected]>
*/
final class OkvpnLoopClock /*implements ClockInterface*/
{
private $timezone;

public function __construct(/*\DateTimeZone|string */$timezone = null)
{
if (\is_string($timezone = $timezone ?? \date_default_timezone_get())) {
$this->timezone = new \DateTimeZone($timezone);
} else {
$this->timezone = $timezone;
}
}

public function now(): \DateTimeImmutable
{
return new \DateTimeImmutable('now', $this->timezone);
}

public function sleep($seconds): void
{
if (0 < $s = (int) $seconds) {
\sleep($s);
}

if (0 < $us = $seconds - $s) {
\usleep((int) ($us * 1E6));
}
}

public function withTimeZone(/*\DateTimeZone|string*/ $timezone): static
{
$clone = clone $this;
$clone->timezone = \is_string($timezone) ? new \DateTimeZone($timezone) : $timezone;

return $clone;
}
}
Loading

0 comments on commit e868550

Please sign in to comment.