目录
- 背景
- 第一部分 Spark内存管理详解
- 第二部分 Spark参数说明
- 第三部分 Spark内存优化
- 第四部分 常见线上问题解决
- 参考文献及资料
背景
Flink 1.11 引入了 Flink SQL CDC,由阿里巴巴技术专家伍翀 (云邪)个人兴趣项目孵化而来。从最开始支持Mysql到现在支持各类数据库,具体见项目说明。
关于Flink CDC的原理介绍可以参考Flink中文社区文章介绍。本位以Mysql为例详细介绍实际使用,供大家参考。
实验环境:
- os:ubuntu 16.4
第一部分 环境准备
目前对于Mysql版本支持如下:
Connector | Database | Driver |
---|---|---|
mysql-cdc | MySQL: 5.6, 5.7, 8.0.xRDS MySQL: 5.6, 5.7, 8.0.xPolarDB MySQL: 5.6, 5.7, 8.0.xAurora MySQL: 5.6, 5.7, 8.0.xMariaDB: 10.xPolarDB X: 2.0.1 | JDBC Driver: 8.0.2 |
1.1 Mysql
开启binlog
日志
检查Mysql数据库是否开启binlog
:
1 | show variables like '%log_bin%' |
如果回显如下,说明已经开启:
变量名 | 变量值 |
---|---|
log_bin | ON |
log_bin_basename | /var/lib/mysql/mysql-bin |
log_bin_index | /var/lib/mysql/mysql-bin.index |
log_bin_trust_function_creators | OFF |
log_bin_use_v1_row_events | OFF |
sql_log_bin | ON |
如果没有开启,修改my.cnf
配置文件增加下面的参数配置(重启生效):
1 | 开启binlog日志,binlog格式必须设置成:ROW模式 |
进一步还可以通过下面的命令查看binlog
日志文件:
1 | SHOW BINARY LOGS; |
回显如下:
1 | Log_name, File_size |
还使用show binlog events
命令,查看binlog
日志事件:
1 | Log_name, Pos, Event_type, Server_id, End_log_pos, Info |
1.2 为CDC创建专用用户
Flink CDC
使用 Debezium
同步Mysql
,所以用户权限需要满足Debezium
组件要求。
- Create the MySQL user(ex:FlinkCdc):
1 | CREATE USER 'FlinkCdc'@'localhost' IDENTIFIED BY 'password'; |
- Grant the required permissions to the user:
1 | GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'FlinkCdc' IDENTIFIED BY 'password'; |
注: The RELOAD permissions is not required any more when scan.incremental.snapshot.enabled
is enabled (enabled by default).
Keyword | Description |
---|---|
SELECT |
SELECT 查询权限。只被用在初始化阶段。 |
RELOAD |
执行 FLUSH 语句清除重新加载内部缓存。只被用在初始化阶段。 |
SHOW DATABASES |
执行 SHOW DATABASE 语句。只被用在初始化阶段。 |
REPLICATION SLAVE |
读取MySQL binlog。 |
REPLICATION CLIENT |
执行SHOW MASTER STATUS 、SHOW SLAVE STATUS 、SHOW BINARY LOGS 等语句。 |
- 生效权限:
1 | FLUSH PRIVILEGES; |
1.3 注意事项
1.3.1 service id
原因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(使用指定的 server id 作为唯一 id),然后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。
解决方法:默认会随机生成一个 server id,容易有碰撞的风险。所以建议使用动态参数(table hint)在 query 中覆盖 server id。如下所示:
SELECT
FROM bill_info /+ OPTIONS(‘server-id’=’123456’) */ ;
每个作业需显式配置不同的SERVER ID
每个同步数据库数据的客户端,都会有一个唯一ID,即SERVER ID。MySQL SERVER会根据该ID来维护网络连接以及Binlog位点。因此如果有大量不同的SERVER ID的客户端一起连接MySQL SERVER,可能导致MySQL SERVER的CPU陡增,影响线上业务稳定性。此外,多个作业共享相同的SERVER ID,会导致Binlog位点错乱,多读或少读数据。因此建议您通过动态Hints,在每个CDC作业都配置上不同的SERVER ID,例如SELECT FROM source_table /+ OPTIONS(‘server-id’=’123456’) */ ;。动态Hints详情请参见动态Hints
1.3.2 session 超时
第二部分 实践案例
第三部分
snapshot.mode
Debezium 支持五种模式:
initial
:默认模式,在没有找到 offset 时(记录在 Kafka topic 的connect-offsets
中,Kafka connect 框架维护),做一次 snapshot——遍历有 SELECT 权限的表,收集列名,并且将每个表的所有行 select 出来写入 Kafka;when_needed
: 跟initial
类似,只是增加了一种情况,当记录的 offset 对应的 binlog 位置已经在 MySQL 服务端被 purge 了时,就重新做一个 snapshot。never
: 不做 snapshot,也就是不拿所有表的列名,也不导出表数据到 Kafka,这个模式下,要求从最开头消费 binlog,以获取完整的 DDL 信息,MySQL 服务端的 binlog 不能被 purge 过,否则由于 DML binlog 里只有 database name、table name、column type 却没有 column name,Debezium 会报错Encountered change event for table some_db.some_table whose schema isn't known to this connector
;schema_only
: 这种情况下会拿所有表的列名信息,但不会导出表数据到 Kafka,而且只从 Debezium 启动那刻起的 binlog 末尾开始消费,所以很适合不关心历史数据,只关心最近变更的场合。schema_only_recovery
: 在 Debezium 的 schema_only 模式出错时,用这个模式恢复,一般不会用到。
https://www.jianshu.com/p/d6ac601438a5
附录
Connector | Database | Driver |
---|---|---|
mongodb-cdc | MongoDB: 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 |
mysql-cdc | MySQL: 5.6, 5.7, 8.0.xRDS MySQL: 5.6, 5.7, 8.0.xPolarDB MySQL: 5.6, 5.7, 8.0.xAurora MySQL: 5.6, 5.7, 8.0.xMariaDB: 10.xPolarDB X: 2.0.1 | JDBC Driver: 8.0.27 |
oceanbase-cdc | OceanBase CE: 3.1.x | JDBC Driver: 5.7.4x |
oracle-cdc | Oracle: 11, 12, 19 | Oracle Driver: 19.3.0.0 |
postgres-cdc | PostgreSQL: 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 |
sqlserver-cdc | Sqlserver: 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 |
tidb-cdc | TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 | JDBC Driver: 8.0.27 |
参考文献及资料
- Flink CDC 2.0 正式发布,详解核心改进,链接:https://flink-learning.org.cn/article/detail/3ebe9f20774991c4d5eeb75a141d9e1e
- Working with MySQL Binary Logs,链接:https://maxchadwick.xyz/blog/working-with-mysql-binary-logs
- debezium,链接:https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-creating-user