1. Pulsar 事务介绍

在 pulsar 中可以通过 produce 异常重试、consume 异常不 ack 的方式保证数据的不丢失,即 atleast-once 语义;

Pulsar 提供了一个幂等(idempotent)producer 的特性,可以从 broker 侧对数据进行去重,但是幂等 producer 会有限制,只能保证单个 porducer 链接在单分区上的去重:

  • 不能保证多个消息的原子性
  • 不支持分区topic的去重

Pulsar 的事务增强了 Pulsar 的消息投递语义,可以支持多个 topic的原子写入和ack,pulsar事务允许:

  • 生产者可以向多个topic生产一批数据,这批数据要么全部对consume可见,要么全部不可见(pulsar 只有一个默认的隔离级别,read-commit
  • 端到端的 exactly-once 语义,即consumer-process-produce操作的exactly-once

1.1 Pulsar 事务语义

  • 事务中的所有操作是一个原子单元
    • 所有的消息都会提交,或者都不提交
    • 每个消息都会被处理一次,不会有丢失和重复
    • 如果事务 abort,那么事务中的写入和 acks 都会回滚(事务操作设计两个部分,一个是生产,一个是消费)
  • 一组消息可以从多个分区中接受、处理、并且写入到多个分区
    • consumer 只能读取 commit 的消息,如果事务正在进行中或者已经丢弃,那么该事务产生的消息不会对consumer可见
    • 可以跨多个分区写入数据
    • 订阅的 acks 是原子性的

1.2 事务和流处理

image.png

pulsar 中的 stream processing 是指一个 consume-process-produce 的过程:

  • 消费:从一个或者多个 topic 消费的算子,包含 pulsar 的消费者
  • 处理:消息处理
  • 生产:将结果消息写入到一个或者多个 pulsar topic 的中

2. Pulsar 事务原理

2.1 关键概念

  • Transaction coordinator:TC,运行在 broker 上的一个 Module

    • 维护事务的整个生命周期保证事务状态的正确性
    • 处理事务超时,保证事务超时之后会被丢弃
  • Transaction Log:所有的事务元数据都会持久化在 transaction log中(实际是一个 pulsar 的 topic),TC 崩溃时,可以从 transaction log 中恢复,transaction log 中记录的是事务的状态额不是实际的消息(message 还是保存在 topic partition中)

  • Transaction Buffer:TB,在一个事务中写入一个分区的多条消息保存在 TB 中(即每个分区都有一个 TB),在 TB 中的消息对 consumer 不可见(直到事务 commit 才可见,事务 abort 时会被丢弃);TB 中保存了所有正在进行和已经丢去的事务。所有的消息都写入到了实际的 pulsar topic 分区,当事务提交之后, TB 中的消息会物化 (对 consumer 可见),当事务 abort 时,TB 中的消息会被删除。

  • Transaction ID:事务ID (TnxID),定义了一个唯一的事务,TransactionID 是一个 128 位,高 16 位保存 TC 信息,剩余的部分用来保存一个 TC 内部单调递增的数字。

  • Penging acknowledge state : pending ack stats 维护了事务提交之前的所有的 message ack 信息,如果一个消息处于 pending ack stats 中,那么其他的事务不能 ack 这个消息(直到 这个消息从 pending ack stats中移除),pending ack state 信息会被持久化到 ack log中(实际上是 cursor ledger)。

2.2 事务 data flow

一个使用了事务的数据流可以被拆分为一下几个部分:

  • 开始一个事务
  • 在事务中生产消息
  • 在事务中 ack 消息
  • 结束事务

2.2.1 开始事务

image.png

  • client 首先查找 TC
  • TC 分配一个 TxnID, 并且在 transaction log 中记录对应的 TnxID 和 状态(此时是 Open),保存状态可以在 TC 崩溃时恢复
  • transaction log 把持久化的结果返回给 TC
  • TC 把 txnId 返回给 client

2.2.2 使用事务生产消息

image.png

这个阶段,client 已经进入了 transaction 中,开始重复执行 consume-processs-produce 操作。这个过程中间可能会包含多条消息写入和多个ack请求。

  • 在 client 生产消息之前,会把发送一个增加事务 partition 的请求到 TC
  • TC 会把分区变更持久化道 transaction log 中,这个信息表明了 当前事务处理了哪些partition
  • transaction log 返回持久化的结果给 TC
  • TC 发送增加分区的相应给 client
  • client 开始生产消息,这个流程和一般的消息生产一样,唯一的不同是消息中携带了 txnId 信息
  • broker 把消息写到一个 partition

2.2.3 使用事务 ack 消息

image.png

这个阶段中,会保存事务的订阅信息。

  • client 向 TC 发送请求增加事务的订阅信息
  • TC 把这个信息写到 transaction log
  • transaction log 返回持久化结果给 TC
  • tc 返回结果给 client
  • client 开始 ack 消息,流程和正常的 ack 一样,唯一的不同是 ack 信息中包含了 txnId信息
  • broker 接收 ack 请求,会在 pending ack data log (cursor)中记录 ack信息

2.2.4 事务结束

最后事务会被提交或者丢弃。

2.2.4.1 结束事务request

image.png

client 处理完一个事务之后,会发起一个 结束事务的请求

  • client 发送结束事务的请求给 TC,请求中包含了提交或者丢弃的标识
  • TC 向 transaction log 中写入 COMMITTING 或者 ABORTING 信息
  • transaction log 返回持久化结果
2.2.4.2 事务 finalize

image.png

TC 开始处理提交或者丢弃请求,这个过程会涉及到事务中所有消息(所有分区)

  • TC 同时在订阅和分区上提交事务
  • 对于分区来讲,是在实际的消息中写入一个事务的marker消息,表示事务的提交或者丢弃;对于 pending ack log 来讲,会在 pending ack data log中写入一个事务(提交或者丢弃)的信息
  • 分区和pending ack 返回结果给 broker,cursor 移动到下一个位置
2.2.4.3

image.png

  • 当事务中所有的生产消息和ack信息都已经被 commit 或者 abort,TC 会将最终状态写入到 transaction log,transaction log 中这个事务相关的信息也可以被安全的删除。
  • transaction log 返回结果给 TC
  • TC 返回结果给 client