This commit is contained in:
Marek
2026-03-24 00:04:55 +01:00
commit c5229e48ed
4225 changed files with 511461 additions and 0 deletions

View File

@@ -0,0 +1,46 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\Persistence\ManagerRegistry;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
/**
* @author Konstantin Myakshin <molodchick@gmail.com>
*
* @internal
*/
abstract class AbstractDoctrineMiddleware implements MiddlewareInterface
{
public function __construct(
protected ManagerRegistry $managerRegistry,
protected ?string $entityManagerName = null,
) {
}
final public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
try {
$entityManager = $this->managerRegistry->getManager($this->entityManagerName);
} catch (\InvalidArgumentException $e) {
throw new UnrecoverableMessageHandlingException($e->getMessage(), 0, $e);
}
return $this->handleForManager($entityManager, $envelope, $stack);
}
abstract protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope;
}

View File

@@ -0,0 +1,55 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\Persistence\ManagerRegistry;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
/**
* Clears entity managers between messages being handled to avoid outdated data.
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
class DoctrineClearEntityManagerWorkerSubscriber implements EventSubscriberInterface
{
public function __construct(
private readonly ManagerRegistry $managerRegistry,
) {
}
public function onWorkerMessageHandled(): void
{
$this->clearEntityManagers();
}
public function onWorkerMessageFailed(): void
{
$this->clearEntityManagers();
}
public static function getSubscribedEvents(): array
{
return [
WorkerMessageHandledEvent::class => 'onWorkerMessageHandled',
WorkerMessageFailedEvent::class => 'onWorkerMessageFailed',
];
}
private function clearEntityManagers(): void
{
foreach ($this->managerRegistry->getManagers() as $manager) {
$manager->clear();
}
}
}

View File

@@ -0,0 +1,38 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
/**
* Closes connection and therefore saves number of connections.
*
* @author Fuong <insidestyles@gmail.com>
*/
class DoctrineCloseConnectionMiddleware extends AbstractDoctrineMiddleware
{
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
{
try {
$connection = $entityManager->getConnection();
return $stack->next()->handle($envelope, $stack);
} finally {
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
$connection->close();
}
}
}
}

View File

@@ -0,0 +1,57 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\Persistence\ManagerRegistry;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface;
/**
* Middleware to log when transaction has been left open.
*
* @author Grégoire Pineau <lyrixx@lyrixx.info>
*/
class DoctrineOpenTransactionLoggerMiddleware extends AbstractDoctrineMiddleware
{
private bool $isHandling = false;
public function __construct(
ManagerRegistry $managerRegistry,
?string $entityManagerName = null,
private readonly ?LoggerInterface $logger = null,
) {
parent::__construct($managerRegistry, $entityManagerName);
}
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
{
if ($this->isHandling) {
return $stack->next()->handle($envelope, $stack);
}
$this->isHandling = true;
$initialTransactionLevel = $entityManager->getConnection()->getTransactionNestingLevel();
try {
return $stack->next()->handle($envelope, $stack);
} finally {
if ($entityManager->getConnection()->getTransactionNestingLevel() > $initialTransactionLevel) {
$this->logger?->error('A handler opened a transaction but did not close it.', [
'message' => $envelope->getMessage(),
]);
}
$this->isHandling = false;
}
}
}

View File

@@ -0,0 +1,61 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception as DBALException;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
/**
* Checks whether the connection is still open or reconnects otherwise.
*
* @author Fuong <insidestyles@gmail.com>
*/
class DoctrinePingConnectionMiddleware extends AbstractDoctrineMiddleware
{
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
{
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
$this->pingConnection($entityManager);
}
return $stack->next()->handle($envelope, $stack);
}
private function pingConnection(EntityManagerInterface $entityManager): void
{
$connection = $entityManager->getConnection();
try {
$this->executeDummySql($connection);
} catch (DBALException) {
$connection->close();
// Attempt to reestablish the lazy connection by sending another query.
$this->executeDummySql($connection);
}
if (!$entityManager->isOpen()) {
$this->managerRegistry->resetManager($this->entityManagerName);
}
}
/**
* @throws DBALException
*/
private function executeDummySql(Connection $connection): void
{
$connection->executeQuery($connection->getDatabasePlatform()->getDummySelectSQL());
}
}

View File

@@ -0,0 +1,56 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
/**
* Wraps all handlers in a single doctrine transaction.
*
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
*/
class DoctrineTransactionMiddleware extends AbstractDoctrineMiddleware
{
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
{
$entityManager->getConnection()->beginTransaction();
$success = false;
try {
$envelope = $stack->next()->handle($envelope, $stack);
$entityManager->flush();
$entityManager->getConnection()->commit();
$success = true;
return $envelope;
} catch (\Throwable $exception) {
if ($exception instanceof HandlerFailedException) {
// Remove all HandledStamp from the envelope so the retry will execute all handlers again.
// When a handler fails, the queries of allegedly successful previous handlers just got rolled back.
throw new HandlerFailedException($exception->getEnvelope()->withoutAll(HandledStamp::class), $exception->getWrappedExceptions());
}
throw $exception;
} finally {
$connection = $entityManager->getConnection();
if (!$success && $connection->isTransactionActive()) {
$connection->rollBack();
}
}
}
}