Skip to content

Commit

Permalink
Update: task_worker use msgqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed May 13, 2014
1 parent 198fc45 commit d70571f
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 54 deletions.
5 changes: 3 additions & 2 deletions examples/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//'open_eof_check' => true,
//'package_eof' => "\r\n",
'task_worker_num' => 2,
'task_ipc_mode' => 1,
//'dispatch_mode' => 2,
//'daemonize' => 1,
//'log_file' => '/tmp/swoole.log',
Expand All @@ -19,7 +20,7 @@ function my_onStart(swoole_server $serv)
swoole_set_process_name("php {$argv[0]}: master");
echo "MasterPid={$serv->master_pid}|Manager_pid={$serv->manager_pid}\n";
echo "Server: start.Swoole version is [".SWOOLE_VERSION."]\n";
$serv->addtimer(5000);
//$serv->addtimer(5000);
}

function my_log($msg)
Expand Down Expand Up @@ -74,7 +75,7 @@ function my_onReceive(swoole_server $serv, $fd, $from_id, $data)
}
elseif($cmd == "task")
{
$task_id = $serv->task("hello world", 0);
$task_id = $serv->task("hello world");
echo "Dispath AsyncTask: id=$task_id\n";
}
elseif($cmd == "taskwait")
Expand Down
5 changes: 0 additions & 5 deletions include/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,7 @@ struct swServer_s
* worker process num
*/
uint16_t worker_num;
/**
* task worker process num
*/
uint16_t task_worker_num;
uint16_t reactor_pipe_num; //每个reactor维持的pipe数量

uint8_t factory_mode;

/**
Expand Down
26 changes: 23 additions & 3 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,20 @@ struct _swWorker

struct _swProcessPool
{
char reloading;
char reload_flag;
/**
* reloading
*/
uint8_t reloading;
uint8_t reload_flag;
/**
* use message queue IPC
*/
uint8_t use_msgqueue;
/**
* message queue key
*/
key_t msgqueue_key;

int worker_num;
int max_request;

Expand All @@ -638,9 +650,11 @@ struct _swProcessPool
int (*main_loop)(struct _swProcessPool *pool, swWorker *worker);

int round_id;

swWorker *workers;
swPipe *pipes;
swHashMap map;
swQueue queue;

void *ptr;
void *ptr2;
Expand Down Expand Up @@ -711,7 +725,7 @@ int swReactorKqueue_create(swReactor *reactor, int max_event_num);
int swReactorSelect_create(swReactor *reactor);

/*----------------------------Process Pool-------------------------------*/
int swProcessPool_create(swProcessPool *pool, int worker_num, int max_request);
int swProcessPool_create(swProcessPool *pool, int worker_num, int max_request, key_t msgqueue_key);
int swProcessPool_wait(swProcessPool *pool);
int swProcessPool_start(swProcessPool *pool);
void swProcessPool_shutdown(swProcessPool *pool);
Expand Down Expand Up @@ -843,6 +857,12 @@ typedef struct _swServerG{

uint8_t use_timerfd;
uint8_t use_signalfd;
uint8_t task_ipc_mode;

/**
* task worker process num
*/
uint16_t task_worker_num;

swServer *serv;
swFactory *factory;
Expand Down
15 changes: 11 additions & 4 deletions src/factory/FactoryProcess.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,14 @@ static int swFactoryProcess_manager_start(swFactory *factory)
}


if (serv->task_worker_num > 0)
if (SwooleG.task_worker_num > 0)
{
if (swProcessPool_create(&SwooleG.task_workers, serv->task_worker_num, serv->max_request)< 0)
key_t msgqueue_key = 0;
if (SwooleG.task_ipc_mode > 0)
{
msgqueue_key = serv->message_queue_key + 2;
}
if (swProcessPool_create(&SwooleG.task_workers, SwooleG.task_worker_num, serv->max_request, msgqueue_key)< 0)
{
swWarn("[Master] create task_workers fail");
return SW_ERR;
Expand Down Expand Up @@ -312,8 +317,10 @@ static int swFactoryProcess_manager_start(swFactory *factory)
object->workers[i].pid = pid;
}
}
//创建task_worker进程
if (serv->task_worker_num > 0)
/**
* create task worker pool
*/
if (SwooleG.task_worker_num > 0)
{
swProcessPool_start(&SwooleG.task_workers);
}
Expand Down
102 changes: 83 additions & 19 deletions src/network/ProcessPool.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static void swProcessPool_free(swProcessPool *pool);
/**
* Process manager
*/
int swProcessPool_create(swProcessPool *pool, int worker_num, int max_request)
int swProcessPool_create(swProcessPool *pool, int worker_num, int max_request, key_t msgqueue_key)
{
bzero(pool, sizeof(swProcessPool));
pool->workers = sw_calloc(worker_num, sizeof(swWorker));
Expand All @@ -43,25 +43,42 @@ int swProcessPool_create(swProcessPool *pool, int worker_num, int max_request)
}

