GEO 简介
GEO replication 提供了数据在多个集群之间进行复制的能力。
上图描述了三个集群,并且集群之间配置了不同的 GEO replication 策略,其中
- Cluster-A 和 Cluster-B 是双向复制,两个集群中的 topic 数据都会复制到对端集群,即集群 A 的数据会被复制到集群 B,集群 B 的数据也会被复制到集群 A,A、B 两个集群都有对方的全部数据;
- Cluster-A 和 Cluster-C 是单向复制:A 集群的数据会被复制到 C 集群,C 集群的数据不会被复制到 A 集群;
- Cluster-B 和 Cluster-C 没有复制关系:集群 B 和 C 之间不会产生任何数据复制
上述描述数据同步/复制的一个典型的场景,GEO replication 中的另外一个场景就是订阅状态同步。
订阅状态同步的场景
订阅同步的一个典型的应用常见是集群容灾,正常情况下只有主集群提供写入和消费服务,主集群故障之后,生产和消费会切换到备集群。
生产的切换是无缝的,切换集群之后可以继续写入;消费比生产会复杂一些,如果只同步数据,在集群切换之后,备集群的订阅会重复消费历史数据,为了解决这个问题,就需要在两个集群之间同步订阅的状态,目前订阅同步的主要信息就是订阅的 MarkDeletePosition(MDP) 信息。
如上图:在主、备两个集群之间,每个 topic(分区)的 ledger 并不是一一对应的,比如在主集群中,订阅 sub-00 消费到了一条消息,这个消息所在的 ledger 是 ledger-x;经过复制之后,在备集群中这条消息对应的 ledger 是 ledger-y,这里 ledger-x 和 ledger-y 没有直接关系,所以订阅状态(MDP)不能简单的直接映射。
GEO 订阅状态同步原理
订阅状态的同步,大体上可以分为两个主要的步骤:
- 第一步是实现两个集群之间 MessageId(可以理解为 offset 信息)的映射,即在主集群的一条消息的 MessageId 复制到备集群之后的 MessageId;
- 第二步是在主集群中一个订阅 ack 数据时,如果有 (MDP) 的变动,根据第一步中的主、备集群 MessageId 的映射关系,将主集群的 MDP 信息映射到备集群订阅的 MDP 中。
下面我们来详细看下整个流程。
MessageId 映射
MessageId 映射最直观的方法,就是维护主、备集群中每个 Message 的映射关系,但是这种方案的需要维护的映射关系太多,代价太大。
Pulsar 采用的方式是一个定时任务的方式,每隔一段时间同步一次主、备集群 LAC 信息之间的关系。假设集群 A 向集群 B 复制数据和订阅状态信息。
首先,集群 A 会定时生产一个 SnapshotRequest 信息,写入到本地 topic(分区)中,这个信息会随着数据复制写入到集群 B 的 topic 中。
B 集群会处理 SnapshotRequest 信息,然后将本地 topic(分区)的 LAC(LAC-B) 信息封装在 SnapshotRespnse 中,写入到本地 topic 中,通过 GEO replciation 复制到 A 集群。
集群 A 在处理 SnapshotRespnse 时,记录 SnapshotRespnse 在本地的 MessageId(LocalMessageId) 和 LAC-B 的映射关系,由于 A -> SnapshotRequest -> B -> SnapshotRespnse -> A 的操作顺序,可以保证集群 A 订阅的 MDP 大于 LocalMessageId 时,LAC-B 对应的数据一定是被消费过的,通过这种方式实现了两个集群之间 MessageId 的映射关系。
订阅信息同步
集群 A 中的订阅会不断消费、ack,当 ack 触发了 MDP 的移动时,集群 A 会检查 LocalMessageId 是否小于 MDP,如果发现小于,说明需要更新集群 B 订阅的 MDP 信息,此时集群 A 会根据映射关系,找到 LAC-B 信息,然后构造一个 ReplicatedSubscriptionsUpdate 消息,写入到本地 topic,这个 ReplicatedSubscriptionsUpdate 消息会通过 GEO 复制到集群 B。
集群 B 接收到 ReplicatedSubscriptionsUpdate 消息之后,会解析出 LAC 和订阅信息,然后更新订阅的 MDP。
至此,就完成了订阅状态的一次复制流程。
总结与思考
Pulsar 的订阅状态复制,依赖于原生的 GEO replication 机制,并且需要主备集群之间双向的交互,所以对于单向复制的 GEO 集群,订阅状态是不能实现订阅状态同步的;
另外,当前的订阅状态同步,只考虑了 MDP 信息,实际上对于一个订阅(尤其是 Shared 和 Key-Shared 类型的订阅),订阅的 individuallyDeletedMessages 信息也是很重要的,尤其是在有大量 consumer 都使用 Individual ack 的场景,如果不同步 individuallyDeletedMessages 信息,就会导致数据的重复。
由于 IndiviindividuallyDeletedMessages 记录的是每个 message 的 ack 情况,所以要解决这个问题就需要
- 记录主、备集群每个 MessageId 的映射关系,比如在复制消息属性中记录原始消息的 MessageId 信息
- 单独复制 IndiviindividuallyDeletedMessages 到 备集群
具备了 IndiviindividuallyDeletedMessages 和 MessageId 映射关系之后,就可以实现每个 MessageId 级别的订阅状态同步。