Skip to content

Commit

Permalink
Merge pull request #7914 from laboro/ticket/CRM-7648_Possible_deadloc…
Browse files Browse the repository at this point in the history
…ks_causing_queries_during_consumer_work_2_0

CRM-7648: Possible deadlocks causing queries during consumer work
  • Loading branch information
tumbochka authored Feb 28, 2017
2 parents 2773041 + ed82286 commit 76c8d6c
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use Oro\Component\MessageQueue\Consumption\Context;
use Oro\Component\MessageQueue\Consumption\Dbal\Extension\RedeliverOrphanMessagesDbalExtension;
use Oro\Component\MessageQueue\Test\DbalSchemaExtensionTrait;
use Oro\Component\MessageQueue\Transport\Dbal\DbalDestination;
use Oro\Component\MessageQueue\Transport\Dbal\DbalMessageConsumer;
use Psr\Log\NullLogger;

class RedeliverOrphanMessagesDbalExtensionTest extends WebTestCase
Expand Down Expand Up @@ -35,8 +37,15 @@ public function testShouldRedeliverOrphanMessages()
$connection = $this->createConnection('message_queue');
$dbal = $connection->getDBALConnection();

// test
$session = $connection->createSession();
$context = new Context($session);
$context->setLogger(new NullLogger());
$consumer = new DbalMessageConsumer($session, new DbalDestination('default'));
$context->setMessageConsumer($consumer);

