0
点赞
收藏
分享

微信扫一扫

记录一下 ArrayBlockingQueue 消息堆积的问题


前言

由于之前这个系统的日志记录是被领导要求写表的,在不影响系统性能的前提下,日志的入库操作肯定是要改成异步进行的,当时利用 ArrayBlockingQueue+线程+AOP 简单的去实现了一下,但是初版代码测试下来发现了一个很严重的问题,就是日志丢失的问题,本文由此而来。

初步构思

代码实现逻辑实现很简单,利用 AOP 切面去记录用户的行为,最终调用对应的 DAO 去入库日志,切面如下。

/**
 * zzh:日志切面优先级第一
 */
@Slf4j
@Aspect
@Order(-999)
@Component
public class AspectLog {
    @Autowired
    private LogInfoService logInfoService;

    @Pointcut("execution(public * com.example.oraceldemo.controller..*(..)))")
    public void webPointCut() {
    }

    @Around("webPointCut()")
    public Object arround(ProceedingJoinPoint pjp) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        LogInfo logInfo = new LogInfo();
        logInfo.setOpTime(sdf.format(System.currentTimeMillis()))
                .setIp(request.getRemoteAddr())
                .setMethod(request.getRequestURL().toString());
        try {
            long startTime = System.currentTimeMillis();
            Object o = pjp.proceed();
            long endTime = System.currentTimeMillis();
            StringBuilder sb = new StringBuilder();
            sb.append("   ###请求URL: " + request.getRequestURL().toString());
            sb.append("   ###IP: " + request.getRemoteAddr());
            sb.append("   ###Params: " + Arrays.toString(pjp.getArgs()));
            sb.append("   ###CLASS_METHOD: " + pjp.getSignature().getDeclaringTypeName() + "." + pjp.getSignature().getName());
            sb.append("   ###耗时: " + (endTime - startTime) + "毫秒");
            log.info(sb.toString());
            logInfo.setStartTime(sdf.format(startTime))
                    .setEndTime(sdf.format(endTime))
                    .setReturnValue(JSONObject.toJSONString(o.toString()));
            logInfoService.put(logInfo);
            return o;
        } catch (Throwable e) {
            e.printStackTrace();
            logInfo.setErrors(JSONObject.toJSONString(e));
            log.error(e.toString());
            logInfoService.put(logInfo);
            return null;
        }
    }
}

DAO层代码

由于之前看过 NACOS 心跳健康监测的源码,感觉代码架构玩的比较 6 ,于是按照他那个设计思路,异步的将日志记录操作抽离出来,在项目启动的时候创建一个日志任务(可用线程池去优化),每当有日志需要记录的时候,就把日志对象堆积到 Notifier 里面,里面由一个阻塞队列去维护。去真正的执行入库日志的逻辑。

/**
 * <p>
 * 服务实现类
 * </p>
 *
 * @author 张子行
 * @since 2022-10-26
 */
@Service
public class LogInfoServiceImpl extends ServiceImpl<LogInfoMapper, LogInfo> implements LogInfoService {
    private notifier notifier = new Notifier();

    @Override
    public void put(LogInfo logInfo) {
        notifier.addTask(logInfo);
    }

    @PostConstruct
    private void init() {
        new Thread(notifier).start();
    }

    @Override
    public boolean save(LogInfo entity) {
        return super.save(entity);
    }
}

ArrayBlockingQueue发生堆积的地方

Notifier 中的 addTask 方法源源不断的提供需要写入的日志,当阻塞队列不为空的时候就会被唤醒,取出队列中的日志进行 db 插入操作。

@Slf4j
public class Notifier implements Runnable {
    private BlockingQueue<LogInfo> tasks = new ArrayBlockingQueue<>(1024 * 1024);

    public void addTask(LogInfo logInfo) {
        log.info("Notifier tasks.add() begin,{}", logInfo.getIp());
        tasks.add(logInfo);
        log.info("Notifier tasks.add() end,{}", logInfo.getIp());
    }

    /**
     * Runnable不能抛出异常,抛出异常线程无法执行!因此需要自己内部消化
     */
    @Override
    public void run() {
        for (; ; ) {
            try {
                log.info("BlockingQueue tasks take begin...");
                LogInfo logInfo = tasks.take();
                log.info("BlockingQueue tasks take end...{}"+logInfo.toString());
                LogInfoServiceImpl logInfoService = WebUtil.getBean(LogInfoServiceImpl.class);
                logInfoService.save(logInfo);
            } catch (InterruptedException e) {
                log.error("Notifier BlockingQueue tasks take InterruptedException, Thread interrupt status{}" + Thread.currentThread().isInterrupted());
            } catch (Exception e) {
                log.error("Notifier logInfoService save exceptionType :{},msg: {}", e.getClass(), e.getMessage());
            }
        }
    }
}

初版的代码如下

只对31行代码做了异常catch,而 33-34 行可能会出现一些 SQL 异常,但是之前我并未做异常catch 处理,然后 当 33-34 行出现异常的时候直接抛出到了 Runable 中去了,导致此线程运行终结,(这也是 Runable的一大特性了 )。导致消息没被正常消费,于是乎 ArrayBlockingQueue 中的消息越来越多,就有问题了。解决办法加一层 catch 捕获所有类型的异常,也就是在 Runable 中吞掉异常,或者手动捕获 33-34 行的异常,设置线程的中断状态,让其他业务感知到此线程被中断执行了,自定义处理逻辑。等等

记录一下 ArrayBlockingQueue 消息堆积的问题_ide

问题排查思路

起初我还以为是 InterruptedException 导致的线程中断,于是乎去看了一下 ArrayBlockingQueue 中的 put 与 take 方法的源码,也算有点收获,接下来也分析一下吧。记录于下文的面试专栏中。

面试专栏

为什么当 BlockingQueue 中没有消息的时候 take 方法会阻塞当前线程?

利用了 ReentrantLock 中的条件锁(Condition)实现的, ArrayBlockingQueue 里面维护了俩个 Condition 分别是 notEmpty 与 notFull ,当 ArrayBlockingQueue 阻塞队列进行 take 的时候,且队列中没有元素的时候会进行调用 notEmpty.await(); 方法阻塞当前线程,当 ArrayBlockingQueue 阻塞队列进行 add 的时候,会去调用 notEmpty.signal(); 去唤醒当前线程。加元素的时候去唤醒线程,取元素且元素数量为0的时候会去阻塞当前线程。这也是为什么 ArrayBlockingQueue 被称之为阻塞队列的原因。对应回答的源码剖析如下。

记录一下 ArrayBlockingQueue 消息堆积的问题_队列_02


ArrayBlockingQueue take核心源码,注意里面用到的是 ReentrantLock 中的 lockInterruptibly 方法(加锁可中断),当线程状态为中断状态时候,lock.lockInterruptibly() 这行代码就会抛出 InterruptedException 异常,具体的 ReentrantLock 为可中断锁的源码可以去看 浅聊一下,可中断锁(ReentrantLock),这也是为什么调用 take 方法的时候必须要去捕获一下 InterruptedException 异常的原因了。

记录一下 ArrayBlockingQueue 消息堆积的问题_阻塞队列_03


ArrayBlockingQueue add里面的核心代码,里面调用了 notEmpty.signal(); 这行代码,去唤醒当前线程。

记录一下 ArrayBlockingQueue 消息堆积的问题_阻塞队列_04


举报

相关推荐

0 条评论