문제

I am currently testing gearman to process parallel tasks/requests from web frontend, the gearman cleint receive the requests with post params via Ajax then create the tasks and send them to gearman worker. 21 instances of worker process are runing to deal with multi requests from different clients at the same time. everything is working fine with one client request at a time, but when multi clients request at same time, the clients will get wrong results for the info they requested. for example if client A requested info for customer_id 123, and client B requetsed info for customer_id 456 and both requests fired at the same time, client A will get result of client B and client B will get result of client A. I tried to separate the functions on different workers but same issue exist. Please help me to find the issue on my code.

I was using CURL-multi with no issue (here) but recently decided to try and test gearman to see if I can get more performance.

Here is my code for the client and worker:

client code:

<?php

header("Expires: Mon, 26 Jul 1997 05:00:00 GMT");    // Date in the past
header("Cache-Control: no-store, no-cache, must-revalidate");  // HTTP/1.1
header("Cache-Control: post-check=0, pre-check=0", false);
header("Pragma: no-cache");


$order_id   = $_POST['order_id'];
$customer_id    = $_POST['customer_id'];

//some validation code here


class DataCollector extends GearmanClient{
    private $data     = array();
    private $tmpArr   = array();

    function addData($content){
        if($content){
            $this->tmpArr   = json_decode($content, true);
            $this->data     = array_merge($this->tmpArr, $this->data);
            //$this->data[] = json_decode($content, true);
        }
    }
    function getData(){
        return $this->data;
    }
    function outputData(){
        echo json_encode($this->getData());
    }

    function taskCompleted($task){

       $this->addData($task->data());

    }

}


$collector = new DataCollector();

$collector->addServer();

# set a function to be called when the work is complete
$collector->setCompleteCallback(array( $collector, "taskCompleted"));


//params to pass to worker
$queryStr = array(
      "order_id" => $order_id,
      "customer_id" => $customer_id
);

$postData = serialize( $queryStr );

# add tasks to be executed in parallel in Gearman server
$collector->addTask("getdata_orderDetails", $postData, null, "1");
$collector->addTask("getdata_customerDetails", $postData, null, "2");

# run the tasks in parallel
$collector->runTasks();


# output the data 
$collector->outputData();

?>

the worker code:

<?php

class Worker{
    private $worker;
    static $conn;

public function __construct(){

    try{
        self::$conn = oci_connect($user, $pass, $db); // create db connection
    }
    catch (Exception $e) {
        echo "ERROR: " . $e->getMessage();
    }

    $this->worker = new GearmanWorker();
    $this->worker->addServer();

    # Register functions
    $this->worker->addFunction("getdata_orderDetails",    array($this, "getdata_orderDetails_fn"));
    $this->worker->addFunction("getdata_customerDetails", array($this, "getdata_customerDetails_fn"));
}

public function run(){

    while (1) {
      //print "Waiting for job...\n";
      $this->worker->work(); 
      if ($this->worker->returnCode() != GEARMAN_SUCCESS) {
        echo "return_code: " . $this->worker->returnCode() . "\n";
        break;
      }
    }



}


static function getdata_orderDetails_fn($job){


        if(!self::$conn){
        $responseArr = array(
            'response_status'           => -1,         //failed
            'response_message'          => 'Database connection lost',
            'response_id'               => 'DatabaseConnectionErr'
            );

        return  json_encode($responseArr);
        }

    $postData = unserialize($job->workload());

    $order_id   = $postData['order_id'];
    $customer_id    = $postData['customer_id'];

    $sql = "select order_id, order_status, create_date from customer_order where order_id= :order_id";
    $stmt = oci_parse(self::$conn, $sql);
    oci_bind_by_name($stmt, ":order_id", $order_id, -1);
    oci_execute($stmt);
    oci_fetch($stmt);
    $order_id   = oci_result($stmt, 'ORDER_ID');
    $order_status   = oci_result($stmt, 'ORDER_STATUS');
    $create_date    = oci_result($stmt, 'CREATE_DATE');
    oci_free_statement($stmt);


    $responseArr = array(
        'response_status'   => 1,  
        'response_message'  => 'success',
        'response_id'       => 'order_details',
        'order_id'          => $order_id,
        'order_status'      => $order_status,
        'create_date'           => $create_date
    );

    // send result
    return  json_encode($responseArr);
}

static function getdata_customerDetails_fn($job){


        if(!self::$conn){
        $responseArr = array(
            'response_status'   => -1,         //failed
            'response_message'  => 'Database connection lost',
            'response_id'       => 'DatabaseConnectionErr'
            );

        return  json_encode($responseArr);
        }

    $postData = unserialize($job->workload());

    $order_id   = $postData['order_id'];
    $customer_id    = $postData['customer_id'];

    $sql = "select customer_id, customer_fname, customer_lname, customer_address, customer_contact where customer_id= :customer_id";
    $stmt = oci_parse(self::$conn, $sql);
    oci_bind_by_name($stmt, ":customer_id", $customer_id, -1);
    oci_execute($stmt);
    oci_fetch($stmt);
    $customer_id        = oci_result($stmt, 'CUSTOMER_ID');
    $customer_fname     = oci_result($stmt, 'CUSTOMER_FNAME');
    $customer_lname     = oci_result($stmt, 'CUSTOMER_LNAME');
    $customer_address   = oci_result($stmt, 'CUSTOMER_ADDRESS');
    $customer_contact   = oci_result($stmt, 'CUSTOMER_CONTACT');
    oci_free_statement($stmt);

    $responseArr = array(
        'response_id'       => 'customer_details',
        'response_status'   => 1,  
        'response_message'  => 'success',
        'customer_id'       => $customer_id,
        'customer_fname'    => $customer_fname,
        'customer_lname'    => $customer_lname,
        'customer_address'  => $customer_address,
        'customer_contact'  => $customer_contact
    );

    // send result
    return  json_encode($responseArr);

}

}//class


 //start worker
  $worker = new Worker();
  $worker->run();


?>
도움이 되었습니까?

해결책 2

I've solved the issue after getting help from gearman group: https://groups.google.com/forum/m/#!topic/gearman/q3EV7mvHKDs

the solution for my case is by making each task unique based on the $postData. This way if two or more clients requesting same info (eg order_id or customer_id), gearman will queue concurrent requests with same unique task id and will reply to all clients with same result (ie one request will be processed only):

$collector->addTask("getdata_orderDetails", $postData, null, md5($order_id);
$collector->addTask("getdata_customerDetails", $postData, null, md5($customer_id);

다른 팁

I believe problem is that your tasks ID are not unique. Try to assign unique ID to each task, or do not specify task ID at all:

$collector->addTask("getdata_orderDetails", $postData);
$collector->addTask("getdata_customerDetails", $postData);

# run the tasks in parallel
$collector->runTasks();
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top