对不同数据库类型开启 CDC

MySQL

开启 binlog

  • 永久开启的方式
在MySQL conf配置加入以下配置

    [mysqld]
    server-id=1

    log-bin = /var/lib/mysql/mysql-bin

    binlog_format = ROW

    expire-logs-days=7

    max-binlog-size=500M
  • 查看是否开启
show variables like '%log_bin%';

如果是非root账户,则需要对账号进行授权

-- 对user用户进行授权
GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'user'@'%';
EXECUTE wf-sa_task_sql WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'rds-mysql-instance4-svc.newdao-common',
  'port' = '3306',
  'username' = 'jnofcaag_580',
  'password' = 'PrPj_1561928',
  'scan.incremental.snapshot.enabled' = 'true',
  'jdbc.properties.tinyInt1isBit' = 'false',
  'jdbc.properties.transformedBitIsBoolean' = 'false',
  'jdbc.properties.autoReconnect' = 'true',
  'checkpoint' = '3009',
  'scan.startup.mode' = 'latest-offset',
  'parallelism' = '1',
  'database-name' = '20240909192654jnofcaag',
  'schema-name' = '20240909192654jnofcaag',
  'table-name' = '20240909192654jnofcaag\.orders',
  'sink.connector' = 'datastream-kafka',
  'sink.topic' = 'testflinkenv-20240909192654jnofcaag',
  'sink.brokers' = 'kafka.newdao-tenant-chensc2:9092',
  'sink.properties.transaction.timeout.ms' = '900000',
  'sink.key.json.ignore-parse-errors' = 'true',
  'sink.value.json.fail-on-missing-field' = 'false',
  'debezium.decimal.handling.mode' = 'double',
)

oracle

oracle 比较特殊以下讲述比较详细,强烈建议使用 oracle19c 及之后的版本

安装并配置

non-cdb 模式

* 安装oracle
  oracle 镜像官方 https://hub.docker.com/r/acktsw/oracle-xe-11g

docker run -d --name=oracle11g -p 1521:1521 -p 8013:8080 -e ORACLE_ALLOW_REMOTE=true -e ORACLE_DISABLE_ASYNCH_IO=true -v /usr/share/zoneinfo/Asia/Shanghai:/etc/localtime acktsw/oracle-xe-11g
# 管理员连接信息
#  sid: XE
#  username: sys
#  password: oracle 

* 进入oracle 内部进行配置
docker exec -it oracle11g /bin/bash
* 开始执行SQL
sqlplus /nolog
* 如果有密码
sqlplus sys/oracle@//localhost:1521/XE as sysdba
* 连接DBA
CONNECT sys/oracle AS SYSDBA

* 以下于Non-CDB数据库为例,参考flink-cdc https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/oracle-cdc
* 开启归档日志
alter system set db_recovery_file_dest_size = 10G;
* /u01/app/oracle/oradata/XE/recovery_area 目录需事先准备路径
alter system set db_recovery_file_dest = '/u01/app/oracle/oradata/XE/recovery_area' scope=spfile;
* 重启
shutdown immediate;
startup mount;

* 开启归档(整个过程最关键)
alter database archivelog;
alter database open;
* 验证归档结果
archive log list;

* 创建 logminer 表空间,/opt/oracle/oradata/XE 目录需事先准备好,并给足权限 chmod -R 777
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/XE/logminer_tbs.dbf' SIZE 250M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

* 创建用户并给足权限
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;

GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT ANALYZE ANY TO flinkuser;

GRANT CREATE TABLE TO flinkuser;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;

GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

* 以上配置内容在flink-cdc中需要的连接信息
  'schema-name' = 'flinkuser',
  'port' = '49161',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database-name' = 'XE',


* 全库开启 LOG
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
* 最后对SA_TASK表开启 LOG
ALTER TABLE FLINKUSER.SA_TASK ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
* 常见问题

* 文件缺失、内存问题等内部异常:
  1.一般有问题进入 /u01/app/oracle/diag/rdbms/xe/XE/trace/alert_XE.log 进行查看
  2.请写定时器定时清理归档日志,以免撑爆数据库

* cdc同步速度较慢,一般要2分钟才能够触发数据事件:
  1.可能是归档日志IO模式是异步,可以尝试改为同步
      SELECT name, value FROM v$parameter WHERE name = 'disk_asynch_io';
  2.也有可能是unlog、binlog等原因
  3.这块在 oracle19c 官方进行了优化和改善,可以到达秒级数据事件的效果

