0
点赞
收藏
分享

微信扫一扫

windows下配置canal

伽马星系 2022-07-14 阅读 70

MySQL开启binlog

show variables like '%log_bin%';

如果为关闭状态,打开my.ini文件 ,添加如下配置

[mysqld]
#开启bin_log canal伪装为从库监听协议
server_id=1
log_bin=mysql-bin
log_bin-index=master-bin.index
binlog-format=ROW

下载canal

github比较慢,直接使用下载好的,这里使用的版本是1-4

mysql5.7没有user表执行以下语句初始化表

use mysql;
select * from user;
# 重置root密码为空
update user set authentication_string='' where user='root';

创建canal用户并授权

create user canal IDENTIFIED by 'canal';
grant select,replication slave,replication client on*.* to 'canal'@'%';
grant all privileges on *.* TO 'canal'@'%';
flush privileges;

配置canal监听MySQL数据库

复制conf下的example文件夹,并重名,修改新文件里面的配置文件,配置好数据库相关
在这里插入图片描述

进入bin目录,双击bat文件

在这里插入图片描述
报错了,解决办法:
将bin文件夹中的startup.bat中红框中删除
在这里插入图片描述

启动客户端监听

引入maven依赖

    <dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
</dependencies>

最简单的demo

    public static void main(String[] args) {
String ip = AddressUtils.getHostIp();
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111), "gold", "canal", "canal");
canalConnector.connect();
canalConnector.subscribe();
int batch = 1000;
Message message = canalConnector.get(batch);
List<CanalEntry.Entry> entries = message.getEntries();
System.out.println(entries);
}

数据格式


### 随便修改数据库,打印entries数据,发现store里面的数据是压缩的(俗称扁平化的)
[
header {
version: 1
logfileName: "binlog.000005"
logfileOffset: 842
serverId: 1
serverenCode: "UTF-8"
executeTime: 1656644165000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 72
}
entryType: TRANSACTIONBEGIN
storeValue: " \a"
,

header {
version: 1
logfileName: "binlog.000005"
logfileOffset: 980
serverId: 1
serverenCode: "UTF-8"
executeTime: 1656644165000
sourceType: MYSQL
schemaName: "blog"
tableName: "m_user"
eventLength: 391
eventType: UPDATE
props {
key: "rowsCount"
value: "1"
}
}
entryType: ROWDATA
storeValue: "\bl\020\002P\000b\221\a\n&\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\0011R\nbigint(20)\n,\b\001\020\f\032\busername \000(\0000\000B\tmarkerhubR\vvarchar(64)\n\213\001\b\002\020\f\032\006avatar \000(\0000\000Bihttps://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpgR\fvarchar(255)\n(\b\003\020\f\032\005email \000(\0000\000B\b11111111R\vvarchar(64)\nC\b\004\020\f\032\bpassword \000(\0000\000B 96e79218965eb72c92a549dd5a330112R\vvarchar(64)\n\035\b\005\020\004\032\006status \000(\0000\000B\0010R\006int(5)\n2\b\006\020]\032\acreated \000(\0000\000B\0232020-04-20 10:44:01R\bdatetime\n \b\a\020]\032\nlast_login \000(\0000\001R\bdatetime\022&\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\0011R\nbigint(20)\022,\b\001\020\f\032\busername \000(\0000\000B\tmarkerhubR\vvarchar(64)\022\213\001\b\002\020\f\032\006avatar \000(\0000\000Bihttps://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpgR\fvarchar(255)\022)\b\003\020\f\032\005email \000(\0010\000B\t111111111R\vvarchar(64)\022C\b\004\020\f\032\bpassword \000(\0000\000B 96e79218965eb72c92a549dd5a330112R\vvarchar(64)\022\035\b\005\020\004\032\006status \000(\0000\000B\0010R\006int(5)\0222\b\006\020]\032\acreated \000(\0000\000B\0232020-04-20 10:44:01R\bdatetime\022 \b\a\020]\032\nlast_login \000(\0000\001R\bdatetime"
,

header {
version: 1
logfileName: "binlog.000005"
logfileOffset: 1371
serverId: 1
serverenCode: "UTF-8"
executeTime: 1656644165000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\00235"
]

处理storevalue

//我们可以根据类型来过滤里面的数据,比如一些事物类型我们需要忽略,因此需要对数据进行一些处理。
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
//这里需要把storeValue转为肉眼可见的数据格式
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
System.out.println(rowChange);

} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}

}

rowchange具体格式:

tableId: 108
eventType: UPDATE
isDdl: false
rowDatas {
beforeColumns {
index: 0
sqlType: -5
name: "id"
isKey: true
updated: false
isNull: false
value: "1"
mysqlType: "bigint(20)"
}
beforeColumns {
index: 1
sqlType: 12
name: "username"
isKey: false
updated: false
isNull: false
value: "markerhub"
mysqlType: "varchar(64)"
}
beforeColumns {
index: 2
sqlType: 12
name: "avatar"
isKey: false
updated: false
isNull: false
value: "https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpg"
mysqlType: "varchar(255)"
}
beforeColumns {
index: 3
sqlType: 12
name: "email"
isKey: false
updated: false
isNull: false
value: "111111111"
mysqlType: "varchar(64)"
}
beforeColumns {
index: 4
sqlType: 12
name: "password"
isKey: false
updated: false
isNull: false
value: "96e79218965eb72c92a549dd5a330112"
mysqlType: "varchar(64)"
}
beforeColumns {
index: 5
sqlType: 4
name: "status"
isKey: false
updated: false
isNull: false
value: "0"
mysqlType: "int(5)"
}
beforeColumns {
index: 6
sqlType: 93
name: "created"
isKey: false
updated: false
isNull: false
value: "2020-04-20 10:44:01"
mysqlType: "datetime"
}
beforeColumns {
index: 7
sqlType: 93
name: "last_login"
isKey: false
updated: false
isNull: true
mysqlType: "datetime"
}
afterColumns {
index: 0
sqlType: -5
name: "id"
isKey: true
updated: false
isNull: false
value: "1"
mysqlType: "bigint(20)"
}
afterColumns {
index: 1
sqlType: 12
name: "username"
isKey: false
updated: false
isNull: false
value: "markerhub"
mysqlType: "varchar(64)"
}
afterColumns {
index: 2
sqlType: 12
name: "avatar"
isKey: false
updated: false
isNull: false
value: "https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload/images/5a9f48118166308daba8b6da7e466aab.jpg"
mysqlType: "varchar(255)"
}
afterColumns {
index: 3
sqlType: 12
name: "email"
isKey: false
updated: true
isNull: false
value: "111111"
mysqlType: "varchar(64)"
}
afterColumns {
index: 4
sqlType: 12
name: "password"
isKey: false
updated: false
isNull: false
value: "96e79218965eb72c92a549dd5a330112"
mysqlType: "varchar(64)"
}
afterColumns {
index: 5
sqlType: 4
name: "status"
isKey: false
updated: false
isNull: false
value: "0"
mysqlType: "int(5)"
}
afterColumns {
index: 6
sqlType: 93
name: "created"
isKey: false
updated: false
isNull: false
value: "2020-04-20 10:44:01"
mysqlType: "datetime"
}
afterColumns {
index: 7
sqlType: 93
name: "last_login"
isKey: false
updated: false
isNull: true
mysqlType: "datetime"
}
}
举报

相关推荐

0 条评论