PostgreSQL 14 到 Apache Doris 4.1.1 CDC 同步方案

PostgreSQL 14 到 Apache Doris 4.1.1 CDC 同步方案
PostgreSQL 14 到 Apache Doris 4.1.1 CDC 同步方案

PostgreSQL 14 到 Apache Doris 4.1.1 CDC 同步方案

本文档用于验证并实施 PostgreSQL 业务库 fudabd_common_plat_db.public.terminal_pos_202602 到 Apache Doris 分析库 fudabd_common_plat_db.terminal_pos 的数据同步。

实际验证环境要求

PostgreSQL 源端

  • PostgreSQL 版本:14。

  • 连接地址:127.0.0.1:15432

  • 登录账号:postgres

  • 登录密码:Fdbd@2013

  • 源数据库:fudabd_common_plat_db

  • 源 schema:public

  • 源表:fudabd_user

  • 主键字段:id

  • 已开启 logical replication。

  • pg_hba.conf 已允许 Doris 所在机器或容器访问数据库和 logical replication。

  • PostgreSQL 账号具备 logical replication、publication、slot 操作权限。

Doris 目标端

Apache Doris要求 最低docker镜像版本 4.1.1 ,此版本支持Stream Job;

  • Doris 版本:doris-4.1.1-rc01-b10073ad9ca

  • Doris @@version5.7.99

  • 目标数据库:fudabd_common_plat_db

  • 目标表:fudabd_user

  • 目标表当前不存在,需要由方案一手动创建。

  • 目标表需要支持 UPDATEDELETE,因此设计为 UNIQUE KEY 表。

同步要求

  • 需要先全量初始化,再持续增量同步。

  • 需要处理 INSERTUPDATEDELETE

  • 当前只验证 terminal_pos_202602 单表,暂不处理后续每月新增分表。

  • 单表数据量:日均约 500 万条。

  • 延迟目标:10 秒内。

Doris Streaming Job 支持判断

Apache Doris 4.x 官方文档提供 CREATE JOB ... ON STREAMING,支持以下两类模式:

  • TVF Mode:使用 cdc_stream(...) 读取 PostgreSQL CDC,再通过 INSERT INTO ... SELECT ... 写入指定 Doris 表,适合单表 SQL 映射同步。

当前环境是 4.1.1-rc01,建议正式执行前在当前 Doris 集群验证 Streaming Job 语法和 FE 配置。

参考文档:

前置检查

PostgreSQL 配置检查

当前已确认 postgresql.conf 包含以下配置:

wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
wal_sender_timeout = 0

执行以下 SQL 复核:

SHOW wal_level;
SHOW max_replication_slots;
SHOW max_wal_senders;

检查源表主键:

SELECT
    tc.table_schema,
    tc.table_name,
    kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
    ON tc.constraint_name = kcu.constraint_name
   AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
  AND tc.table_schema = 'public'
  AND tc.table_name = 'terminal_pos_202602';

Doris 配置检查

登录 Doris FE MySQL 协议端口后执行:

SELECT @@version_comment, @@version;
SHOW FRONTEND CONFIG LIKE 'max_streaming_job_num';

如果 max_streaming_job_num0 或 Streaming Job 语法不可用,需要先调整 Doris FE 配置或切换到正式 4.1.1 镜像版本。

PostgreSQL JDBC Driver 准备

Streaming Job 需要 PostgreSQL JDBC Driver。建议将驱动放到 Doris FE/BE 都可访问的 HTTP 地址。

示例:

postgresql-42.7.3.jar

本文 SQL 中使用 <PG_DRIVER_URL> 占位,执行前替换为实际地址,例如:

https://maven.aliyun.com/repository/public/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar

源表结构