cdb 模式

* 安装oracle
  oracle 镜像官方 https://hub.docker.com/r/banglamon/oracle193db:19.3.0-ee

docker run --name=oracle19c -p 21521:1521 -p 25500:5500 -e ORACLE_SID=XE -e ORACLE_PDB=ORCLPDB1 -e ORACLE_PWD=oracle -d banglamon/oracle193db:19.3.0-ee
# 管理员连接信息
#  sid: XE
#  username: sys
#  password: oracle 
#  pdb: ORCLPDB1

* 进入oracle 内部进行配置
docker exec -it oracle19c /bin/bash
* 开始执行SQL
sqlplus /nolog
* 连接DBA
CONNECT sys/oracle AS SYSDBA
* 也可以这样
sqlplus sys/oracle@//localhost:1521/XE as sysdba


* 以下以CDB数据库为例,参考flink-cdc https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/oracle-cdc
* 开启归档日志
alter system set db_recovery_file_dest_size = 10G;
* /opt/oracle/oradata/XE/recovery_area 目录需事先准备路径,并给足权限 chmod -R 777
alter system set db_recovery_file_dest = '/opt/oracle/oradata/XE/recovery_area' scope=spfile;
* 重启,或者重启docker容器
shutdown immediate;
startup mount;

* 开启归档(整个过程最关键)
alter database archivelog;
alter database open;
* 验证归档结果
archive log list;

* 创建 logminer 表空间,/opt/oracle/oradata/XE 目录需事先准备好,并给足权限 chmod -R 777
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/XE/logminer_tbs.dbf' SIZE 250M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

* 登录到 pdb 用户上再次设置表空间
sqlplus sys/oracle@//localhost:1521/ORCLPDB1 as sysdba
* 或者从根用户切换到ORCLPDB1容器
alter session set container=ORCLPDB1;
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/XE/logminer_tbs.dbf' SIZE 250M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;



* 回到根用户进行用户创建
sqlplus sys/oracle@//localhost:1521/XE as sysdba
* 改公共用户前缀标识,从C##到C_
ALTER SYSTEM SET common_user_prefix = 'C_' SCOPE = spfile;
* 如果出现 ORA-32001: write to SPFILE requested but no SPFILE is in use 错误时
  需开启 CREATE SPFILE FROM PFILE; 然后进行重启,后再进行设置
* 重启
shutdown immediate;
startup mount;
show parameter common_user;

* 创建用户
CREATE USER C_flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO C_flinkuser CONTAINER=ALL;
GRANT SET CONTAINER TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to C_flinkuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO C_flinkuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO C_flinkuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO C_flinkuser CONTAINER=ALL;
GRANT LOGMINING TO C_flinkuser CONTAINER=ALL;
GRANT CREATE TABLE TO C_flinkuser CONTAINER=ALL;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO C_flinkuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO C_flinkuser CONTAINER=ALL;

GRANT EXECUTE ON DBMS_LOGMNR TO C_flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO C_flinkuser CONTAINER=ALL;

GRANT SELECT ON V_$LOG TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO C_flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO C_flinkuser CONTAINER=ALL;

* 全库开启 LOG
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
* 表也需要开启log,如下对SA_TASK表开启 LOG
ALTER TABLE C_flinkuser.SA_TASK ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

* 以上配置内容在flink-cdc中需要的连接信息
  'schema-name' = 'C_flinkuser',
  'port' = '21521',
  'username' = 'C_flinkuser',
  'password' = 'flinkpw',
  'database-name' = 'XE',
* cdb 模式下常用的几个查询语句

  * 查询表空间
    SELECT TABLESPACE_NAME,FILE_NAME,BYTES,AUTOEXTENSIBLE FROM DBA_DATA_FILES;
  * 查看Pdb
    select con_id, dbid, NAME, OPEN_MODE from v$pdbs;
  * 查询是不是cdb模式
    select sys_context ('USERENV', 'CON_NAME') from dual;
  * 查询所有用户
    select a.username,a.common,a.con_id,decode(a.CON_ID,1,'CDB$ROOT',b.pdb_name) name from cdb_users a,cdb_pdbs b 
    where a.con_id=b.con_id(+) and oracle_maintained='N'
    order by a.con_id,a.username;

