Pulsar 事务实现

1.1 事务准备

1.1.1 初始化事务元数据

通过事务元数据初始化命令来执行元数据初始化操作:

bin/pulsar initialize-transaction-coordinator-metadata
					-cs 127.0.0.1:2181  # configuration store address
					-c standalone       # cluster name
					--initial-num-transaction-coordinators # tc 数量,即 tc topic 的分区数

这个过程会在 configuration store 中创建 TC topic 对应的 tenant(pulsar),namespace(pulsar/system)和分区 topic(pulsar/system/transaction_coordinator_assign,默认是16个分区)。

1.1.2 修改事务相关配置

开启事务需要在 broker.conf 中修改以下配置:

//mandatory configuration, used to enable transaction coordinator
transactionCoordinatorEnabled=true

//mandtory configuration, used to create systemTopic used for transaction buffer snapshot
systemTopicEnabled=true

//if you want to acknowledge batch messages in transactions,
acknowledgmentAtBatchIndexLevelEnabled=true

在初始化元数据并且修改了事务相关配置只有,Broker 启动时,会创建事务相关的一些对象

1.1.3 启动 Broker

在元数据创建完毕并且配置修改之后,启动 Broker,Broker 会创建事务相关的一些对象,

transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
                        config.getTransactionBufferClientMaxConcurrentRequests(),
                        config.getTransactionBufferClientOperationTimeoutInMills());
transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
                        .newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
                        transactionBufferClient, transactionTimer);
transactionBufferProvider = TransactionBufferProvider
                        .newProvider(config.getTransactionBufferProviderClassName());
transactionPendingAckStoreProvider = TransactionPendingAckStoreProvider
                        .newProvider(config.getTransactionPendingAckStoreProviderClassName());
3.1.3.1 SystemTopicBaseTxnBufferSnapshotService

SystemTopicBaseTxnBufferSnapshotServiceTransactionBufferSnapshotService 的实现类,对应的 system topic 是 tenant/namespace/__transaction_buffer_snapshot,主要用来创建 TransactionBufferSnapshot 所用的 writer(producer) 和 reader,并且使用 writer 和 reader 写入和读取 TransactionBufferSnapshot 信息,TransactionBufferSnapshot 的内容包括:topicName, 已提交事务的最大的 msgId, 以及丢弃的事务列表。

    private String topicName;
    private long maxReadPositionLedgerId;
    private long maxReadPositionEntryId;
    private List<AbortTxnMetadata> aborts;
1.1.3.2 transactionTimer

broker侧的事务处理需要的timer

1.1.3.3 TransactionBufferClientImpl

TC 使用 TransactionBufferClientImpl 来 commit 或者 abort 事务。TransactionBufferClientImpl 内部是通过 TransactionBufferHandlerImpl 来完成事务的提交或者终止操作。

1.1.3.4 TransactionMetadataStoreService

可以理解为 TC,其中包含了 MLTransactionLogImpl 用来保存事务状态信息。

1.1.3.5 TopicTransactionBufferProvider 和 MLPendingAckStoreProvider

1.2 事务使用

一个 consume-process-produce 过程的事务可以如下所示:

image.png

1.2.1 事务 Client 的创建

使用事务,需要在 PulsarClient 初始化时 enable,

pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .statsInterval(0, TimeUnit.SECONDS)
                .enableTransaction(true)
                .build();

开了事务的 PulsarClient 在初始化时,会在 client 侧初始化一个 TransactionCoordinatorClientImpl 并启动, TransactionCoordinatorClientImpl是 client 用来管理事务的类,可以创建新的事务、为事务增加分区、为事务增加订阅、commit 事务以及 abort 事务等。

