Question

Problem: I want to implement several php-worker processes who are listening on a MQ-server queue for asynchronous jobs. The problem now is that simply running this processes as daemons on a server doesn't really give me any level of control over the instances (Load, Status, locked up)...except maybe for dumping ps -aux. Because of that I'm looking for a runtime environment of some kind that lets me monitor and control the instances, either on system (process) level or on a higher layer (some kind of Java-style appserver)

Any pointers?

Was it helpful?

Solution

Here's some code that may be useful.

<?
define('WANT_PROCESSORS', 5);
define('PROCESSOR_EXECUTABLE', '/path/to/your/processor');
set_time_limit(0);
$cycles = 0;
$run = true;
$reload = false;
declare(ticks = 30);

function signal_handler($signal) {
    switch($signal) {
    case SIGTERM :
        global $run;
        $run = false;
        break;
    case SIGHUP  :
        global $reload;
        $reload = true;
        break;
    }   
}

pcntl_signal(SIGTERM, 'signal_handler');
pcntl_signal(SIGHUP, 'signal_handler');

function spawn_processor() {
    $pid = pcntl_fork();
    if($pid) {
        global $processors;
        $processors[] = $pid;
    } else {
        if(posix_setsid() == -1)
            die("Forked process could not detach from terminal\n");
        fclose(stdin);
        fclose(stdout);
        fclose(stderr);
        pcntl_exec(PROCESSOR_EXECUTABLE);
        die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n");
    }
}

function spawn_processors() {
    global $processors;
    if($processors)
        kill_processors();
    $processors = array();
    for($ix = 0; $ix < WANT_PROCESSORS; $ix++)
        spawn_processor();
}

function kill_processors() {
    global $processors;
    foreach($processors as $processor)
        posix_kill($processor, SIGTERM);
    foreach($processors as $processor)
        pcntl_waitpid($processor);
    unset($processors);
}

function check_processors() {
    global $processors;
    $valid = array();
    foreach($processors as $processor) {
        pcntl_waitpid($processor, $status, WNOHANG);
        if(posix_getsid($processor))
            $valid[] = $processor;
    }
    $processors = $valid;
    if(count($processors) > WANT_PROCESSORS) {
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            posix_kill($processors[$ix], SIGTERM);
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            pcntl_waitpid($processors[$ix]);
    } elseif(count($processors) < WANT_PROCESSORS) {
        for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++)
            spawn_processor();
    }
}

spawn_processors();

while($run) {
    $cycles++;
    if($reload) {
        $reload = false;
        kill_processors();
        spawn_processors();
    } else {
        check_processors();
    }
    usleep(150000);
}
kill_processors();
pcntl_wait();
?>

OTHER TIPS

It sounds like you already have a MQ up and running on a *nix system and just want a way to manage workers.

A very simple way to do so is to use GNU screen. To start 10 workers you can use:

#!/bin/sh
for x in `seq 1 10` ; do
screen -dmS worker_$x php /path/to/script.php worker$x
end

This will start 10 workers in the background using screens named worker_1,2,3 and so on.

You can reattach to the screens by running screen -r worker_ and list the running workers by using screen -list.

For more info this guide may be of help: http://www.kuro5hin.org/story/2004/3/9/16838/14935

Also try:

  • screen --help
  • man screen
  • or google.

For production servers I would normally recommend using the normal system startup scripts, but I have been running screen commands from the startup scripts for years with no problems.

Do you actually need it to be continuously running?

If you only want to spawn new process on request, you can register it as a service in xinetd.

a pcntl plugin type server daemon for PHP

http://dev.pedemont.com/sonic/

Bellow is our working implementation of @chaos answer. Code to handle signals was removed as this script lives usually just few milliseconds.

Also, in code we added 2 functions to save pids between calls: restore_processors_state() and save_processors_state(). We've used redis there, but you can decide to use implementation on files.

We run this script every minute using cron. Cron checks if all processes alive. If not - it re-run them and then dies. If we want to kill existing processes then we simply run this script with argument kill: php script.php kill.

Very handy way of running workers without injecting scripts into init.d.

<?php

include_once dirname( __FILE__ ) . '/path/to/bootstrap.php';

define('WANT_PROCESSORS', 5);
define('PROCESSOR_EXECUTABLE', '' . dirname(__FILE__) . '/path/to/worker.php');
set_time_limit(0);

$run = true;
$reload = false;
declare(ticks = 30);

function restore_processors_state()
{
    global $processors;

    $redis = Zend_Registry::get('redis');
    $pids = $redis->hget('worker_procs', 'pids');

    if( !$pids )
    {
        $processors = array();
    }
    else
    {
        $processors = json_decode($pids, true);
    }
}

function save_processors_state()
{
    global $processors;

    $redis = Zend_Registry::get('redis');
    $redis->hset('worker_procs', 'pids', json_encode($processors));
}

function spawn_processor() {
    $pid = pcntl_fork();
    if($pid) {
        global $processors;
        $processors[] = $pid;
    } else {
        if(posix_setsid() == -1)
            die("Forked process could not detach from terminal\n");
        fclose(STDIN);
        fclose(STDOUT);
        fclose(STDERR);
        pcntl_exec('/usr/bin/php', array(PROCESSOR_EXECUTABLE));
        die('Failed to fork ' . PROCESSOR_EXECUTABLE . "\n");
    }
}

function spawn_processors() {
    restore_processors_state();

    check_processors();

    save_processors_state();
}

function kill_processors() {
    global $processors;
    foreach($processors as $processor)
        posix_kill($processor, SIGTERM);
    foreach($processors as $processor)
        pcntl_waitpid($processor, $trash);
    unset($processors);
}

function check_processors() {
    global $processors;
    $valid = array();
    foreach($processors as $processor) {
        pcntl_waitpid($processor, $status, WNOHANG);
        if(posix_getsid($processor))
            $valid[] = $processor;
    }
    $processors = $valid;
    if(count($processors) > WANT_PROCESSORS) {
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            posix_kill($processors[$ix], SIGTERM);
        for($ix = count($processors) - 1; $ix >= WANT_PROCESSORS; $ix--)
            pcntl_waitpid($processors[$ix], $trash);
    }
    elseif(count($processors) < WANT_PROCESSORS) {
        for($ix = count($processors); $ix < WANT_PROCESSORS; $ix++)
            spawn_processor();
    }
}

if( isset($argv) && count($argv) > 1 ) {
    if( $argv[1] == 'kill' ) {
        restore_processors_state();
        kill_processors();
        save_processors_state();

        exit(0);
    }
}

spawn_processors();
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top