diff --git a/src/StompClient.php b/src/StompClient.php index 785d7fa..75fef90 100644 --- a/src/StompClient.php +++ b/src/StompClient.php @@ -78,7 +78,7 @@ public function __destruct() $this->pw = null; if ($this->stomp) { foreach ($this->queues as $queue) { - $this->stomp->unsubscribe($queue, ['id' => $stomp->getSessionId()]); + $this->stomp->unsubscribe($queue, ['id' => $this->stomp->getSessionId()]); } } $this->stomp = null; @@ -107,16 +107,22 @@ public function send( $result = true; try { $stomp = $this->getStomp(); - $stomp->begin($transactionId); - $stomp->send($destination, $body, $headers); - $stomp->commit($transactionId); + if (!$stomp->begin($transactionId)) { + throw new Exception('Transaction does not started'); + } + if (!$stomp->send($destination, $body, $headers)) { + throw new Exception('Message does not sended. Headers: ' . implode(', ', $headers)); + } + if (!$stomp->commit($transactionId)) { + throw new Exception('Transaction does not completed'); + } } catch (Exception $e) { $result = false; $this->getStomp()->abort($transactionId); // Логирование в случае, если установлен логгер - $this->putInLog(LogLevel::ERROR, 'Stomp transaction failed. Transaction id: ' . $transactionId, [ + $this->putInLog(LogLevel::ERROR, 'Stomp::send failed. Transaction id: ' . $transactionId, [ 'Message' => $e->getMessage(), 'Code' => $e->getCode(), 'File' => $e->getFile(), @@ -148,7 +154,7 @@ public function getNextFrame() 'Line' => $ex->getLine() ]); - throw $ex; + return null; } } @@ -162,7 +168,11 @@ public function ack($frame) $id = !empty($frame->headers['ack']) ? $frame->headers['ack'] : $frame; try { - return $this->getStomp()->ack($id, ['id' => $id]); + if (!$this->getStomp()->ack($id, ['id' => $id])) { + throw new Exception('Frame with id: ' . $id . ' does not acked'); + } + + return true; } catch (Exception $ex) { // Логирование в случае, если установлен логгер $this->putInLog(LogLevel::ERROR, 'Stomp::ack failed', [ @@ -173,21 +183,25 @@ public function ack($frame) 'frame' => $frame ]); - throw $ex; + return false; } } /** * @param $frame * - * @return mixed + * @return bool */ public function nack($frame) { $id = !empty($frame->headers['ack']) ? $frame->headers['ack'] : $frame; try { - return $this->getStomp()->nack($id, ['id' => $id]); + if (!$this->getStomp()->nack($id, ['id' => $id])) { + throw new Exception('Frame with id: ' . $id . ' does not nacked'); + } + + return true; } catch (Exception $ex) { // Логирование в случае, если установлен логгер $this->putInLog(LogLevel::ERROR, 'Stomp::nack failed', [ @@ -198,7 +212,7 @@ public function nack($frame) 'frame' => $frame ]); - throw $ex; + return false; } } @@ -215,9 +229,11 @@ public function subscribe($queues) } try { - $stomp = $this->getStomp(); foreach ($this->queues as $queue) { - $stomp->subscribe($queue, ['id' => $stomp->getSessionId()]); + $stomp = $this->getStomp(); + if (!$stomp->subscribe($queue, ['id' => $stomp->getSessionId()])) { + throw new Exception('Queue: ' . $queue); + } } } catch (Exception $ex) { // Логирование в случае, если установлен логгер @@ -245,9 +261,11 @@ public function unsubscribe() } try { - $stomp = $this->getStomp(); foreach ($this->queues as $queue) { - $stomp->unsubscribe($queue, ['id' => $stomp->getSessionId()]); + $stomp = $this->getStomp(); + if (!$stomp->unsubscribe($queue, ['id' => $stomp->getSessionId()])) { + throw new Exception('Queue: ' . $queue); + } } } catch (Exception $ex) { // Логирование в случае, если установлен логгер @@ -276,7 +294,7 @@ private function getStomp() } if (!empty($this->stomp->error())) { - $this->putInLog(LogLevel::INFO, 'Stomp::getStomp error', [ + $this->putInLog(LogLevel::ERROR, 'Stomp::getStomp error', [ 'error' => $this->stomp->error(), ]); @@ -307,7 +325,7 @@ private function initStomp() } catch (Exception $ex) { $errors[] = $ex->getMessage(); if ($i === count($this->hosts)) { - throw new StompClientException("StompClient cannot connect to: " . implode(', ',$this->hosts) . '.Errors: ' . implode(', ',$errors)); + throw new StompClientException("Cannot connect to: " . implode(', ',$this->hosts) . '.Errors: ' . implode(', ',$errors)); } } }