* cdb 在版本 oracle19c 下完全正常已测试通过

* 为什么公共用户前缀标识,要从C##改为C_
  因为 dlink 整库同步的SQL里不支持有 # 的关键标识,#{xxx}在dlink内是作为变量标识使用的

* oracle内存要求很大至少 8GB

* 出现 LRM-00109: could not open parameter file '/u01/app/oracle/product/19.3.0/db100/dbs/initxe.ora'
  cd /u01/app/oracle/admin/XE/pfile
  cp init.ora.71202412325 /u01/app/oracle/product/19.3.0/db100/dbs/initxe.ora
  docker cp initxe.ora oracle19c-standard:/u01/app/oracle/product/19.3.0/db100/dbs/initxe.ora

* 出现归档日志爆满,如何进行清理的命令如下
  rman target sys/oracle@//localhost:1521/XE
  crosscheck archivelog all;
  delete expired archivelog all;

参考资料

oralce12c+用户权限管理 https://blog.csdn.net/su377486/article/details/101570051

Oracle PDB、CDB模式下创建表空间、用户和授权 https://www.cnblogs.com/zhangruifeng/p/14605817.html

Oracle 19c 中CDB和PDB的常见操作介绍 https://blog.csdn.net/Ruishine/article/details/136783212

归档日志爆满定期清理 https://blog.csdn.net/bing_yuan/article/details/135235723
-- 整库同步的方案
EXECUTE CDCSOURCE orcale-wf-sa-task-sql WITH 
  (
    'connector' = 'oracle-cdc',
    'hostname' = '192.168.0.238',
    'port' = '21521',
    'username' = 'c_flinkuser',
    'password' = 'flinkpw',
    'scan.incremental.snapshot.enabled' = 'false',
    'jdbc.properties.tinyInt1isBit' = 'false',
    'jdbc.properties.transformedBitIsBoolean' = 'false',
    'jdbc.properties.autoReconnect' = 'true',
    'checkpoint' = '3009',
    'scan.startup.mode' = 'latest-offset',
    'parallelism' = '1',
    'database-name' = 'XE',
    'schema-name' = 'C_FLINKUSER',
    'table-name' = 'C_FLINKUSER\.ORDERS',
    'sink.connector' = 'datastream-kafka',
    'sink.topic' = 'test-oracle-kafka',
    'sink.brokers' = 'kafka:9092',
    'sink.properties.transaction.timeout.ms' = '900000',
    'sink.key.json.ignore-parse-errors' = 'true',
    'sink.value.json.fail-on-missing-field' = 'false',
    'debezium.decimal.handling.mode' = 'double',
  )

postgresql

安装

docker run -d -p 5432:5432 --name postgresql  -v /work/mywork/flink-env/postgresql/data:/var/lib/postgresql/data -e POSTGRES_PASSWORD=123456789 postgres:12.20

# 进入容器连接数据库
docker exec -it postgresql /bin/bash
$ psql -h 127.0.0.1 -p 5432 -U postgres

配置

# 进入容器内部,配置postgresql.conf
vi /var/lib/postgresql/data/postgresql.conf

# 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
wal_level = logical  
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20     
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s

安装 postgres-decoderbufs 插件

# 参考资料 https://github.com/debezium/postgres-decoderbufs

# 进入容器内部执行以下命令
# 安装构建环境和类库
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-12

apt-get install -f -y libproj-dev liblwgeom-dev
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" -y && apt-get update
apt-get install -y libprotobuf-c-dev

apt-get install -y protobuf-c-compiler

# 构建 decoderbufs 插件
git clone https://github.com/debezium/postgres-decoderbufs.git
cd postgres-decoderbufs
make
make install

新建用户并赋权

-- 创建数据库 test_db
CREATE DATABASE test_db;

-- 连接到新创建的数据库 test_db
\c test_db ;

-- 创建 t_user 表
CREATE TABLE "public"."orders" (
  "order_id" varchar(254) COLLATE "pg_catalog"."default" NOT NULL,
  "order_date" timestamptz(6),
  "customer_name" varchar(254) COLLATE "pg_catalog"."default" NOT NULL,
  "price" numeric(18,2) NOT NULL,
  "product_id" int4 NOT NULL,
  "order_status" int4 NOT NULL,
  "growth_rate" float8
);
ALTER TABLE "public"."orders" ADD CONSTRAINT "orders_pkey" PRIMARY KEY ("order_id");

