最近做项目需要使用到消息队列,所以记录一下,本次消息队列基于redis
- 安装需要的依赖
composer require symfony/messenger
- 在App\Message下创建消息文件
<?php
namespace App\Message;
final class CourseMessage
{
private string $user_course_id;
public function __construct(string $user_course_id)
{
$this->user_course_id = $user_course_id;
}
public function getCourseId(): string
{
return $this->user_course_id;
}
}
- 在App\MessageHandler下创建消息处理函数
<?php
namespace App\MessageHandler;
use App\Entity\UserCourse;
use App\Message\CourseMessage;
use App\Service\TimeService;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
final class CourseMessageHandler implements MessageHandlerInterface
{
private EntityManagerInterface $entityManager;
private LoggerInterface $logger;
public function __construct(EntityManagerInterface $entityManager, LoggerInterface $logger)
{
$this->entityManager = $entityManager;
$this->logger = $logger;
}
// 通过__invoke处理相关的逻辑
public function __invoke(CourseMessage $message)
{
// do something with your message
$user_course_id = $message->getCourseId();
$userCourse = $this->entityManager->getRepository(UserCourse::class)->find($user_course_id);
$dateTime = TimeService::getDateTime(time());
$userCourse->setFinishedAt($dateTime);
$this->entityManager->getRepository(UserCourse::class)->save($userCourse, true);
$this->logger->info("user course done!");
}
}
关键是里面的 __invoke 方法,根据参数判断要消费的消息类型, 可以为同一个消息类,配置多个不同的handler, 但是invoke的参数一致, Symfony 会扫描所有被注册的 Handler 服务,自动根据类型提示找到对应的 Handler。
- 在配置文件 config/packages/messager.yaml下配置
framework:
messenger:
failure_transport: failed
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
use_notify: true
check_delayed_interval: 60000
retry_strategy:
max_retries: 3
multiplier: 2
failed: 'doctrine://default?queue_name=failed'
sync: 'sync://'
routing:
Symfony\Component\Mailer\Messenger\SendEmailMessage: sync
Symfony\Component\Notifier\Message\ChatMessage: async
Symfony\Component\Notifier\Message\SmsMessage: async
App\Message\CourseMessage: async
# Route your messages to the transports
# 'App\Message\YourMessage': async
- 触发message
<?php
namespace App\Controller\Admin;
use App\Message\CourseMessage;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;
class IndexController extends AbstractController
{
#[Route('/admin', name: 'admin')]
public function index(Request $request, MessageBusInterface $bus): Response
{
//这里通过 $bus 手动触发 message
$bus->dispatch(new CourseMessage(1));
return $this->renderForm('admin/index.html.twig');
}
}
- 消费消息
1.通过命令行的方式
php bin/console messenger:consume async -vv
2.通过supervisor守护进程方式
通过supervisor操作:
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600 #async 为异步使用的transport
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
startretries=10
process_name=%(program_name)s_%(process_num)02d
官方文档(https://symfony.com/doc/current/messenger.html)