$dbal->insert('message_queue', [
'consumer_id' => 'consumer-id',
'consumer_id' => $consumer->getId(),
'delivered_at' => strtotime('-1 year'),
'redelivered' => false,
'queue' => 'queue',
Expand All @@ -47,10 +56,6 @@ public function testShouldRedeliverOrphanMessages()
//guard
$this->assertGreaterThan(0, $id);

// test
$context = new Context($connection->createSession());
$context->setLogger(new NullLogger());

$extension = new RedeliverOrphanMessagesDbalExtension();
$extension->onBeforeReceive($context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,20 @@ public function onBeforeReceive(Context $context)

$sql = sprintf(
'UPDATE %s SET consumer_id=NULL, delivered_at=NULL, redelivered=:isRedelivered '.
'WHERE delivered_at <= :deliveredAt',
'WHERE delivered_at <= :deliveredAt AND consumer_id=:consumerId',
$connection->getTableName()
);

$affectedRows = $dbal->executeUpdate(
$sql,
[
'isRedelivered' => true,
'deliveredAt' => time() - $this->orphanTime,
'consumerId' => $context->getMessageConsumer()->getId(),
],
[
'isRedelivered' => Type::BOOLEAN,
'deliveredAt' => Type::INTEGER,
'consumerId' => Type::STRING,
]
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public function onInterrupted(Context $context)
$context->getMessageConsumer()->reject($context->getMessage(), true);

$context->getLogger()->debug(
'[RejectMessageOnExceptionDbalExtension] Execution was interrupted and message was rejected'
'[RejectMessageOnExceptionDbalExtension] Execution was interrupted and message was rejected. {id}',
['id' => $context->getMessage()->getMessageId()]
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Oro\Component\MessageQueue\Consumption\Context;
use Oro\Component\MessageQueue\Consumption\Dbal\Extension\RedeliverOrphanMessagesDbalExtension;
use Oro\Component\MessageQueue\Transport\Dbal\DbalConnection;
use Oro\Component\MessageQueue\Transport\Dbal\DbalMessageConsumer;
use Oro\Component\MessageQueue\Transport\Dbal\DbalSession;
use Oro\Component\MessageQueue\Transport\Null\NullSession;
use Psr\Log\LoggerInterface;
Expand All @@ -23,38 +24,39 @@ public function testShouldRedeliverOrphanMessages()
->expects($this->once())
->method('executeUpdate')
->with('UPDATE tableName SET consumer_id=NULL, delivered_at=NULL, redelivered=:isRedelivered '.
'WHERE delivered_at <= :deliveredAt')
->will($this->returnValue(3))
;
'WHERE delivered_at <= :deliveredAt AND consumer_id=:consumerId')
->will($this->returnValue(3));

$connection = $this->createConnectionMock();
$connection
->expects($this->once())
->method('getDBALConnection')
->will($this->returnValue($dbal))
;
->will($this->returnValue($dbal));
$connection
->expects($this->once())
->method('getTableName')
->will($this->returnValue('tableName'))
;
->will($this->returnValue('tableName'));

$session = $this->createSessionMock();
$session
->expects($this->once())
->method('getConnection')
->will($this->returnValue($connection))
;
->will($this->returnValue($connection));

$logger = $this->createLoggerMock();
$logger
->expects($this->once())
->method('alert')
->with('[RedeliverOrphanMessagesDbalExtension] Orphans were found and redelivered. number: 3')
;
->with('[RedeliverOrphanMessagesDbalExtension] Orphans were found and redelivered. number: 3');

$consumer = $this->createMock(DbalMessageConsumer::class);
$consumer
->expects($this->once())
->method('getId')
->willReturn(123);
$context = new Context($session);
$context->setLogger($logger);
$context->setMessageConsumer($consumer);

$extension = new RedeliverOrphanMessagesDbalExtension();
$extension->onBeforeReceive($context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ public function testShouldDoNothingIfMessageIsMissing()
public function testShouldRejectMessage()
{
$message = new NullMessage();
$message->setMessageId(123);

$logger = $this->createLoggerMock();
$logger
->expects($this->once())
->method('debug')
->with('[RejectMessageOnExceptionDbalExtension] Execution was interrupted and message was rejected')
->with(
'[RejectMessageOnExceptionDbalExtension] Execution was interrupted and message was rejected. {id}',
['id' => '123']
)
;

$consumer = $this->createMessageConsumerMock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,23 @@ public function testCouldSetAndGetPollingInterval()

public function testAcknowledgeShouldShouldDeleteRecordFromDb()
{
$statment = $this->createMock(Statement::class);
$statment
->expects($this->once())
->method('fetch')
->will($this->returnValue(1));

$dbal = $this->createDBALConnectionMock();
$dbal
->expects($this->once())
->method('executeQuery')
->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123])
->willReturn($statment);
$dbal
->expects($this->once())
->method('delete')
->with('tableName', ['id' => 123])
->will($this->returnValue(1))
;
->will($this->returnValue(1));

$connection = $this->createConnectionMock();
$connection
Expand All @@ -118,7 +128,7 @@ public function testAcknowledgeShouldShouldDeleteRecordFromDb()
->will($this->returnValue($dbal))
;
$connection
->expects($this->once())
->expects($this->exactly(2))
->method('getTableName')
->will($this->returnValue('tableName'))
;
Expand All @@ -139,7 +149,18 @@ public function testAcknowledgeShouldShouldDeleteRecordFromDb()

public function testAcknowledgeShouldThrowIfMessageWasNotRemoved()
{
$statment = $this->createMock(Statement::class);
$statment
->expects($this->once())
->method('fetch')
->will($this->returnValue(1));

$dbal = $this->createDBALConnectionMock();
$dbal
->expects($this->once())
->method('executeQuery')
->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123])
->willReturn($statment);
$dbal
->expects($this->once())
->method('delete')
Expand All @@ -154,7 +175,7 @@ public function testAcknowledgeShouldThrowIfMessageWasNotRemoved()
->will($this->returnValue($dbal))
;
$connection
->expects($this->once())
->expects($this->exactly(2))
->method('getTableName')
->will($this->returnValue('tableName'))
;
Expand All @@ -175,7 +196,7 @@ public function testAcknowledgeShouldThrowIfMessageWasNotRemoved()
$consumer->acknowledge($message);
}

public function testRejectShouldThrowIfInstanceOfMessageIsInvalid()
public function stestRejectShouldThrowIfInstanceOfMessageIsInvalid()
{
$connection = $this->createConnectionMock();
$connection
Expand Down Expand Up @@ -204,7 +225,18 @@ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid()

public function testRejectShouldShouldDeleteRecordFromDb()
{
$statment = $this->createMock(Statement::class);
$statment
->expects($this->once())
->method('fetch')
->will($this->returnValue(1));

$dbal = $this->createDBALConnectionMock();
$dbal
->expects($this->once())
->method('executeQuery')
->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123])
->willReturn($statment);
$dbal
->expects($this->once())
->method('delete')
Expand All @@ -219,7 +251,7 @@ public function testRejectShouldShouldDeleteRecordFromDb()
->will($this->returnValue($dbal))
;
$connection
->expects($this->once())
->expects($this->exactly(2))
->method('getTableName')
->will($this->returnValue('tableName'))
;
Expand All @@ -240,7 +272,18 @@ public function testRejectShouldShouldDeleteRecordFromDb()

public function testRejectShouldThrowIfMessageWasNotRemoved()
{
$statment = $this->createMock(Statement::class);
$statment
->expects($this->once())
->method('fetch')
->will($this->returnValue(1));

$dbal = $this->createDBALConnectionMock();
$dbal
->expects($this->once())
->method('executeQuery')
->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123])
->willReturn($statment);
$dbal
->expects($this->once())
->method('delete')
Expand All @@ -255,7 +298,7 @@ public function testRejectShouldThrowIfMessageWasNotRemoved()
->will($this->returnValue($dbal))
;
$connection
->expects($this->once())
->expects($this->exactly(2))
->method('getTableName')
->will($this->returnValue('tableName'))
;
Expand All @@ -279,7 +322,18 @@ public function testRejectShouldThrowIfMessageWasNotRemoved()

public function testRejectShouldShouldInsertNewMessageIfRequeueIsTrue()
{
$statment = $this->createMock(Statement::class);
$statment
->expects($this->once())
->method('fetch')
->will($this->returnValue(1));

$dbal = $this->createDBALConnectionMock();
$dbal
->expects($this->once())
->method('executeQuery')
->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123])
->willReturn($statment);
$dbal
->expects($this->once())
->method('delete')
Expand Down Expand Up @@ -310,7 +364,7 @@ public function testRejectShouldShouldInsertNewMessageIfRequeueIsTrue()
->will($this->returnValue($dbal))
;
$connection
->expects($this->exactly(2))
->expects($this->exactly(3))
->method('getTableName')
->will($this->returnValue('tableName'))
;
Expand All @@ -333,9 +387,65 @@ public function testRejectShouldShouldInsertNewMessageIfRequeueIsTrue()
$consumer->reject($message, true);
}