-- 新建用户
CREATE USER pgsql_flinkuser WITH PASSWORD 'pgsql_flinkpw';

-- 给用户复制流权限
ALTER ROLE pgsql_flinkuser replication;

-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to pgsql_flinkuser;

-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO pgsql_flinkuser;

发布表

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布
select * from pg_publication_tables;

-- 这里特别重要,所有要同步的表都必须进行以下设置 --
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 orders 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 orders 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE orders REPLICA IDENTITY FULL;

-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='orders';

辅助查询用到的sql语句

-- 查看solt使用情况
SELECT * FROM pg_replication_slots;

-- 查询删除solt
SELECT pg_drop_replication_slot('zd_org_goods_solt');

-- 查询用户当前连接数
select usename, count(*) from pg_stat_activity group by usename order by count(*) desc;

-- 设置用户最大连接数
alter role odps_etl connection limit 200;
EXECUTE CDCSOURCE postgres-public WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '192.168.100.3',
  'port' = '5432',
  'username' = 'pgsql_flinkuser',
  'password' = 'pgsql_flinkpw',
  'scan.incremental.snapshot.enabled' = 'true',
  'jdbc.properties.tinyInt1isBit' = 'false',
  'jdbc.properties.transformedBitIsBoolean' = 'false',
  'jdbc.properties.autoReconnect' = 'true',
  'checkpoint' = '3009',
  'scan.startup.mode' = 'latest-offset',
  'parallelism' = '1',
  'database-name' = 'test_db',
  'schema-name' = 'public',
  'table-name' = 'public\.orders',
  'sink.connector' = 'datastream-kafka',
  'sink.topic' = 'testflinkenv-public',
  'sink.brokers' = '192.168.100.3:9092',
  'sink.properties.transaction.timeout.ms' = '900000',
  'sink.key.json.ignore-parse-errors' = 'true',
  'sink.value.json.fail-on-missing-field' = 'false',
  'debezium.decimal.handling.mode' = 'double',
)

sqlService

安装

docker run -d --name=sqlService -p 1433:1433 -v /etc/localtime:/etc/localtime -e MSSQL_AGENT_ENABLED=true -e MSSQL_PID=Standard -e ACCEPT_EULA=Y -e SA_PASSWORD=Password! mcr.microsoft.com/mssql/server:2019-latest

# 进入容器连接数据库
docker exec -it sqlService /bin/bash
$ /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P Password!

开启cdc

-- 创建数据库,并对库开启cdc
CREATE DATABASE inventory;
GO
USE inventory;
EXEC sys.sp_cdc_enable_db;

-- 创建表并对表开启cdc
CREATE TABLE [dbo].[orders] (
  [order_id] varchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL PRIMARY KEY,
  [order_date] datetime  NULL,
  [customer_name] varchar(512) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
  [price] numeric(18,2)  NULL,
  [product_id] int  NULL,
  [order_status] int  NULL,
  [growth_rate] float(53)  NULL
)
GO
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0;
GO
EXECUTE CDCSOURCE testflinkenv-dbo-2 WITH (
  'connector' = 'sqlserver-cdc',
  'hostname' = '192.168.0.238',
  'port' = '1433',
  'username' = 'sa',
  'password'='Password!',
  'scan.incremental.snapshot.enabled' = 'true',
  'jdbc.properties.tinyInt1isBit' = 'false',
  'jdbc.properties.transformedBitIsBoolean' = 'false',
  'jdbc.properties.autoReconnect' = 'true',
  'checkpoint' = '3009',
  'scan.startup.mode' = 'latest-offset',
  'parallelism' = '1',
  'database-name' = 'inventory',
  'schema-name' = 'dbo',
  'table-name' = 'dbo\.orders',
  'sink.connector' = 'datastream-kafka',
  'sink.topic' = 'testflinkenv-dbo',
  'sink.brokers' = 'kafka.newdao-tenant-chensc2:9092',
  'sink.properties.transaction.timeout.ms' = '900000',
  'sink.key.json.ignore-parse-errors' = 'true',
  'sink.value.json.fail-on-missing-field' = 'false',
  'debezium.decimal.handling.mode' = 'double',
)

results matching ""

    No results matching ""