Fork me on GitHub

Flink CDC实现实时数仓

目录

  • 背景
  • 第一部分 Spark内存管理详解
  • 第二部分 Spark参数说明
  • 第三部分 Spark内存优化
  • 第四部分 常见线上问题解决
  • 参考文献及资料

背景

Flink 1.11 引入了 Flink SQL CDC,由阿里巴巴技术专家伍翀 (云邪)个人兴趣项目孵化而来。从最开始支持Mysql到现在支持各类数据库,具体见项目说明。

关于Flink CDC的原理介绍可以参考Flink中文社区文章介绍。本位以Mysql为例详细介绍实际使用,供大家参考。

实验环境:

  • os:ubuntu 16.4

第一部分 环境准备

目前对于Mysql版本支持如下:

Connector Database Driver
mysql-cdc MySQL: 5.6, 5.7, 8.0.xRDS MySQL: 5.6, 5.7, 8.0.xPolarDB MySQL: 5.6, 5.7, 8.0.xAurora MySQL: 5.6, 5.7, 8.0.xMariaDB: 10.xPolarDB X: 2.0.1 JDBC Driver: 8.0.2

1.1 Mysql开启binlog日志

检查Mysql数据库是否开启binlog

1
show variables like '%log_bin%'

如果回显如下,说明已经开启:

变量名 变量值
log_bin ON
log_bin_basename /var/lib/mysql/mysql-bin
log_bin_index /var/lib/mysql/mysql-bin.index
log_bin_trust_function_creators OFF
log_bin_use_v1_row_events OFF
sql_log_bin ON

如果没有开启,修改my.cnf配置文件增加下面的参数配置(重启生效):

1
2
3
# 开启binlog日志,binlog格式必须设置成:ROW模式
log_bin=/var/lib/mysql/mysql-bin
binlog-format=ROW

进一步还可以通过下面的命令查看binlog日志文件:

1
SHOW BINARY LOGS;

回显如下:

1
2
3
4
Log_name, File_size
mysql-bin.000003 13979
mysql-bin.000004 177
mysql-bin.000005 872

还使用show binlog events命令,查看binlog日志事件:

1
2
3
4
5
Log_name, Pos, Event_type, Server_id, End_log_pos, Info
'mysql-bin.000003', '742', 'Anonymous_Gtid', '223344', '807', 'SET @@SESSION.GTID_NEXT= \'ANONYMOUS\''

# 字段说明:
log_name:指定要查询的binlog文件名(不指定就是第一个binlog文件)

1.2 为CDC创建专用用户

Flink CDC 使用 Debezium 同步Mysql,所以用户权限需要满足Debezium组件要求。

  • Create the MySQL user(ex:FlinkCdc):
1
mysql> CREATE USER 'FlinkCdc'@'localhost' IDENTIFIED BY 'password';
  • Grant the required permissions to the user:
1
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'FlinkCdc' IDENTIFIED BY 'password';

注: The RELOAD permissions is not required any more when scan.incremental.snapshot.enabled is enabled (enabled by default).

Keyword Description
SELECT SELECT查询权限。只被用在初始化阶段。
RELOAD 执行 FLUSH 语句清除重新加载内部缓存。只被用在初始化阶段。
SHOW DATABASES 执行 SHOW DATABASE 语句。只被用在初始化阶段。
REPLICATION SLAVE 读取MySQL binlog。
REPLICATION CLIENT 执行SHOW MASTER STATUSSHOW SLAVE STATUSSHOW BINARY LOGS等语句。
  • 生效权限:
1
mysql> FLUSH PRIVILEGES;

1.3 注意事项

1.3.1 service id

原因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(使用指定的 server id 作为唯一 id),然后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。
解决方法:默认会随机生成一个 server id,容易有碰撞的风险。所以建议使用动态参数(table hint)在 query 中覆盖 server id。如下所示:
SELECT
FROM bill_info /
+ OPTIONS(‘server-id’=’123456’) */ ;

每个作业需显式配置不同的SERVER ID
每个同步数据库数据的客户端,都会有一个唯一ID,即SERVER ID。MySQL SERVER会根据该ID来维护网络连接以及Binlog位点。因此如果有大量不同的SERVER ID的客户端一起连接MySQL SERVER,可能导致MySQL SERVER的CPU陡增,影响线上业务稳定性。此外,多个作业共享相同的SERVER ID,会导致Binlog位点错乱,多读或少读数据。因此建议您通过动态Hints,在每个CDC作业都配置上不同的SERVER ID,例如SELECT FROM source_table /+ OPTIONS(‘server-id’=’123456’) */ ;。动态Hints详情请参见动态Hints

1.3.2 session 超时

第二部分 实践案例

第三部分

  • snapshot.mode

    Debezium 支持五种模式:

    1. initial :默认模式,在没有找到 offset 时(记录在 Kafka topic 的 connect-offsets 中,Kafka connect 框架维护),做一次 snapshot——遍历有 SELECT 权限的表,收集列名,并且将每个表的所有行 select 出来写入 Kafka;
    2. when_needed: 跟 initial 类似,只是增加了一种情况,当记录的 offset 对应的 binlog 位置已经在 MySQL 服务端被 purge 了时,就重新做一个 snapshot。
    3. never: 不做 snapshot,也就是不拿所有表的列名,也不导出表数据到 Kafka,这个模式下,要求从最开头消费 binlog,以获取完整的 DDL 信息,MySQL 服务端的 binlog 不能被 purge 过,否则由于 DML binlog 里只有 database name、table name、column type 却没有 column name,Debezium 会报错 Encountered change event for table some_db.some_table whose schema isn't known to this connector
    4. schema_only: 这种情况下会拿所有表的列名信息,但不会导出表数据到 Kafka,而且只从 Debezium 启动那刻起的 binlog 末尾开始消费,所以很适合不关心历史数据,只关心最近变更的场合。
    5. schema_only_recovery: 在 Debezium 的 schema_only 模式出错时,用这个模式恢复,一般不会用到。

https://www.jianshu.com/p/d6ac601438a5

附录

Connector Database Driver
mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.1
mysql-cdc MySQL: 5.6, 5.7, 8.0.xRDS MySQL: 5.6, 5.7, 8.0.xPolarDB MySQL: 5.6, 5.7, 8.0.xAurora MySQL: 5.6, 5.7, 8.0.xMariaDB: 10.xPolarDB X: 2.0.1 JDBC Driver: 8.0.27
oceanbase-cdc OceanBase CE: 3.1.x JDBC Driver: 5.7.4x
oracle-cdc Oracle: 11, 12, 19 Oracle Driver: 19.3.0.0
postgres-cdc PostgreSQL: 9.6, 10, 11, 12 JDBC Driver: 42.2.12
sqlserver-cdc Sqlserver: 2012, 2014, 2016, 2017, 2019 JDBC Driver: 7.2.2.jre8
tidb-cdc TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 JDBC Driver: 8.0.27

参考文献及资料

本文标题:Flink CDC实现实时数仓

文章作者:rong xiang

发布时间:2022年04月26日 - 13:04

最后更新:2022年10月25日 - 23:10

原始链接:https://zjrongxiang.github.io/posts/f65f7580/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

0%