指尖上的记忆指尖上的记忆
首页
  • 基础
  • Laravel框架
  • Symfony框架
  • 基础
  • Gin框架
  • 基础
  • Spring框架
  • 命令
  • Nginx
  • Ai
  • Deploy
  • Docker
  • K8s
  • Micro
  • RabbitMQ
  • Mysql
  • PostgreSsql
  • Redis
  • MongoDb
  • Html
  • Js
  • 前端
  • 后端
  • Git
  • 知识扫盲
  • Golang
🌟 gitHub
首页
  • 基础
  • Laravel框架
  • Symfony框架
  • 基础
  • Gin框架
  • 基础
  • Spring框架
  • 命令
  • Nginx
  • Ai
  • Deploy
  • Docker
  • K8s
  • Micro
  • RabbitMQ
  • Mysql
  • PostgreSsql
  • Redis
  • MongoDb
  • Html
  • Js
  • 前端
  • 后端
  • Git
  • 知识扫盲
  • Golang
🌟 gitHub

最近做项目需要使用到消息队列,所以记录一下,本次消息队列基于redis

  • 安装需要的依赖
composer require symfony/messenger
  • 在App\Message下创建消息文件
<?php

namespace App\Message;

final class CourseMessage
{
    private string $user_course_id;

    public function __construct(string $user_course_id)
    {
        $this->user_course_id = $user_course_id;
    }

    public function getCourseId(): string
    {
        return $this->user_course_id;
    }
}
  • 在App\MessageHandler下创建消息处理函数
<?php

namespace App\MessageHandler;

use App\Entity\UserCourse;
use App\Message\CourseMessage;
use App\Service\TimeService;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

final class CourseMessageHandler implements MessageHandlerInterface
{
    private EntityManagerInterface $entityManager;
    private LoggerInterface $logger;

    public function __construct(EntityManagerInterface $entityManager, LoggerInterface $logger)
    {
        $this->entityManager = $entityManager;
        $this->logger = $logger;
    }


    // 通过__invoke处理相关的逻辑
    public function __invoke(CourseMessage $message)
    {
        // do something with your message
        $user_course_id = $message->getCourseId();
        $userCourse = $this->entityManager->getRepository(UserCourse::class)->find($user_course_id);

        $dateTime = TimeService::getDateTime(time());
        $userCourse->setFinishedAt($dateTime);
        $this->entityManager->getRepository(UserCourse::class)->save($userCourse, true);

        $this->logger->info("user course done!");
    }
}

关键是里面的 __invoke 方法,根据参数判断要消费的消息类型, 可以为同一个消息类,配置多个不同的handler, 但是invoke的参数一致, Symfony 会扫描所有被注册的 Handler 服务,自动根据类型提示找到对应的 Handler。
  • 在配置文件 config/packages/messager.yaml下配置
framework:
    messenger:
        failure_transport: failed

        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    use_notify: true
                    check_delayed_interval: 60000
                retry_strategy:
                    max_retries: 3
                    multiplier: 2
            failed: 'doctrine://default?queue_name=failed'
            sync: 'sync://'

        routing:
            Symfony\Component\Mailer\Messenger\SendEmailMessage: sync
            Symfony\Component\Notifier\Message\ChatMessage: async
            Symfony\Component\Notifier\Message\SmsMessage: async
            App\Message\CourseMessage: async

            # Route your messages to the transports
            # 'App\Message\YourMessage': async
  • 触发message
<?php

namespace App\Controller\Admin;

use App\Message\CourseMessage;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;

class IndexController extends AbstractController
{
    #[Route('/admin', name: 'admin')]
    public function index(Request $request, MessageBusInterface $bus): Response
    {
        //这里通过 $bus 手动触发 message
        $bus->dispatch(new CourseMessage(1));
        
        return $this->renderForm('admin/index.html.twig');
    }
}
  • 消费消息

1.通过命令行的方式

php bin/console messenger:consume async -vv

2.通过supervisor守护进程方式

通过supervisor操作:
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600 #async 为异步使用的transport
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
startretries=10
process_name=%(program_name)s_%(process_num)02d

官方文档(https://symfony.com/doc/current/messenger.html)