Firstly, resource
types are officially unsupported by pthreads; a curl handle is a resource
, you therefore should not store curl handles in the object scope of pthreads
objects, since they might become corrupted.
Making it easy
pthreads provides an easy way to use workers...
The easiest way to execute among many threads is to use the built in Pool
class provided by pthreads:
The following code demonstrates how to pool a bunch of requests in a few background threads:
<?php
define("LOG", Mutex::create());
function slog($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
Mutex::lock(LOG);
echo vsprintf("{$message}\n", $args);
Mutex::unlock(LOG);
}
}
class Request extends Threaded {
public function __construct($url, $post = []) {
$this->url = $url;
$this->post = $post;
}
public function run() {
$curl = curl_init();
curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
curl_setopt($curl, CURLOPT_URL, $this->url);
if ($this->post) {
curl_setopt($curl, CURLOPT_POSTFIELDS, $this->post);
}
$response = curl_exec($curl);
slog("%s returned %d bytes", $this->url, strlen($response));
}
public function getURL() { return $this->url; }
public function getPost() { return $this->post; }
protected $url;
protected $post;
}
$max = 100;
$urls = [];
while (count($urls) < $max) {
$urls[] = sprintf(
"http://www.google.co.uk/?q=%s",
md5(mt_rand()*count($urls)));
}
$pool = new Pool(4);
foreach ($urls as $url) {
$pool->submit(new Request($url));
}
$pool->shutdown();
Mutex::destroy(LOG);
?>
Your specific task requires that you now process the data, you can either write this functionality into a design like the above ... or
Making it fancy
promises are a super fancy form of concurrency ...
Promises suit the nature of the task here:
- First: Make a request
- Then: Process response
The following code shows how to use pthreads/promises
to make the same request and process responses:
<?php
namespace {
require_once("vendor/autoload.php");
use pthreads\PromiseManager;
use pthreads\Promise;
use pthreads\Promisable;
use pthreads\Thenable;
define("LOG", Mutex::create());
function slog($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
Mutex::lock(LOG);
echo vsprintf("{$message}\n", $args);
Mutex::unlock(LOG);
}
}
/* will be used by everything to report errors when they occur */
trait ErrorManager {
public function onError(Promisable $promised) {
slog("Oh noes: %s\n", (string) $promised->getError());
}
}
class Request extends Promisable {
use ErrorManager;
public function __construct($url, $post = []) {
$this->url = $url;
$this->post = $post;
$this->done = false;
}
public function onFulfill() {
$curl = curl_init();
curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
curl_setopt($curl, CURLOPT_URL, $this->url);
if ($this->post) {
curl_setopt($curl, CURLOPT_POSTFIELDS, $this->post);
}
$this->response = curl_exec($curl);
}
public function getURL() { return $this->url; }
public function getPost() { return $this->post; }
public function getResponse() { return $this->response; }
public function setGarbage() { $this->garbage = true; }
public function isGarbage() { return $this->garbage; }
protected $url;
protected $post;
protected $response;
protected $garbage;
}
class Process extends Thenable {
use ErrorManager;
public function onFulfilled(Promisable $request) {
slog("%s returned %d bytes\n",
$request->getURL(), strlen($request->getResponse()));
}
}
/* some dummy urls */
$max = 100;
$urls = [];
while (count($urls) < $max) {
$urls[] = sprintf(
"http://www.google.co.uk/?q=%s",
md5(mt_rand()*count($urls)));
}
/* initialize manager for promises */
$manager = new PromiseManager(4);
/* create promises to make and process requests */
while (@++$id < $max) {
$promise = new Promise($manager, new Request($urls[$id], []));
$promise->then(
new Process($promise));
}
/* force the manager to shutdown (fulfilling all promises first) */
$manager->shutdown();
/* destroy mutex */
Mutex::destroy(LOG);
}
?>
Composer:
{
"require": {
"krakjoe/promises": ">=1.0.2"
}
}
Note that Request
has hardly changed, all that has been added is somewhere to hold the response and a means to detect if the objects are garbage.
For details on garbage collection from pools, which applies to both examples:
The slog
function exists only to make logged output readable
Making it clear
pthreads is not a new PDO driver ...
Many people approach using pthreads
as they would approach using a new PDO driver - assume it works like the rest of PHP and that everything will be fine.
Everything might not be fine, and requires research: we are pushing the envelope, in doing so some "restrictions" must be placed upon the architecture of pthreads to maintain stability, this can have some strange side effects.
While pthreads comes with exhaustive documentation which mostly include examples in the PHP manual, I'm not able to attach the following document in the manual, yet.
The following document provides you with an understanding of the internals of pthreads, everyone should read it, it's written for you.