Skip to content

Commit 6a8c229

Browse files
committed
Remove indices in messenger table on MySQL to prevent deadlocks while removing messages when running multiple consumers
SELECT ... FOR UPDATE row locks also locks indices. Since locking rows and indices is not one atomic operation, this might cause deadlocks when running multiple workers. Removing indices on queue_name and available_at resolves this problem.
1 parent f11aab0 commit 6a8c229

File tree

2 files changed

+60
-3
lines changed

2 files changed

+60
-3
lines changed

Tests/Transport/Doctrine/ConnectionTest.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
1313

1414
use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
15+
use Doctrine\DBAL\Configuration;
1516
use Doctrine\DBAL\Connection as DBALConnection;
1617
use Doctrine\DBAL\DBALException;
1718
use Doctrine\DBAL\Driver\Result as DriverResult;
@@ -23,8 +24,11 @@
2324
use Doctrine\DBAL\Query\QueryBuilder;
2425
use Doctrine\DBAL\Result;
2526
use Doctrine\DBAL\Schema\AbstractSchemaManager;
27+
use Doctrine\DBAL\Schema\Schema;
2628
use Doctrine\DBAL\Schema\SchemaConfig;
29+
use Doctrine\DBAL\Schema\TableDiff;
2730
use Doctrine\DBAL\Statement;
31+
use Doctrine\DBAL\Types\Types;
2832
use PHPUnit\Framework\TestCase;
2933
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
3034
use Symfony\Component\Messenger\Exception\TransportException;
@@ -402,4 +406,54 @@ public function providePlatformSql(): iterable
402406
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
403407
];
404408
}
409+
410+
/**
411+
* @dataProvider setupIndicesProvider
412+
*/
413+
public function testSetupIndices(string $platformClass, array $expectedIndices)
414+
{
415+
$driverConnection = $this->createMock(DBALConnection::class);
416+
$driverConnection->method('getConfiguration')->willReturn(new Configuration());
417+
418+
$schemaManager = $this->createMock(AbstractSchemaManager::class);
419+
$schema = new Schema();
420+
$expectedTable = $schema->createTable('messenger_messages');
421+
$expectedTable->addColumn('id', Types::BIGINT);
422+
$expectedTable->setPrimaryKey(['id']);
423+
// Make sure columns for indices exists so addIndex() will not throw
424+
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
425+
$expectedTable->addColumn($columnName, Types::STRING);
426+
}
427+
foreach ($expectedIndices as $indexColumns) {
428+
$expectedTable->addIndex($indexColumns);
429+
}
430+
$schemaManager->method('createSchema')->willReturn($schema);
431+
$driverConnection->method('getSchemaManager')->willReturn($schemaManager);
432+
433+
$platformMock = $this->createMock($platformClass);
434+
$platformMock
435+
->expects(self::once())
436+
->method('getAlterTableSQL')
437+
->with(self::callback(static function (TableDiff $tableDiff): bool {
438+
return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes);
439+
}))
440+
->willReturn([]);
441+
$driverConnection->method('getDatabasePlatform')->willReturn($platformMock);
442+
443+
$connection = new Connection([], $driverConnection);
444+
$connection->setup();
445+
}
446+
447+
public function setupIndicesProvider(): iterable
448+
{
449+
yield 'MySQL' => [
450+
MySQL57Platform::class,
451+
[['delivered_at']],
452+
];
453+
454+
yield 'Other platforms' => [
455+
AbstractPlatform::class,
456+
[['queue_name'], ['available_at'], ['delivered_at']],
457+
];
458+
}
405459
}

Transport/Doctrine/Connection.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Doctrine\DBAL\Exception;
1818
use Doctrine\DBAL\Exception\TableNotFoundException;
1919
use Doctrine\DBAL\LockMode;
20+
use Doctrine\DBAL\Platforms\MySqlPlatform;
2021
use Doctrine\DBAL\Query\QueryBuilder;
2122
use Doctrine\DBAL\Result;
2223
use Doctrine\DBAL\Schema\Comparator;
@@ -386,7 +387,6 @@ private function getSchema(): Schema
386387
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
387388
->setNotnull(true);
388389
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
389-
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
390390
->setNotnull(true);
391391
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
392392
->setNotnull(true);
@@ -395,8 +395,11 @@ private function getSchema(): Schema
395395
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
396396
->setNotnull(false);
397397
$table->setPrimaryKey(['id']);
398-
$table->addIndex(['queue_name']);
399-
$table->addIndex(['available_at']);
398+
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
399+
if (!$this->driverConnection->getDatabasePlatform() instanceof MySqlPlatform) {
400+
$table->addIndex(['queue_name']);
401+
$table->addIndex(['available_at']);
402+
}
400403
$table->addIndex(['delivered_at']);
401404

402405
return $schema;

0 commit comments

Comments
 (0)