本文环境 Hyperf2.1,PHP7.3,Mysql5.7
不懂的可以评论或联系我
著作权归OwenZhang所有。商业转载请联系OwenZhang获得授权,非商业转载请注明出处。
业务场景介绍
前天接到一个需求,后台管理系统,记录后台账号的操作记录信息。
由于是集团后台,这样操作者就很多,但操作日志却是很关键的信息,必须得写入数据库,这样多用户写入,我们就得用到异步队列进行消费,防止写入失败,如果队列进行消费指定参数后还是失败,就得写入日志进行钉钉消息推送,具体可以看我另外一篇文章:
lite-monitor 一款基于 shell 命令的监控系统 - owenzhang24的个人空间 - OSCHINA - 中文开源技术交流社区
需求产品原型图:
Hyperf & async-queue介绍
Hyperf 介绍
Hyperf 是基于 Swoole 4.5+
实现的高性能、高灵活性的 PHP 协程框架,内置协程服务器及大量常用的组件,性能较传统基于 PHP-FPM
的框架有质的提升,提供超高性能的同时,也保持着极其灵活的可扩展性,标准组件均基于 PSR 标准 实现,基于强大的依赖注入设计,保证了绝大部分组件或类都是 可替换
与 可复用
的。
async-queue 介绍
async-queue是Redis 异步队列,异步队列区别于 RabbitMQ
Kafka
等消息队列,它只提供一种 异步处理
和 异步延时处理
的能力,并 不能 严格地保证消息的持久化和 不支持 ACK 应答机制。
官方文档:Redis 异步队列 (hyperf.wiki)
async-queue 安装
使用composer将 async-queue 安装到你的项目中:
composer require hyperf/async-queue复制代码
插件环境要求:参考Packagist:hyperf/async-queue - Packagist
async-queue 配置
配置文件位于 config/autoload/async_queue.php
,如文件不存在可自行创建。
我的项目配置如下:
strict_types=1);/** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://hyperf.wiki * @contact group@hyperf.io * @license https://github.com/hyperf/hyperf/blob/master/LICENSE */return [ 'default' => [ 'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class, 'channel' => 'plan_queue',//队列前缀 'timeout' => 5,//pop 消息的超时时间 'retry_seconds' => 5,//失败后重新尝试间隔 'handle_timeout' => 5,//消息处理超时时间 'processes' => 5, //增加异步队列进程数量,加快消费速度 ],];复制代码
(
其他配置可以参考官方文档
代码实例
消费队列接口类
函数说明:
-
__construct()
任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次 -
handle()
执行任务日志写入数据库
代码实例:
strict_types=1);namespace App\Job;use App\Model\HoutaiOperationLog;use Hyperf\AsyncQueue\Job;use Hyperf\Utils\ApplicationContext;class OperationLogAddJob extends Job{ public $params; /** * 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次 * * @var int */ protected $maxAttempts = 0; public function __construct($params) { $this->params = $params; } /** * 执行任务 日志写入数据库 * @var int */ public function handle() { $container = ApplicationContext::getContainer(); $logger = $container->get(\Hyperf\Logger\LoggerFactory::class)->get('log', 'default'); try { $insertData = [ 'operation_type' => $this->params['operation_type'], 'operation_page' => $this->params['operation_page'], 'content' => $this->params['content'], 'status' => $this->params['status'], 'ip' => $this->params['ip'], 'created_id' => $this->params['created_id'], 'app' => $this->params['app'], 'created_at' => date('Y-m-d H:i:s'), ]; return HoutaiOperationLog::query()->insert($insertData); } catch (\Exception $e) { $logger->error('后台操作日志添加失败:' . $e->getMessage()); } return 'success'; }}复制代码
(
写入异步队列函数方法
操作日志添加队列函数方法
代码实例:
/** * Describe: 个人中心-操作日志添加队列 * Route: get /admin/operation_log_list * @param int $operationType 操作类型,1登录,2登出,3编辑修改,4新增创建,5删除,6导出数据,7导入数据(导出和导入暂时不做) * @param string $operationPage 操作页面 * @param string $content 操作内容 * @param int $status 操作状态,1成功,2失败 * Created by OwenZhang at 2022/01/17 10:14 */public function operationLogAdd(int $operationType, string $operationPage, string $content, int $status){ try { $driverFactory = ApplicationContext::getContainer()->get(DriverFactory::class); $driver = $driverFactory->get('default'); $params['operation_type'] = $operationType; $params['operation_page'] = $operationPage; $params['content'] = $content; $params['status'] = $status; $params['ip'] = Context::get('ip') ?? ''; $params['created_id'] = Context::get('system_user_id') ?? 0; $params['app'] = Context::get('app') ?? 0; return $driver->push(new OperationLogAddJob($params)); } catch (\Exception $e) { throw new BusinessException(9999, '操作日志添加失败:' . $e->getMessage()); }}复制代码
调用写入异步队列函数方法
具体业务类下写入
代码实例:
$this->operationLogService->operationLogAdd(operationLogService::TYPE_EDIT, '运营管理&配置-意见反馈', "【意见反馈】【{$statusName}】【ID{$ids}】", operationLogService::STATUS_SUCCESS);复制代码
Buy me a cup of coffee :)
觉得对你有帮助,就给我打赏吧,谢谢!