vendor/shopware/core/Framework/MessageQueue/DeadMessage/RequeueDeadMessagesService.php line 49

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\DeadMessage;
  3. use Psr\Log\LoggerInterface;
  4. use Shopware\Core\Framework\Context;
  5. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
  6. use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
  7. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsFilter;
  8. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\RangeFilter;
  9. use Shopware\Core\Framework\Feature;
  10. use Shopware\Core\Framework\Log\Package;
  11. use Shopware\Core\Framework\MessageQueue\Message\RetryMessage;
  12. use Symfony\Component\Messenger\MessageBusInterface;
  13. /***
  14.  * @package core
  15.  * @deprecated tag:v6.5.0 - Will be removed, as we use the default symfony retry mechanism
  16.  */
  17. #[Package('core')]
  18. class RequeueDeadMessagesService
  19. {
  20.     private const MAX_RETRIES 3;
  21.     private EntityRepositoryInterface $deadMessageRepository;
  22.     private MessageBusInterface $bus;
  23.     private MessageBusInterface $encryptedBus;
  24.     private LoggerInterface $logger;
  25.     /**
  26.      * @internal
  27.      */
  28.     public function __construct(
  29.         EntityRepositoryInterface $deadMessageRepository,
  30.         MessageBusInterface $bus,
  31.         MessageBusInterface $encryptedBus,
  32.         LoggerInterface $logger
  33.     ) {
  34.         $this->deadMessageRepository $deadMessageRepository;
  35.         $this->bus $bus;
  36.         $this->encryptedBus $encryptedBus;
  37.         $this->logger $logger;
  38.     }
  39.     public function requeue(?string $messageClass null): void
  40.     {
  41.         Feature::triggerDeprecationOrThrow(
  42.             'v6.5.0.0',
  43.             Feature::deprecatedClassMessage(__CLASS__'v6.5.0.0')
  44.         );
  45.         $criteria $this->buildCriteria($messageClass);
  46.         $context Context::createDefaultContext();
  47.         $messages $this->deadMessageRepository->search($criteria$context)->getEntities();
  48.         $notFoundDeadMessages = [];
  49.         /** @var DeadMessageEntity $message */
  50.         foreach ($messages as $message) {
  51.             if (!class_exists($message->getOriginalMessageClass())) {
  52.                 $notFoundDeadMessages[] = ['id' => $message->getId()];
  53.                 continue;
  54.             }
  55.             if ($message->getErrorCount() > self::MAX_RETRIES) {
  56.                 $this->logger->warning(sprintf('Dropped the message %s after %d retries'$message->getOriginalMessageClass(), self::MAX_RETRIES), [
  57.                     'exception' => $message->getException(),
  58.                     'exceptionFile' => $message->getExceptionFile(),
  59.                     'exceptionLine' => $message->getExceptionLine(),
  60.                     'exceptionMessage' => $message->getExceptionMessage(),
  61.                 ]);
  62.                 $notFoundDeadMessages[] = ['id' => $message->getId()];
  63.                 continue;
  64.             }
  65.             $this->dispatchRetryMessage($message);
  66.         }
  67.         if (!empty($notFoundDeadMessages)) {
  68.             $this->deadMessageRepository->delete($notFoundDeadMessages$context);
  69.         }
  70.     }
  71.     private function buildCriteria(?string $messageClass): Criteria
  72.     {
  73.         $criteria = new Criteria();
  74.         $criteria->addFilter(new RangeFilter(
  75.             'nextExecutionTime',
  76.             [
  77.                 RangeFilter::LT => (new \DateTime())->format(\DATE_ATOM),
  78.             ]
  79.         ));
  80.         if ($messageClass) {
  81.             $criteria->addFilter(new EqualsFilter('originalMessageClass'$messageClass));
  82.         }
  83.         return $criteria;
  84.     }
  85.     private function dispatchRetryMessage(DeadMessageEntity $message): void
  86.     {
  87.         $retryMessage = new RetryMessage($message->getId());
  88.         if ($message->isEncrypted()) {
  89.             $this->encryptedBus->dispatch($retryMessage);
  90.         } else {
  91.             $this->bus->dispatch($retryMessage);
  92.         }
  93.     }
  94. }