PostgreSQL 14 到 Apache Doris 4.1.1 CDC 同步方案
本文档用于验证并实施 PostgreSQL 业务库 fudabd_common_plat_db.public.terminal_pos_202602 到 Apache Doris 分析库 fudabd_common_plat_db.terminal_pos 的数据同步。
实际验证环境要求
PostgreSQL 源端
-
PostgreSQL 版本:14。
-
连接地址:
127.0.0.1:15432。 -
登录账号:
postgres。 -
登录密码:
Fdbd@2013。 -
源数据库:
fudabd_common_plat_db。 -
源 schema:
public。 -
源表:
fudabd_user。 -
主键字段:
id。 -
已开启 logical replication。
-
pg_hba.conf已允许 Doris 所在机器或容器访问数据库和 logical replication。 -
PostgreSQL 账号具备 logical replication、publication、slot 操作权限。
Doris 目标端
Apache Doris要求 最低docker镜像版本 4.1.1 ,此版本支持Stream Job;
-
Doris 版本:
doris-4.1.1-rc01-b10073ad9ca。 -
Doris
@@version:5.7.99。 -
目标数据库:
fudabd_common_plat_db。 -
目标表:
fudabd_user。 -
目标表当前不存在,需要由方案一手动创建。
-
目标表需要支持
UPDATE、DELETE,因此设计为UNIQUE KEY表。
同步要求
-
需要先全量初始化,再持续增量同步。
-
需要处理
INSERT、UPDATE、DELETE。 -
当前只验证
terminal_pos_202602单表,暂不处理后续每月新增分表。 -
单表数据量:日均约 500 万条。
-
延迟目标:10 秒内。
Doris Streaming Job 支持判断
Apache Doris 4.x 官方文档提供 CREATE JOB ... ON STREAMING,支持以下两类模式:
TVF Mode:使用cdc_stream(...)读取 PostgreSQL CDC,再通过INSERT INTO ... SELECT ...写入指定 Doris 表,适合单表 SQL 映射同步。
当前环境是 4.1.1-rc01,建议正式执行前在当前 Doris 集群验证 Streaming Job 语法和 FE 配置。
参考文档:
-
CREATE STREAMING JOB:CREATE STREAMING JOB - Apache Doris -
PostgreSQL SQL Mapping Sync:PostgreSQL CDC with SQL Mapping - Apache Doris
-
PostgreSQL Auto Table Creation Sync:PostgreSQL CDC with Auto Table Creation - Apache Doris
-
Doris 4.1.1 Release Notes:Release 4.1.1 - Apache Doris
前置检查
PostgreSQL 配置检查
当前已确认 postgresql.conf 包含以下配置:
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
wal_sender_timeout = 0
执行以下 SQL 复核:
SHOW wal_level;
SHOW max_replication_slots;
SHOW max_wal_senders;
检查源表主键:
SELECT
tc.table_schema,
tc.table_name,
kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = 'public'
AND tc.table_name = 'terminal_pos_202602';
Doris 配置检查
登录 Doris FE MySQL 协议端口后执行:
SELECT @@version_comment, @@version;
SHOW FRONTEND CONFIG LIKE 'max_streaming_job_num';
如果 max_streaming_job_num 为 0 或 Streaming Job 语法不可用,需要先调整 Doris FE 配置或切换到正式 4.1.1 镜像版本。
PostgreSQL JDBC Driver 准备
Streaming Job 需要 PostgreSQL JDBC Driver。建议将驱动放到 Doris FE/BE 都可访问的 HTTP 地址。
示例:
postgresql-42.7.3.jar
本文 SQL 中使用 <PG_DRIVER_URL> 占位,执行前替换为实际地址,例如:
https://maven.aliyun.com/repository/public/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar
源表结构
CREATE TABLE `fudabd_user` (
`id` bigint NOT NULL COMMENT "用户ID",
`user_name` varchar(50) NOT NULL COMMENT "用户名",
`password` varchar(255) NOT NULL COMMENT "密码(应加密存储)",
`age` tinyint NULL COMMENT "年龄",
`create_time` datetime NULL COMMENT "创建时间",
`update_time` datetime NULL COMMENT "更新时间",
`is_deleted` int NULL DEFAULT "0" COMMENT "是否删除标记"
)
SQL 映射同步
推荐结论
当前验证目标是 fudabd_user 同步到 fudabd_user,且 Doris 目标表不存在、需要设计为 UNIQUE KEY,优先 SQL 映射同步。
该模式优势:
-
可将源表
fudabd_user明确写入目标表fudabd_user。 -
可手动控制 Doris 表模型、字段类型、分桶数和副本数。
-
可承接 PostgreSQL 主键表的
INSERT、UPDATE、DELETECDC 变更。
创建 Doris 数据库
CREATE DATABASE IF NOT EXISTS fudabd_common_plat_db;
USE fudabd_common_plat_db;
创建 Doris 目标表
CREATE TABLE `fudabd_user` (
`id` bigint NOT NULL COMMENT "用户ID",
`user_name` varchar(50) NOT NULL COMMENT "用户名",
`password` varchar(255) NOT NULL COMMENT "密码(应加密存储)",
`age` tinyint NULL COMMENT "年龄",
`create_time` datetime NULL COMMENT "创建时间",
`update_time` datetime NULL COMMENT "更新时间",
`is_deleted` int NULL DEFAULT "0" COMMENT "是否删除标记"
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V3",
"compression" = "LZ4",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"enable_mow_light_delete" = "false"
);;
参数建议:
-
单 BE 验证环境使用
replication_num = 1。 -
生产多 BE 环境建议改为
replication_num = 3。 -
日均 500 万数据验证阶段可先使用
BUCKETS 32,生产环境根据 BE 数量和 tablet 大小调整为32或64。
创建 SQL 映射 Streaming Job
执行前需要将 <PG_DRIVER_URL> 替换为 PostgreSQL JDBC Driver 的实际地址。
将 “offset” 从 “initial” 改为 “latest” 即可跳过全量快照,仅捕获增量变更:
CREATE JOB pg_fudabd_user_to_fudabd_user
ON STREAMING
DO
INSERT INTO fudabd_common_plat_db.fudabd_user (
id ,
user_name ,
password ,
age` tinyint ,
create_time ,
update_time ,
is_deleted
)
SELECT
id,
user_name ,
password ,
age,
create_time ,
update_time ,
is_deleted
FROM cdc_stream(
"type" = "postgres",
"jdbc_url" = "jdbc:postgresql://127.0.0.1:15432/fudabd_common_plat_db",
"driver_url" = "https://maven.aliyun.com/repository/public/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "Fdbd@2013",
"database" = "fudabd_common_plat_db",
"schema" = "public",
"table" = "fudabd_user",
"offset" = "initial"
);
JOB 运维命令
以下命令用于查看、暂停、恢复和删除前面创建的 Doris Streaming Job。
JOB
查看所有 INSERT 类型 JOB:
SELECT * FROM jobs("type" = "insert");
查看 SQL 映射同步 JOB:
SELECT *
FROM jobs("type" = "insert")
WHERE Name = 'pg_fudabd_user_to_fudabd_user';
查看 JOB 产生的 TASK
查看所有 INSERT 类型 TASK:
SELECT * FROM tasks("type" = "insert");
查看 SQL 映射同步 JOB 的 TASK:
SELECT *
FROM tasks("type" = "insert")
WHERE JobName = 'pg_fudabd_user_to_fudabd_user';
暂停 JOB
暂停 SQL 映射同步 JOB:
PAUSE JOB WHERE jobName = 'pg_fudabd_user_to_fudabd_user';
恢复启动 JOB
恢复 SQL 映射同步 JOB:
RESUME JOB WHERE jobName = 'pg_fudabd_user_to_fudabd_user';
删除 JOB
删除 SQL 映射同步 JOB:
DROP JOB WHERE jobName = 'pg_fudabd_user_to_fudabd_user';
PGSQL运维命令
-- 查看当前 Publication
SELECT
p.pubname,
n.nspname AS schema_name,
c.relname AS table_name
FROM pg_publication p
JOIN pg_publication_rel pr
ON p.oid = pr.prpubid
JOIN pg_class c
ON pr.prrelid = c.oid
JOIN pg_namespace n
ON c.relnamespace = n.oid;
-- 查看 Replication Slot
SELECT
slot_name,
plugin,
slot_type,
active,
database,
restart_lsn,
confirmed_flush_lsn
FROM pg_replication_slots;
-- 如果 Slot 仍然 active,先终止连接
SELECT
pid,
usename,
application_name,
client_addr,
state
FROM pg_stat_activity
WHERE backend_type = 'walsender';
SELECT pg_terminate_backend(pid); -- pid 需要手动替换
-- 删除 Replication Slot
SELECT pg_drop_replication_slot('slot_name');
-- 删除 Publication
-- 查看当前 Publication 进行替换
DROP PUBLICATION doris_pub_1781162893358;
-- 确认 Slot 已删除
SELECT slot_name
FROM pg_replication_slots;
-- 检查 WAL 是否恢复正常
SELECT
slot_name,
active,
pg_size_pretty(
pg_wal_lsn_diff(
pg_current_wal_lsn(),
restart_lsn
)
) AS retained_wal
FROM pg_replication_slots;
延迟与性能建议
-
当前单表日均约 500 万条,验证阶段建议先使用
BUCKETS 32。 -
如果同步延迟超过 10 秒,优先检查 Doris Streaming Job 状态、BE compaction、tablet 数量、网络带宽和 PostgreSQL replication slot WAL 堆积。
-
如果 Doris 集群 BE 数量较多,可将
BUCKETS调整到64,但不建议在 POC 初期过度增加 tablet 数。 -
生产环境建议使用多 BE,并将
replication_num调整为3。 -
PostgreSQL 侧需要持续监控
pg_replication_slots,避免 Doris Job 停止后 WAL 长时间堆积。
注意事项
STREAMING JOB 任务状态出现PENDING解决
问题根因
__internal_schema.streaming_job_meta 是 Doris 内部管理 Streaming Job 元数据的系统表,FE 未能自动创建它。所有 Streaming Job 操作都依赖此表,缺失则全部阻塞在 PENDING。
确认 __internal_schema 库及表现状
-- 查看内部库是否存在
SHOW DATABASES LIKE '__internal_schema';
-- 如果存在,查看里面有哪些表
SHOW TABLES FROM __internal_schema;
第一步:先停掉报错的 JOB 止血
TOP JOB pg_fudabd_user_to_fudabd_user;
第二步:手动创建缺失的内部表
CREATE TABLE IF NOT EXISTS `__internal_schema`.`streaming_job_meta` (
`id` bigint NOT NULL,
`job_id` bigint NOT NULL,
`table_name` varchar(256) NOT NULL,
`chunk_list` text
) UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
这是基于日志中 SQL 推断的最小表结构。如果后续 FE 代码还访问了其他字段,会再次报错,届时根据错误信息用 ALTER TABLE 补充列即可。
第三步:删除旧 JOB 重新创建
3 个帖子 - 3 位参与者