TransactionCoordinatorClientImpl 的启动过程如下:

  • 通过 lookupService 查找然后查找 TC topic 对应的分区信息(persistent://pulsar/system/transaction_coordinator_assign)
  • 对于每个 TC topic 分区,对应创建一个 TransactionMetaStoreHandler, TransactionMetaStoreHandler 可以向 TC 建立链接,发送事务相关请求并处理 TC 的响应,在 TransactionMetaStoreHandler 的初始化过程中,会建立到 TC topic 分区所在 broker 的链接
  • 链接创建之后,回调 TransactionMetaStoreHandler.connectionOpened ,这里会发送 TcClientConnectRequest 给 TC

Broker 接收到 TC 之后,会做一些事务的相关准备工作(TransactionMetadataStoreService.handleTcClientConnectRequest),(如果是TC的第一次client链接)之后会为这个 TCID 创建一个 TransactionMetadataStore, 这里的 store 实际上是 transactionLog 的 store,对应的 TransactionLogName 是 persistent://pulsar/system/__transaction_log_$TCID, TxnMetaStore 有两种类型包括 InMemTransactionMetadataStoreProviderMLTransactionMetadataStore, 其中保存的是 TransactionMetadataEntry 的数据,TransactionMetadataEntry 定义如下:

message TransactionMetadataEntry {
  optional TransactionMetadataOp metadata_op   = 1;
  optional uint64 txnid_least_bits    = 2 [default = 0];
  optional uint64 txnid_most_bits     = 3 [default = 0];
  optional TxnStatus expected_status  = 4;
  optional TxnStatus new_status       = 5;
  repeated string partitions          = 6;
  repeated pulsar.proto.Subscription subscriptions = 7;
  optional uint64 timeout_ms      = 8;
  optional uint64 start_time      = 9;
  optional uint64 last_modification_time = 10;
  optional uint64 max_local_txn_id = 11;
}

transactionMeta操作有四种类型:

  • NEW
  • ADD_PARTITION
  • ADD_SUBSCRIPTION
  • UPDATE

然后会创建 MLTransactionMetadataStore ,其中包含了 MLTransactionLogImpl(类似于一个topic,内部封装了 ManagedLedger 和 cursor), MLTransactionLogImpl保存了事务操作的元数据,在 MLTransactionMetadataStore 初始化是会使用 MLTransactionLogImpl replay 所有内容。

Client 创建完毕之后,就可以开始使用事务了。

1.2.2 创建一个新的事务

Client 创建事务可以指定一个事务的超时时间(默认时间是一分钟)

Transaction transaction = pulsarClient.newTransaction()
                    .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

创建的过程实际上就是调用 TransactionCoordinatorClientImpl.newTransactionAsync 来完成的,前文我们知道每个 TC topic 分区都对应一个 TransactionCoordinatorClientImpl , 那么在 new 一个 transaction时,具体使用的是哪个 TransactionCoordinatorClientImpl 呢?这里的选择逻辑很简单,会通过遍历的方式从TransactionCoordinatorClientImpl 数组中选取,即如果本次选择的是第 (n % tc 分区数)个 TransactionCoordinatorClientImpl,那么在通过 client 创建一个新的事务时,就会选取第 (n +1 % tc 分区数)个 TransactionCoordinatorClientImpl

然后构造一个 newTxn 的请求发送给 TC,我们看下 TC (Broker) 是如何处理 newTxn 请求的。

Broker 会创建一个TxnID 返回给 client,调用层级是 TransactionMetadataStoreService.newTransaction -> MLTransactionMetadataStore.newTransaction ,会创建一个新的 TransactionMetadataEntry,并且写入到 transactionLog 中。

// TxnidMostBits 是 tcId
long mostSigBits = tcID.getId(); 
// TxnIdLeastBits 是 sequenceIdGenerator 生产的一个递增值,从 0 开始
long leastSigBits = sequenceIdGenerator.generateSequenceId(); 
// txnID 包含 TxnidMostBits 和 TxnIdLeastBits 两部分内容,txnID 会返回给客户端
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
long currentTimeMillis = System.currentTimeMillis();
// 构造 TransactionMetadataEntry
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
                    .setTxnidMostBits(mostSigBits)
                    .setTxnidLeastBits(leastSigBits)
                    .setStartTime(currentTimeMillis)
                    .setTimeoutMs(timeOut)
                    .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW) // NEW 表示一个新的事务
                    .setLastModificationTime(currentTimeMillis)
                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); //MaxLocalTxnId 是 sequenceId
