0
点赞
收藏
分享

微信扫一扫

canal快速监听数据库变化-监听多个数据库


目录

一、修改配置canal.properties配置文件

二、启动canal

三、配置example1

四、重启cancel 

五、python客户端监听

一、修改配置canal.properties配置文件

canal快速监听数据库变化-监听多个数据库_客户端

二、启动canal

会发现conf文件夹下多了一个example1文件夹

canal快速监听数据库变化-监听多个数据库_运维_02

三、配置example1

将example中的instance.properties复制一份到example1中

canal快速监听数据库变化-监听多个数据库_python_03

 修改数据源

canal快速监听数据库变化-监听多个数据库_服务器_04

四、重启cancel 

五、python客户端监听

启动两个客户端来分别监听

canal快速监听数据库变化-监听多个数据库_python_05

客户端一:

import time

from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2

client: Client = Client()
client.connect(host='192.168.1.71', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')

if __name__ == '__main__':
    while True:
        message = client.get(100)
        entries = message['entries']
        for entry in entries:
            entry_type = entry.entryType
            if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
                continue
            row_change = EntryProtocol_pb2.RowChange()
            row_change.MergeFromString(entry.storeValue)
            event_type = row_change.eventType
            header = entry.header
            database = header.schemaName
            table = header.tableName
            event_type = header.eventType
            for row in row_change.rowDatas:
                format_data = dict()
                if event_type == EntryProtocol_pb2.EventType.DELETE:
                    format_data['event_type_name'] = '删除'
                    for column in row.beforeColumns:
                        format_data[column.name] = column.value
                elif event_type == EntryProtocol_pb2.EventType.INSERT:
                    format_data['event_type_name'] = '新增'
                    for column in row.afterColumns:
                        format_data[column.name] = column.value
                else:
                    format_data['event_type_name'] = '更新'
                    format_data['before'] = format_data['after'] = dict()
                    for column in row.beforeColumns:
                        format_data['before'][column.name] = column.value
                    for column in row.afterColumns:
                        format_data['after'][column.name] = column.value
                data = dict(
                    db=database,
                    table=table,
                    event_type=event_type,
                    data=format_data,
                )
                print(data)
        time.sleep(1)
    client.disconnect()

 客户端二:

import time

from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2

client: Client = Client()
client.connect(host='192.168.1.71', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'example1', filter=b'.*\\..*')

if __name__ == '__main__':
    while True:
        message = client.get(100)
        entries = message['entries']
        for entry in entries:
            entry_type = entry.entryType
            if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
                continue
            row_change = EntryProtocol_pb2.RowChange()
            row_change.MergeFromString(entry.storeValue)
            event_type = row_change.eventType
            header = entry.header
            database = header.schemaName
            table = header.tableName
            event_type = header.eventType
            for row in row_change.rowDatas:
                format_data = dict()
                if event_type == EntryProtocol_pb2.EventType.DELETE:
                    format_data['event_type_name'] = '删除'
                    for column in row.beforeColumns:
                        format_data[column.name] = column.value
                elif event_type == EntryProtocol_pb2.EventType.INSERT:
                    format_data['event_type_name'] = '新增'
                    for column in row.afterColumns:
                        format_data[column.name] = column.value
                else:
                    format_data['event_type_name'] = '更新'
                    format_data['before'] = format_data['after'] = dict()
                    for column in row.beforeColumns:
                        format_data['before'][column.name] = column.value
                    for column in row.afterColumns:
                        format_data['after'][column.name] = column.value
                data = dict(
                    db=database,
                    table=table,
                    event_type=event_type,
                    data=format_data,
                )
                print(data)
        time.sleep(1)
    client.disconnect()

六、注意点

①如果两台客户端同时监听同一个数据库通道,日志只会发给其中一个客户端,先到先得,消费掉就不会再发。

举报

相关推荐

0 条评论