Question

I'm making my own PHP-gateway for STOMP (Apollo) because all existing solutions are either too bad or too complicated.

So when I'm sending a message to a queue without transaction (but with receipts), everything ok, see the log:

SEND
destination:/queue/teststst
content-type:text/plain
content-length:100
receipt:f1dd03450508d938e6eb8196a6d128c4

2149b4a936862b121f7928ed5c060152
75235e0fbc8a56970ede75d2147b538b
fece2e4403b52fd903b8e7a78b1918c6

RECEIPT
receipt-id:f1dd03450508d938e6eb8196a6d128c4

But when it comes to transactions, I get this (I removed handshake log, don't worry):

BEGIN
transaction:499cc8a062be1235d312e968e5f30802
receipt:f7c837aed5ee9efd8f27143d85061067

RECEIPT
receipt-id:f7c837aed5ee9efd8f27143d85061067


SEND
destination:/queue/teststst
content-type:text/plain
content-length:100
transaction:499cc8a062be1235d312e968e5f30802
receipt:7048ce3f8a01b55294f5e92fcd501c93

f4c356386a563be82f460900889ea07f
bdd8a503ac3e5ddf0d816f95e9448e1e
e5b7ea903beb7f6300249217e9824ef0

And then nothing. My wrapper tries to receive receipt frame, but there is just a fgets timeout, looks like the broker waits for more SEND frame data, but the process of generating the frame is identical, there is just one more header (transaction). All necessary EOLs and null octets are on their places.

Apollo v1.6, STOMP v1.2.

What could it be?..

UPDATE: Source code

<?php
class Stompler {
  const EOL = "\n";
  const NULL_OCTET = "\x00";
  const STATE_HEADER = 1;
  const STATE_BODY   = 2;

  protected $subscription = false;
  protected $transactionStack;
  protected $connection;
  protected $socket;
  protected $possibleFrameTypes = [
    'server' => [
      'MESSAGE',
      'RECEIPT',
      'ERROR',
      'CONNECTED',
    ],
  ];

  public function send($message, $queueName, $async = false) {
    $this->connect();

    $params = [
      'destination' => $queueName,
      'content-type' => 'text/plain',
      'content-length' => mb_strlen($message . static::EOL),
    ];

    if (isset($this->transactionStack) && !$this->transactionStack->isEmpty()) {
      $params['transaction'] = $this->transactionStack->top();
    }

    if ($async === false) {
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    }

    $this->sendFrame('send', $params, $message);

    if ($async === false) $this->checkReceipt($receiptId, 'send');
  }

  public function subscribe($queueName, $async = false, $autoAck = false) {
    $this->connect();

    if ($this->subscription === true) {
      throw new StomplerException('Another subscription has already been started');
    }

    $this->subscription = true;

    $params = [
      'id' => 1,
      'destination' => $queueName,
      'ack' => ($autoAck === true ? 'auto' : 'client'),
    ];

    if ($async === false) {
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    }

    $this->sendFrame('subscribe', $params);

    if ($async === false) $this->checkReceipt($receiptId, 'subscribe');
  }

  public function unsubscribe() {
    if ($this->subscription === false) {
      throw new StomplerException('No subscription');
    }

    $this->subscription = false;

    $this->sendFrame('unsubscribe', [
      'id' => 1,
    ]);
  }

  public function connect() {
    if (!empty($this->connection)) return;

    $config = [...];

            $this->socket = fsockopen('tcp://' . $config['host'], $config['port']);
            if (empty($this->socket)) throw new StomplerConnectionException;

            stream_set_timeout($this->socket, 2);

            $this->sendFrame('connect', [
          'accept-version' => '1.2',
          'login'          => $config['login'],
          'passcode'       => $config['password'],
          'virtual-host'   => 'srv',
          'host'           => 'srv',
        ]);
            $frame = $this->readFrame();

            if ($frame['name'] === 'ERROR') {
          throw new StomplerConnectionException("Could not connect to broker: '{$frame['headers']['message']}' ({$frame['body']})");
        }
            if ($frame['name'] !== 'CONNECTED') {
          throw new StomplerConnectionException;
        }

            $this->connection = $frame['headers']['session'];
        }

  public function ack($message, $async = false) {
    $id = is_array($message) ? $message['headers']['ack'] : $message;

    $params = [
      'id' => $id,
    ];

    if (isset($this->transactionStack) && !$this->transactionStack->isEmpty()) {
      $params['transaction'] = $this->transactionStack->top();
    }

    if ($async === false) {
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    }

    $this->sendFrame('ack', $params);
    if ($async === false) $this->checkReceipt($receiptId, 'ack');
  }

  public function nack($message, $async = false) {
    $id = is_array($message) ? $message['headers']['ack'] : $message;

    $params = [
      'id' => $id,
    ];

    if (isset($this->transactionStack) && !$this->transactionStack->isEmpty()) {
      $params['transaction'] = $this->transactionStack->top();
    }

    if ($async === false) {
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    }

    $this->sendFrame('nack', $params);
    if ($async === false) $this->checkReceipt($receiptId, 'nack');
  }

  public function begin($async = false) {
    $this->connect();

    if ($this->transactionStack === null) {
      $this->transactionStack = new \SplStack();
    }

    $this->transactionStack->unshift($this->generateRandom());

    $params = ['transaction' => $this->transactionStack->top()];

    if ($async === false) {
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    }

    $this->sendFrame('begin', $params);
    if ($async === false) $this->checkReceipt($receiptId, 'begin');
  }

  public function commit() {
    if (empty($this->transactionStack) || $this->transactionStack->isEmpty()) {
      throw new StomplerException('No transaction started');
    }

    $this->sendFrame('commit', ['transaction' => $this->transactionStack->pop()]);
  }

  public function abort($transactionId) {
    if (empty($this->transactionStack) || $this->transactionStack->isEmpty()) {
      throw new StomplerException('No transaction started');
    }

    $this->sendFrame('abort', ['transaction' => $this->transactionStack->pop()]);
  }

  public function readFrame($expectedFrameName = null) {
    $started = time();
    $frame = [
      'name' => null,
      'headers' => [],
      'body' => null,
    ];
    $state = false;
    $frameName = false;

    //echo '-------RECV--------' . PHP_EOL;
    while (true) {
      if (feof($this->socket) || ((time() - $started) > 1)) return false;

      $frameLine = fgets($this->socket);

      echo $frameLine;

      if ($state === static::STATE_HEADER) {
        $header = rtrim($frameLine, static::EOL);
        if (!empty($header)) {
          list($k, $v) = explode(':', $header);
          $frame['headers'][$k] = $v;
        }
      }
      if ($state === static::STATE_BODY) $frame['body'] .= $frameLine;

      if ($state === false) {
        $frameName = $frame['name'] = rtrim($frameLine, static::EOL);
        if (!in_array($frameName, $this->possibleFrameTypes['server'])) {
          if (empty($frameName)) return false;
          throw new StomplerUnknownFrameException($frameName);
        }
        if ($expectedFrameName !== null && $frameName !== mb_strtoupper($expectedFrameName)) {
          throw new StomplerUnexpectedFrameException($frameName);
        }
        $state = static::STATE_HEADER;
      }
      elseif ($state === static::STATE_HEADER && $frameLine === static::EOL) {
        $state = static::STATE_BODY;
      }
      elseif ($state === static::STATE_BODY && $this->detectNullOctet($frameLine)) {
        break;
      }
    }
    //echo '-------RECV--------' . PHP_EOL;

    if ($frame['body'] !== null) $frame['body'] = rtrim($frame['body'], static::EOL . static::NULL_OCTET);
    if ($frame['name'] === null) return false;

    return $frame;
  }

  private function sendFrame($frameName, $frameParams, $body = null) {
    $frame = $this->compileFrame($frameName, $frameParams, $body);
    //echo '=======SEND========' . PHP_EOL;
    echo $frame;
    //echo '=======SEND========' . PHP_EOL;
    $result = fwrite($this->socket, $frame);

    if (empty($result)) {
      $md = stream_get_meta_data($this->socket);
      if($md['timed_out']) throw new StomplerTimeoutConnectionException;
      throw new StomplerUnknownConnectionException;
    }
  }

  private function compileFrame($name, $headers, $body = null) {
    $result = mb_strtoupper($name) . static::EOL;

    foreach ($headers as $key => $value) {
      $result .= $key;
      if ($value !== false) $result .= ':' . $value;
      $result .= static::EOL;
    }

    if ($body) $result .= static::EOL . $body;

    $result .= static::EOL . static::NULL_OCTET;

    return $result;
  }

  private function detectNullOctet($string) {
    return strpos($string, static::NULL_OCTET) === (mb_strlen($string) - 2);
  }

  private function checkReceipt($receiptId, $frameName) {
    $frameName = mb_strtoupper($frameName);

    try {
      $receiptFrame = $this->readFrame('RECEIPT');
      if ($receiptFrame['headers']['receipt-id'] != $receiptId) {
        throw new StomplerException("Wrong receipt for {$frameName} frame (expected {$receiptFrame}, received {$receiptFrame['headers']['receipt-id']})");
      }
    }
    catch (StomplerUnexpectedFrameException $e) {
      throw new StomplerException("Could not receive receipt frame for {$frameName} frame (received {$e->getMessage()} frame)");
    }
  }

  private function generateRandom() {
    return md5(uniqid('', true));
  }

  public function __destruct() {
    if (empty($this->socket)) return;

    $this->connection = null;
    fclose($this->socket);
  }
}
Was it helpful?

Solution

Solved it. If you start a transaction, there is no need of receipt headers, thus the official STOMP documentation (maybe, this is Apollo-related issue) must be fixed.

OTHER TIPS

Are you sure your sending the stomp COMMIT frame? Receivers wont get transacted SEND frames until the related transaction commits.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top