// 写入 Transaction log,持久化事务元数据信息
transactionLog.append(transactionMetadataEntry)
                    .whenComplete((position, throwable) -> {
                        if (throwable != null) {
                            completableFuture.completeExceptionally(throwable);
                        } else {
                            appendLogCount.increment();
                          	// 构造 TxnMeta,并缓存在 txnMetaMap 中
                            // txnMetaMap 记录了每个事务和 TnxMeta 以及该事务在 TransactionLog messages 的映射关系
                            // tnxMeta 中保存了 txnID、当前时间和事务超时时间
                            TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
                            List<Position> positions = new ArrayList<>();
                            positions.add(position);
                            Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
                            txnMetaMap.put(leastSigBits, pair);
                          	// 在 timeoutTracker 中保存事务的超时时间,然后在达到超时时间时触发事务的结束操作
                            this.timeoutTracker.addTransaction(leastSigBits, timeOut);
                            createdTransactionCount.increment();
                          	// 返回 tnxID 给客户端
                            completableFuture.complete(txnID);
                        }
                    });

Broker 记录了事务初始化信息之后,给Client 返回一个 TxnID 的结果。Client 接收到 TxnID 之后,会对其做一次封装成为 TransactionImpl,表示客户端事务,其出示状态为 OPEN

1.2.3 消费事务

消费者在初次订阅时,如果开启了事务,那么在初始化 subscription 时,会为每个订阅初始化一个 PendingAckHandle, 默认是 PendingAckHandleImpl 实现,即 每个订阅都有一个 PendingAckHandleImpl , 是1:1的关系。

PendingAckHandleImpl 是用来处理事务型 ack 消息的入口类(初始状态为 None),PendingAckHandleImpl 内部会创建一个 PendingAckStore , PendingAckStore 实际上是一个 ManagedLedger + cursor + 原始订阅Cursor 的封装。

PendingAckHandleImpl 主要是处理事务类型的 ack,不让 ack 直接体作用到原始订阅的 cursor上,而是在事务 commit 时才会影响原始 cursor。 这里有两个比较重要的数据结构,用来处理 cumulative 和 individual 类型的 ack 信息。

  • individualAckOfTransaction : <TxnID, HashMap<PositionImpl, PositionImpl>>, 记录了事务和 individual 的对应关系,value 中的两个 PositionImpl 都是 ack 的位置,对于非 Batch 消息(ack信息中没有 ackset),这两个 position 没有区别,都是 ack 的 MessageId 对应的 position 位置;对于 Batch 消息(ack 信息中包含了 ackset),那么 <PositionImpl, PositionImpl> 中的第一个 positoin 表示 位置(<LedgerId, EntryId>), 第二个 positionImpl 则包含了 ackset 信息,会将本次 ack 的信息和已经 ack 的信息合并

  • individualAckPositions: <PositionImpl, MutablePair<PositionImpl, Integer>>,key 是 position,表示一个<LedgerId, EntryId> 坐标,value 部分是一个 pair, left 是 position, right 是 batchSize,其中 left 的 position 中主要用于batch message ack 冲突的判断,会判断 ack 和之前的 ack 的 ackset 有没有重叠; 对于非 batch 消息,直接判断 individualAckPositions 中是否包含相同 position 的 ack 信息。

  • cumulativeAckOfTransaction: <TxnID, PositionImpl>, 保存了 Txn 和 ack 位置的映射关系(cumulative 只有ack位置需要保存)

