使用pgsync同步postgres与elasticsearch
pgsync
pgsync是一个中间件,用来将postgres中的数据同步至elasticsearch。
只需要定义一下schema,剩下的事都只可以交给它了。
使用需求
需要userepl权限的数据库账户
为了开启监听,pgsync需要创建replication slots,此操作需要userepl权限。
可使用sql语查询具有权限的用户:
1 | SELECT usename FROM pg_user WHERE userepl = true; |
logical decoding
为了监听到所有修改,需要将postgres.conf中的wal_level从默认的replication改为logical。
可使用sql语句查询当前设置:
1 | SHOW wal_level; |
修改wal_level可以直接修改postgres.conf
,也可使用ALTER SYSTEM。
注意:修改wal_level需要重启postgres才会生效。我们可以通过sql语句查看wal_level的定义。
1 | SELECT * FROM pg_settings WHERE name ='wal_level'; |
postgres.conf
postgres.conf的文件位置可使用sql语句查询:
1 | SHOW config_file; |
打开文件直接修改即可。
1 | wal_level = logical # minimal, replica, or logical |
ALTER SYSTEM
使用ALTER SYSTEM语句来修改。
1 | ALTER SYSTEM SET wal_level = logical; |
ALTER SYSTEM会将设置值存入postgres.auto.conf
,在启动时覆盖postgres.conf
中的设置。
max_replication_slots
要监听数据变化至少需要一个replication slot,不能为0。可使用sql语句查询当前最大个数:
1 | SHOW max_replication_slots; |
本地安装的默认值是10,阿里云上的RDS是64。
阿里云RDS
如使用阿里云RDS,可参考最佳实践相关文档:开发运维建议和逻辑订阅。
schema
schema是核心,在常规使用时,只需要修改schema文件就可以完成所有操作。
可参考官方文档中给出的各种例子。
运行pgsync
pgsync的使用分为两步,第一步创建triggers和logical replication slot,第二步启动同步守护。
bootstrap
1 | bootstrap --config /path/to/schema.json |
sync
1 | pgsync --config /optional/path/to/schema.json --daemon |
如果不想使用命令行,或者是在windows下使用,也可使用纯python,参见bootstrap和pgsync命令的实现。
环境变量
在使用python时要注意环境变量要否成功设置,可考虑使用python-dotenv在一开始就载入。
注意事项
打开逻辑订阅后会极大的增加硬盘使用量,一定要监控硬盘使用量,避免硬盘爆掉。
max_wal_size只是一个软限制,在高负载的情况下会超出这个值。
节点掉线会导致wal日志累积,slot会一直保留节点所需的日志直到节点再次上线。
在生产环境中使用replication slot,有以下建议:
- 增加xlog日志个数的监控,当xlog数量超过正常值时报警
- 做好对每个复制槽同步状态的监控,出现某个槽同步状态异常要及时处理,同步异常会造成lsn不向前推进
- 对于业务很空闲但是数据需要同步的库,可以自定义脚本,定期更新无用表,手工推进lsn
- 如果xlog已经堆积很多磁盘马上要爆炸的情况下,在考虑应急删掉复制槽之前要评估剩余空间是否还有足够富余,因为即使删掉复制槽,xlog也不是马上就会清理,删掉后主库vacuum也会产生较多xlog日志,一定要做好评估
- 增加pg_replication_slot()视图中restart_lsn的监控,对于落后较大和长期不推进的lsn进行告警
- 避免长事务,一般超过1天的事务理论上都可以取消掉的