CREATE TABLE `fudabd_user` (
  `id` bigint NOT NULL COMMENT "用户ID",
  `user_name` varchar(50) NOT NULL COMMENT "用户名",
  `password` varchar(255) NOT NULL COMMENT "密码(应加密存储)",
  `age` tinyint NULL COMMENT "年龄",
  `create_time` datetime NULL COMMENT "创建时间",
  `update_time` datetime NULL COMMENT "更新时间",
  `is_deleted` int NULL DEFAULT "0" COMMENT "是否删除标记"
)

SQL 映射同步

推荐结论

当前验证目标是 fudabd_user 同步到 fudabd_user,且 Doris 目标表不存在、需要设计为 UNIQUE KEY,优先 SQL 映射同步。

该模式优势:

  • 可将源表 fudabd_user 明确写入目标表 fudabd_user

  • 可手动控制 Doris 表模型、字段类型、分桶数和副本数。

  • 可承接 PostgreSQL 主键表的 INSERTUPDATEDELETE CDC 变更。

创建 Doris 数据库

CREATE DATABASE IF NOT EXISTS fudabd_common_plat_db;
USE fudabd_common_plat_db;

创建 Doris 目标表

CREATE TABLE `fudabd_user` (
  `id` bigint NOT NULL COMMENT "用户ID",
  `user_name` varchar(50) NOT NULL COMMENT "用户名",
  `password` varchar(255) NOT NULL COMMENT "密码(应加密存储)",
  `age` tinyint NULL COMMENT "年龄",
  `create_time` datetime NULL COMMENT "创建时间",
  `update_time` datetime NULL COMMENT "更新时间",
  `is_deleted` int NULL DEFAULT "0" COMMENT "是否删除标记"
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V3",
"compression" = "LZ4",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"enable_mow_light_delete" = "false"
);;

参数建议:

  • 单 BE 验证环境使用 replication_num = 1

  • 生产多 BE 环境建议改为 replication_num = 3

  • 日均 500 万数据验证阶段可先使用 BUCKETS 32,生产环境根据 BE 数量和 tablet 大小调整为 3264

创建 SQL 映射 Streaming Job

执行前需要将 <PG_DRIVER_URL> 替换为 PostgreSQL JDBC Driver 的实际地址。

将 “offset” 从 “initial” 改为 “latest” 即可跳过全量快照,仅捕获增量变更:

