柚子快報(bào)邀請(qǐng)碼778899分享:一文弄懂Flink CDC
柚子快報(bào)邀請(qǐng)碼778899分享:一文弄懂Flink CDC
文章目錄
1.CDC概述2.CDC 的實(shí)現(xiàn)原理3.為什么選 Flink4.支持的連接器5.支持的 Flink 版本6.Flink CDC特性7.用法實(shí)例7.1DataStream API 的用法(推薦)7.2Table/SQL API的用法
1.CDC概述
CDC(Change Data Capture)是一種用于捕獲和處理數(shù)據(jù)源中的變化的技術(shù)。它允許實(shí)時(shí)地監(jiān)視數(shù)據(jù)庫(kù)或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動(dòng),并將這些變動(dòng)抽取出來(lái),以便進(jìn)行進(jìn)一步的處理和分析。
傳統(tǒng)上,數(shù)據(jù)源的變化通常通過(guò)周期性地輪詢整個(gè)數(shù)據(jù)集進(jìn)行檢查來(lái)實(shí)現(xiàn)。但是,這種輪詢的方式效率低下且不能實(shí)時(shí)反應(yīng)變化。而 CDC 技術(shù)則通過(guò)在數(shù)據(jù)源上設(shè)置一種機(jī)制,使得變化的數(shù)據(jù)可以被實(shí)時(shí)捕獲并傳遞給下游處理系統(tǒng),從而實(shí)現(xiàn)了實(shí)時(shí)的數(shù)據(jù)變動(dòng)監(jiān)控。
Flink 作為一個(gè)強(qiáng)大的流式計(jì)算引擎,提供了內(nèi)置的 CDC 功能,能夠連接到各種數(shù)據(jù)源(如數(shù)據(jù)庫(kù)、消息隊(duì)列等),捕獲其中的數(shù)據(jù)變化,并進(jìn)行靈活的實(shí)時(shí)處理和分析。
通過(guò)使用 Flink CDC,我們可以輕松地構(gòu)建實(shí)時(shí)數(shù)據(jù)管道,對(duì)數(shù)據(jù)變動(dòng)進(jìn)行實(shí)時(shí)響應(yīng)和處理,為實(shí)時(shí)分析、實(shí)時(shí)報(bào)表和實(shí)時(shí)決策等場(chǎng)景提供強(qiáng)大的支持。
2.CDC 的實(shí)現(xiàn)原理
通常來(lái)講,CDC 分為主動(dòng)查詢和事件接收兩種技術(shù)實(shí)現(xiàn)模式。對(duì)于主動(dòng)查詢而言,用戶通常會(huì)在數(shù)據(jù)源表的某個(gè)字段中,保存上次更新的時(shí)間戳或版本號(hào)等信息,然后下游通過(guò)不斷的查詢和與上次的記錄做對(duì)比,來(lái)確定數(shù)據(jù)是否有變動(dòng),是否需要同步。這種方式優(yōu)點(diǎn)是不涉及數(shù)據(jù)庫(kù)底層特性,實(shí)現(xiàn)比較通用;缺點(diǎn)是要對(duì)業(yè)務(wù)表做改造,且實(shí)時(shí)性不高,不能確保跟蹤到所有的變更記錄,且持續(xù)的頻繁查詢對(duì)數(shù)據(jù)庫(kù)的壓力較大。事件接收模式可以通過(guò)觸發(fā)器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)來(lái)實(shí)現(xiàn)。當(dāng)數(shù)據(jù)源表發(fā)生變動(dòng)時(shí),會(huì)通過(guò)附加在表上的觸發(fā)器或者 binlog 等途徑,將操作記錄下來(lái)。下游可以通過(guò)數(shù)據(jù)庫(kù)底層的協(xié)議,訂閱并消費(fèi)這些事件,然后對(duì)數(shù)據(jù)庫(kù)變動(dòng)記錄做重放,從而實(shí)現(xiàn)同步。這種方式的優(yōu)點(diǎn)是實(shí)時(shí)性高,可以精確捕捉上游的各種變動(dòng);缺點(diǎn)是部署數(shù)據(jù)庫(kù)的事件接收和解析器(例如 Debezium、Canal 等),有一定的學(xué)習(xí)和運(yùn)維成本,對(duì)一些冷門的數(shù)據(jù)庫(kù)支持不夠。綜合來(lái)看,事件接收模式整體在實(shí)時(shí)性、吞吐量方面占優(yōu),如果數(shù)據(jù)源是 MySQL、PostgreSQL、MongoDB 等常見的數(shù)據(jù)庫(kù)實(shí)現(xiàn),建議使用Debezium來(lái)實(shí)現(xiàn)變更數(shù)據(jù)的捕獲(下圖來(lái)自Debezium 官方文檔如果使用的只有 MySQL,則還可以用Canal。
3.為什么選 Flink
從上圖可以看到,Debezium 官方架構(gòu)圖中,是通過(guò) Kafka Streams 直接實(shí)現(xiàn)的 CDC 功能。而我們這里更建議使用 Flink CDC 模塊,因?yàn)?Flink 相對(duì) Kafka Streams 而言,有如下優(yōu)勢(shì):
強(qiáng)大的流處理引擎: Flink 是一個(gè)強(qiáng)大的流處理引擎,具備高吞吐量、低延遲、Exactly-Once 語(yǔ)義等特性。它通過(guò)基于事件時(shí)間的處理模型,支持準(zhǔn)確和有序的數(shù)據(jù)處理,適用于實(shí)時(shí)數(shù)據(jù)處理和分析場(chǎng)景。這使得 Flink 成為實(shí)現(xiàn) CDC 的理想選擇。 內(nèi)置的 CDC 功能: Flink 提供了內(nèi)置的 CDC 功能,可以直接連接到各種數(shù)據(jù)源,捕獲數(shù)據(jù)變化,并將其作為數(shù)據(jù)流進(jìn)行處理。這消除了我們自行開發(fā)或集成 CDC 解決方案的需要,使得實(shí)現(xiàn) CDC 變得更加簡(jiǎn)單和高效。 多種數(shù)據(jù)源的支持: Flink CDC 支持與各種數(shù)據(jù)源進(jìn)行集成,如關(guān)系型數(shù)據(jù)庫(kù)(如MySQL、PostgreSQL)、消息隊(duì)列(如Kafka、RabbitMQ)、文件系統(tǒng)等。這意味著無(wú)論你的數(shù)據(jù)存儲(chǔ)在哪里,F(xiàn)link 都能夠輕松地捕獲其中的數(shù)據(jù)變化,并進(jìn)行進(jìn)一步的實(shí)時(shí)處理和分析。 靈活的數(shù)據(jù)處理能力: Flink 提供了靈活且強(qiáng)大的數(shù)據(jù)處理能力,可以通過(guò)編寫自定義的轉(zhuǎn)換函數(shù)、處理函數(shù)等來(lái)對(duì) CDC 數(shù)據(jù)進(jìn)行各種實(shí)時(shí)計(jì)算和分析。同時(shí),F(xiàn)link 還集成了 SQL 和 Table API,為用戶提供了使用 SQL 查詢語(yǔ)句或 Table API 進(jìn)行簡(jiǎn)單查詢和分析的方式。 完善的生態(tài)系統(tǒng): Flink 擁有活躍的社區(qū)和龐大的生態(tài)系統(tǒng),這意味著你可以輕松地獲取到豐富的文檔、教程、示例代碼和解決方案。此外,F(xiàn)link 還與其他流行的開源項(xiàng)目(如Apache Kafka、Elasticsearch)深度集成,提供了更多的功能和靈活性。
4.支持的連接器
連接器數(shù)據(jù)庫(kù)Drivermongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4mysql-cdcMySQL: 5.6, 5.7, 8.0.xRDS MySQL: 5.6, 5.7, 8.0.xPolarDB MySQL: 5.6, 5.7, 8.0.xAurora MySQL: 5.6, 5.7, 8.0.xMariaDB: 10.xPolarDB X: 2.0.1JDBC Driver: 8.0.28oceanbase-cdcOceanBase CE: 3.1.x, 4.xOceanBase EE: 2.x, 3.x, 4.xOceanBase Driver: 2.4.xoracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26
5.支持的 Flink 版本
Flink CDC版本Flink 版本_1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.*1.4.01.13.*2.0.*1.13.*2.1.*1.13.*2.2.*1.13.* , 1.14.*2.3.*1.13.* , 1.14.* , 1.15.* , 1.16.02.4.*1.13.* , 1.14.* , 1.15.* , 1.16.* , 1.17.0
6.Flink CDC特性
支持讀取數(shù)據(jù)庫(kù)快照,即使出現(xiàn)故障也能繼續(xù)讀取binlog,并進(jìn)行Exactly-once處理DataStream API 的 CDC 連接器,用戶可以在單個(gè)作業(yè)中使用多個(gè)數(shù)據(jù)庫(kù)和表的更改,而無(wú)需部署 Debezium 和 KafkaTable/SQL API 的 CDC 連接器,用戶可以使用 SQL DDL 創(chuàng)建 CDC 源來(lái)監(jiān)視單個(gè)表上的更改
下表顯示了連接器的當(dāng)前特性:
連接器無(wú)鎖讀并行讀一次性語(yǔ)義讀增量快照讀MongoDB-CDC????mysql-cdc????Oracle-CDC????Postgres-CDC????sqlserver-cdc????Oceanbase-CDC????TiDB-CDC????db2-cdc????vitess-cdc????
7.用法實(shí)例
7.1DataStream API 的用法(推薦)
請(qǐng)嚴(yán)格按照上面的《5.支持的 Flink 版本》搭配來(lái)使用Flink CDC
請(qǐng)?zhí)崆伴_啟MySQL中的binlog,配置my.cnf文件,重啟mysqld服務(wù)即可
my.cnf
[client]
default_character_set=utf8
[mysqld]
server-id=1
collation_server=utf8_general_ci
character_set_server=utf8
log-bin=mysql-bin
binlog_format=row
expire_logs_days=30
ddl&dml.sql
create table test_cdc
(
id int not null
primary key,
name varchar(100) null,
age int null
);
INSERT INTO flink.test_cdc (id, name, age) VALUES (1, 'Daniel', 25);
INSERT INTO flink.test_cdc (id, name, age) VALUES (2, 'David', 38);
INSERT INTO flink.test_cdc (id, name, age) VALUES (3, 'James', 16);
INSERT INTO flink.test_cdc (id, name, age) VALUES (4, 'Robert', 27);
FlinkDSCDC.java
package com.daniel.util;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Author Daniel
* @Date: 2023/7/25 10:03
* @Description DataStream API CDC
**/
public class FlinkDSCDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DebeziumSourceFunction
.hostname("localhost")
.port(3306)
.username("root")
.password("123456")
.databaseList("flink")
// 這里一定要是db.table的形式
.tableList("flink.test_cdc")
.deserializer(new StringDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource
dataStreamSource.print();
env.execute("FlinkDSCDC");
}
}
UPDATE flink.test_cdc t SET t.age = 24 WHERE t.id = 1;
UPDATE flink.test_cdc t SET t.name = 'Andy' WHERE t.id = 3;
打印出的日志
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=1,name=Daniel,age=25},after=Struct{id=1,name=Daniel,age=24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=7989,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=4}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=3,name=James,age=16},after=Struct{id=3,name=Andy,age=16},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=8113,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
可以得出的結(jié)論:
日志中的數(shù)據(jù)變化操作類型(op)可以表示為 ‘u’,表示更新操作。在第一條日志中,發(fā)生了一個(gè)更新操作,對(duì)應(yīng)的記錄的 key 是 id=1,更新前的數(shù)據(jù)是 {id=1, name=Daniel, age=25},更新后的數(shù)據(jù)是 {id=1, name=Daniel, age=24}。在第二條日志中,也發(fā)生了一個(gè)更新操作,對(duì)應(yīng)的記錄的 key 是 id=3,更新前的數(shù)據(jù)是 {id=3, name=James, age=16},更新后的數(shù)據(jù)是 {id=3, name=Andy, age=16}。每條日志還提供了其他元數(shù)據(jù)信息,如數(shù)據(jù)源(source)、版本號(hào)(version)、連接器名稱(connector)、時(shí)間戳(ts_ms)等。這些信息可以幫助我們追蹤記錄的來(lái)源和處理過(guò)程。日志中的 sourceOffset 包含了一些關(guān)鍵信息,如事務(wù)ID(transaction_id)、文件名(file)、偏移位置(pos)等。這些信息可以用于確保數(shù)據(jù)的準(zhǔn)確順序和一致性。
7.2Table/SQL API的用法
FlinkSQLCDC.java
package com.daniel.util;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @Author Daniel
* @Date: 2023/7/25 15:25
* @Description
**/
public class FlinkSQLCDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE test_cdc (" +
" id int primary key," +
" name STRING," +
" age int" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'hostname' = 'localhost'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'database-name' = 'flink'," +
" 'table-name' = 'test_cdc'" +
")");
Table table = tableEnv.sqlQuery("select * from test_cdc");
DataStream
dataStreamSource.print();
env.execute("FlinkSQLCDC");
}
}
UPDATE flink.test_cdc t SET t.age = 55 WHERE t.id = 2;
UPDATE flink.test_cdc t SET t.age = 22 WHERE t.id = 3;
UPDATE flink.test_cdc t SET t.name = 'Alice' WHERE t.id = 4;
UPDATE flink.test_cdc t SET t.age = 18 WHERE t.id = 1;
INSERT INTO flink.test_cdc (id, name, age) VALUES (5, 'David', 29);
打印出的日志
(false,-U[2, David, 38])
(true,+U[2, David, 55])
(false,-U[3, Andy, 16])
(true,+U[3, Andy, 22])
(false,-U[4, Robert, 27])
(true,+U[4, Alice, 27])
(false,-U[1, Daniel, 24])
(true,+U[1, Daniel, 18])
(true,+I[5, David, 29])
柚子快報(bào)邀請(qǐng)碼778899分享:一文弄懂Flink CDC
文章鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。