欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)邀請(qǐng)碼778899分享:一文弄懂Flink CDC

柚子快報(bào)邀請(qǐng)碼778899分享:一文弄懂Flink CDC

http://yzkb.51969.com/

文章目錄

1.CDC概述2.CDC 的實(shí)現(xiàn)原理3.為什么選 Flink4.支持的連接器5.支持的 Flink 版本6.Flink CDC特性7.用法實(shí)例7.1DataStream API 的用法(推薦)7.2Table/SQL API的用法

1.CDC概述

CDC(Change Data Capture)是一種用于捕獲和處理數(shù)據(jù)源中的變化的技術(shù)。它允許實(shí)時(shí)地監(jiān)視數(shù)據(jù)庫(kù)或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動(dòng),并將這些變動(dòng)抽取出來(lái),以便進(jìn)行進(jìn)一步的處理和分析。

傳統(tǒng)上,數(shù)據(jù)源的變化通常通過(guò)周期性地輪詢整個(gè)數(shù)據(jù)集進(jìn)行檢查來(lái)實(shí)現(xiàn)。但是,這種輪詢的方式效率低下且不能實(shí)時(shí)反應(yīng)變化。而 CDC 技術(shù)則通過(guò)在數(shù)據(jù)源上設(shè)置一種機(jī)制,使得變化的數(shù)據(jù)可以被實(shí)時(shí)捕獲并傳遞給下游處理系統(tǒng),從而實(shí)現(xiàn)了實(shí)時(shí)的數(shù)據(jù)變動(dòng)監(jiān)控。

Flink 作為一個(gè)強(qiáng)大的流式計(jì)算引擎,提供了內(nèi)置的 CDC 功能,能夠連接到各種數(shù)據(jù)源(如數(shù)據(jù)庫(kù)、消息隊(duì)列等),捕獲其中的數(shù)據(jù)變化,并進(jìn)行靈活的實(shí)時(shí)處理和分析。

通過(guò)使用 Flink CDC,我們可以輕松地構(gòu)建實(shí)時(shí)數(shù)據(jù)管道,對(duì)數(shù)據(jù)變動(dòng)進(jìn)行實(shí)時(shí)響應(yīng)和處理,為實(shí)時(shí)分析、實(shí)時(shí)報(bào)表和實(shí)時(shí)決策等場(chǎng)景提供強(qiáng)大的支持。

2.CDC 的實(shí)現(xiàn)原理

通常來(lái)講,CDC 分為主動(dòng)查詢和事件接收兩種技術(shù)實(shí)現(xiàn)模式。對(duì)于主動(dòng)查詢而言,用戶通常會(huì)在數(shù)據(jù)源表的某個(gè)字段中,保存上次更新的時(shí)間戳或版本號(hào)等信息,然后下游通過(guò)不斷的查詢和與上次的記錄做對(duì)比,來(lái)確定數(shù)據(jù)是否有變動(dòng),是否需要同步。這種方式優(yōu)點(diǎn)是不涉及數(shù)據(jù)庫(kù)底層特性,實(shí)現(xiàn)比較通用;缺點(diǎn)是要對(duì)業(yè)務(wù)表做改造,且實(shí)時(shí)性不高,不能確保跟蹤到所有的變更記錄,且持續(xù)的頻繁查詢對(duì)數(shù)據(jù)庫(kù)的壓力較大。事件接收模式可以通過(guò)觸發(fā)器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)來(lái)實(shí)現(xiàn)。當(dāng)數(shù)據(jù)源表發(fā)生變動(dòng)時(shí),會(huì)通過(guò)附加在表上的觸發(fā)器或者 binlog 等途徑,將操作記錄下來(lái)。下游可以通過(guò)數(shù)據(jù)庫(kù)底層的協(xié)議,訂閱并消費(fèi)這些事件,然后對(duì)數(shù)據(jù)庫(kù)變動(dòng)記錄做重放,從而實(shí)現(xiàn)同步。這種方式的優(yōu)點(diǎn)是實(shí)時(shí)性高,可以精確捕捉上游的各種變動(dòng);缺點(diǎn)是部署數(shù)據(jù)庫(kù)的事件接收和解析器(例如 Debezium、Canal 等),有一定的學(xué)習(xí)和運(yùn)維成本,對(duì)一些冷門的數(shù)據(jù)庫(kù)支持不夠。綜合來(lái)看,事件接收模式整體在實(shí)時(shí)性、吞吐量方面占優(yōu),如果數(shù)據(jù)源是 MySQL、PostgreSQL、MongoDB 等常見的數(shù)據(jù)庫(kù)實(shí)現(xiàn),建議使用Debezium來(lái)實(shí)現(xiàn)變更數(shù)據(jù)的捕獲(下圖來(lái)自Debezium 官方文檔如果使用的只有 MySQL,則還可以用Canal。

3.為什么選 Flink

從上圖可以看到,Debezium 官方架構(gòu)圖中,是通過(guò) Kafka Streams 直接實(shí)現(xiàn)的 CDC 功能。而我們這里更建議使用 Flink CDC 模塊,因?yàn)?Flink 相對(duì) Kafka Streams 而言,有如下優(yōu)勢(shì):