首先会检查 pendingAckStore 是否已经初始化过:

  • 拼接 pendingAckStore 对应的 ManagedLedger name : tenant/namespace/topic-subname__transaction_pending_ack

  • 查看 metastore 中是否存在这个名称对应的 managedledger

    • 如果存在,
      • 修改状态,None -> Initializing
      • 创建新的 PendingAckStore,首先创建 tenant/namespace/topic-subname__transaction_pending_ack 对应的 ML,然后给 ML 创建一个 cursor(name是 __pending_ack_state),获得 ML 和对应的 cursor 之后,new 一个 MLPendingAckStore, 内部包含了 pending ack 的ML /cursor 以及原始订阅的 cursor。PendingAckStore 中有一个 pendingAckLogIndex(<PositionImpl, PositionImpl>), key 是 ack 的最大位置,value 是 pendingAckStore log 的 位置,通过这个结构可以清理 pendingAck log 中无用的数据(pendingAckLogIndex 中 key 小于 原始订阅cursor 的 markdelete 的log可以删除)
      • Replay:从 cursor(pendingAckStore对应的 cursor)其实位置开始读取,一直读到 LAC 位置(中间会有一个 entryQueue 保存读取的结果,然后从 queue 中取出 entry 逐个处理),每个 entry 都对应于一个 PendingAckMetadataEntry
        • 如果 PendingAckMetadataEntry 不是 ABORT 或者 COMMIT 操作,从中取出 List<PendingAckMetadata>, 每个 PendingAckMetadata 携带了ledgerId/entryId/batchSize/ackSet 等信息,取出 msgId 最大的一个 PendingAckMetadata,使用这个 msgId 对应的 position 来更新 maxAckPosition;并且在 replay 过程中,如果处理的 MetadataEntry 条数大于 maxIndexLag,会在pendingAckLogIndex 记录maxAckPosition 和 metadataEntry 位置的映射关系(方便做pengdingAck log 的清理)
        • 回调 MLPendingAckReplayCallback.handleMetadataEntry 处理
          • 如果是 Abort 操作:
            • 对于 cumulative 的 ack,将 cumulativeAckOfTransaction 置为 null(对于cumulative类型的ack,一个分区和一个订阅只有一个Txn);
            • 如果是 individual 类型的 ack , 并且在 individualAckOfTransaction 中 Txn 对应的信息,遍历所有的 <PositionImpl, PositionImpl> ,如果value 没有 ackset ,不是 batch消息,直接从individualAckPositions删除这个记录;如果有 ackset,并且 individualAckPositions 中包含 value 对应的信息,则取出 entry 的 ackset(thisBitSet,即需要 abort 的位点的ackset),然后从individualAckPositions取出对应的ackset(otherBitSet),从 otherBitSet 中剔除 thisBitSet,执行完之后如果不存在 ack的位点,则从individualAckPositions 中删除对应位置,否则更新 individualAckPositions 中的 ackset 信息;最后会从 individualAckOfTransaction 中删除该 Txn 对应内容
          • 如果是 Commit 操作:
            • cumultivea ack:从 cumulativeAckOfTransaction 中取出保存的ackPosition位置,调用原始订阅,ack 该位点,执行之后,将 cumulativeAckOfTransaction 置为null
            • individual ack:从 individualAckOfTransaction 中取出该 Txn 对应的 pendingAck 信息,调用原始订阅执行ack,然后从 individualAckOfTransaction 中删除 Txn 对应内容
          • 如果是 Ack 操作:
            • cumultivea ack:如果 PendingAckMetadata 中记录的 position 位点大于原始订阅cursor 的md位置并且 position ack 的消息比cumulativeAckOfTransaction 中记录的多,更新 cumulativeAckOfTransaction 的value 为 position
            • individual ack:遍历 所有的PendingAckMetadata,使用这些信息恢复 individualAckOfTransactionindividualAckPositions
        • Replay 完成之后,就恢复了所有的 peningAck 信息,此时可以做 pendingAckStore log 的清理,具体逻辑是遍历pendingAckLogIndex ,删除原始 cursor 订阅 markdelete 之前的 pendingAckStore log。
    • 如果不存在,直接返回 PendingAckHandleImpl, 之后会在 ack(individual/cumulative)/abort/commit时执行 PendingAckStore 的初始化

至此,pendingAckStore 的初始化完成,可以提供事务 ack 服务,可以看到主要的思想是通过 pendingAckStore log 来保存 ack 状态信息,直到事务提交才会讲这些 ack 提交到原始订阅或者在事务 abort 时,从pendingAckStore 中删除。

如果要在消费中使用事务,需要在 ack 指定 Txn(TransactionImpl)。

  consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get();

如果 ack 类型是 cumulative ack,会首先在本地 Txn 中注册当前 consumer,

 public synchronized void registerCumulativeAckConsumer(ConsumerImpl<?> consumer) {
        if (this.cumulativeAckConsumers == null) {
            this.cumulativeAckConsumers = new HashMap<>();
        }
        cumulativeAckConsumers.put(consumer, 0);
}

然后会通过 TCClient 向 TC 注册 ackTopic ,这个过程在本地 Txn 中会保存一个 <(topic, subscription), ComplatableFuture> 的映射关系,表示key(topic, subscription)二元组是否完成注册。

注册请求的内容会被封装成一个Subscrption 对象,包含了 topic 和 subscrption 信息,然后发送给 broker

handler.addSubscriptionToTxn(txnID, Collections.singletonList(sub)); // sub 是 subscription 队形

Broker 接收到 AddSubscriptionToTxn 的请求之后,首先把 client 发送过来的 subscription 转化为 TransactionSubscription

 transactionSubscriptionBuilder.subscription(subscription.getSubscription());
 transactionSubscriptionBuilder.topic(subscription.getTopic());

