1. Pulsar 事务介绍
在 pulsar 中可以通过 produce 异常重试、consume 异常不 ack 的方式保证数据的不丢失,即 atleast-once 语义;
Pulsar 提供了一个幂等(idempotent)producer 的特性,可以从 broker 侧对数据进行去重,但是幂等 producer 会有限制,只能保证单个 porducer 链接在单分区上的去重:
- 不能保证多个消息的原子性
- 不支持分区topic的去重
Pulsar 的事务增强了 Pulsar 的消息投递语义,可以支持多个 topic的原子写入和ack,pulsar事务允许:
- 生产者可以向多个topic生产一批数据,这批数据要么全部对consume可见,要么全部不可见(pulsar 只有一个默认的隔离级别,read-commit)
- 端到端的 exactly-once 语义,即consumer-process-produce操作的exactly-once
1.1 Pulsar 事务语义
- 事务中的所有操作是一个原子单元
- 所有的消息都会提交,或者都不提交
- 每个消息都会被处理一次,不会有丢失和重复
- 如果事务 abort,那么事务中的写入和 acks 都会回滚(事务操作设计两个部分,一个是生产,一个是消费)
- 一组消息可以从多个分区中接受、处理、并且写入到多个分区
- consumer 只能读取 commit 的消息,如果事务正在进行中或者已经丢弃,那么该事务产生的消息不会对consumer可见
- 可以跨多个分区写入数据
- 订阅的 acks 是原子性的
1.2 事务和流处理
pulsar 中的 stream processing 是指一个 consume-process-produce
的过程:
- 消费:从一个或者多个 topic 消费的算子,包含 pulsar 的消费者
- 处理:消息处理
- 生产:将结果消息写入到一个或者多个 pulsar topic 的中
2. Pulsar 事务原理
2.1 关键概念
-
Transaction coordinator:TC,运行在 broker 上的一个 Module
- 维护事务的整个生命周期保证事务状态的正确性
- 处理事务超时,保证事务超时之后会被丢弃
-
Transaction Log:所有的事务元数据都会持久化在 transaction log中(实际是一个 pulsar 的 topic),TC 崩溃时,可以从 transaction log 中恢复,transaction log 中记录的是事务的状态额不是实际的消息(message 还是保存在 topic partition中)
-
Transaction Buffer:TB,在一个事务中写入一个分区的多条消息保存在 TB 中(即每个分区都有一个 TB),在 TB 中的消息对 consumer 不可见(直到事务 commit 才可见,事务 abort 时会被丢弃);TB 中保存了所有正在进行和已经丢去的事务。所有的消息都写入到了实际的 pulsar topic 分区,当事务提交之后, TB 中的消息会物化 (对 consumer 可见),当事务 abort 时,TB 中的消息会被删除。
-
Transaction ID:事务ID (TnxID),定义了一个唯一的事务,TransactionID 是一个 128 位,高 16 位保存 TC 信息,剩余的部分用来保存一个 TC 内部单调递增的数字。
-
Penging acknowledge state : pending ack stats 维护了事务提交之前的所有的 message ack 信息,如果一个消息处于 pending ack stats 中,那么其他的事务不能 ack 这个消息(直到 这个消息从 pending ack stats中移除),pending ack state 信息会被持久化到 ack log中(实际上是 cursor ledger)。
2.2 事务 data flow
一个使用了事务的数据流可以被拆分为一下几个部分:
- 开始一个事务
- 在事务中生产消息
- 在事务中 ack 消息
- 结束事务
2.2.1 开始事务
- client 首先查找 TC
- TC 分配一个 TxnID, 并且在 transaction log 中记录对应的 TnxID 和 状态(此时是 Open),保存状态可以在 TC 崩溃时恢复
- transaction log 把持久化的结果返回给 TC
- TC 把 txnId 返回给 client
2.2.2 使用事务生产消息
这个阶段,client 已经进入了 transaction 中,开始重复执行 consume-processs-produce
操作。这个过程中间可能会包含多条消息写入和多个ack请求。
- 在 client 生产消息之前,会把发送一个增加事务 partition 的请求到 TC
- TC 会把分区变更持久化道 transaction log 中,这个信息表明了 当前事务处理了哪些partition
- transaction log 返回持久化的结果给 TC
- TC 发送增加分区的相应给 client
- client 开始生产消息,这个流程和一般的消息生产一样,唯一的不同是消息中携带了 txnId 信息
- broker 把消息写到一个 partition
2.2.3 使用事务 ack 消息
这个阶段中,会保存事务的订阅信息。
- client 向 TC 发送请求增加事务的订阅信息
- TC 把这个信息写到 transaction log
- transaction log 返回持久化结果给 TC
- tc 返回结果给 client
- client 开始 ack 消息,流程和正常的 ack 一样,唯一的不同是 ack 信息中包含了 txnId信息
- broker 接收 ack 请求,会在 pending ack data log (cursor)中记录 ack信息
2.2.4 事务结束
最后事务会被提交或者丢弃。
2.2.4.1 结束事务request
client 处理完一个事务之后,会发起一个 结束事务的请求
- client 发送结束事务的请求给 TC,请求中包含了提交或者丢弃的标识
- TC 向 transaction log 中写入 COMMITTING 或者 ABORTING 信息
- transaction log 返回持久化结果
2.2.4.2 事务 finalize
TC 开始处理提交或者丢弃请求,这个过程会涉及到事务中所有消息(所有分区)
- TC 同时在订阅和分区上提交事务
- 对于分区来讲,是在实际的消息中写入一个事务的marker消息,表示事务的提交或者丢弃;对于 pending ack log 来讲,会在 pending ack data log中写入一个事务(提交或者丢弃)的信息
- 分区和pending ack 返回结果给 broker,cursor 移动到下一个位置
2.2.4.3
- 当事务中所有的生产消息和ack信息都已经被 commit 或者 abort,TC 会将最终状态写入到 transaction log,transaction log 中这个事务相关的信息也可以被安全的删除。
- transaction log 返回结果给 TC
- TC 返回结果给 client