From dd0c882334eb5849660e9ebc2e9c6972b1c251cb Mon Sep 17 00:00:00 2001 From: kostya Date: Fri, 24 Feb 2017 11:07:19 +0200 Subject: [PATCH 1/2] CRM-7648: Possible deadlocks causing queries during consumer work --- .../RedeliverOrphanMessagesDbalExtension.php | 4 +- .../RejectMessageOnExceptionDbalExtension.php | 3 +- .../Transport/Dbal/DbalMessageConsumer.php | 105 +++++++++++++++--- 3 files changed, 95 insertions(+), 17 deletions(-) diff --git a/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtension.php b/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtension.php index 279fc951512..5427b3edd86 100644 --- a/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtension.php +++ b/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtension.php @@ -51,7 +51,7 @@ public function onBeforeReceive(Context $context) $sql = sprintf( 'UPDATE %s SET consumer_id=NULL, delivered_at=NULL, redelivered=:isRedelivered '. - 'WHERE delivered_at <= :deliveredAt', + 'WHERE delivered_at <= :deliveredAt AND consumer_id=:consumerId', $connection->getTableName() ); @@ -60,10 +60,12 @@ public function onBeforeReceive(Context $context) [ 'isRedelivered' => true, 'deliveredAt' => time() - $this->orphanTime, + 'consumerId' => $context->getMessageConsumer()->getId(), ], [ 'isRedelivered' => Type::BOOLEAN, 'deliveredAt' => Type::INTEGER, + 'consumerId' => Type::STRING, ] ); diff --git a/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RejectMessageOnExceptionDbalExtension.php b/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RejectMessageOnExceptionDbalExtension.php index 67926a4c113..19828d8db15 100644 --- a/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RejectMessageOnExceptionDbalExtension.php +++ b/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RejectMessageOnExceptionDbalExtension.php @@ -22,7 +22,8 @@ public function onInterrupted(Context $context) $context->getMessageConsumer()->reject($context->getMessage(), true); $context->getLogger()->debug( - '[RejectMessageOnExceptionDbalExtension] Execution was interrupted and message was rejected' + '[RejectMessageOnExceptionDbalExtension] Execution was interrupted and message was rejected. {id}', + ['id' => $context->getMessage()->getId()] ); } } diff --git a/src/Oro/Component/MessageQueue/Transport/Dbal/DbalMessageConsumer.php b/src/Oro/Component/MessageQueue/Transport/Dbal/DbalMessageConsumer.php index eaded32cd38..7b37f700c64 100644 --- a/src/Oro/Component/MessageQueue/Transport/Dbal/DbalMessageConsumer.php +++ b/src/Oro/Component/MessageQueue/Transport/Dbal/DbalMessageConsumer.php @@ -130,15 +130,45 @@ public function acknowledge(MessageInterface $message) { InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); - $affectedRows = $this->dbal->delete($this->connection->getTableName(), ['id' => $message->getId()], [ - 'id' => Type::INTEGER, - ]); - - if (1 !== $affectedRows) { - throw new \LogicException(sprintf( - 'Expected record was removed but it is not. id: "%s"', - $message->getId() - )); + $this->dbal->beginTransaction(); + + $sql = sprintf( + 'SELECT id FROM %s WHERE id=:id FOR UPDATE', + $this->connection->getTableName() + ); + + $row = $this->dbal->executeQuery( + $sql, + ['id' => $message->getId(),], + ['id' => Type::INTEGER,] + )->fetch(); + + if (count($row)) { + $affectedRows = null; + try { + $affectedRows = $this->dbal->delete($this->connection->getTableName(), ['id' => $message->getId()], [ + 'id' => Type::INTEGER, + ]); + $this->dbal->commit(); + } catch (\Exception $e) { + sleep(1); + try { + $affectedRows = $this->dbal->delete( + $this->connection->getTableName(), + ['id' => $message->getId()], + ['id' => Type::INTEGER,] + ); + $this->dbal->commit(); + } catch (\Exception $e) { + $this->dbal->rollBack(); + } + } + if (1 !== $affectedRows) { + throw new \LogicException(sprintf( + 'Expected record was removed but it is not. id: "%s"', + $message->getId() + )); + } } } @@ -151,13 +181,50 @@ public function reject(MessageInterface $message, $requeue = false) { InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); - $affectedRows = $this->dbal->delete($this->connection->getTableName(), ['id' => $message->getId()]); + $this->dbal->beginTransaction(); - if (1 !== $affectedRows) { - throw new \LogicException(sprintf( - 'Expected record was removed but it is not. id: "%s"', - $message->getId() - )); + $sql = sprintf( + 'SELECT id FROM %s WHERE id=:id FOR UPDATE', + $this->connection->getTableName() + ); + + $row = $this->dbal->executeQuery( + $sql, + [ + 'id' => $message->getId(), + ], + [ + 'id' => Type::INTEGER, + ] + )->fetch(); + + if (count($row)) { + $affectedRows = null; + try { + $affectedRows = $this->dbal->delete($this->connection->getTableName(), ['id' => $message->getId()], [ + 'id' => Type::INTEGER, + ]); + $this->dbal->commit(); + } catch (\Exception $e) { + sleep(1); + try { + $affectedRows = $this->dbal->delete( + $this->connection->getTableName(), + ['id' => $message->getId()], + ['id' => Type::INTEGER,] + ); + $this->dbal->commit(); + } catch (\Exception $e) { + + $this->dbal->rollBack(); + } + } + if (1 !== $affectedRows) { + throw new \LogicException(sprintf( + 'Expected record was removed but it is not. id: "%s"', + $message->getId() + )); + } } if ($requeue) { @@ -308,4 +375,12 @@ protected function convertMessage(array $dbalMessage) return $message; } + + /** + * @return string + */ + public function getId() + { + return $this->consumerId; + } } From ed82286db9ea0a1a4ca9bbdfe5a10770fac5490b Mon Sep 17 00:00:00 2001 From: kostya Date: Fri, 24 Feb 2017 11:34:14 +0200 Subject: [PATCH 2/2] CRM-7648: Possible deadlocks causing queries during consumer work -fixed tests --- ...deliverOrphanMessagesDbalExtensionTest.php | 15 +- .../RedeliverOrphanMessagesDbalExtension.php | 1 - .../RejectMessageOnExceptionDbalExtension.php | 2 +- ...deliverOrphanMessagesDbalExtensionTest.php | 24 ++-- ...ectMessageOnExceptionDbalExtensionTest.php | 6 +- .../Dbal/DbalMessageConsumerTest.php | 128 ++++++++++++++++-- .../Transport/Dbal/DbalMessageConsumer.php | 33 ++--- 7 files changed, 163 insertions(+), 46 deletions(-) diff --git a/src/Oro/Bundle/MessageQueueBundle/Tests/Functional/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtensionTest.php b/src/Oro/Bundle/MessageQueueBundle/Tests/Functional/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtensionTest.php index f6022ea1536..91673a9f359 100644 --- a/src/Oro/Bundle/MessageQueueBundle/Tests/Functional/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtensionTest.php +++ b/src/Oro/Bundle/MessageQueueBundle/Tests/Functional/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtensionTest.php @@ -6,6 +6,8 @@ use Oro\Component\MessageQueue\Consumption\Context; use Oro\Component\MessageQueue\Consumption\Dbal\Extension\RedeliverOrphanMessagesDbalExtension; use Oro\Component\MessageQueue\Test\DbalSchemaExtensionTrait; +use Oro\Component\MessageQueue\Transport\Dbal\DbalDestination; +use Oro\Component\MessageQueue\Transport\Dbal\DbalMessageConsumer; use Psr\Log\NullLogger; class RedeliverOrphanMessagesDbalExtensionTest extends WebTestCase @@ -35,8 +37,15 @@ public function testShouldRedeliverOrphanMessages() $connection = $this->createConnection('message_queue'); $dbal = $connection->getDBALConnection(); + // test + $session = $connection->createSession(); + $context = new Context($session); + $context->setLogger(new NullLogger()); + $consumer = new DbalMessageConsumer($session, new DbalDestination('default')); + $context->setMessageConsumer($consumer); + $dbal->insert('message_queue', [ - 'consumer_id' => 'consumer-id', + 'consumer_id' => $consumer->getId(), 'delivered_at' => strtotime('-1 year'), 'redelivered' => false, 'queue' => 'queue', @@ -47,10 +56,6 @@ public function testShouldRedeliverOrphanMessages() //guard $this->assertGreaterThan(0, $id); - // test - $context = new Context($connection->createSession()); - $context->setLogger(new NullLogger()); - $extension = new RedeliverOrphanMessagesDbalExtension(); $extension->onBeforeReceive($context); diff --git a/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtension.php b/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtension.php index 5427b3edd86..14b4d710737 100644 --- a/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtension.php +++ b/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RedeliverOrphanMessagesDbalExtension.php @@ -54,7 +54,6 @@ public function onBeforeReceive(Context $context) 'WHERE delivered_at <= :deliveredAt AND consumer_id=:consumerId', $connection->getTableName() ); - $affectedRows = $dbal->executeUpdate( $sql, [ diff --git a/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RejectMessageOnExceptionDbalExtension.php b/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RejectMessageOnExceptionDbalExtension.php index 19828d8db15..e8cbcfab192 100644 --- a/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RejectMessageOnExceptionDbalExtension.php +++ b/src/Oro/Component/MessageQueue/Consumption/Dbal/Extension/RejectMessageOnExceptionDbalExtension.php @@ -23,7 +23,7 @@ public function onInterrupted(Context $context) $context->getLogger()->debug( '[RejectMessageOnExceptionDbalExtension] Execution was interrupted and message was rejected. {id}', - ['id' => $context->getMessage()->getId()] + ['id' => $context->getMessage()->getMessageId()] ); } } diff --git a/src/Oro/Component/MessageQueue/Tests/Unit/Consumption/DBAL/Extension/RedeliverOrphanMessagesDbalExtensionTest.php b/src/Oro/Component/MessageQueue/Tests/Unit/Consumption/DBAL/Extension/RedeliverOrphanMessagesDbalExtensionTest.php index c1677d7d5be..5a85cf90fb4 100644 --- a/src/Oro/Component/MessageQueue/Tests/Unit/Consumption/DBAL/Extension/RedeliverOrphanMessagesDbalExtensionTest.php +++ b/src/Oro/Component/MessageQueue/Tests/Unit/Consumption/DBAL/Extension/RedeliverOrphanMessagesDbalExtensionTest.php @@ -5,6 +5,7 @@ use Oro\Component\MessageQueue\Consumption\Context; use Oro\Component\MessageQueue\Consumption\Dbal\Extension\RedeliverOrphanMessagesDbalExtension; use Oro\Component\MessageQueue\Transport\Dbal\DbalConnection; +use Oro\Component\MessageQueue\Transport\Dbal\DbalMessageConsumer; use Oro\Component\MessageQueue\Transport\Dbal\DbalSession; use Oro\Component\MessageQueue\Transport\Null\NullSession; use Psr\Log\LoggerInterface; @@ -23,38 +24,39 @@ public function testShouldRedeliverOrphanMessages() ->expects($this->once()) ->method('executeUpdate') ->with('UPDATE tableName SET consumer_id=NULL, delivered_at=NULL, redelivered=:isRedelivered '. - 'WHERE delivered_at <= :deliveredAt') - ->will($this->returnValue(3)) - ; + 'WHERE delivered_at <= :deliveredAt AND consumer_id=:consumerId') + ->will($this->returnValue(3)); $connection = $this->createConnectionMock(); $connection ->expects($this->once()) ->method('getDBALConnection') - ->will($this->returnValue($dbal)) - ; + ->will($this->returnValue($dbal)); $connection ->expects($this->once()) ->method('getTableName') - ->will($this->returnValue('tableName')) - ; + ->will($this->returnValue('tableName')); $session = $this->createSessionMock(); $session ->expects($this->once()) ->method('getConnection') - ->will($this->returnValue($connection)) - ; + ->will($this->returnValue($connection)); $logger = $this->createLoggerMock(); $logger ->expects($this->once()) ->method('alert') - ->with('[RedeliverOrphanMessagesDbalExtension] Orphans were found and redelivered. number: 3') - ; + ->with('[RedeliverOrphanMessagesDbalExtension] Orphans were found and redelivered. number: 3'); + $consumer = $this->createMock(DbalMessageConsumer::class); + $consumer + ->expects($this->once()) + ->method('getId') + ->willReturn(123); $context = new Context($session); $context->setLogger($logger); + $context->setMessageConsumer($consumer); $extension = new RedeliverOrphanMessagesDbalExtension(); $extension->onBeforeReceive($context); diff --git a/src/Oro/Component/MessageQueue/Tests/Unit/Consumption/DBAL/Extension/RejectMessageOnExceptionDbalExtensionTest.php b/src/Oro/Component/MessageQueue/Tests/Unit/Consumption/DBAL/Extension/RejectMessageOnExceptionDbalExtensionTest.php index a0b7a9201f6..2388263d125 100644 --- a/src/Oro/Component/MessageQueue/Tests/Unit/Consumption/DBAL/Extension/RejectMessageOnExceptionDbalExtensionTest.php +++ b/src/Oro/Component/MessageQueue/Tests/Unit/Consumption/DBAL/Extension/RejectMessageOnExceptionDbalExtensionTest.php @@ -49,12 +49,16 @@ public function testShouldDoNothingIfMessageIsMissing() public function testShouldRejectMessage() { $message = new NullMessage(); + $message->setMessageId(123); $logger = $this->createLoggerMock(); $logger ->expects($this->once()) ->method('debug') - ->with('[RejectMessageOnExceptionDbalExtension] Execution was interrupted and message was rejected') + ->with( + '[RejectMessageOnExceptionDbalExtension] Execution was interrupted and message was rejected. {id}', + ['id' => '123'] + ) ; $consumer = $this->createMessageConsumerMock(); diff --git a/src/Oro/Component/MessageQueue/Tests/Unit/Transport/Dbal/DbalMessageConsumerTest.php b/src/Oro/Component/MessageQueue/Tests/Unit/Transport/Dbal/DbalMessageConsumerTest.php index 337153dcb8e..a7bc397ea14 100644 --- a/src/Oro/Component/MessageQueue/Tests/Unit/Transport/Dbal/DbalMessageConsumerTest.php +++ b/src/Oro/Component/MessageQueue/Tests/Unit/Transport/Dbal/DbalMessageConsumerTest.php @@ -103,13 +103,23 @@ public function testCouldSetAndGetPollingInterval() public function testAcknowledgeShouldShouldDeleteRecordFromDb() { + $statment = $this->createMock(Statement::class); + $statment + ->expects($this->once()) + ->method('fetch') + ->will($this->returnValue(1)); + $dbal = $this->createDBALConnectionMock(); + $dbal + ->expects($this->once()) + ->method('executeQuery') + ->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123]) + ->willReturn($statment); $dbal ->expects($this->once()) ->method('delete') ->with('tableName', ['id' => 123]) - ->will($this->returnValue(1)) - ; + ->will($this->returnValue(1)); $connection = $this->createConnectionMock(); $connection @@ -118,7 +128,7 @@ public function testAcknowledgeShouldShouldDeleteRecordFromDb() ->will($this->returnValue($dbal)) ; $connection - ->expects($this->once()) + ->expects($this->exactly(2)) ->method('getTableName') ->will($this->returnValue('tableName')) ; @@ -139,7 +149,18 @@ public function testAcknowledgeShouldShouldDeleteRecordFromDb() public function testAcknowledgeShouldThrowIfMessageWasNotRemoved() { + $statment = $this->createMock(Statement::class); + $statment + ->expects($this->once()) + ->method('fetch') + ->will($this->returnValue(1)); + $dbal = $this->createDBALConnectionMock(); + $dbal + ->expects($this->once()) + ->method('executeQuery') + ->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123]) + ->willReturn($statment); $dbal ->expects($this->once()) ->method('delete') @@ -154,7 +175,7 @@ public function testAcknowledgeShouldThrowIfMessageWasNotRemoved() ->will($this->returnValue($dbal)) ; $connection - ->expects($this->once()) + ->expects($this->exactly(2)) ->method('getTableName') ->will($this->returnValue('tableName')) ; @@ -175,7 +196,7 @@ public function testAcknowledgeShouldThrowIfMessageWasNotRemoved() $consumer->acknowledge($message); } - public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() + public function stestRejectShouldThrowIfInstanceOfMessageIsInvalid() { $connection = $this->createConnectionMock(); $connection @@ -204,7 +225,18 @@ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() public function testRejectShouldShouldDeleteRecordFromDb() { + $statment = $this->createMock(Statement::class); + $statment + ->expects($this->once()) + ->method('fetch') + ->will($this->returnValue(1)); + $dbal = $this->createDBALConnectionMock(); + $dbal + ->expects($this->once()) + ->method('executeQuery') + ->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123]) + ->willReturn($statment); $dbal ->expects($this->once()) ->method('delete') @@ -219,7 +251,7 @@ public function testRejectShouldShouldDeleteRecordFromDb() ->will($this->returnValue($dbal)) ; $connection - ->expects($this->once()) + ->expects($this->exactly(2)) ->method('getTableName') ->will($this->returnValue('tableName')) ; @@ -240,7 +272,18 @@ public function testRejectShouldShouldDeleteRecordFromDb() public function testRejectShouldThrowIfMessageWasNotRemoved() { + $statment = $this->createMock(Statement::class); + $statment + ->expects($this->once()) + ->method('fetch') + ->will($this->returnValue(1)); + $dbal = $this->createDBALConnectionMock(); + $dbal + ->expects($this->once()) + ->method('executeQuery') + ->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123]) + ->willReturn($statment); $dbal ->expects($this->once()) ->method('delete') @@ -255,7 +298,7 @@ public function testRejectShouldThrowIfMessageWasNotRemoved() ->will($this->returnValue($dbal)) ; $connection - ->expects($this->once()) + ->expects($this->exactly(2)) ->method('getTableName') ->will($this->returnValue('tableName')) ; @@ -279,7 +322,18 @@ public function testRejectShouldThrowIfMessageWasNotRemoved() public function testRejectShouldShouldInsertNewMessageIfRequeueIsTrue() { + $statment = $this->createMock(Statement::class); + $statment + ->expects($this->once()) + ->method('fetch') + ->will($this->returnValue(1)); + $dbal = $this->createDBALConnectionMock(); + $dbal + ->expects($this->once()) + ->method('executeQuery') + ->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123]) + ->willReturn($statment); $dbal ->expects($this->once()) ->method('delete') @@ -310,7 +364,7 @@ public function testRejectShouldShouldInsertNewMessageIfRequeueIsTrue() ->will($this->returnValue($dbal)) ; $connection - ->expects($this->exactly(2)) + ->expects($this->exactly(3)) ->method('getTableName') ->will($this->returnValue('tableName')) ; @@ -333,9 +387,65 @@ public function testRejectShouldShouldInsertNewMessageIfRequeueIsTrue() $consumer->reject($message, true); } + public function testRejectAndAckShouldThrowIfRecordWasNotFoundForDeleting() + { + $statment = $this->createMock(Statement::class); + $statment + ->expects($this->once()) + ->method('fetch') + ->will($this->returnValue(null)); + + $dbal = $this->createDBALConnectionMock(); + $dbal + ->expects($this->once()) + ->method('executeQuery') + ->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123]) + ->willReturn($statment); + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('getDBALConnection') + ->will($this->returnValue($dbal)) + ; + $connection + ->expects($this->once()) + ->method('getTableName') + ->will($this->returnValue('tableName')) + ; + + $session = $this->createSessionMock(); + $session + ->expects($this->once()) + ->method('getConnection') + ->will($this->returnValue($connection)) + ; + + $message = new DbalMessage(); + $message->setId(123); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage( + 'Expected record was removed but it is not. id: "123"' + ); + $consumer = new DbalMessageConsumer($session, new DbalDestination('queue')); + $consumer->reject($message, true); + $consumer->acknowledge($message); + } + public function testRejectShouldThrowIfRecordWasNotInserted() { + $statment = $this->createMock(Statement::class); + $statment + ->expects($this->once()) + ->method('fetch') + ->will($this->returnValue(1)); + $dbal = $this->createDBALConnectionMock(); + $dbal + ->expects($this->once()) + ->method('executeQuery') + ->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => null]) + ->willReturn($statment); $dbal ->expects($this->once()) ->method('delete') @@ -354,7 +464,7 @@ public function testRejectShouldThrowIfRecordWasNotInserted() ->will($this->returnValue($dbal)) ; $connection - ->expects($this->exactly(2)) + ->expects($this->exactly(3)) ->method('getTableName') ->will($this->returnValue('tableName')) ; diff --git a/src/Oro/Component/MessageQueue/Transport/Dbal/DbalMessageConsumer.php b/src/Oro/Component/MessageQueue/Transport/Dbal/DbalMessageConsumer.php index 7b37f700c64..d75d5c2e9b8 100644 --- a/src/Oro/Component/MessageQueue/Transport/Dbal/DbalMessageConsumer.php +++ b/src/Oro/Component/MessageQueue/Transport/Dbal/DbalMessageConsumer.php @@ -2,6 +2,7 @@ namespace Oro\Component\MessageQueue\Transport\Dbal; use Doctrine\DBAL\Connection; +use Doctrine\DBAL\DBALException; use Doctrine\DBAL\Types\Type; use Oro\Component\MessageQueue\Transport\Exception\InvalidMessageException; use Oro\Component\MessageQueue\Transport\MessageConsumerInterface; @@ -142,9 +143,8 @@ public function acknowledge(MessageInterface $message) ['id' => $message->getId(),], ['id' => Type::INTEGER,] )->fetch(); - + $affectedRows = null; if (count($row)) { - $affectedRows = null; try { $affectedRows = $this->dbal->delete($this->connection->getTableName(), ['id' => $message->getId()], [ 'id' => Type::INTEGER, @@ -163,12 +163,12 @@ public function acknowledge(MessageInterface $message) $this->dbal->rollBack(); } } - if (1 !== $affectedRows) { - throw new \LogicException(sprintf( - 'Expected record was removed but it is not. id: "%s"', - $message->getId() - )); - } + } + if (1 !== $affectedRows) { + throw new \LogicException(sprintf( + 'Expected record was removed but it is not. id: "%s"', + $message->getId() + )); } } @@ -197,9 +197,8 @@ public function reject(MessageInterface $message, $requeue = false) 'id' => Type::INTEGER, ] )->fetch(); - + $affectedRows = null; if (count($row)) { - $affectedRows = null; try { $affectedRows = $this->dbal->delete($this->connection->getTableName(), ['id' => $message->getId()], [ 'id' => Type::INTEGER, @@ -215,18 +214,16 @@ public function reject(MessageInterface $message, $requeue = false) ); $this->dbal->commit(); } catch (\Exception $e) { - $this->dbal->rollBack(); } } - if (1 !== $affectedRows) { - throw new \LogicException(sprintf( - 'Expected record was removed but it is not. id: "%s"', - $message->getId() - )); - } } - + if (1 !== $affectedRows) { + throw new \LogicException(sprintf( + 'Expected record was removed but it is not. id: "%s"', + $message->getId() + )); + } if ($requeue) { $dbalMessage = [ 'body' => $message->getBody(),