強(qiáng)大的流處理引擎: Flink 是一個(gè)強(qiáng)大的流處理引擎,具備高吞吐量、低延遲、Exactly-Once 語(yǔ)義等特性。它通過(guò)基于事件時(shí)間的處理模型,支持準(zhǔn)確和有序的數(shù)據(jù)處理,適用于實(shí)時(shí)數(shù)據(jù)處理和分析場(chǎng)景。這使得 Flink 成為實(shí)現(xiàn) CDC 的理想選擇。 內(nèi)置的 CDC 功能: Flink 提供了內(nèi)置的 CDC 功能,可以直接連接到各種數(shù)據(jù)源,捕獲數(shù)據(jù)變化,并將其作為數(shù)據(jù)流進(jìn)行處理。這消除了我們自行開發(fā)或集成 CDC 解決方案的需要,使得實(shí)現(xiàn) CDC 變得更加簡(jiǎn)單和高效。 多種數(shù)據(jù)源的支持: Flink CDC 支持與各種數(shù)據(jù)源進(jìn)行集成,如關(guān)系型數(shù)據(jù)庫(kù)(如MySQL、PostgreSQL)、消息隊(duì)列(如Kafka、RabbitMQ)、文件系統(tǒng)等。這意味著無(wú)論你的數(shù)據(jù)存儲(chǔ)在哪里,F(xiàn)link 都能夠輕松地捕獲其中的數(shù)據(jù)變化,并進(jìn)行進(jìn)一步的實(shí)時(shí)處理和分析。 靈活的數(shù)據(jù)處理能力: Flink 提供了靈活且強(qiáng)大的數(shù)據(jù)處理能力,可以通過(guò)編寫自定義的轉(zhuǎn)換函數(shù)、處理函數(shù)等來(lái)對(duì) CDC 數(shù)據(jù)進(jìn)行各種實(shí)時(shí)計(jì)算和分析。同時(shí),F(xiàn)link 還集成了 SQL 和 Table API,為用戶提供了使用 SQL 查詢語(yǔ)句或 Table API 進(jìn)行簡(jiǎn)單查詢和分析的方式。 完善的生態(tài)系統(tǒng): Flink 擁有活躍的社區(qū)和龐大的生態(tài)系統(tǒng),這意味著你可以輕松地獲取到豐富的文檔、教程、示例代碼和解決方案。此外,F(xiàn)link 還與其他流行的開源項(xiàng)目(如Apache Kafka、Elasticsearch)深度集成,提供了更多的功能和靈活性。

4.支持的連接器

