使用pgsync同步postgres与elasticsearch

pgsync

pgsync是一个中间件,用来将postgres中的数据同步至elasticsearch。

只需要定义一下schema,剩下的事都只可以交给它了。

使用需求

需要userepl权限的数据库账户

为了开启监听,pgsync需要创建replication slots,此操作需要userepl权限。

可使用sql语查询具有权限的用户:

1
SELECT usename FROM pg_user WHERE userepl = true;

logical decoding

为了监听到所有修改,需要将postgres.conf中的wal_level从默认的replication改为logical。

可使用sql语句查询当前设置:

1
SHOW wal_level;

修改wal_level可以直接修改postgres.conf,也可使用ALTER SYSTEM

注意:修改wal_level需要重启postgres才会生效。我们可以通过sql语句查看wal_level的定义。

1
SELECT * FROM pg_settings WHERE name ='wal_level';

postgres.conf
postgres.conf的文件位置可使用sql语句查询:

1
2
SHOW config_file;
--- /var/lib/postgresql/data/postgresql.conf

打开文件直接修改即可。

1
wal_level = logical                    # minimal, replica, or logical

ALTER SYSTEM
使用ALTER SYSTEM语句来修改。

1
ALTER SYSTEM SET wal_level = logical;

ALTER SYSTEM会将设置值存入postgres.auto.conf,在启动时覆盖postgres.conf中的设置。

max_replication_slots

要监听数据变化至少需要一个replication slot,不能为0。可使用sql语句查询当前最大个数:

1
SHOW max_replication_slots;

本地安装的默认值是10,阿里云上的RDS是64。

阿里云RDS

如使用阿里云RDS,可参考最佳实践相关文档:开发运维建议逻辑订阅

schema

schema是核心,在常规使用时,只需要修改schema文件就可以完成所有操作。

可参考官方文档中给出的各种例子

运行pgsync

pgsync的使用分为两步,第一步创建triggers和logical replication slot,第二步启动同步守护。

bootstrap

1
bootstrap --config /path/to/schema.json

sync

1
pgsync --config /optional/path/to/schema.json --daemon

如果不想使用命令行,或者是在windows下使用,也可使用纯python,参见bootstrappgsync命令的实现。

环境变量

在使用python时要注意环境变量要否成功设置,可考虑使用python-dotenv在一开始就载入。

注意事项

打开逻辑订阅后会极大的增加硬盘使用量,一定要监控硬盘使用量,避免硬盘爆掉。

max_wal_size只是一个软限制,在高负载的情况下会超出这个值。

节点掉线会导致wal日志累积,slot会一直保留节点所需的日志直到节点再次上线。

在生产环境中使用replication slot,有以下建议:

  1. 增加xlog日志个数的监控,当xlog数量超过正常值时报警
  2. 做好对每个复制槽同步状态的监控,出现某个槽同步状态异常要及时处理,同步异常会造成lsn不向前推进
  3. 对于业务很空闲但是数据需要同步的库,可以自定义脚本,定期更新无用表,手工推进lsn
  4. 如果xlog已经堆积很多磁盘马上要爆炸的情况下,在考虑应急删掉复制槽之前要评估剩余空间是否还有足够富余,因为即使删掉复制槽,xlog也不是马上就会清理,删掉后主库vacuum也会产生较多xlog日志,一定要做好评估
  5. 增加pg_replication_slot()视图中restart_lsn的监控,对于落后较大和长期不推进的lsn进行告警
  6. 避免长事务,一般超过1天的事务理论上都可以取消掉的