对不同数据库类型开启 CDC
MySQL
开启 binlog
- 永久开启的方式
在MySQL conf配置加入以下配置
[mysqld]
server-id=1
log-bin = /var/lib/mysql/mysql-bin
binlog_format = ROW
expire-logs-days=7
max-binlog-size=500M
- 查看是否开启
show variables like '%log_bin%';
如果是非root账户,则需要对账号进行授权
-- 对user用户进行授权
GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'user'@'%';
msyql flink sql
EXECUTE wf-sa_task_sql WITH (
'connector' = 'mysql-cdc',
'hostname' = 'rds-mysql-instance4-svc.newdao-common',
'port' = '3306',
'username' = 'jnofcaag_580',
'password' = 'PrPj_1561928',
'scan.incremental.snapshot.enabled' = 'true',
'jdbc.properties.tinyInt1isBit' = 'false',
'jdbc.properties.transformedBitIsBoolean' = 'false',
'jdbc.properties.autoReconnect' = 'true',
'checkpoint' = '3009',
'scan.startup.mode' = 'latest-offset',
'parallelism' = '1',
'database-name' = '20240909192654jnofcaag',
'schema-name' = '20240909192654jnofcaag',
'table-name' = '20240909192654jnofcaag\.orders',
'sink.connector' = 'datastream-kafka',
'sink.topic' = 'testflinkenv-20240909192654jnofcaag',
'sink.brokers' = 'kafka.newdao-tenant-chensc2:9092',
'sink.properties.transaction.timeout.ms' = '900000',
'sink.key.json.ignore-parse-errors' = 'true',
'sink.value.json.fail-on-missing-field' = 'false',
'debezium.decimal.handling.mode' = 'double',
)
oracle
oracle 比较特殊以下讲述比较详细,强烈建议使用 oracle19c 及之后的版本
安装并配置
non-cdb 模式
* 安装oracle
oracle 镜像官方 https://hub.docker.com/r/acktsw/oracle-xe-11g
docker run -d --name=oracle11g -p 1521:1521 -p 8013:8080 -e ORACLE_ALLOW_REMOTE=true -e ORACLE_DISABLE_ASYNCH_IO=true -v /usr/share/zoneinfo/Asia/Shanghai:/etc/localtime acktsw/oracle-xe-11g
# 管理员连接信息
# sid: XE
# username: sys
# password: oracle
* 进入oracle 内部进行配置
docker exec -it oracle11g /bin/bash
* 开始执行SQL
sqlplus /nolog
* 如果有密码
sqlplus sys/oracle@//localhost:1521/XE as sysdba
* 连接DBA
CONNECT sys/oracle AS SYSDBA
* 以下于Non-CDB数据库为例,参考flink-cdc https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/oracle-cdc
* 开启归档日志
alter system set db_recovery_file_dest_size = 10G;
* /u01/app/oracle/oradata/XE/recovery_area 目录需事先准备路径
alter system set db_recovery_file_dest = '/u01/app/oracle/oradata/XE/recovery_area' scope=spfile;
* 重启
shutdown immediate;
startup mount;
* 开启归档(整个过程最关键)
alter database archivelog;
alter database open;
* 验证归档结果
archive log list;
* 创建 logminer 表空间,/opt/oracle/oradata/XE 目录需事先准备好,并给足权限 chmod -R 777
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/XE/logminer_tbs.dbf' SIZE 250M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
* 创建用户并给足权限
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT ANALYZE ANY TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
* 以上配置内容在flink-cdc中需要的连接信息
'schema-name' = 'flinkuser',
'port' = '49161',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
* 全库开启 LOG
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
* 最后对SA_TASK表开启 LOG
ALTER TABLE FLINKUSER.SA_TASK ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
* 常见问题
* 文件缺失、内存问题等内部异常:
1.一般有问题进入 /u01/app/oracle/diag/rdbms/xe/XE/trace/alert_XE.log 进行查看
2.请写定时器定时清理归档日志,以免撑爆数据库
* cdc同步速度较慢,一般要2分钟才能够触发数据事件:
1.可能是归档日志IO模式是异步,可以尝试改为同步
SELECT name, value FROM v$parameter WHERE name = 'disk_asynch_io';
2.也有可能是unlog、binlog等原因
3.这块在 oracle19c 官方进行了优化和改善,可以到达秒级数据事件的效果
cdb 模式
* 安装oracle
oracle 镜像官方 https://hub.docker.com/r/banglamon/oracle193db:19.3.0-ee
docker run --name=oracle19c -p 21521:1521 -p 25500:5500 -e ORACLE_SID=XE -e ORACLE_PDB=ORCLPDB1 -e ORACLE_PWD=oracle -d banglamon/oracle193db:19.3.0-ee
# 管理员连接信息
# sid: XE
# username: sys
# password: oracle
# pdb: ORCLPDB1
* 进入oracle 内部进行配置
docker exec -it oracle19c /bin/bash
* 开始执行SQL
sqlplus /nolog
* 连接DBA
CONNECT sys/oracle AS SYSDBA
* 也可以这样
sqlplus sys/oracle@//localhost:1521/XE as sysdba
* 以下以CDB数据库为例,参考flink-cdc https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/oracle-cdc
* 开启归档日志
alter system set db_recovery_file_dest_size = 10G;
* /opt/oracle/oradata/XE/recovery_area 目录需事先准备路径,并给足权限 chmod -R 777
alter system set db_recovery_file_dest = '/opt/oracle/oradata/XE/recovery_area' scope=spfile;
* 重启,或者重启docker容器
shutdown immediate;
startup mount;
* 开启归档(整个过程最关键)
alter database archivelog;
alter database open;
* 验证归档结果
archive log list;
* 创建 logminer 表空间,/opt/oracle/oradata/XE 目录需事先准备好,并给足权限 chmod -R 777
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/XE/logminer_tbs.dbf' SIZE 250M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
* 登录到 pdb 用户上再次设置表空间
sqlplus sys/oracle@//localhost:1521/ORCLPDB1 as sysdba
* 或者从根用户切换到ORCLPDB1容器
alter session set container=ORCLPDB1;
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/XE/logminer_tbs.dbf' SIZE 250M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
* 回到根用户进行用户创建
sqlplus sys/oracle@//localhost:1521/XE as sysdba
* 改公共用户前缀标识,从C##到C_
ALTER SYSTEM SET common_user_prefix = 'C_' SCOPE = spfile;
* 如果出现 ORA-32001: write to SPFILE requested but no SPFILE is in use 错误时
需开启 CREATE SPFILE FROM PFILE; 然后进行重启,后再进行设置
* 重启
shutdown immediate;
startup mount;
show parameter common_user;
* 创建用户
CREATE USER C_flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO C_flinkuser CONTAINER=ALL;
GRANT SET CONTAINER TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to C_flinkuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO C_flinkuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO C_flinkuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO C_flinkuser CONTAINER=ALL;
GRANT LOGMINING TO C_flinkuser CONTAINER=ALL;
GRANT CREATE TABLE TO C_flinkuser CONTAINER=ALL;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO C_flinkuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO C_flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO C_flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO C_flinkuser CONTAINER=ALL;
* 全库开启 LOG
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
* 表也需要开启log,如下对SA_TASK表开启 LOG
ALTER TABLE C_flinkuser.SA_TASK ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
* 以上配置内容在flink-cdc中需要的连接信息
'schema-name' = 'C_flinkuser',
'port' = '21521',
'username' = 'C_flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
* cdb 模式下常用的几个查询语句
* 查询表空间
SELECT TABLESPACE_NAME,FILE_NAME,BYTES,AUTOEXTENSIBLE FROM DBA_DATA_FILES;
* 查看Pdb
select con_id, dbid, NAME, OPEN_MODE from v$pdbs;
* 查询是不是cdb模式
select sys_context ('USERENV', 'CON_NAME') from dual;
* 查询所有用户
select a.username,a.common,a.con_id,decode(a.CON_ID,1,'CDB$ROOT',b.pdb_name) name from cdb_users a,cdb_pdbs b
where a.con_id=b.con_id(+) and oracle_maintained='N'
order by a.con_id,a.username;
* cdb 在版本 oracle19c 下完全正常已测试通过
* 为什么公共用户前缀标识,要从C##改为C_
因为 dlink 整库同步的SQL里不支持有 # 的关键标识,#{xxx}在dlink内是作为变量标识使用的
* oracle内存要求很大至少 8GB
* 出现 LRM-00109: could not open parameter file '/u01/app/oracle/product/19.3.0/db100/dbs/initxe.ora'
cd /u01/app/oracle/admin/XE/pfile
cp init.ora.71202412325 /u01/app/oracle/product/19.3.0/db100/dbs/initxe.ora
docker cp initxe.ora oracle19c-standard:/u01/app/oracle/product/19.3.0/db100/dbs/initxe.ora
* 出现归档日志爆满,如何进行清理的命令如下
rman target sys/oracle@//localhost:1521/XE
crosscheck archivelog all;
delete expired archivelog all;
参考资料
oralce12c+用户权限管理 https://blog.csdn.net/su377486/article/details/101570051
Oracle PDB、CDB模式下创建表空间、用户和授权 https://www.cnblogs.com/zhangruifeng/p/14605817.html
Oracle 19c 中CDB和PDB的常见操作介绍 https://blog.csdn.net/Ruishine/article/details/136783212
归档日志爆满定期清理 https://blog.csdn.net/bing_yuan/article/details/135235723
oracle flink SQL
-- 整库同步的方案
EXECUTE CDCSOURCE orcale-wf-sa-task-sql WITH
(
'connector' = 'oracle-cdc',
'hostname' = '192.168.0.238',
'port' = '21521',
'username' = 'c_flinkuser',
'password' = 'flinkpw',
'scan.incremental.snapshot.enabled' = 'false',
'jdbc.properties.tinyInt1isBit' = 'false',
'jdbc.properties.transformedBitIsBoolean' = 'false',
'jdbc.properties.autoReconnect' = 'true',
'checkpoint' = '3009',
'scan.startup.mode' = 'latest-offset',
'parallelism' = '1',
'database-name' = 'XE',
'schema-name' = 'C_FLINKUSER',
'table-name' = 'C_FLINKUSER\.ORDERS',
'sink.connector' = 'datastream-kafka',
'sink.topic' = 'test-oracle-kafka',
'sink.brokers' = 'kafka:9092',
'sink.properties.transaction.timeout.ms' = '900000',
'sink.key.json.ignore-parse-errors' = 'true',
'sink.value.json.fail-on-missing-field' = 'false',
'debezium.decimal.handling.mode' = 'double',
)
postgresql
安装
docker run -d -p 5432:5432 --name postgresql -v /work/mywork/flink-env/postgresql/data:/var/lib/postgresql/data -e POSTGRES_PASSWORD=123456789 postgres:12.20
# 进入容器连接数据库
docker exec -it postgresql /bin/bash
$ psql -h 127.0.0.1 -p 5432 -U postgres
配置
# 进入容器内部,配置postgresql.conf
vi /var/lib/postgresql/data/postgresql.conf
# 更改wal日志方式为logical(方式有:minimal、replica 、logical )
wal_level = logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s
安装 postgres-decoderbufs 插件
# 参考资料 https://github.com/debezium/postgres-decoderbufs
# 进入容器内部执行以下命令
# 安装构建环境和类库
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-12
apt-get install -f -y libproj-dev liblwgeom-dev
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" -y && apt-get update
apt-get install -y libprotobuf-c-dev
apt-get install -y protobuf-c-compiler
# 构建 decoderbufs 插件
git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
make
make install
新建用户并赋权
-- 创建数据库 test_db
CREATE DATABASE test_db;
-- 连接到新创建的数据库 test_db
\c test_db ;
-- 创建 t_user 表
CREATE TABLE "public"."orders" (
"order_id" varchar(254) COLLATE "pg_catalog"."default" NOT NULL,
"order_date" timestamptz(6),
"customer_name" varchar(254) COLLATE "pg_catalog"."default" NOT NULL,
"price" numeric(18,2) NOT NULL,
"product_id" int4 NOT NULL,
"order_status" int4 NOT NULL,
"growth_rate" float8
);
ALTER TABLE "public"."orders" ADD CONSTRAINT "orders_pkey" PRIMARY KEY ("order_id");
-- 新建用户
CREATE USER pgsql_flinkuser WITH PASSWORD 'pgsql_flinkpw';
-- 给用户复制流权限
ALTER ROLE pgsql_flinkuser replication;
-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to pgsql_flinkuser;
-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO pgsql_flinkuser;
发布表
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
-- 这里特别重要,所有要同步的表都必须进行以下设置 --
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 orders 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 orders 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE orders REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='orders';
辅助查询用到的sql语句
-- 查看solt使用情况
SELECT * FROM pg_replication_slots;
-- 查询删除solt
SELECT pg_drop_replication_slot('zd_org_goods_solt');
-- 查询用户当前连接数
select usename, count(*) from pg_stat_activity group by usename order by count(*) desc;
-- 设置用户最大连接数
alter role odps_etl connection limit 200;
postgresql flink sql
EXECUTE CDCSOURCE postgres-public WITH (
'connector' = 'postgres-cdc',
'hostname' = '192.168.100.3',
'port' = '5432',
'username' = 'pgsql_flinkuser',
'password' = 'pgsql_flinkpw',
'scan.incremental.snapshot.enabled' = 'true',
'jdbc.properties.tinyInt1isBit' = 'false',
'jdbc.properties.transformedBitIsBoolean' = 'false',
'jdbc.properties.autoReconnect' = 'true',
'checkpoint' = '3009',
'scan.startup.mode' = 'latest-offset',
'parallelism' = '1',
'database-name' = 'test_db',
'schema-name' = 'public',
'table-name' = 'public\.orders',
'sink.connector' = 'datastream-kafka',
'sink.topic' = 'testflinkenv-public',
'sink.brokers' = '192.168.100.3:9092',
'sink.properties.transaction.timeout.ms' = '900000',
'sink.key.json.ignore-parse-errors' = 'true',
'sink.value.json.fail-on-missing-field' = 'false',
'debezium.decimal.handling.mode' = 'double',
)
sqlService
安装
docker run -d --name=sqlService -p 1433:1433 -v /etc/localtime:/etc/localtime -e MSSQL_AGENT_ENABLED=true -e MSSQL_PID=Standard -e ACCEPT_EULA=Y -e SA_PASSWORD=Password! mcr.microsoft.com/mssql/server:2019-latest
# 进入容器连接数据库
docker exec -it sqlService /bin/bash
$ /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P Password!
开启cdc
-- 创建数据库,并对库开启cdc
CREATE DATABASE inventory;
GO
USE inventory;
EXEC sys.sp_cdc_enable_db;
-- 创建表并对表开启cdc
CREATE TABLE [dbo].[orders] (
[order_id] varchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL PRIMARY KEY,
[order_date] datetime NULL,
[customer_name] varchar(512) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
[price] numeric(18,2) NULL,
[product_id] int NULL,
[order_status] int NULL,
[growth_rate] float(53) NULL
)
GO
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0;
GO
sqlService flink sql
EXECUTE CDCSOURCE testflinkenv-dbo-2 WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '192.168.0.238',
'port' = '1433',
'username' = 'sa',
'password'='Password!',
'scan.incremental.snapshot.enabled' = 'true',
'jdbc.properties.tinyInt1isBit' = 'false',
'jdbc.properties.transformedBitIsBoolean' = 'false',
'jdbc.properties.autoReconnect' = 'true',
'checkpoint' = '3009',
'scan.startup.mode' = 'latest-offset',
'parallelism' = '1',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'dbo\.orders',
'sink.connector' = 'datastream-kafka',
'sink.topic' = 'testflinkenv-dbo',
'sink.brokers' = 'kafka.newdao-tenant-chensc2:9092',
'sink.properties.transaction.timeout.ms' = '900000',
'sink.key.json.ignore-parse-errors' = 'true',
'sink.value.json.fail-on-missing-field' = 'false',
'debezium.decimal.handling.mode' = 'double',
)