連接器數(shù)據(jù)庫(kù)Drivermongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4mysql-cdcMySQL: 5.6, 5.7, 8.0.xRDS MySQL: 5.6, 5.7, 8.0.xPolarDB MySQL: 5.6, 5.7, 8.0.xAurora MySQL: 5.6, 5.7, 8.0.xMariaDB: 10.xPolarDB X: 2.0.1JDBC Driver: 8.0.28oceanbase-cdcOceanBase CE: 3.1.x, 4.xOceanBase EE: 2.x, 3.x, 4.xOceanBase Driver: 2.4.xoracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26

5.支持的 Flink 版本

Flink CDC版本Flink 版本_1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.*1.4.01.13.*2.0.*1.13.*2.1.*1.13.*2.2.*1.13.* , 1.14.*2.3.*1.13.* , 1.14.* , 1.15.* , 1.16.02.4.*1.13.* , 1.14.* , 1.15.* , 1.16.* , 1.17.0

6.Flink CDC特性

支持讀取數(shù)據(jù)庫(kù)快照,即使出現(xiàn)故障也能繼續(xù)讀取binlog,并進(jìn)行Exactly-once處理DataStream API 的 CDC 連接器,用戶可以在單個(gè)作業(yè)中使用多個(gè)數(shù)據(jù)庫(kù)和表的更改,而無(wú)需部署 Debezium 和 KafkaTable/SQL API 的 CDC 連接器,用戶可以使用 SQL DDL 創(chuàng)建 CDC 源來(lái)監(jiān)視單個(gè)表上的更改

下表顯示了連接器的當(dāng)前特性:

連接器無(wú)鎖讀并行讀一次性語(yǔ)義讀增量快照讀MongoDB-CDC????mysql-cdc????Oracle-CDC????Postgres-CDC????sqlserver-cdc????Oceanbase-CDC????TiDB-CDC????db2-cdc????vitess-cdc????

7.用法實(shí)例

7.1DataStream API 的用法(推薦)

請(qǐng)嚴(yán)格按照上面的《5.支持的 Flink 版本》搭配來(lái)使用Flink CDC

1.13.0

1.8

1.8

com.ververica

flink-connector-mysql-cdc

${flinkcdc.version}

org.apache.flink

flink-clients_2.12

${flink.version}

org.apache.flink

flink-java

${flink.version}

org.apache.flink

flink-scala_2.12

${flink.version}

org.apache.flink

flink-streaming-java_2.12

${flink.version}

org.apache.flink

flink-streaming-scala_2.12

${flink.version}

org.apache.flink

flink-table-common

${flink.version}

org.apache.flink

flink-table-planner-blink_2.12

${flink.version}

org.apache.flink

flink-table-api-java-bridge_2.12

${flink.version}

請(qǐng)?zhí)崆伴_啟MySQL中的binlog,配置my.cnf文件,重啟mysqld服務(wù)即可

my.cnf

[client]

default_character_set=utf8

[mysqld]

server-id=1

collation_server=utf8_general_ci

character_set_server=utf8

log-bin=mysql-bin

binlog_format=row

expire_logs_days=30

ddl&dml.sql

create table test_cdc

(

id int not null

primary key,

name varchar(100) null,

age int null

);

INSERT INTO flink.test_cdc (id, name, age) VALUES (1, 'Daniel', 25);

INSERT INTO flink.test_cdc (id, name, age) VALUES (2, 'David', 38);

INSERT INTO flink.test_cdc (id, name, age) VALUES (3, 'James', 16);

INSERT INTO flink.test_cdc (id, name, age) VALUES (4, 'Robert', 27);

FlinkDSCDC.java

package com.daniel.util;

import com.ververica.cdc.connectors.mysql.MySqlSource;

import com.ververica.cdc.connectors.mysql.table.StartupOptions;

import com.ververica.cdc.debezium.DebeziumSourceFunction;

import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**

* @Author Daniel

* @Date: 2023/7/25 10:03

* @Description DataStream API CDC

**/