int i;
swPipe *pipe;
for (i = 0; i < worker_num; i++)
if (msgqueue_key > 0)
{
pipe = &pool->pipes[i];
if (swPipeUnsock_create(pipe, 1, SOCK_DGRAM) < 0)
if (swQueueMsg_create(&pool->queue, 1, msgqueue_key, 1) < 0)
{
return SW_ERR;
}
swProcessPool_worker(pool, i).pipe_master = pipe->getFd(pipe, 1);
swProcessPool_worker(pool, i).pipe_worker = pipe->getFd(pipe, 0);
pool->use_msgqueue = 1;
pool->msgqueue_key = msgqueue_key;
}
else
{
swPipe *pipe;
for (i = 0; i < worker_num; i++)
{
pipe = &pool->pipes[i];
if (swPipeUnsock_create(pipe, 1, SOCK_DGRAM) < 0)
{
return SW_ERR;
}
swProcessPool_worker(pool, i).pipe_master = pipe->getFd(pipe, 1);
swProcessPool_worker(pool, i).pipe_worker = pipe->getFd(pipe, 0);
}
}

for (i = 0; i < worker_num; i++)
{
swProcessPool_worker(pool, i).id = i;
swProcessPool_worker(pool, i).pool = pool;
}

pool->main_loop = swProcessPool_worker_start;
return SW_OK;
}

