0
点赞
收藏
分享

微信扫一扫

canal+rabbitmq解决mysql与redis缓存数据一致性问题

狐沐说 2022-05-02 阅读 75

文章目录

1 mysql

1.1 开启 MySQL的binlog

vi /etc/my.cnf
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1    #配置MySQL replaction需要定义,不要和canal的 slaveId重复

1.2 重启mysql

systemctl restart mysqld

1.3 查看binlog是否已被开启

SHOW VARIABLES LIKE 'log_bin';

在这里插入图片描述

1.4 修改密码策略

set global validate_password_policy=LOW;
set global validate_password_length=5;

1.5 新建canal用户并授权

DROP USER 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';  
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';  
FLUSH PRIVILEGES;

2 rabbitmq

因为使用Docker来安装rabbitmq比较方便,所以本文选用Docker进行安装,未安装Docker的请移步Docker从零基础入门到使用。

2.1 拉取rabbitmq镜像

docker pull rabbitmq:3.9.16-management

2.2 运行rabbitmq镜像

docker run -d --name rabbitmq-test --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.9.16-management

2.3 进入Rabbitmq Management

在浏览器地址栏中输入 ip:15672,默认Username和Password均为guest
在这里插入图片描述

3 canal

3.1 下载canal

下载地址
在这里插入图片描述
下载之后通过XFTP或WinSCP上传到centos

3.2 创建解压目录并解压

  • 创建解压目录
mkdir /tmp/canal
  • 解压
tar -zxvf canal.deployer-1.1.5.tar.gz -C /tmp/canal/

3.3 修改配置文件

3.3.1 conf/canal.properties

 vi /tmp/canal/conf/canal.properties
canal.serverMode = rabbitMQ #设置服务器模式为rabbitMQ
rabbitmq.host =127.0.0.1 #ip
rabbitmq.virtual.host = / #虚拟主机
rabbitmq.exchange = mysql #交换机名称
rabbitmq.username = guest #用户名
rabbitmq.password =guest #密码
rabbitmq.deliveryMode = direct #交换机类型

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.3.2 conf/example/instance.properties

 vi /tmp/canal/conf/example/instance.properties
## mysql serverId 不能与mysql的server_id一样
canal.instance.mysql.slaveId = 1234
  
#mysql数据库ip:port
canal.instance.master.address = 127.0.0.1:3306 

#rabbitmq中exchange与queue进行绑定的路由键
canal.mq.topic=mysql-binlog

#mysql数据库账号密码
canal.instance.dbUsername = 
canal.instance.dbPassword = 

3.4 启动canal

  • 进入canal启动目录
cd /tmp/canal/bin

假如服务器内存小,则修改启动文件startup.sh的jvm参数,否则会出现canal无法启动问题,或者是运行着出现com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException

vi startup.sh

在这里插入图片描述

  • 启动canal
./startup.sh

4 Spring Boot集成rabbitmq

4.1 在pom.xml中添加maven依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.2 yml文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true  # 开启confirm模式,确保消息成功发送到交换器
    listener:
      type: simple # 设置容器类型
      simple:
        default-requeue-rejected: false # basicReject或basicNack后不重新入队,使其进入死信队列
        acknowledge-mode: manual # 选择使用手动ack,不使用自动ack
        retry:
          enabled: true # 开启消息消费失败重试
          max-attempts: 5 # 重试次数
          initial-interval: 3000 # 重试时间间隔