然后通过 transactionMetadataStoreService.addAckedPartitionToTxn -> TransactionMetadataStore.addAckedPartitionToTxn 把 TransactionSubscription 添加到 TC 元数据中。

添加的过程如下:

  • 构造一个新的 TransactionMetadataEntry,操作类型是 ADD_SUBSCRIPTION, 以及 subscripton(topic 和 订阅名)信息
  • 把 TransactionMetadataEntry 写入到 Transaction Log 中
  • 从 txnMetaMap <Long, Pair<TxnMeta, List>> 中获取 TCID 中事务对应的 Pair<TxnMeta, List> 信息
  • 然后把 TransactionSubscription 信息加入到 TxnMeta 中,并且在 List 中添加 TransactionMetadataEntry 保存之后的 position信息

事务注册完成之后,可以调用 ack 把 ack 请求发送到 Broker,会携带 TxnID 信息

 doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
                    new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));

在 Broker 上的处理和 Transaction Buffer 类似,为了防止 broker crash 时的数据丢失,Broker 上有一个 pengdingAckStore 的组件来保存 pending ack 信息(每个订阅有一个 PendingAckHandleImpl,PendingAckHandleImpl 包含 PendingAckStore)。

pendingAckStore 对应的 topic 名字为 $tenant/$namespace/$originTopicName-subScriptionName-__transaction_pending_ack,Broker 会根据这个 pendingAckTopicName 创建 ML 以及对应的 cursor,然后将 pendingAckTopic, pendingAckCursor,订阅的 cursor封装成 PendingAckStore,然后会执行 pendingAckStore 的 replay 操作。

PendingAckStore 恢复完毕杭州,就可以处理事务的 akc 请求,并且可以在 Broker crash 之后,从 pengdingAckStore 中恢复出 pending ack 的信息。

当 Broker 接收到 事务消息的 ack 请求之后,会根据 ackType 的不同做不同的处理。

如果是 cumulative ack(如果是 cumulative 类型的 ack,那么一个事务同一时间只有一个 position):

  • 首先在 pendingAckStore log 中保存本次ack操作的元数据信息

  • cumulativeAckOfTransaction Pair<TxnID, PositionImpl> ,对于 cumulative 的 ack,首先会初始化一个 Pair 结构,这个结构中记录了 Txn 和 ack position

  • 如果 pair 中记录的 position < 本次 (Txn) ack 的position,说明本次 ack 是更新的一个ack,则更新更新这个 pair 的 position

可以看到对于 cumulative 的 ack,这一阶段只是把 ack position 保存在了一个缓存中,并没有执行实际的 ack。

如果是 individual ack,individual 的 ack 类型,一次可以有 ack 多个 position,遍历每个 position

  • 首先在 pendingAckStore log 中保存本次ack操作的元数据信息

  • 构造每个 <position, batchSize> 映射关系

  • 记录 unack 的消息数量(从 unack 指标中减掉)

  • 从 pendingAck 中删除对应的 position,PendingAckHandleImpl.individualAcknowledgeMessage

    • 首次在 pendingAckStore 中记录所有 ack position 的信息,对于每一个 ack 信息,构造一个PendingAckMetadata (包含 ackset,ledgerId,entryId,batchSize),然后所有的 PendingAckMetadata 列表信息会封装在 PendingAckMetadataEntry 中,PendingAckMetadataEntry 中还包含了 PendingAckOp(表示pendingAck 的类型),ackType以及TxnID
      • 然后把 PendingAckMetadataEntry 写入 pendingAckStore,写入之后处理回调,a) currentIndexLag + 1; b) 对于非 commit 或者 abort 类型的 pengdingAck,计算其 ack 的最大位置并更新 maxAckPosition,并且定期( currentIndexLag > maxIndexLag) 在 pendingAckLogIndex 中记录 maxAckPosition 和 pendingAckMetadataEntry 保存 position 的映射关系; c) 然后对 pendingAckLogIndex 保存的内容做清理,清理的过程如下:
        • 如果原始 topic 订阅的 markdelete 位置 大于 pendingAckLogIndex 的firstKey,说明在原始 topic 的订阅中数据已经 ack,那么 pendingAckLogIndex 中对应的 PendingAckMetadataEntry 可以删除,使用 pendingAckStrore 的cursor markdelete即可
  • 在 individualAckOfTransaction 中记录 Txn 和 ack position 的关系,在 individualAckPositions 保存 ack position 和 <Position, BatchSize>的关系