CREATE JOB pg_fudabd_user_to_fudabd_user
ON STREAMING
DO
INSERT INTO fudabd_common_plat_db.fudabd_user (
  id ,
  user_name ,
  password ,
  age` tinyint ,
  create_time ,
  update_time ,
  is_deleted 
)
SELECT
  id,
  user_name ,
  password ,
  age,
  create_time ,
  update_time ,
  is_deleted 
FROM cdc_stream(
    "type" = "postgres",
    "jdbc_url" = "jdbc:postgresql://127.0.0.1:15432/fudabd_common_plat_db",
    "driver_url" = "https://maven.aliyun.com/repository/public/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar",
    "driver_class" = "org.postgresql.Driver",
    "user" = "postgres",
    "password" = "Fdbd@2013",
    "database" = "fudabd_common_plat_db",
    "schema" = "public",
    "table" = "fudabd_user",
    "offset" = "initial"
);

JOB 运维命令

以下命令用于查看、暂停、恢复和删除前面创建的 Doris Streaming Job。

JOB

查看所有 INSERT 类型 JOB:

SELECT * FROM jobs("type" = "insert");

查看 SQL 映射同步 JOB:

SELECT *
FROM jobs("type" = "insert")
WHERE Name = 'pg_fudabd_user_to_fudabd_user';

查看 JOB 产生的 TASK

查看所有 INSERT 类型 TASK:

SELECT * FROM tasks("type" = "insert");

查看 SQL 映射同步 JOB 的 TASK:

SELECT *
FROM tasks("type" = "insert")
WHERE JobName = 'pg_fudabd_user_to_fudabd_user';

暂停 JOB

暂停 SQL 映射同步 JOB:

PAUSE JOB WHERE jobName = 'pg_fudabd_user_to_fudabd_user';

恢复启动 JOB

恢复 SQL 映射同步 JOB:

RESUME JOB WHERE jobName = 'pg_fudabd_user_to_fudabd_user';

删除 JOB

删除 SQL 映射同步 JOB:

DROP JOB WHERE jobName = 'pg_fudabd_user_to_fudabd_user';

PGSQL运维命令

-- 查看当前 Publication
SELECT
    p.pubname,
    n.nspname AS schema_name,
    c.relname AS table_name
FROM pg_publication p
JOIN pg_publication_rel pr
    ON p.oid = pr.prpubid
JOIN pg_class c
    ON pr.prrelid = c.oid
JOIN pg_namespace n
    ON c.relnamespace = n.oid;
        
-- 查看 Replication Slot
SELECT
    slot_name,
    plugin,
    slot_type,
    active,
    database,
    restart_lsn,
    confirmed_flush_lsn
FROM pg_replication_slots;
​
-- 如果 Slot 仍然 active,先终止连接
SELECT
    pid,
    usename,
    application_name,
    client_addr,
    state
FROM pg_stat_activity
WHERE backend_type = 'walsender';
​
SELECT pg_terminate_backend(pid); -- pid 需要手动替换
​
-- 删除 Replication Slot
SELECT pg_drop_replication_slot('slot_name');
​
-- 删除 Publication
-- 查看当前 Publication 进行替换
DROP PUBLICATION doris_pub_1781162893358; 
​
​
-- 确认 Slot 已删除
SELECT slot_name
FROM pg_replication_slots;
​
-- 检查 WAL 是否恢复正常
SELECT
    slot_name,
    active,
    pg_size_pretty(
        pg_wal_lsn_diff(
            pg_current_wal_lsn(),
            restart_lsn
        )
    ) AS retained_wal
FROM pg_replication_slots;
​

延迟与性能建议

  • 当前单表日均约 500 万条,验证阶段建议先使用 BUCKETS 32

  • 如果同步延迟超过 10 秒,优先检查 Doris Streaming Job 状态、BE compaction、tablet 数量、网络带宽和 PostgreSQL replication slot WAL 堆积。

  • 如果 Doris 集群 BE 数量较多,可将 BUCKETS 调整到 64,但不建议在 POC 初期过度增加 tablet 数。

  • 生产环境建议使用多 BE,并将 replication_num 调整为 3

  • PostgreSQL 侧需要持续监控 pg_replication_slots,避免 Doris Job 停止后 WAL 长时间堆积。

注意事项

STREAMING JOB 任务状态出现PENDING解决

问题根因

__internal_schema.streaming_job_meta 是 Doris 内部管理 Streaming Job 元数据的系统表,FE 未能自动创建它。所有 Streaming Job 操作都依赖此表,缺失则全部阻塞在 PENDING。

确认 __internal_schema 库及表现状

-- 查看内部库是否存在
SHOW DATABASES LIKE '__internal_schema';
​
-- 如果存在,查看里面有哪些表
SHOW TABLES FROM __internal_schema;

第一步:先停掉报错的 JOB 止血

TOP JOB pg_fudabd_user_to_fudabd_user;

第二步:手动创建缺失的内部表

CREATE TABLE IF NOT EXISTS `__internal_schema`.`streaming_job_meta` (
    `id` bigint NOT NULL,
    `job_id` bigint NOT NULL,
    `table_name` varchar(256) NOT NULL,
    `chunk_list` text
) UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
    "replication_num" = "1"
);

:warning: 这是基于日志中 SQL 推断的最小表结构。如果后续 FE 代码还访问了其他字段,会再次报错,届时根据错误信息用 ALTER TABLE 补充列即可。

第三步:删除旧 JOB 重新创建

3 个帖子 - 3 位参与者

阅读完整话题

来源: LinuxDo 最新话题查看原文