4.3 RabbitConfig配置文件

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class RabbitConfig {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Bean
    public Exchange exchange() {
        // 创建一个Direct Exchange,设置为持久化,不自动删除
        return new DirectExchange("mysql", true, false);
    }

    @Bean
    public Exchange deadLetterExchange() {
        // 死信Exchange
        return new DirectExchange("dead.letter.exchange", true, false);
    }


    @Bean
    public Queue queue() {
        /**
         *  durable=true 持久化queue的元数据
         *  exclusive = false 队列不独占,允许多个消费者访问
         *  autoDelete = false 当最后一个消费者断开连接之后队列是否自动被删除
         */
        Map<String, Object> args = new HashMap<>(2);
        // 配置当前队列绑定的死信交换器
        args.put("x-dead-letter-exchange", "dead.letter.exchange");
        // 配置当前队列的死信队列路由key,如果不设置默认为当前队列的路由key
        args.put("x-dead-letter-routing-key", "dead.letter.routing.key");
        return new Queue("binlog", true, false, false, args);
    }

    @Bean
    public Queue deadLetterQueue() {
        // 死信Queue
        return new Queue("dead.letter.queue", true, false, false);
    }

    @Bean
    public Binding binding() {
        // 将上面的mysql Exchange与binlog Queue以"mysql-binlog"为路由键进行绑定,无参数
        return BindingBuilder
                .bind(queue())
                .to(exchange())
                .with("mysql-binlog")
                .noargs();
    }

    @Bean
    public Binding deadLetterBinding() {
        // 绑定死信Queue与死信Exchange
        return BindingBuilder
                .bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with("dead.letter.routing.key")
                .noargs();
    }

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        // 开启强制委托模式
        rabbitTemplate.setMandatory(true);
        // ack=true表示Exchange接收到了消息
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                    if (ack) {
                        logger.info("消息已发送到Exchange");
                    } else {
                        logger.error("消息未能发送到Exchange,{}", cause);
                    }
                }
        );
        // 当消息发送给Exchange后,Exchange路由到Queue失败时会执行ReturnCallBack
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->
                logger.error("mq消息不可达,message:{},replyCode:{},replyText:{},exchange:{},routing:{}",
                        message, replyCode, replyText, exchange, routingKey)
        );
        return rabbitTemplate;
    }
}

4.4 CanalMessage.java

import lombok.Data;

import java.util.List;
import java.util.Map;

@Data
public class CanalMessage<T> {

    /**
     * 更新后的数据
     */
    private List<T> data;
    /**
     * 数据库名
     */
    private String database;
    /**
     * binlog executeTime, 执行耗时
     */
    private long es;
    /**
     * id
     */
    private int id;
    /**
     * 标识是否是ddl语句,比如create table/drop table
     */
    private boolean isDdl;

    /**
     * 更新前的有变更的列的数据
     */
    private List<Map<String, Object>> old;
    /**
     * 主键字段名
     */
    private List<String> pkNames;
    /**
     * ddl/query的sql语句
     */
    private String sql;

    /**
     * 表名
     */
    private String table;
    /**
     * dml build timeStamp
     */
    private long ts;
    /**
     * 事件类型:INSERT/UPDATE/DELETE
     */
    private String type;
}

4.5 RabbitmqListener.java

import com.alibaba.fastjson.JSON;
import com.company.springboot.canal.CanalMessage;
import com.company.springboot.sys.entity.User;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Random;

@Component
public class RabbitmqListener {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "binlog"), exchange = @Exchange(value = "mysql")))
    public void businessQueue(@Payload byte[] message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        try {
            // canal发送到rabbitmq的消息默认为二进制字节流,无法看懂,所以将二进制字节流转换为String类型
            String realMessage = new String(message, StandardCharsets.UTF_8);
            // 将String转换为对象类型
            CanalMessage<User> canalMessage = JSON.parseObject(realMessage, CanalMessage.class);
            // 只针对test数据库中的user表
            if ("test".equals(canalMessage.getDatabase()) && "user".equals(canalMessage.getTable())) {
                if ("UPDATE".equals(canalMessage.getType()) || "INSERT".equals(canalMessage.getType())) {
                    // userList不能直接等于canalMessage.getData(),否则会出现类型无法转换问题
                    List<User> userList = JSON.parseArray(JSON.parseObject(realMessage).getString("data"), User.class);
                    for (User user : userList) {
                        logger.info(user.toString());
                        redisTemplate.opsForValue().set("user::" + user.getId(), user, Duration.ofSeconds(60 * 60 + new Random().nextInt(60 * 10)));
                    }
                } else if ("DELETE".equals(canalMessage.getType())) {
                    List<User> userList = JSON.parseArray(JSON.parseObject(realMessage).getString("data"), User.class);
                    for (User user : userList) {
                        redisTemplate.delete("user::" + user.getId());
                    }
                }
            }
            // 手动ack,确认消息已被消费
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // requeue=false 表示被拒绝的消息进入死信队列
            channel.basicNack(deliveryTag, false, false);
            e.printStackTrace();
        }
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "dead.letter.queue"), exchange = @Exchange(value = "dead.letter.exchange")))
    public void deadLetterQueue(@Payload byte[] message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
       logger.info("死信队列业务逻辑");
    }
}
举报

相关推荐

0 条评论