public class FlinkDSCDC {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

DebeziumSourceFunction sourceFunction = MySqlSource.builder()

.hostname("localhost")

.port(3306)

.username("root")

.password("123456")

.databaseList("flink")

// 這里一定要是db.table的形式

.tableList("flink.test_cdc")

.deserializer(new StringDebeziumDeserializationSchema())

.startupOptions(StartupOptions.initial())

.build();

DataStreamSource dataStreamSource = env.addSource(sourceFunction);

dataStreamSource.print();

env.execute("FlinkDSCDC");

}

}

UPDATE flink.test_cdc t SET t.age = 24 WHERE t.id = 1;

UPDATE flink.test_cdc t SET t.name = 'Andy' WHERE t.id = 3;

打印出的日志

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=1,name=Daniel,age=25},after=Struct{id=1,name=Daniel,age=24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=7989,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=4}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=3,name=James,age=16},after=Struct{id=3,name=Andy,age=16},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=8113,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

可以得出的結(jié)論:

日志中的數(shù)據(jù)變化操作類型(op)可以表示為 ‘u’,表示更新操作。在第一條日志中,發(fā)生了一個(gè)更新操作,對(duì)應(yīng)的記錄的 key 是 id=1,更新前的數(shù)據(jù)是 {id=1, name=Daniel, age=25},更新后的數(shù)據(jù)是 {id=1, name=Daniel, age=24}。在第二條日志中,也發(fā)生了一個(gè)更新操作,對(duì)應(yīng)的記錄的 key 是 id=3,更新前的數(shù)據(jù)是 {id=3, name=James, age=16},更新后的數(shù)據(jù)是 {id=3, name=Andy, age=16}。每條日志還提供了其他元數(shù)據(jù)信息,如數(shù)據(jù)源(source)、版本號(hào)(version)、連接器名稱(connector)、時(shí)間戳(ts_ms)等。這些信息可以幫助我們追蹤記錄的來(lái)源和處理過(guò)程。日志中的 sourceOffset 包含了一些關(guān)鍵信息,如事務(wù)ID(transaction_id)、文件名(file)、偏移位置(pos)等。這些信息可以用于確保數(shù)據(jù)的準(zhǔn)確順序和一致性。

7.2Table/SQL API的用法

FlinkSQLCDC.java

package com.daniel.util;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

/**

* @Author Daniel

* @Date: 2023/7/25 15:25

* @Description

**/

public class FlinkSQLCDC {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.executeSql("CREATE TABLE test_cdc (" +

" id int primary key," +

" name STRING," +

" age int" +

") WITH (" +

" 'connector' = 'mysql-cdc'," +

" 'scan.startup.mode' = 'latest-offset'," +

" 'hostname' = 'localhost'," +

" 'port' = '3306'," +

" 'username' = 'root'," +

" 'password' = '123456'," +

" 'database-name' = 'flink'," +

" 'table-name' = 'test_cdc'" +

")");

Table table = tableEnv.sqlQuery("select * from test_cdc");

DataStream> dataStreamSource = tableEnv.toRetractStream(table, Row.class);

dataStreamSource.print();

env.execute("FlinkSQLCDC");

}

}

UPDATE flink.test_cdc t SET t.age = 55 WHERE t.id = 2;

UPDATE flink.test_cdc t SET t.age = 22 WHERE t.id = 3;

UPDATE flink.test_cdc t SET t.name = 'Alice' WHERE t.id = 4;

UPDATE flink.test_cdc t SET t.age = 18 WHERE t.id = 1;

INSERT INTO flink.test_cdc (id, name, age) VALUES (5, 'David', 29);

打印出的日志

(false,-U[2, David, 38])

(true,+U[2, David, 55])

(false,-U[3, Andy, 16])

(true,+U[3, Andy, 22])

(false,-U[4, Robert, 27])

(true,+U[4, Alice, 27])

(false,-U[1, Daniel, 24])

(true,+U[1, Daniel, 18])

(true,+I[5, David, 29])

柚子快報(bào)邀請(qǐng)碼778899分享:一文弄懂Flink CDC

http://yzkb.51969.com/

文章鏈接

評(píng)論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18359854.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問(wèn)

文章目錄