1.2.4 生产事务

如果需要把一个消息包含在一个 Txn 中发送,需要在发送消息时指定 Txn(TransactionImpl),

	producer.newMessage(tnx).value(msg).sendAsync()

首先,会在消息的 msgMetadata 中记录 TxnID 的信息,包括 TxnidMostBitsTxnidLeastBits (TCID 和 TC 内部事务 ID),通过这样就可以在 msg 中携带事务 ID 相关信息。

由于一个事务中可能会包含多个 msg,如果是一个分区 topic, 就会涉及到多个分区,因此在发送消息之前,会在 Client 事务 TransactionImpl 的 registerPartitionMap 中记录当前事务对应的 topic 分区信息(<PartitionName , List >)。

然后还会通过 tcClient 向 TC 发送 addPublishPartition 的请求。

tcClient.addPublishPartitionToTxnAsync(new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic))

Broker 接收到addParition 请求之后,会通过 TransactionMetaataStoreService 来添加事务到 Txn,这里就是添加到事务的元数据(TransactionLog)中。增加的流程如下:

  • 首先根据 TxnID 从 txnMetaMap 中获取 txnID 对应的元数据信息(<TxnMeta, List)
  • 然后构造一个新的 TransactionMetadataEntry,包含 TxnID,增加的分区信息,当前事务ID,MetadataOp 是 ADD_PARETITION
  • 将 TransactionMetadataEntry 写入到 Transaction log,写入完成之后,返回一个 MessageID
  • 更新第一步获取的元数据信息,将 partiton 信息添加到 TxnMeta ,并且把第三部的 MessageID,加入到 List
  • broker 端处理 addPartition 完成,发送响应给 client

Producer 接收到 Broker 的响应之后,就可以继续执行发送消息的逻辑,这个过程和普通的消息生产没有区别,由于前文中在 MessageMeta 中保存了 TxnId 的信息,因此在发送的SendCMD 中也包含了 TxnID 的信息。

Broker 接收到 send 请求之后的处理,会和 Txn 相关。

如果 send 请求中有 txnidMostBits 和 txnidLeastBits,说明是事务消息,此时通过 producer.publishTxnMessage -> persistentTopic.publishTxnMessage 来处理

transactionBuffer.appendBufferToTxn(txnID, publishContext.getSequenceId(), headersAndPayload)

最终会使用 TransactionBuffer 来处理数据,每个 PersistentTopic 内部都有一个 TransactionBuffer,以 TopicTransactionBuffer 为例,先看下 TransactionBuffer 初始化的流程

public TopicTransactionBuffer(PersistentTopic topic) {
        super(State.None); // 状态为 None
        this.topic = topic; // persistentTopic,原始 topic
  			// 每个 topic 都有一个对应的 TBSnapshot topic,名称为 $tenant/$namespace/__transaction_buffer_snapshot
   			// write 即为向这个 topic 写入的一个 Producer
        this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
                .getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
        this.timer = topic.getBrokerService().getPulsar().getTransactionTimer();
  			// snapshot 周期,按照事务数量决定
        this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
                .getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
  			// snapshot 周期,按照时间周期决定
        this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                .getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
  			// 最大的 ReadPosition 为 LAC
        this.maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
  			// 恢复TB,主要分为两个过程:
  			// 1. 从 TB snapshot 中恢复,通过一个 reader 读取 $tenant/$namespace/__transaction_buffer_snapshot 内容,
  			// 遍历所有内容,每个消息中包含了一个 TransactionBufferSnapshot,更具这些信息恢复 maxReadPosition 和丢弃的事务信息
  			// 2. 从原始 topic 中恢复,然后从 maxReadPosition 位置开始读取原始 topic 的信息,a)如果是一个 abrot 的 marker,
  			// 则保存到 aborts 中; 对于 marker 信息,无论是 commit 或者 abort 都说明事务已经完成,可以从 ongoing 的事务中
  			// 删除该事务信息,删除之后,判断是否还有 ongoing 的事务,并且更新 maxReadPosition; b)对于正常的 transaction 消息
  			// 如果这个消息对应的 txn 不在 ongoing 和 abort 的事务中,则在 ongoing 事务中保存一条新的记录,然后更新 
  			// maxReadPosition 
        this.recover();
    }