/**
* start
* start workers
*/
int swProcessPool_start(swProcessPool *pool)
{
Expand All @@ -88,11 +105,31 @@ int swProcessPool_dispatch(swProcessPool *pool, swEventData *data, int worker_id
{
worker_id = (pool->round_id++)%pool->worker_num;
}
swWorker *worker = &swProcessPool_worker(pool, worker_id);
ret = swWrite(worker->pipe_master, data, sizeof(data->info) + data->info.len);
if (ret < 0)

struct
{
long mtype;
swEventData buf;
} in;

if (pool->use_msgqueue)
{
in.mtype = worker_id + 1;
memcpy(&in.buf, data, sizeof(data->info) + data->info.len);
ret = pool->queue.in(&pool->queue, (swQueue_data *) &in, sizeof(data->info) + data->info.len);
if (ret < 0)
{
swWarn("msgsnd failed. Error: %s[%d]", strerror(errno), errno);
}
}
else
{
swWarn("sendto unix socket failed. Error: %s[%d]", strerror(errno), errno);
swWorker *worker = &swProcessPool_worker(pool, worker_id);
ret = swWrite(worker->pipe_master, data, sizeof(data->info) + data->info.len);
if (ret < 0)
{
swWarn("sendto unix socket failed. Error: %s[%d]", strerror(errno), errno);
}
}
return ret;
}
Expand Down Expand Up @@ -142,7 +179,12 @@ pid_t swProcessPool_spawn(swWorker *worker)

static int swProcessPool_worker_start(swProcessPool *pool, swWorker *worker)
{
swEventData buf;
struct
{
long mtype;
swEventData buf;
} out;

int n, ret;
int task_n, worker_task_always = 0;

Expand All @@ -155,18 +197,40 @@ static int swProcessPool_worker_start(swProcessPool *pool, swWorker *worker)
{
task_n = pool->max_request;
}

//使用from_fd保存task_worker的id
buf.info.from_fd = worker->id;
out.buf.info.from_fd = worker->id;

if (SwooleG.task_ipc_mode > 1)
{
out.mtype = worker->id + 1;
}
else
{
out.mtype = 0;
}

while (SwooleG.running > 0 && task_n > 0)
{
n = read(worker->pipe_worker, &buf, sizeof(buf));
if (n < 0)
if (pool->use_msgqueue)
{
swWarn("[Worker#%d]read pipe fail. Error: %s [%d]", worker->id, strerror(errno), errno);
continue;
n = pool->queue.out(&pool->queue, (swQueue_data *) &out, sizeof(out.buf));
if (n < 0)
{
swWarn("[Worker#%d]deQueue failed. Error: %s [%d]", worker->id, strerror(errno), errno);
continue;
}
}
else
{
n = read(worker->pipe_worker, &out.buf, sizeof(out.buf));
if (n < 0)
{
swWarn("[Worker#%d]read pipe failed. Error: %s [%d]", worker->id, strerror(errno), errno);
continue;
}
}
ret = pool->onTask(pool, &buf);
ret = pool->onTask(pool, &out.buf);
if (ret > 0 && !worker_task_always)
{
task_n--;
Expand Down
9 changes: 6 additions & 3 deletions src/network/ReactorProcess.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ int swReactorProcess_start(swServer *serv)
}
int ret, i;
swProcessPool pool;
swProcessPool_create(&pool, serv->worker_num, serv->max_request);
if (swProcessPool_create(&pool, serv->worker_num, serv->max_request, 0) < 0)
{
return SW_ERR;
}
pool.main_loop = swReactorProcess_loop;
pool.ptr = serv;

Expand Down Expand Up @@ -88,9 +91,9 @@ int swReactorProcess_start(swServer *serv)
}
SwooleG.event_workers = &pool;
//task workers
if (serv->task_worker_num > 0)
if (SwooleG.task_worker_num > 0)
{
if (swProcessPool_create(&SwooleG.task_workers, serv->task_worker_num, serv->max_request)< 0)
if (swProcessPool_create(&SwooleG.task_workers, SwooleG.task_worker_num, serv->max_request, serv->message_queue_key + 2)< 0)
{
swWarn("[Master] create task_workers fail");
return SW_ERR;
Expand Down
22 changes: 13 additions & 9 deletions src/network/Server.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ static int swServer_check_callback(swServer *serv)
return SW_ERR;
}
//AsyncTask
if (serv->task_worker_num > 0)
if (SwooleG.task_worker_num > 0)
{
if (serv->onTask == NULL)
{
Expand Down Expand Up @@ -436,14 +436,15 @@ int swServer_start(swServer *serv)
serv->ipc_mode = SW_IPC_MSGQUEUE;
#endif

if (serv->message_queue_key == 0)
{
char path_buf[128];
char *path_ptr = getcwd(path_buf, 128);
serv->message_queue_key = ftok(path_ptr, 1);
}

if (serv->ipc_mode == SW_IPC_MSGQUEUE)
{
if (serv->message_queue_key == 0)
{
char path_buf[128];
char *path_ptr = getcwd(path_buf, 128);
serv->message_queue_key = ftok(path_ptr, 1);
}
SwooleG.use_timerfd = 0;
SwooleG.use_signalfd = 0;
}
Expand Down Expand Up @@ -484,7 +485,7 @@ int swServer_start(swServer *serv)
}

//for taskwait
if (serv->task_worker_num > 0 && serv->worker_num > 0)
if (SwooleG.task_worker_num > 0 && serv->worker_num > 0)
{
SwooleG.task_result = sw_shm_calloc(serv->worker_num, sizeof(swEventData));
SwooleG.task_notify = sw_calloc(serv->worker_num, sizeof(swPipe));
Expand Down Expand Up @@ -721,7 +722,7 @@ int swServer_free(swServer *serv)
}

//master pipe
if (serv->task_worker_num > 0)
if (SwooleG.task_worker_num > 0)
{
swProcessPool_shutdown(&SwooleG.task_workers);
}
Expand Down Expand Up @@ -804,6 +805,9 @@ void swTaskWorker_onWorkerStart(swProcessPool *pool, int worker_id)
serv->onWorkerStart(serv, worker_id + serv->worker_num);
}

/**
* in worker process
*/
int swTaskWorker_onFinish(swReactor *reactor, swEvent *event)
{
swServer *serv = reactor->ptr;
Expand Down
1 change: 1 addition & 0 deletions src/queue/Msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ int swQueueMsg_create(swQueue *p, int blocking, int msg_key, long type)
msg_id = msgget(msg_key, IPC_CREAT | O_EXCL | 0666);
if (msg_id < 0)
{
swWarn("msgget() failed. Error: %s[%d]", strerror(errno), errno);
return SW_ERR;
}
else
Expand Down
Loading

0 comments on commit d70571f

Please sign in to comment.