合 PG逻辑复制插件之pgl_ddl_deploy支持DDL复制
Tags: PG插件逻辑复制pgl_ddl_deployDDL复制
- PG内置的逻辑复制:https://www.dbaup.com/pgzhongdeluojifuzhilogical-replication.html
- PG逻辑复制插件之pglogical使用说明:https://www.dbaup.com/pgluojifuzhichajianzhipglogicalshiyongshuoming.html
简介
详情:https://github.com/lhrbest/pgl_ddl_deploy
Transparent DDL replication for Postgres 9.5+ for both pglogical and native logical replication.
PostgreSQL 社区官方版本不支持DDL的逻辑复制,仅支持DML【INSERT、UPDATE、DELETE、TRUNCATE】,且要求表有主键,否则UPDATE和DELETE无法复制【注意:逻辑复制要求wal_level为logical】。
目前有第三方的插件可以做到DDL复制,BDR【收费】、pglogical【开源,比较复杂】、pgl_ddl_deploy。
pgl_ddl_deploy该插件可以实现PG逻辑复制中的DDL复制。该插件具有以下特点:
任何DDL SQL语句都可以直接传播给订阅者
表可以在创建时自动添加到复制中
支持过滤。可以选择性仅复制某些schema
可以选择以锁定安全的方式在订阅服务器上进行部署
ALTER TABLE语句可以由子命令标签过滤
当前最新版本是2.10版,从2.0版本开始,它支持使用本机逻辑复制进行DDL复制(以前依赖于pglogical插件)。详细信息请查阅:https://github.com/enova/pgl_ddl_deploy
在配置ddl复制的过程中,需要复制queue表,该表记录了一个函数,用来执行ddl语句。
安装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | -- CentOS git clone https://github.com/enova/pgl_ddl_deploy.git wget https://github.com/enova/pgl_ddl_deploy/archive/refs/tags/v2.1.0.tar.gz tar -zxvf v2.1.0.tar.gz cd pgl_ddl_deploy-2.1.0 export PGHOME=/usr/lib/postgresql/13 export PATH=$PGHOME/bin:$PATH USE_PGXS=1 make USE_PGXS=1 make install -- debian 系统 apt-get install postgresql-13-pgl-ddl-deploy create extension pgl_ddl_deploy; |
内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | lhrdb3=# \dx+ pgl_ddl_deploy Objects in extension "pgl_ddl_deploy" Object description ------------------------------------------------------------------------------------------------------------------------------------------------------------------ event trigger auto_rep_ddl_create_1_pub1 event trigger auto_rep_ddl_drop_1_pub1 event trigger auto_rep_ddl_unsupp_1_pub1 function pgl_ddl_deploy.add_ext_object(text,text) function pgl_ddl_deploy.add_role(oid) function pgl_ddl_deploy.add_table_to_replication(pgl_ddl_deploy.driver,name,regclass,boolean) function pgl_ddl_deploy.auto_rep_ddl_create_1_pub1() function pgl_ddl_deploy.auto_rep_ddl_drop_1_pub1() function pgl_ddl_deploy.auto_rep_ddl_unsupp_1_pub1() function pgl_ddl_deploy.blacklisted_tags() function pgl_ddl_deploy.common_exclude_alter_table_subcommands() function pgl_ddl_deploy.current_query() function pgl_ddl_deploy.deploy(integer) function pgl_ddl_deploy.deployment_check_count(integer,text,text,pgl_ddl_deploy.driver) function pgl_ddl_deploy.deployment_check(integer) function pgl_ddl_deploy.deployment_check(text) function pgl_ddl_deploy.deployment_check_wrapper(integer,text) function pgl_ddl_deploy.deploy(text) function pgl_ddl_deploy.disable(integer) function pgl_ddl_deploy.disable(text) function pgl_ddl_deploy.drop_ext_object(text,text) function pgl_ddl_deploy.enable(integer) function pgl_ddl_deploy.enable(text) function pgl_ddl_deploy.exclude_regex() function pgl_ddl_deploy.execute_queued_ddl() function pgl_ddl_deploy.fail_queued_attempt(integer,text) function pgl_ddl_deploy.get_altertable_subcmdtypes(pg_ddl_command) function pgl_ddl_deploy.get_command_tag(pg_ddl_command) function pgl_ddl_deploy.get_command_type(pg_ddl_command) function pgl_ddl_deploy.is_subscriber(pgl_ddl_deploy.driver,text[],name) function pgl_ddl_deploy.kill_blockers(pgl_ddl_deploy.signals,name,name) function pgl_ddl_deploy.lock_safe_executor(text) function pgl_ddl_deploy.log_unhandled(integer,text,integer,text,text,text,bigint) function pgl_ddl_deploy.notify_subscription_refresh(name,boolean) function pgl_ddl_deploy.override() function pgl_ddl_deploy.provider_node_name(pgl_ddl_deploy.driver) function pgl_ddl_deploy.queue_ddl_message_type() function pgl_ddl_deploy.raise_message(text,text) function pgl_ddl_deploy.replicate_ddl_command(text,text[]) function pgl_ddl_deploy.rep_set_table_wrapper() function pgl_ddl_deploy.rep_set_wrapper() function pgl_ddl_deploy.resolve_exception(integer,text) function pgl_ddl_deploy.resolve_unhandled(integer,text) function pgl_ddl_deploy.retry_all_subscriber_logs() function pgl_ddl_deploy.retry_subscriber_log(integer) function pgl_ddl_deploy.schema_execute(integer,text) function pgl_ddl_deploy.schema_execute(text,text) function pgl_ddl_deploy.set_origin_subscriber_log_id() function pgl_ddl_deploy.set_tag_defaults() function pgl_ddl_deploy.sql_command_tags(text) function pgl_ddl_deploy.standard_create_tags() function pgl_ddl_deploy.standard_drop_tags() function pgl_ddl_deploy.standard_repset_only_tags() function pgl_ddl_deploy.subscriber_command(name,text[],name,name,text,text,integer,integer,boolean,pgl_ddl_deploy.signals,integer,pgl_ddl_deploy.driver,boolean) function pgl_ddl_deploy.toggle_ext_object(text,text,text) function pgl_ddl_deploy.undeploy(integer) function pgl_ddl_deploy.undeploy(text) function pgl_ddl_deploy.unique_tags() function pgl_ddl_deploy.unsupported_tags() sequence pgl_ddl_deploy.commands_id_seq sequence pgl_ddl_deploy.events_id_seq sequence pgl_ddl_deploy.exceptions_id_seq sequence pgl_ddl_deploy.killed_blockers_id_seq sequence pgl_ddl_deploy.set_configs_id_seq sequence pgl_ddl_deploy.subscriber_logs_id_seq sequence pgl_ddl_deploy.unhandled_id_seq table pgl_ddl_deploy.commands table pgl_ddl_deploy.events table pgl_ddl_deploy.exceptions table pgl_ddl_deploy.killed_blockers table pgl_ddl_deploy.queue table pgl_ddl_deploy.set_configs table pgl_ddl_deploy.subscriber_logs table pgl_ddl_deploy.unhandled type pgl_ddl_deploy.driver type pgl_ddl_deploy.signals view pgl_ddl_deploy.event_trigger_schema (77 rows) |
数据字典
1 2 3 4 5 6 7 8 | select * from pgl_ddl_deploy.commands; select * from pgl_ddl_deploy.events; select * from pgl_ddl_deploy.exceptions; select * from pgl_ddl_deploy.killed_blockers; select * from pgl_ddl_deploy.queue; select * from pgl_ddl_deploy.set_configs; select * from pgl_ddl_deploy.subscriber_logs; select * from pgl_ddl_deploy.unhandled; |
示例:内部逻辑复制
相关环境配置请参考:https://www.dbaup.com/pgluojifuzhichajianzhipglogicalshiyongshuoming.html
环境配置
2个节点都需要配置:
1 2 3 4 5 6 7 8 | create database lhrdb3; \c lhrdb3 create table t1(id int primary key, name text, reg_time timestamp); insert into t1 select generate_series(1,10000),'xxt',now(); select count(*) from t1; create extension pgl_ddl_deploy; |
配置发布订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | -- 发布端 CREATE PUBLICATION pub1 FOR ALL TABLES; select * from pg_publication; -- 订阅端 create subscription sub1 connection 'host=172.72.6.30 port=5432 dbname=lhrdb3 user=postgres password=lhr' publication pub1; select * from pg_subscription; -- 2. 订阅节点创建同步表并手动执行刷新命令 create table t1(id int primary key, name text, reg_time timestamp); ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION ; -- 发布端查询复制槽 select * from pg_replication_slots; |
过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | lhrdb3=# CREATE PUBLICATION pub1 FOR all TABLES; CREATE PUBLICATION lhrdb3=# select * from pg_publication; oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot -------+---------+----------+--------------+-----------+-----------+-----------+-------------+------------ 17202 | pub1 | 10 | t | t | t | t | t | f (1 row) lhrdb3=# create subscription sub1 connection lhrdb3-# 'host=172.72.6.30 port=5432 dbname=lhrdb3 user=postgres password=lhr' publication pub1; NOTICE: created replication slot "sub1" on publisher CREATE SUBSCRIPTION lhrdb3=# select * from pg_subscription; oid | subdbid | subname | subowner | subenabled | subconninfo | subslotname | subsynccommit | subpublications -------+---------+---------+----------+------------+---------------------------------------------------------------------+-------------+---------------+----------------- 17308 | 16792 | sub1 | 10 | t | host=172.72.6.30 port=5432 dbname=lhrdb3 user=postgres password=lhr | sub1 | off | {pub1} (1 row) lhrdb3=# select count(*) from t1; count ------- 10000 (1 row) lhrdb3=# select * from pg_replication_slots; slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size -----------------------------------+------------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------- sub1 | pgoutput | logical | 16939 | lhrdb3 | f | t | 19678 | | 537 | 0/25CD268 | 0/25CD2A0 | reserved | |
数据已经同步!!!