Question

I wrote a PHP script which retrieved data via libcurl and processed it. It worked fine but for performance reasons I changed it to use dozens of workers (threads). The performance improved by more than 50 times, however now php.exe is crashing every few minutes and the faulting module listed is php_curl.dll. I do have prior experience with multi-threading in C, but haven't used it at all before in php.

I googled around and supposedly cURL is thread safe (as of 2001): http://curl.haxx.se/mail/lib-2001-01/0001.html But I can't find any mention of whether or not php_curl is thread safe.

In case it matters, I am running php from the command line. My setup is Win7 x64, PHP 5.5.11 Thread Safe VC11 x86, PHP pthreads 2.0.4 for PHP 5.5 Thread Safe VC11 x86.

Here is some pseudo code to show what I am doing

class MyWorker extends Worker
{
    ...
    public function run()
    {
        ...
        while(1)
        {
            ...
            runCURL();
            ...
            sleep(1);
        }
    }
}

function runCURL()
{
    static $curlHandle = null;
    ...
    if(is_null($curlHandle))
    {
        $curlHandle = curl_init();
        curl_setopt($curlHandle, CURLOPT_RETURNTRANSFER, TRUE);
        curl_setopt($curlHandle, CURLOPT_USERAGENT, "My User Agent String");
    }
    curl_setopt($curlHandle, CURLOPT_URL, "The URL");
    curl_setopt($curlHandle, CURLOPT_POSTFIELDS, $data);
    curl_setopt($curlHandle, CURLOPT_HTTPHEADER, $header);
    curl_setopt($curlHandle, CURLOPT_SSL_VERIFYPEER, false);

    $result = curl_exec($curlHandle);
    ...
}
Was it helpful?

Solution

Firstly, resource types are officially unsupported by pthreads; a curl handle is a resource, you therefore should not store curl handles in the object scope of pthreads objects, since they might become corrupted.

Making it easy

pthreads provides an easy way to use workers...

The easiest way to execute among many threads is to use the built in Pool class provided by pthreads:

http://php.net/pool

The following code demonstrates how to pool a bunch of requests in a few background threads:

<?php
define("LOG", Mutex::create());

function slog($message, $args = []) {
    $args = func_get_args();
    if (($message = array_shift($args))) {
        Mutex::lock(LOG);
        echo vsprintf("{$message}\n", $args);
        Mutex::unlock(LOG);
    }
}

class Request extends Threaded {

    public function __construct($url, $post = []) {
        $this->url = $url;
        $this->post = $post;
    }

    public function run() {
        $curl = curl_init();

        curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($curl, CURLOPT_URL, $this->url);

        if ($this->post) {
            curl_setopt($curl, CURLOPT_POSTFIELDS, $this->post);
        }

        $response = curl_exec($curl);

        slog("%s returned %d bytes", $this->url, strlen($response));
    }

    public function getURL()      { return $this->url;      }
    public function getPost()     { return $this->post;     }

    protected $url;
    protected $post;
}

$max = 100;
$urls   = [];
while (count($urls) < $max) {
    $urls[] = sprintf(
        "http://www.google.co.uk/?q=%s", 
        md5(mt_rand()*count($urls)));
}

$pool = new Pool(4);

foreach ($urls as $url) {
    $pool->submit(new Request($url));
}

$pool->shutdown();

Mutex::destroy(LOG);
?>

Your specific task requires that you now process the data, you can either write this functionality into a design like the above ... or

Making it fancy

promises are a super fancy form of concurrency ...

Promises suit the nature of the task here:

  • First: Make a request
  • Then: Process response

The following code shows how to use pthreads/promises to make the same request and process responses:

<?php
namespace {
    require_once("vendor/autoload.php");

    use pthreads\PromiseManager;
    use pthreads\Promise;
    use pthreads\Promisable;
    use pthreads\Thenable;

    define("LOG", Mutex::create());

    function slog($message, $args = []) {
        $args = func_get_args();
        if (($message = array_shift($args))) {
            Mutex::lock(LOG);
            echo vsprintf("{$message}\n", $args);
            Mutex::unlock(LOG);
        }
    }

    /* will be used by everything to report errors when they occur */
    trait ErrorManager {
        public function onError(Promisable $promised) {
            slog("Oh noes: %s\n", (string) $promised->getError());
        }
    }

    class Request extends Promisable {
        use ErrorManager;

        public function __construct($url, $post = []) {
            $this->url = $url;
            $this->post = $post;
            $this->done = false;
        }

        public function onFulfill() {
            $curl = curl_init();

            curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
            curl_setopt($curl, CURLOPT_URL, $this->url);

            if ($this->post) {
                curl_setopt($curl, CURLOPT_POSTFIELDS, $this->post);
            }

            $this->response = curl_exec($curl);
        }

        public function getURL()      { return $this->url;      }
        public function getPost()     { return $this->post;     }
        public function getResponse() { return $this->response; }
        public function setGarbage()  { $this->garbage = true; }
        public function isGarbage()   { return $this->garbage; }

        protected $url;
        protected $post;
        protected $response;
        protected $garbage;
    }

    class Process extends Thenable {
        use ErrorManager;

        public function onFulfilled(Promisable $request) {
            slog("%s returned %d bytes\n",  
                $request->getURL(), strlen($request->getResponse()));
        }
    }

    /* some dummy urls */
    $max = 100;
    $urls   = [];
    while (count($urls) < $max) {
        $urls[] = sprintf(
            "http://www.google.co.uk/?q=%s", 
            md5(mt_rand()*count($urls)));
    }

    /* initialize manager for promises */
    $manager = new PromiseManager(4);

    /* create promises to make and process requests */
    while (@++$id < $max) {
        $promise = new Promise($manager, new Request($urls[$id], []));
        $promise->then(
            new Process($promise));
    }

    /* force the manager to shutdown (fulfilling all promises first) */
    $manager->shutdown();

    /* destroy mutex */
    Mutex::destroy(LOG);
}
?>

Composer:

{
    "require": {
        "krakjoe/promises": ">=1.0.2"
    }
}

Note that Request has hardly changed, all that has been added is somewhere to hold the response and a means to detect if the objects are garbage.

For details on garbage collection from pools, which applies to both examples:

http://php.net/pool.collect

The slog function exists only to make logged output readable

Making it clear

pthreads is not a new PDO driver ...

Many people approach using pthreads as they would approach using a new PDO driver - assume it works like the rest of PHP and that everything will be fine.

Everything might not be fine, and requires research: we are pushing the envelope, in doing so some "restrictions" must be placed upon the architecture of pthreads to maintain stability, this can have some strange side effects.

While pthreads comes with exhaustive documentation which mostly include examples in the PHP manual, I'm not able to attach the following document in the manual, yet.

The following document provides you with an understanding of the internals of pthreads, everyone should read it, it's written for you.

https://gist.github.com/krakjoe/6437782

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top