TransactionBuffer 初始化完成之后,可以使用 transactionBuffer 写入数据,首先会根据 lowWaterMarks <Long, Long> (保存了 TCID 和其对应的事务ID信息)判断当前事务ID是否小于已经保存的对应 TCID 下的事务ID,小于说明有异常。否则,将消息写入到原始 topic 中,获得 msg 的 position 信息,如果在LinkedMap<TxnID, PositionImpl> ongoingTxnsLinkedMap<TxnID, PositionImpl> aborts 中不包括 txnID,则将 TnxID 和 positoin 保存在 ongoingTxns 中,并且更新 maxReadPosition

ongoingTxns 只保存了一个 Txn 第一条 message 的位置信息。

写入到原始 topic 之后,在 publishContext 里传入 txnID,然后返回msg 的 position 信息。

可以看到事务消息写入过程中,引入 TB 这个组件,数据写入时,会在 TB 中记录 Txn 和 该事务的第一条数据的映射关系,并且根据这个信息计算出 maxReadPosition(read-commit),另外还会记录 abort 的事务信息。在 TB 中有定时任务,将这些记录的信息定期的刷到存储($tenant/$namespace/__transaction_buffer_snapshot 中)。

多个事务可以并行处理,即一个 TB (属于一个topic分区)可以同时支持多个事务消息的写入,但是会根据最早写入数据的事务来决定消息的可见性,即未完成事务(ongoing)的第一条消息前面的一个msg对consumer可见(read-commit)

1.2.5 commit 事务

事务提交是指事务完成之后,由 client 发起的事务 commit 过程。事务提交是通过客户端 Txn 来完成的(TransactionImpl.commit())。

  • 首先将事务状态置为 COMMITTING
  • 校验所有的 send 和 ack 操作都已经完成,如果没完成则等待完成
  • 如果所有 future 等待过程中有异常,则转向执行事务 abort 操作,结束
  • 开始执行 commit,通过 TCClient 发送 commit 请求到 Broker

Broker 接收到事务 commit(EndTxn) 命令之后,会通过 TransactionMetadataStoreService 来 endTransaction。

第一步,获取 Txn 对应的事务元数据信息,从 TransactionMetadataStore 的 txnMetaMap 中获取对应 TxnID 的元数据内容,元数据中包含了事务相关的 生产分区和订阅信息。

第二步更新事务状态信息

  • 如果 TxnMeta 是 Open 状态,则将其更新为 COMMITING 状态,并将新的 TransactionMetadataEntry 保存到 Transaction Log;然后更新 TxnMetaMap 中的元数据信息,a) 修改状态 b) 更新 transaction log 保存的 position 信息。
  • 如果不是 Open 状态,则校验状态是否正常,如果当前状态不是 COMMITTING 状态,说明状态异常,抛出相应的 InvalidTxnStatusException

第三部则是 endTxnInTransactionBuffer,

  • 获取 TxnId 对应的 TxnMeta 和 lowWatermark
  • EndTxnOnSubscription:遍历 TxnMeta 上的所有订阅信息,对每一个订阅执行 endTxn,这里是通过 TBClient 发送 ndTxnOnSubscription 请求给 TB(每个topic都会有一个TB,这里的请求就是发送给该分区所在的broker),对应的 TB 接收到请求之后
    • 首先从 Topic 中获取到订阅信息,然后使用订阅来结束订阅,最终会使用 PendingAckHandle.commitTxn 来完成作业提交
      • 如果是 cumulative 类型的订阅,首先在 pendingAckStore 中保存 pendingAckMetadataEntry来记录提交动作;然后通过原始订阅来 cumulative ack
      • 如果是 individual 类型的订阅,首先在 pendingAckStore 中保存 pendingAckMetadataEntry来记录提交动作;然后从 individualAckOfTransaction 中取出相对应的 Individual position信息;从 individualAckOfTransaction 删除该 Txn;处理lowwatermark
  • EndTxnOnProducePartition:遍历 TxnMeta 上的所有partition信息,对于每个 paritition,执行 endTxn,和订阅类似,也是通过TBClient 发送请求到 TB,然后 end persistentTopic 上的 txn(TB.commit)
    • 构建一个commitMarker,内部包含了 TxnID 信息
    • 将这个 Marker 信息写入到原始topic中
    • 更新 maxReadPosition 信息: 从 ongoingTxns 中删除 txn,如果依旧存在事务,maxReadPosition 位置是第一个事务的前一个entry,否则是 topic 的 LAC 位置
    • 更新 lowwatermark信息
    • 清理丢去的事务信息
    • 根据 maxReadPositionAndAbortTimes 的值,决定是否执行 snapshot

