新增MySQL实时采集

本节介绍如何在实时计算中配置MySQL实时采集任务。
实时计算支持MySQL数据库的实时采集,当MySQL发生insert、update等操作时,实时采集组件可根据MySQL的Binlog信息立即获取其操作,并实时将变更信息同步至目标数据源。

实时采集-参数配置

image.png
配置 说明

任务类型

MySQL实时采集支持两种类型:通过Binlog实时采集、通过间隔轮询进行采集

数据源

选择对应数据类型的已有数据源

是否分表

选择分表,可以设置多对多分表、分组写入;
目前主要针对Hive数据源实现多对多分表写入;

image::3-4-2021-13-45-46-PM.png[]

选择数据源表,支持多表合并写入一张表;
多表合并写入时,无法保证完全根据业务顺序写入;

采集起点

用户可选择从任务运行时开始、按时间选择、按文件选择三种采集起点
从任务运行时开始:任务提交处于"运行中"时,开始实时采集;
按时间选择:以用户选择的时间作为采集起点(非完全精准);
按文件选择:根据选择用户的binlog、LogMiner作为起始文件进行采集;

数据操作

支持Insert、Update、Delete三种数据操作类型选择,当发生insert、update等操作时,实时采集组件可根据Binlog等信息立即获取其操作,并实时将变更信息同步至目标数据源

格式转换

勾选后,系统将会多层嵌套格式的JSON分解为单层结构,例如:{"a":1, "b": {"c":3}},将会被分解为:{"a":1,"b_c":3}

高级配置

以JSON格式添加高级参数,例如对关系型数据库可配置fetchSize,每类数据源支持不同的参数

如何开启Mysql_Binlog功能

1.修改配置文件

server_id=109
log_bin = /var/lib/mysql/mysql-bin
binlog_format = ROW
expire_logs_days = 30

2.添加权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';

3.问题排查

采集mysql binlog 发现采集不到数据
(1)查看binlog是否开启
show variables like '%log_bin%' ;
image.png
(2)binlog_format 是否设置为ROW
注意 binlog_format 必须设置为 ROW, 因为在 STATEMENT 或 MIXED 模式下, Binlog 只会记录和传输 SQL 语句(以减少日志大小),而不包含具体数据,我们也就无法保存了。
(3)从节点通过一个专门的账号连接主节点,这个账号需要拥有全局的 REPLICATION 权限。我们可以使用 GRANT 命令创建这样的账号:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT
ON . TO 'canal'@'%' IDENTIFIED BY 'canal';
参考:https://blog.csdn.net/zjerryj/article/details/77152226

间隔轮询-参数配置

image.png
配置 说明

任务类型

MySQL实时采集支持两种类型:通过Binlog实时采集、通过间隔轮询进行采集

数据源

选择对应数据类型的已有数据源

Schema

选择数据源下指定Schema

选择数据源表,支持多表合并写入一张表;
多表合并写入时,无法保证完全根据业务顺序写入;

增量标识字段

用户需选择增量标示字段,每次同步时,系统自动记录增量标识的最大值,下次运行时,会从上一次的最大值继续同步数据,实现增量同步;支持将数值类型、Timestamp类型作为增量标识字段

采集起点

用户根据选择的增量标示字段设定相应的采集起点,若不填则默认从头开始拉取数据,输入格式请在"数据预览"中参考所选增量标示字段内容。采集时不包含采集起点,例如采集起点为40 则采集开始时不会包含id=40这一条数据。

轮询时间间隔

手动设定轮询时间间隔,单位为秒

高级配置

以JSON格式添加高级参数,例如对关系型数据库可配置fetchSize,每类数据源支持不同的参数

从mysql—binlog采集数据到kafka时,Flink1.12与FLink1.10的数据输出格式不一致,不影响数据结果。

操作步骤

具体操作可参考快速开始中的 实时采集