public function testRejectAndAckShouldThrowIfRecordWasNotFoundForDeleting()
{
$statment = $this->createMock(Statement::class);
$statment
->expects($this->once())
->method('fetch')
->will($this->returnValue(null));

$dbal = $this->createDBALConnectionMock();
$dbal
->expects($this->once())
->method('executeQuery')
->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => 123])
->willReturn($statment);

$connection = $this->createConnectionMock();
$connection
->expects($this->once())
->method('getDBALConnection')
->will($this->returnValue($dbal))
;
$connection
->expects($this->once())
->method('getTableName')
->will($this->returnValue('tableName'))
;

$session = $this->createSessionMock();
$session
->expects($this->once())
->method('getConnection')
->will($this->returnValue($connection))
;

$message = new DbalMessage();
$message->setId(123);
$this->expectException(\LogicException::class);
$this->expectExceptionMessage(
'Expected record was removed but it is not. id: "123"'
);
$consumer = new DbalMessageConsumer($session, new DbalDestination('queue'));
$consumer->reject($message, true);
$consumer->acknowledge($message);
}

public function testRejectShouldThrowIfRecordWasNotInserted()
{
$statment = $this->createMock(Statement::class);
$statment
->expects($this->once())
->method('fetch')
->will($this->returnValue(1));

$dbal = $this->createDBALConnectionMock();
$dbal
->expects($this->once())
->method('executeQuery')
->with('SELECT id FROM tableName WHERE id=:id FOR UPDATE', ['id' => null])
->willReturn($statment);
$dbal
->expects($this->once())
->method('delete')
Expand All @@ -354,7 +464,7 @@ public function testRejectShouldThrowIfRecordWasNotInserted()
->will($this->returnValue($dbal))
;
$connection
->expects($this->exactly(2))
->expects($this->exactly(3))
->method('getTableName')
->will($this->returnValue('tableName'))
;
Expand Down
Loading

0 comments on commit 76c8d6c

Please sign in to comment.