第四步,在 TransactionLog 中,记录事务结束的信息(COMMITTING -> COMMITTED),并且更新 TxnMetaMap 中的信息。完成之后,说明整个事务结束,可以从 TxnMetaMap中删除该事务。

1.2.6 abort 事务

由 client 发起的事务 abort 过程。事务提交是通过客户端 Txn 来完成的(TransactionImpl.abort())。

  • 首先将事务状态置为 ABORTING
  • 校验所有的 send 和 ack 操作都已经完成,如果没完成则等待完成
  • 遍历 cumulativeAckConsumers 中的所有 consumer,执行 incoming message 的清理
  • 开始执行 abort,通过 TCClient 发送 abort 请求到 Broker

Broker 接收到事务 abort(EndTxn) 命令之后,会通过 TransactionMetadataStoreService 来 endTransaction。

第一步,获取 Txn 对应的事务元数据信息,从 TransactionMetadataStore 的 txnMetaMap 中获取对应 TxnID 的元数据内容,元数据中包含了事务相关的 生产分区和订阅信息。

第二步更新事务状态信息

  • 如果 TxnMeta 是 Open 状态,则将其更新为 ABORTING 状态,并将新的 TransactionMetadataEntry 保存到 Transaction Log;
  • 如果不是 Open 状态,则校验状态是否正常,如果当前状态不是 ABORTING 状态,说明状态异常,抛出相应的 InvalidTxnStatusException

第三部则是 abortTxnOnSubscription,

  • 获取 TxnId 对应的 TxnMeta 和 lowWatermark
  • EndTxnOnSubscription:遍历 TxnMeta 上的所有订阅信息,对每一个订阅执行 endTxn,这里是通过 TBClient 发送 ndTxnOnSubscription 请求给 TB(每个topic都会有一个TB,这里的请求就是发送给该分区所在的broker),对应的 TB 接收到请求之后
    • 如果是 failover 或者 exclusive 的订阅,获取订阅对应的 active consumer
    • 首先从 Topic 中获取到订阅信息,然后使用订阅来结束订阅,最终会使用 PendingAckHandle.abortTxn 来完成作业提交
      • 如果是 cumulative 类型的订阅,首先在 pendingAckStore 中保存 pendingAckMetadataEntry来记录 abort 动作;如果 cumulativeAckOfTransaction 的key 和abort 的txnId 一致,将 cumulativeAckOfTransaction 置为null
      • 如果是 individual 类型的订阅,首先在 pendingAckStore 中保存 pendingAckMetadataEntry来记录 abort 动作;然后从 individualAckOfTransaction 中取出 pendingAckMessageForCurrentTxn 信息,遍历所有的 positioin,从 individualAckPositions 删除对应pendingAck 信息,然后从 individualAckOfTransaction 中删除 txn 信息;重新投递没有ack的消息;处理loawatermark
  • EndTxnOnProducePartition:遍历 TxnMeta 上的所有partition信息,对于每个 paritition,执行 endTxn,和订阅类似,也是通过TBClient 发送请求到 TB,然后 end persistentTopic 上的 txn(TB.commit)
    • 构建一个abortMarker,内部包含了 TxnID 信息
    • 将这个 abortMarker 信息写入到原始topic中
    • 将 abort 的事务添加到 aborts map 中
    • 更新 maxReadPosition 信息: 从 ongoingTxns 中删除 txn,如果依旧存在事务,maxReadPosition 位置是第一个事务的前一个entry,否则是 topic 的 LAC 位置
    • 清理丢去的事务信息
    • 根据 maxReadPositionAndAbortTimes 的值,决定是否执行 snapshot

第四步,在 TransactionLog 中,记录事务结束的信息(ABORTING -> ABORTED),并且更新 TxnMetaMap 中的信息。完成之后,说明整个事务结束,可以从 TxnMetaMap中删除该事务。