Canal实现MySql数据监听


一、什么是canal

我们先看官网的介绍
 
canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
 
这句介绍有几个关键字:增量日志,增量数据订阅和消费。
 
这里我们可以简单地把canal理解为一个用来同步增量数据的一个工具。
 
接下来我们看一张官网提供的示意图:
 
 
canal的工作原理就是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。
 

二、canal能做什么

以下参考canal官网。
 
与其问canal能做什么,不如说数据同步有什么作用。
 
但是canal的数据同步不是全量的,而是增量。基于binary log增量订阅和消费,canal可以做:
 
数据库镜像
数据库实时备份
索引构建和实时维护
业务cache(缓存)刷新
带业务逻辑的增量数据处理

三、如何搭建canal

3.1 首先有一个MySQL服务器
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
 
我的Linux服务器安装的MySQL服务器是5.7版本。
 
MySQL的安装这里就不演示了,比较简单,网上也有很多教程。
 

mysql配置:

1.编辑mysql配置文件 $ sudo vim /etc/my.cnf

	[mysqld]  
	log-bin=mysql-bin #binlog文件名(也可以使用绝对路径)
	binlog-format=ROW #选择row模式  
	server_id=1 	  #实例唯一ID,不能和canal的slaveId重复

保存并退出,并重启mysql
	$ sudo service mysql restart
改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:
在这里插入图片描述

2.创建 mysql账号密码(账号密码自定 权限自定) 

-- CREATE USER canal IDENTIFIED BY 'password'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
 
查看binlog日志文件列表:
 
查看当前正在写入的binlog文件:
 
MySQL服务器这边就搞定了,很简单。

3.2 RabbitMQ

RabbitMQ安装也没啥好说的,教程一大堆。

安装完成后使用默认guest登录,新建一个Exchange和Queues,并将它们绑定到一起去。

  • 新建Exchange

image-20200907192747481

  • 新建Queues

    同时新建路由键routing key备用。

  • 创建连接MQ的账号密码,同时顺手一起创建一个消费者账号canalConsumer用于SpringCloud项目中连接MQ所用。


3.3 Canal Server配置

canal server 模拟mysql从库并向mysql发送dump命令获取mysql binlog数据。

