Skip to content

Commit

Permalink
push task
Browse files Browse the repository at this point in the history
  • Loading branch information
millken committed Sep 18, 2018
1 parent c3bb4d5 commit 0e9df04
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 31 deletions.
36 changes: 23 additions & 13 deletions src/Application/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
use Ypf\Swoole\CronManager;
use GuzzleHttp\Psr7\ServerRequest;
use function swoole_set_process_name;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;

class Swoole
class Swoole implements LoggerAwareInterface
{
private $server = null;
protected $workers;
protected static $instances = null;

use LoggerAwareTrait;

public function build()
{
$container = Application::getContainer();
Expand All @@ -29,6 +33,9 @@ public function build()
$this->server = new SwooleHttpServer($address, $port, SWOOLE_PROCESS, SWOOLE_TCP);

$this->server->set($options);
$logger = Application::getContainer()->get('logger');
$this->setLogger($logger);

static::$instances = &$this;

return $this;
Expand All @@ -39,6 +46,11 @@ public static function getInstance()
return static::$instances;
}

public static function getServer()
{
return static::$instances->server;
}

public function onRequest(SwooleHttpRequest $request, SwooleHttpResponse $swooleResponse): void
{
$_SERVER = [];
Expand Down Expand Up @@ -72,18 +84,16 @@ public function onRequest(SwooleHttpRequest $request, SwooleHttpResponse $swoole

public function onTask(SwooleHttpServer $server, int $task_id, int $source, string $data)
{
//echo sprintf("pid=%d, task_id=%d, from_id=%d, data=%s\n", getmypid(), $task_id, $source, $data);

$data = SwooleSerialize::unpack($data);
if (!isset($data['name'])) {
return;
}
$name = $data['name'];
assert(
$this->workers[$name],
new \UnexpectedValueException("No task worker registered for '{$name}")
);
$result = $workers[$name]->run($data['payload']);
// $this->logger->debug('task receive pid : {pid}, task_id : {task_id}, from_id= {from_id}, data= {data}', [
// 'pid' => getmypid(),
// 'task_id' => $task_id,
// 'from_id' => $source,
// 'data' => $data,
// ]);

$task = SwooleSerialize::unpack($data);
$unit = Application::getContainer()->get($task->getClass());
$result = $unit->run($task->getPayload());
$server->finish(SwooleSerialize::pack($result, 1));

return $result;
Expand Down
15 changes: 6 additions & 9 deletions src/Swoole/CronManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,27 @@
use Ypf\Application;
use Ypf\Controller\CronWorker;
use Cron\CronExpression;
use Swoole\Process as SwooleProcess;

class CronManager
{
private $queue;
private $job;

public function process()
public function __construct()
{
$process = new SwooleProcess([$this, 'start'], false, 1);
$process->start();
if (!class_exists(CronExpression::class)) {
throw new Exception('please use composer require dragonmantank/cron-expression');
}
}

public function start(SwooleProcess $worker)
public function process()
{
global $argv;
$processName = "php {$argv[0]}: cron manager";
\swoole_set_process_name($processName);
$workers = Application::getContainer()->get('workers');
foreach ($workers as $worker) {
$className = $worker[0];
$classReflection = new ReflectionClass($className);
if (!$classReflection->isSubclassOf(CronWorker::class)) {
throw new Exception('cron worker mustbe extends '.CronWorker::class);
throw new Exception('cron worker mustbe extends \Ypf\Controller\\'.CronWorker::class);
}
if (!isset($worker[1])) {
go(function () use ($classReflection) {
Expand Down
9 changes: 0 additions & 9 deletions src/Swoole/Server.php

This file was deleted.

48 changes: 48 additions & 0 deletions src/Swoole/Tasks/Server.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

declare(strict_types=1);

namespace Ypf\Swoole\Tasks;

use Swoole\Server as SwooleServer;
use Ypf\Application\Swoole as YAS;
use Swoole\Serialize;

class Server
{
public function push(Task $task)
{
$payload = Serialize::pack($task, 1);
YAS::getServer()->task(
$payload,
-1,
function (SwooleServer $server, $source, $data) use ($task) {
call_user_func($task->getCallback(), Serialize::unpack($data));
}
);
}

public function await(Task $task, float $timeout = 1)
{
$payload = Serialize::pack($task, 1);

return YAS::getServer()->taskwait($payload, (float) $timeout);
}

public function parallel(array $tasks, float $timeout = 10): array
{
$normalized = [];
$results = [];
foreach ($tasks as $idx => $task) {
/* @var Task $task */
$normalized[] = Serialize::pack($task, 1);
$results[$idx] = false;
}
$result = YAS::getServer()->taskWaitMulti($normalized, (float) $timeout);
foreach ($result as $index => $value) {
$results[$index] = $value;
}

return $results;
}
}
45 changes: 45 additions & 0 deletions src/Swoole/Tasks/Task.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Ypf\Swoole\Tasks;

class Task
{
private $class;
private $payload = null;
private $callback;

public function __construct(string $class, callable $callback = null)
{
$this->class = $class;
$this->callback = $callback;
}

public function getClass(): string
{
return $this->class;
}

public function withPayload($payload): void
{
$this->payload = $payload;
}

public function getPayload()
{
return $this->payload;
}

public function withCallback(callable $callback): void
{
$this->callback = $callback;
}

public function getCallback(): callable
{
return $this->callback ?? function () {
// Nothing to do
};
}
}

0 comments on commit 0e9df04

Please sign in to comment.