Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivanov Sergey committed Jan 12, 2018
1 parent 07de11d commit ab1da52
Showing 1 changed file with 35 additions and 17 deletions.
52 changes: 35 additions & 17 deletions src/StompClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public function __destruct()
$this->pw = null;
if ($this->stomp) {
foreach ($this->queues as $queue) {
$this->stomp->unsubscribe($queue, ['id' => $stomp->getSessionId()]);
$this->stomp->unsubscribe($queue, ['id' => $this->stomp->getSessionId()]);
}
}
$this->stomp = null;
Expand Down Expand Up @@ -107,16 +107,22 @@ public function send(
$result = true;
try {
$stomp = $this->getStomp();
$stomp->begin($transactionId);
$stomp->send($destination, $body, $headers);
$stomp->commit($transactionId);
if (!$stomp->begin($transactionId)) {
throw new Exception('Transaction does not started');
}
if (!$stomp->send($destination, $body, $headers)) {
throw new Exception('Message does not sended. Headers: ' . implode(', ', $headers));
}
if (!$stomp->commit($transactionId)) {
throw new Exception('Transaction does not completed');
}
} catch (Exception $e) {
$result = false;

$this->getStomp()->abort($transactionId);

// Логирование в случае, если установлен логгер
$this->putInLog(LogLevel::ERROR, 'Stomp transaction failed. Transaction id: ' . $transactionId, [
$this->putInLog(LogLevel::ERROR, 'Stomp::send failed. Transaction id: ' . $transactionId, [
'Message' => $e->getMessage(),
'Code' => $e->getCode(),
'File' => $e->getFile(),
Expand Down Expand Up @@ -148,7 +154,7 @@ public function getNextFrame()
'Line' => $ex->getLine()
]);

throw $ex;
return null;
}
}

Expand All @@ -162,7 +168,11 @@ public function ack($frame)
$id = !empty($frame->headers['ack']) ? $frame->headers['ack'] : $frame;

try {
return $this->getStomp()->ack($id, ['id' => $id]);
if (!$this->getStomp()->ack($id, ['id' => $id])) {
throw new Exception('Frame with id: ' . $id . ' does not acked');
}

return true;
} catch (Exception $ex) {
// Логирование в случае, если установлен логгер
$this->putInLog(LogLevel::ERROR, 'Stomp::ack failed', [
Expand All @@ -173,21 +183,25 @@ public function ack($frame)
'frame' => $frame
]);

throw $ex;
return false;
}
}

/**
* @param $frame
*
* @return mixed
* @return bool
*/
public function nack($frame)
{
$id = !empty($frame->headers['ack']) ? $frame->headers['ack'] : $frame;

try {
return $this->getStomp()->nack($id, ['id' => $id]);
if (!$this->getStomp()->nack($id, ['id' => $id])) {
throw new Exception('Frame with id: ' . $id . ' does not nacked');
}

return true;
} catch (Exception $ex) {
// Логирование в случае, если установлен логгер
$this->putInLog(LogLevel::ERROR, 'Stomp::nack failed', [
Expand All @@ -198,7 +212,7 @@ public function nack($frame)
'frame' => $frame
]);

throw $ex;
return false;
}
}

Expand All @@ -215,9 +229,11 @@ public function subscribe($queues)
}

try {
$stomp = $this->getStomp();
foreach ($this->queues as $queue) {
$stomp->subscribe($queue, ['id' => $stomp->getSessionId()]);
$stomp = $this->getStomp();
if (!$stomp->subscribe($queue, ['id' => $stomp->getSessionId()])) {
throw new Exception('Queue: ' . $queue);
}
}
} catch (Exception $ex) {
// Логирование в случае, если установлен логгер
Expand Down Expand Up @@ -245,9 +261,11 @@ public function unsubscribe()
}

try {
$stomp = $this->getStomp();
foreach ($this->queues as $queue) {
$stomp->unsubscribe($queue, ['id' => $stomp->getSessionId()]);
$stomp = $this->getStomp();
if (!$stomp->unsubscribe($queue, ['id' => $stomp->getSessionId()])) {
throw new Exception('Queue: ' . $queue);
}
}
} catch (Exception $ex) {
// Логирование в случае, если установлен логгер
Expand Down Expand Up @@ -276,7 +294,7 @@ private function getStomp()
}

if (!empty($this->stomp->error())) {
$this->putInLog(LogLevel::INFO, 'Stomp::getStomp error', [
$this->putInLog(LogLevel::ERROR, 'Stomp::getStomp error', [
'error' => $this->stomp->error(),
]);

Expand Down Expand Up @@ -307,7 +325,7 @@ private function initStomp()
} catch (Exception $ex) {
$errors[] = $ex->getMessage();
if ($i === count($this->hosts)) {
throw new StompClientException("StompClient cannot connect to: " . implode(', ',$this->hosts) . '.Errors: ' . implode(', ',$errors));
throw new StompClientException("Cannot connect to: " . implode(', ',$this->hosts) . '.Errors: ' . implode(', ',$errors));
}
}
}
Expand Down

0 comments on commit ab1da52

Please sign in to comment.