1.下载解压项目
可从阿里项目下载最新版本 deployer :
[https://github.com/alibaba/canal/releases](https://github.com/alibaba/canal/releases)

2.配置项目:
	# 公共配置
	$ sudo vim conf/canal.properties
		
		canal.port= 11111 # canal server 运行端口,保证该端口为占用状态,或者使用其他未占用端口
	
	保存退出。
	
	# 实例配置
	$ sudo vim conf/example/instance.properties
		
		# position info
		canal.instance.master.address = 127.0.0.1:3306  # mysql连接
		
		canal.instance.dbUsername = canal  		# mysql账号
		canal.instance.dbPassword = canal		# 密码
		canal.instance.defaultDatabaseName = test	# 需要同步的库名
		canal.instance.connectionCharset = UTF-8	# mysql编码
         #模式设置为rabbitMq模式
         canal.serverMode = rabbitMQ
         
         ##################################################
         ######### 		    RabbitMQ	     #############
         ##################################################
         #地址,不需要端口
         rabbitmq.host = 127.0.0.1
         #当前Vhost
         rabbitmq.virtual.host = /
         #刚才配置的交换机
         rabbitmq.exchange = cannal-exchange
         #刚才配置的账号密码
         rabbitmq.username = cannal
         rabbitmq.password = cannal
	
	保存退出。
	
3.启动:
	$ sh bin/startup.sh
	
日志文件: $ less logs/canal/canal.log	 # canal server端运行日志
	  $ less logs/example/example.log   # canal client端连接日志
	  $ logs/example/meta.log 	    # 实例binlog 读取记录文件(记录变更位置,默认为新增变更(tail))

3.4 监听MQ获取消息

启动springBoot项目 监听MQ并打印

application.yml

  rabbitmq:
    host: ip地址
    port: 5672
    username: admin
    password: admin

pom.xml

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
package com.peng.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author peng
 * @date 2021/12/23 15:35
 */

@Component
public class DirectReceiver {

    @RabbitHandler
    @RabbitListener(queues = "canal.queue")
    public void process(byte[] b) {
        System.out.println("DirectReceiver消费者收到消息  : " + new String(b));
    }

}

启动项目,修改数据库字段,控制台输出:

DirectReceiver消费者收到消息  : {"data":[{"invoice_id":"19","invoice_title_type":"0","invoice_type":"1","name":"","company_name":"bb5","status":"1","tax_register_number":"","invoice_code":"","invoice_amount":"0","register_address":"","register_phone":"","bank_name":"","bank_no":"","contact_name":"","contact_phone":"","contact_address":"","contact_email":"","express_no":"","express_name":"","file_url":"","file_path":"","supplier_id":"0","owner_id":"10","user_id":"0","remark":"","yn":"0","create_time":"2021-10-21 14:33:29","update_time":"2021-12-23 14:08:40","create_pin":"","update_pin":""}],"database":"my_test","es":1640239720000,"id":38,"isDdl":false,"mysqlType":{"invoice_id":"bigint(10) unsigned","invoice_title_type":"tinyint(3)","invoice_type":"tinyint(3)","name":"varchar(20)","company_name":"varchar(50)","status":"tinyint(3)","tax_register_number":"varchar(20)","invoice_code":"varchar(20)","invoice_amount":"bigint(20)","register_address":"varchar(50)","register_phone":"varchar(50)","bank_name":"varchar(50)","bank_no":"varchar(50)","contact_name":"varchar(50)","contact_phone":"varchar(50)","contact_address":"varchar(255)","contact_email":"varchar(50)","express_no":"varchar(30)","express_name":"varchar(30)","file_url":"varchar(255)","file_path":"varchar(255)","supplier_id":"int(10)","owner_id":"int(10)","user_id":"int(10)","remark":"varchar(255)","yn":"tinyint(1)","create_time":"timestamp","update_time":"timestamp","create_pin":"varchar(50)","update_pin":"varchar(50)"},"old":[{"invoice_type":"0","update_time":"2021-12-23 14:03:03"}],"pkNames":["invoice_id"],"sql":"","sqlType":{"invoice_id":-5,"invoice_title_type":-6,"invoice_type":-6,"name":12,"company_name":12,"status":-6,"tax_register_number":12,"invoice_code":12,"invoice_amount":-5,"register_address":12,"register_phone":12,"bank_name":12,"bank_no":12,"contact_name":12,"contact_phone":12,"contact_address":12,"contact_email":12,"express_no":12,"express_name":12,"file_url":12,"file_path":12,"supplier_id":4,"owner_id":4,"user_id":4,"remark":12,"yn":-6,"create_time":93,"update_time":93,"create_pin":12,"update_pin":12},"table":"trade_invoice","ts":1640239720097,"type":"UPDATE"}
DirectReceiver消费者收到消息  : {"data":[{"log_id":"86329","url":"/peng/admin/blog/add","ip_address":"223.72.80.105","class_method":"com.peng.controller.Admin.BlogController.add","args":"[Blog(blId=null, title=Canal实现MySql数据监听, content=<div>一、什么是canal</div>\n<div>我们先看官网的介绍</div>\n<div>&nbsp;</div>\n<div>canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。</div>\n<div>&nbsp;</div>\n<div>这句介绍有几个关键字:增量日志,增量数据订阅和消费。</div>\n<div>&nbsp;</......","create_time":"2021-12-23 14:17:17"}],"database":"My_Blog_db","es":1640240237000,"id":39,"isDdl":false,"mysqlType":{"log_id":"bigint(11)","url":"varchar(255)","ip_address":"varchar(32)","class_method":"varchar(255)","args":"varchar(255)","create_time":"timestamp"},"old":null,"pkNames":["log_id"],"sql":"","sqlType":{"log_id":-5,"url":12,"ip_address":12,"class_method":12,"args":12,"create_time":93},"table":"t_request_log","ts":1640240237450,"type":"INSERT"}

数据中有数据库名database,表名table,操作类型type,时间等等关键信息,拥有了这些关键信息,那么对于数据我们自己想怎么玩就怎么玩了

参考:mysql+canal+rabbitMq+SpringCloud 实现数据库数据同步监听

 

  • 作者:低调做个路人 (扫码联系作者)
  • 发表时间:2021-12-23 14:17:17
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 评论

    visitor
    不错
    cui
    学到了!