Pulsar 事务实现
1.1 事务准备
1.1.1 初始化事务元数据
通过事务元数据初始化命令来执行元数据初始化操作:
bin/pulsar initialize-transaction-coordinator-metadata
-cs 127.0.0.1:2181 # configuration store address
-c standalone # cluster name
--initial-num-transaction-coordinators # tc 数量,即 tc topic 的分区数
这个过程会在 configuration store 中创建 TC topic 对应的 tenant(pulsar
),namespace(pulsar/system
)和分区 topic(pulsar/system/transaction_coordinator_assign
,默认是16个分区)。
1.1.2 修改事务相关配置
开启事务需要在 broker.conf 中修改以下配置:
//mandatory configuration, used to enable transaction coordinator
transactionCoordinatorEnabled=true
//mandtory configuration, used to create systemTopic used for transaction buffer snapshot
systemTopicEnabled=true
//if you want to acknowledge batch messages in transactions,
acknowledgmentAtBatchIndexLevelEnabled=true
在初始化元数据并且修改了事务相关配置只有,Broker 启动时,会创建事务相关的一些对象
1.1.3 启动 Broker
在元数据创建完毕并且配置修改之后,启动 Broker,Broker 会创建事务相关的一些对象,
transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
config.getTransactionBufferClientMaxConcurrentRequests(),
config.getTransactionBufferClientOperationTimeoutInMills());
transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
transactionBufferClient, transactionTimer);
transactionBufferProvider = TransactionBufferProvider
.newProvider(config.getTransactionBufferProviderClassName());
transactionPendingAckStoreProvider = TransactionPendingAckStoreProvider
.newProvider(config.getTransactionPendingAckStoreProviderClassName());
3.1.3.1 SystemTopicBaseTxnBufferSnapshotService
SystemTopicBaseTxnBufferSnapshotService
是 TransactionBufferSnapshotService
的实现类,对应的 system topic 是 tenant/namespace/__transaction_buffer_snapshot
,主要用来创建 TransactionBufferSnapshot
所用的 writer(producer) 和 reader,并且使用 writer 和 reader 写入和读取 TransactionBufferSnapshot
信息,TransactionBufferSnapshot
的内容包括:topicName, 已提交事务的最大的 msgId, 以及丢弃的事务列表。
private String topicName;
private long maxReadPositionLedgerId;
private long maxReadPositionEntryId;
private List<AbortTxnMetadata> aborts;
1.1.3.2 transactionTimer
broker侧的事务处理需要的timer
1.1.3.3 TransactionBufferClientImpl
TC 使用 TransactionBufferClientImpl
来 commit 或者 abort 事务。TransactionBufferClientImpl
内部是通过 TransactionBufferHandlerImpl
来完成事务的提交或者终止操作。
1.1.3.4 TransactionMetadataStoreService
可以理解为 TC,其中包含了 MLTransactionLogImpl
用来保存事务状态信息。
1.1.3.5 TopicTransactionBufferProvider 和 MLPendingAckStoreProvider
1.2 事务使用
一个 consume-process-produce
过程的事务可以如下所示:
1.2.1 事务 Client 的创建
使用事务,需要在 PulsarClient 初始化时 enable,
pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
开了事务的 PulsarClient 在初始化时,会在 client 侧初始化一个 TransactionCoordinatorClientImpl
并启动, TransactionCoordinatorClientImpl
是 client 用来管理事务的类,可以创建新的事务、为事务增加分区、为事务增加订阅、commit 事务以及 abort 事务等。
TransactionCoordinatorClientImpl
的启动过程如下:
- 通过 lookupService 查找然后查找 TC topic 对应的分区信息(persistent://pulsar/system/transaction_coordinator_assign)
- 对于每个 TC topic 分区,对应创建一个
TransactionMetaStoreHandler
,TransactionMetaStoreHandler
可以向 TC 建立链接,发送事务相关请求并处理 TC 的响应,在TransactionMetaStoreHandler
的初始化过程中,会建立到 TC topic 分区所在 broker 的链接 - 链接创建之后,回调
TransactionMetaStoreHandler.connectionOpened
,这里会发送TcClientConnectRequest
给 TC
Broker 接收到 TC 之后,会做一些事务的相关准备工作(TransactionMetadataStoreService.handleTcClientConnectRequest
),(如果是TC的第一次client链接)之后会为这个 TCID 创建一个 TransactionMetadataStore
, 这里的 store 实际上是 transactionLog 的 store,对应的 TransactionLogName 是 persistent://pulsar/system/__transaction_log_$TCID
, TxnMetaStore 有两种类型包括 InMemTransactionMetadataStoreProvider
和 MLTransactionMetadataStore
, 其中保存的是 TransactionMetadataEntry
的数据,TransactionMetadataEntry 定义如下:
message TransactionMetadataEntry {
optional TransactionMetadataOp metadata_op = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional TxnStatus expected_status = 4;
optional TxnStatus new_status = 5;
repeated string partitions = 6;
repeated pulsar.proto.Subscription subscriptions = 7;
optional uint64 timeout_ms = 8;
optional uint64 start_time = 9;
optional uint64 last_modification_time = 10;
optional uint64 max_local_txn_id = 11;
}
transactionMeta操作有四种类型:
- NEW
- ADD_PARTITION
- ADD_SUBSCRIPTION
- UPDATE
然后会创建 MLTransactionMetadataStore
,其中包含了 MLTransactionLogImpl
(类似于一个topic,内部封装了 ManagedLedger 和 cursor), MLTransactionLogImpl
保存了事务操作的元数据,在 MLTransactionMetadataStore
初始化是会使用 MLTransactionLogImpl
replay 所有内容。
Client 创建完毕之后,就可以开始使用事务了。
1.2.2 创建一个新的事务
Client 创建事务可以指定一个事务的超时时间(默认时间是一分钟)
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
创建的过程实际上就是调用 TransactionCoordinatorClientImpl.newTransactionAsync
来完成的,前文我们知道每个 TC topic 分区都对应一个 TransactionCoordinatorClientImpl
, 那么在 new 一个 transaction时,具体使用的是哪个 TransactionCoordinatorClientImpl
呢?这里的选择逻辑很简单,会通过遍历的方式从TransactionCoordinatorClientImpl
数组中选取,即如果本次选择的是第 (n % tc 分区数)个 TransactionCoordinatorClientImpl
,那么在通过 client 创建一个新的事务时,就会选取第 (n +1 % tc 分区数)个 TransactionCoordinatorClientImpl
。
然后构造一个 newTxn
的请求发送给 TC,我们看下 TC (Broker) 是如何处理 newTxn
请求的。
Broker 会创建一个TxnID
返回给 client,调用层级是 TransactionMetadataStoreService.newTransaction
-> MLTransactionMetadataStore.newTransaction
,会创建一个新的 TransactionMetadataEntry,并且写入到 transactionLog 中。
// TxnidMostBits 是 tcId
long mostSigBits = tcID.getId();
// TxnIdLeastBits 是 sequenceIdGenerator 生产的一个递增值,从 0 开始
long leastSigBits = sequenceIdGenerator.generateSequenceId();
// txnID 包含 TxnidMostBits 和 TxnIdLeastBits 两部分内容,txnID 会返回给客户端
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
long currentTimeMillis = System.currentTimeMillis();
// 构造 TransactionMetadataEntry
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
.setTxnidMostBits(mostSigBits)
.setTxnidLeastBits(leastSigBits)
.setStartTime(currentTimeMillis)
.setTimeoutMs(timeOut)
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW) // NEW 表示一个新的事务
.setLastModificationTime(currentTimeMillis)
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); //MaxLocalTxnId 是 sequenceId
// 写入 Transaction log,持久化事务元数据信息
transactionLog.append(transactionMetadataEntry)
.whenComplete((position, throwable) -> {
if (throwable != null) {
completableFuture.completeExceptionally(throwable);
} else {
appendLogCount.increment();
// 构造 TxnMeta,并缓存在 txnMetaMap 中
// txnMetaMap 记录了每个事务和 TnxMeta 以及该事务在 TransactionLog messages 的映射关系
// tnxMeta 中保存了 txnID、当前时间和事务超时时间
TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
List<Position> positions = new ArrayList<>();
positions.add(position);
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
txnMetaMap.put(leastSigBits, pair);
// 在 timeoutTracker 中保存事务的超时时间,然后在达到超时时间时触发事务的结束操作
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
createdTransactionCount.increment();
// 返回 tnxID 给客户端
completableFuture.complete(txnID);
}
});
Broker 记录了事务初始化信息之后,给Client 返回一个 TxnID 的结果。Client 接收到 TxnID 之后,会对其做一次封装成为 TransactionImpl,表示客户端事务,其出示状态为 OPEN。
1.2.3 消费事务
消费者在初次订阅时,如果开启了事务,那么在初始化 subscription 时,会为每个订阅初始化一个 PendingAckHandle
, 默认是 PendingAckHandleImpl
实现,即 每个订阅都有一个 PendingAckHandleImpl
, 是1:1的关系。
PendingAckHandleImpl
是用来处理事务型 ack 消息的入口类(初始状态为 None),PendingAckHandleImpl
内部会创建一个 PendingAckStore
, PendingAckStore 实际上是一个 ManagedLedger + cursor + 原始订阅Cursor 的封装。
PendingAckHandleImpl
主要是处理事务类型的 ack,不让 ack 直接体作用到原始订阅的 cursor上,而是在事务 commit 时才会影响原始 cursor。 这里有两个比较重要的数据结构,用来处理 cumulative 和 individual 类型的 ack 信息。
-
individualAckOfTransaction
: <TxnID, HashMap<PositionImpl, PositionImpl>>, 记录了事务和 individual 的对应关系,value 中的两个 PositionImpl 都是 ack 的位置,对于非 Batch 消息(ack信息中没有 ackset),这两个 position 没有区别,都是 ack 的 MessageId 对应的 position 位置;对于 Batch 消息(ack 信息中包含了 ackset),那么 <PositionImpl, PositionImpl> 中的第一个 positoin 表示 位置(<LedgerId, EntryId>), 第二个 positionImpl 则包含了 ackset 信息,会将本次 ack 的信息和已经 ack 的信息合并 -
individualAckPositions
: <PositionImpl, MutablePair<PositionImpl, Integer>>,key 是 position,表示一个<LedgerId, EntryId> 坐标,value 部分是一个 pair, left 是 position, right 是 batchSize,其中 left 的 position 中主要用于batch message ack 冲突的判断,会判断 ack 和之前的 ack 的 ackset 有没有重叠; 对于非 batch 消息,直接判断individualAckPositions
中是否包含相同 position 的 ack 信息。 -
cumulativeAckOfTransaction
: <TxnID, PositionImpl>, 保存了 Txn 和 ack 位置的映射关系(cumulative 只有ack位置需要保存)
首先会检查 pendingAckStore 是否已经初始化过:
-
拼接 pendingAckStore 对应的 ManagedLedger name :
tenant/namespace/topic-subname__transaction_pending_ack
-
查看 metastore 中是否存在这个名称对应的 managedledger
- 如果存在,
- 修改状态,None -> Initializing
- 创建新的 PendingAckStore,首先创建
tenant/namespace/topic-subname__transaction_pending_ack
对应的 ML,然后给 ML 创建一个 cursor(name是 __pending_ack_state),获得 ML 和对应的 cursor 之后,new 一个MLPendingAckStore
, 内部包含了 pending ack 的ML /cursor 以及原始订阅的 cursor。PendingAckStore 中有一个pendingAckLogIndex
(<PositionImpl, PositionImpl>), key 是 ack 的最大位置,value 是 pendingAckStore log 的 位置,通过这个结构可以清理 pendingAck log 中无用的数据(pendingAckLogIndex 中 key 小于 原始订阅cursor 的 markdelete 的log可以删除) - Replay:从 cursor(pendingAckStore对应的 cursor)其实位置开始读取,一直读到 LAC 位置(中间会有一个 entryQueue 保存读取的结果,然后从 queue 中取出 entry 逐个处理),每个 entry 都对应于一个
PendingAckMetadataEntry
- 如果
PendingAckMetadataEntry
不是 ABORT 或者 COMMIT 操作,从中取出List<PendingAckMetadata>
, 每个PendingAckMetadata
携带了ledgerId/entryId/batchSize/ackSet 等信息,取出 msgId 最大的一个PendingAckMetadata
,使用这个 msgId 对应的 position 来更新maxAckPosition
;并且在 replay 过程中,如果处理的 MetadataEntry 条数大于 maxIndexLag,会在pendingAckLogIndex
记录maxAckPosition 和 metadataEntry 位置的映射关系(方便做pengdingAck log 的清理) - 回调
MLPendingAckReplayCallback.handleMetadataEntry
处理- 如果是 Abort 操作:
- 对于 cumulative 的 ack,将
cumulativeAckOfTransaction
置为 null(对于cumulative类型的ack,一个分区和一个订阅只有一个Txn); - 如果是 individual 类型的 ack , 并且在
individualAckOfTransaction
中 Txn 对应的信息,遍历所有的 <PositionImpl, PositionImpl> ,如果value 没有 ackset ,不是 batch消息,直接从individualAckPositions
删除这个记录;如果有 ackset,并且individualAckPositions
中包含 value 对应的信息,则取出 entry 的 ackset(thisBitSet,即需要 abort 的位点的ackset),然后从individualAckPositions
取出对应的ackset(otherBitSet),从 otherBitSet 中剔除 thisBitSet,执行完之后如果不存在 ack的位点,则从individualAckPositions
中删除对应位置,否则更新individualAckPositions
中的 ackset 信息;最后会从individualAckOfTransaction
中删除该 Txn 对应内容
- 对于 cumulative 的 ack,将
- 如果是 Commit 操作:
- cumultivea ack:从
cumulativeAckOfTransaction
中取出保存的ackPosition位置,调用原始订阅,ack 该位点,执行之后,将cumulativeAckOfTransaction
置为null - individual ack:从
individualAckOfTransaction
中取出该 Txn 对应的 pendingAck 信息,调用原始订阅执行ack,然后从individualAckOfTransaction
中删除 Txn 对应内容
- cumultivea ack:从
- 如果是 Ack 操作:
- cumultivea ack:如果 PendingAckMetadata 中记录的 position 位点大于原始订阅cursor 的md位置并且 position ack 的消息比
cumulativeAckOfTransaction
中记录的多,更新cumulativeAckOfTransaction
的value 为 position - individual ack:遍历 所有的PendingAckMetadata,使用这些信息恢复
individualAckOfTransaction
和individualAckPositions
- cumultivea ack:如果 PendingAckMetadata 中记录的 position 位点大于原始订阅cursor 的md位置并且 position ack 的消息比
- 如果是 Abort 操作:
- Replay 完成之后,就恢复了所有的 peningAck 信息,此时可以做 pendingAckStore log 的清理,具体逻辑是遍历
pendingAckLogIndex
,删除原始 cursor 订阅 markdelete 之前的 pendingAckStore log。
- 如果
- 如果不存在,直接返回 PendingAckHandleImpl, 之后会在 ack(individual/cumulative)/abort/commit时执行 PendingAckStore 的初始化
- 如果存在,
至此,pendingAckStore 的初始化完成,可以提供事务 ack 服务,可以看到主要的思想是通过 pendingAckStore log 来保存 ack 状态信息,直到事务提交才会讲这些 ack 提交到原始订阅或者在事务 abort 时,从pendingAckStore 中删除。
如果要在消费中使用事务,需要在 ack 指定 Txn(TransactionImpl
)。
consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get();
如果 ack 类型是 cumulative ack,会首先在本地 Txn 中注册当前 consumer,
public synchronized void registerCumulativeAckConsumer(ConsumerImpl<?> consumer) {
if (this.cumulativeAckConsumers == null) {
this.cumulativeAckConsumers = new HashMap<>();
}
cumulativeAckConsumers.put(consumer, 0);
}
然后会通过 TCClient 向 TC 注册 ackTopic ,这个过程在本地 Txn 中会保存一个 <(topic, subscription), ComplatableFuture> 的映射关系,表示key(topic, subscription)二元组是否完成注册。
注册请求的内容会被封装成一个Subscrption
对象,包含了 topic 和 subscrption 信息,然后发送给 broker
handler.addSubscriptionToTxn(txnID, Collections.singletonList(sub)); // sub 是 subscription 队形
Broker 接收到 AddSubscriptionToTxn
的请求之后,首先把 client 发送过来的 subscription 转化为 TransactionSubscription
transactionSubscriptionBuilder.subscription(subscription.getSubscription());
transactionSubscriptionBuilder.topic(subscription.getTopic());
然后通过 transactionMetadataStoreService.addAckedPartitionToTxn -> TransactionMetadataStore.addAckedPartitionToTxn 把 TransactionSubscription 添加到 TC 元数据中。
添加的过程如下:
- 构造一个新的 TransactionMetadataEntry,操作类型是
ADD_SUBSCRIPTION
, 以及 subscripton(topic 和 订阅名)信息 - 把 TransactionMetadataEntry 写入到 Transaction Log 中
- 从 txnMetaMap <Long, Pair<TxnMeta, List
>> 中获取 TCID 中事务对应的 Pair<TxnMeta, List > 信息 - 然后把 TransactionSubscription 信息加入到 TxnMeta 中,并且在 List
中添加 TransactionMetadataEntry 保存之后的 position信息
事务注册完成之后,可以调用 ack 把 ack 请求发送到 Broker,会携带 TxnID 信息
doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
在 Broker 上的处理和 Transaction Buffer 类似,为了防止 broker crash 时的数据丢失,Broker 上有一个 pengdingAckStore 的组件来保存 pending ack 信息(每个订阅有一个 PendingAckHandleImpl,PendingAckHandleImpl 包含 PendingAckStore)。
pendingAckStore 对应的 topic 名字为 $tenant/$namespace/$originTopicName-subScriptionName-__transaction_pending_ack
,Broker 会根据这个 pendingAckTopicName
创建 ML 以及对应的 cursor,然后将 pendingAckTopic, pendingAckCursor,订阅的 cursor封装成 PendingAckStore,然后会执行 pendingAckStore 的 replay 操作。
PendingAckStore 恢复完毕杭州,就可以处理事务的 akc 请求,并且可以在 Broker crash 之后,从 pengdingAckStore 中恢复出 pending ack 的信息。
当 Broker 接收到 事务消息的 ack 请求之后,会根据 ackType 的不同做不同的处理。
如果是 cumulative ack(如果是 cumulative 类型的 ack,那么一个事务同一时间只有一个 position):
-
首先在 pendingAckStore log 中保存本次ack操作的元数据信息
-
cumulativeAckOfTransaction Pair<TxnID, PositionImpl> ,对于 cumulative 的 ack,首先会初始化一个 Pair 结构,这个结构中记录了 Txn 和 ack position
-
如果 pair 中记录的 position < 本次 (Txn) ack 的position,说明本次 ack 是更新的一个ack,则更新更新这个 pair 的 position
可以看到对于 cumulative 的 ack,这一阶段只是把 ack position 保存在了一个缓存中,并没有执行实际的 ack。
如果是 individual ack,individual 的 ack 类型,一次可以有 ack 多个 position,遍历每个 position
-
首先在 pendingAckStore log 中保存本次ack操作的元数据信息
-
构造每个 <position, batchSize> 映射关系
-
记录 unack 的消息数量(从 unack 指标中减掉)
-
从 pendingAck 中删除对应的 position,PendingAckHandleImpl.individualAcknowledgeMessage
- 首次在 pendingAckStore 中记录所有 ack position 的信息,对于每一个 ack 信息,构造一个
PendingAckMetadata
(包含 ackset,ledgerId,entryId,batchSize),然后所有的PendingAckMetadata
列表信息会封装在PendingAckMetadataEntry
中,PendingAckMetadataEntry 中还包含了 PendingAckOp(表示pendingAck 的类型),ackType以及TxnID- 然后把
PendingAckMetadataEntry
写入 pendingAckStore,写入之后处理回调,a) currentIndexLag + 1; b) 对于非 commit 或者 abort 类型的 pengdingAck,计算其 ack 的最大位置并更新 maxAckPosition,并且定期( currentIndexLag > maxIndexLag) 在 pendingAckLogIndex 中记录 maxAckPosition 和 pendingAckMetadataEntry 保存 position 的映射关系; c) 然后对 pendingAckLogIndex 保存的内容做清理,清理的过程如下:- 如果原始 topic 订阅的 markdelete 位置 大于 pendingAckLogIndex 的firstKey,说明在原始 topic 的订阅中数据已经 ack,那么 pendingAckLogIndex 中对应的 PendingAckMetadataEntry 可以删除,使用 pendingAckStrore 的cursor markdelete即可
- 然后把
- 首次在 pendingAckStore 中记录所有 ack position 的信息,对于每一个 ack 信息,构造一个
-
在 individualAckOfTransaction 中记录 Txn 和 ack position 的关系,在 individualAckPositions 保存 ack position 和 <Position, BatchSize>的关系
1.2.4 生产事务
如果需要把一个消息包含在一个 Txn 中发送,需要在发送消息时指定 Txn(TransactionImpl
),
producer.newMessage(tnx).value(msg).sendAsync()
首先,会在消息的 msgMetadata 中记录 TxnID 的信息,包括 TxnidMostBits
和 TxnidLeastBits
(TCID 和 TC 内部事务 ID),通过这样就可以在 msg 中携带事务 ID 相关信息。
由于一个事务中可能会包含多个 msg,如果是一个分区 topic, 就会涉及到多个分区,因此在发送消息之前,会在 Client 事务 TransactionImpl 的 registerPartitionMap 中记录当前事务对应的 topic 分区信息(<PartitionName , List
然后还会通过 tcClient 向 TC 发送 addPublishPartition 的请求。
tcClient.addPublishPartitionToTxnAsync(new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic))
Broker 接收到addParition 请求之后,会通过 TransactionMetaataStoreService 来添加事务到 Txn,这里就是添加到事务的元数据(TransactionLog)中。增加的流程如下:
- 首先根据 TxnID 从 txnMetaMap 中获取 txnID 对应的元数据信息(<TxnMeta, List
) - 然后构造一个新的 TransactionMetadataEntry,包含 TxnID,增加的分区信息,当前事务ID,MetadataOp 是 ADD_PARETITION
- 将 TransactionMetadataEntry 写入到 Transaction log,写入完成之后,返回一个 MessageID
- 更新第一步获取的元数据信息,将 partiton 信息添加到 TxnMeta ,并且把第三部的 MessageID,加入到 List
中 - broker 端处理 addPartition 完成,发送响应给 client
Producer 接收到 Broker 的响应之后,就可以继续执行发送消息的逻辑,这个过程和普通的消息生产没有区别,由于前文中在 MessageMeta 中保存了 TxnId 的信息,因此在发送的SendCMD 中也包含了 TxnID 的信息。
Broker 接收到 send 请求之后的处理,会和 Txn 相关。
如果 send 请求中有 txnidMostBits 和 txnidLeastBits,说明是事务消息,此时通过 producer.publishTxnMessage -> persistentTopic.publishTxnMessage 来处理
transactionBuffer.appendBufferToTxn(txnID, publishContext.getSequenceId(), headersAndPayload)
最终会使用 TransactionBuffer
来处理数据,每个 PersistentTopic 内部都有一个 TransactionBuffer
,以 TopicTransactionBuffer
为例,先看下 TransactionBuffer
初始化的流程
public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None); // 状态为 None
this.topic = topic; // persistentTopic,原始 topic
// 每个 topic 都有一个对应的 TBSnapshot topic,名称为 $tenant/$namespace/__transaction_buffer_snapshot
// write 即为向这个 topic 写入的一个 Producer
this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
.getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
this.timer = topic.getBrokerService().getPulsar().getTransactionTimer();
// snapshot 周期,按照事务数量决定
this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
// snapshot 周期,按照时间周期决定
this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
// 最大的 ReadPosition 为 LAC
this.maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
// 恢复TB,主要分为两个过程:
// 1. 从 TB snapshot 中恢复,通过一个 reader 读取 $tenant/$namespace/__transaction_buffer_snapshot 内容,
// 遍历所有内容,每个消息中包含了一个 TransactionBufferSnapshot,更具这些信息恢复 maxReadPosition 和丢弃的事务信息
// 2. 从原始 topic 中恢复,然后从 maxReadPosition 位置开始读取原始 topic 的信息,a)如果是一个 abrot 的 marker,
// 则保存到 aborts 中; 对于 marker 信息,无论是 commit 或者 abort 都说明事务已经完成,可以从 ongoing 的事务中
// 删除该事务信息,删除之后,判断是否还有 ongoing 的事务,并且更新 maxReadPosition; b)对于正常的 transaction 消息
// 如果这个消息对应的 txn 不在 ongoing 和 abort 的事务中,则在 ongoing 事务中保存一条新的记录,然后更新
// maxReadPosition
this.recover();
}
TransactionBuffer 初始化完成之后,可以使用 transactionBuffer 写入数据,首先会根据 lowWaterMarks <Long, Long>
(保存了 TCID 和其对应的事务ID信息)判断当前事务ID是否小于已经保存的对应 TCID 下的事务ID,小于说明有异常。否则,将消息写入到原始 topic 中,获得 msg 的 position 信息,如果在LinkedMap<TxnID, PositionImpl> ongoingTxns
和 LinkedMap<TxnID, PositionImpl> aborts
中不包括 txnID,则将 TnxID 和 positoin 保存在 ongoingTxns 中,并且更新 maxReadPosition
。
ongoingTxns 只保存了一个 Txn 第一条 message 的位置信息。
写入到原始 topic 之后,在 publishContext 里传入 txnID,然后返回msg 的 position 信息。
可以看到事务消息写入过程中,引入 TB 这个组件,数据写入时,会在 TB 中记录 Txn 和 该事务的第一条数据的映射关系,并且根据这个信息计算出 maxReadPosition(read-commit),另外还会记录 abort 的事务信息。在 TB 中有定时任务,将这些记录的信息定期的刷到存储($tenant/$namespace/__transaction_buffer_snapshot 中)。
多个事务可以并行处理,即一个 TB (属于一个topic分区)可以同时支持多个事务消息的写入,但是会根据最早写入数据的事务来决定消息的可见性,即未完成事务(ongoing)的第一条消息前面的一个msg对consumer可见(read-commit)
1.2.5 commit 事务
事务提交是指事务完成之后,由 client 发起的事务 commit 过程。事务提交是通过客户端 Txn 来完成的(TransactionImpl.commit())。
- 首先将事务状态置为 COMMITTING
- 校验所有的 send 和 ack 操作都已经完成,如果没完成则等待完成
- 如果所有 future 等待过程中有异常,则转向执行事务 abort 操作,结束
- 开始执行 commit,通过 TCClient 发送 commit 请求到 Broker
Broker 接收到事务 commit(EndTxn) 命令之后,会通过 TransactionMetadataStoreService 来 endTransaction。
第一步,获取 Txn 对应的事务元数据信息,从 TransactionMetadataStore 的 txnMetaMap 中获取对应 TxnID 的元数据内容,元数据中包含了事务相关的 生产分区和订阅信息。
第二步更新事务状态信息
- 如果 TxnMeta 是 Open 状态,则将其更新为 COMMITING 状态,并将新的 TransactionMetadataEntry 保存到 Transaction Log;然后更新 TxnMetaMap 中的元数据信息,a) 修改状态 b) 更新 transaction log 保存的 position 信息。
- 如果不是 Open 状态,则校验状态是否正常,如果当前状态不是 COMMITTING 状态,说明状态异常,抛出相应的 InvalidTxnStatusException
第三部则是 endTxnInTransactionBuffer,
- 获取 TxnId 对应的 TxnMeta 和 lowWatermark
- EndTxnOnSubscription:遍历 TxnMeta 上的所有订阅信息,对每一个订阅执行 endTxn,这里是通过 TBClient 发送
ndTxnOnSubscription
请求给 TB(每个topic都会有一个TB,这里的请求就是发送给该分区所在的broker),对应的 TB 接收到请求之后- 首先从 Topic 中获取到订阅信息,然后使用订阅来结束订阅,最终会使用 PendingAckHandle.commitTxn 来完成作业提交
- 如果是 cumulative 类型的订阅,首先在 pendingAckStore 中保存 pendingAckMetadataEntry来记录提交动作;然后通过原始订阅来 cumulative ack
- 如果是 individual 类型的订阅,首先在 pendingAckStore 中保存 pendingAckMetadataEntry来记录提交动作;然后从 individualAckOfTransaction 中取出相对应的 Individual position信息;从 individualAckOfTransaction 删除该 Txn;处理lowwatermark
- 首先从 Topic 中获取到订阅信息,然后使用订阅来结束订阅,最终会使用 PendingAckHandle.commitTxn 来完成作业提交
- EndTxnOnProducePartition:遍历 TxnMeta 上的所有partition信息,对于每个 paritition,执行 endTxn,和订阅类似,也是通过TBClient 发送请求到 TB,然后 end persistentTopic 上的 txn(TB.commit)
- 构建一个commitMarker,内部包含了 TxnID 信息
- 将这个 Marker 信息写入到原始topic中
- 更新 maxReadPosition 信息: 从 ongoingTxns 中删除 txn,如果依旧存在事务,maxReadPosition 位置是第一个事务的前一个entry,否则是 topic 的 LAC 位置
- 更新 lowwatermark信息
- 清理丢去的事务信息
- 根据 maxReadPositionAndAbortTimes 的值,决定是否执行 snapshot
第四步,在 TransactionLog 中,记录事务结束的信息(COMMITTING -> COMMITTED),并且更新 TxnMetaMap 中的信息。完成之后,说明整个事务结束,可以从 TxnMetaMap中删除该事务。
1.2.6 abort 事务
由 client 发起的事务 abort 过程。事务提交是通过客户端 Txn 来完成的(TransactionImpl.abort())。
- 首先将事务状态置为 ABORTING
- 校验所有的 send 和 ack 操作都已经完成,如果没完成则等待完成
- 遍历 cumulativeAckConsumers 中的所有 consumer,执行 incoming message 的清理
- 开始执行 abort,通过 TCClient 发送 abort 请求到 Broker
Broker 接收到事务 abort(EndTxn) 命令之后,会通过 TransactionMetadataStoreService 来 endTransaction。
第一步,获取 Txn 对应的事务元数据信息,从 TransactionMetadataStore 的 txnMetaMap 中获取对应 TxnID 的元数据内容,元数据中包含了事务相关的 生产分区和订阅信息。
第二步更新事务状态信息
- 如果 TxnMeta 是 Open 状态,则将其更新为 ABORTING 状态,并将新的 TransactionMetadataEntry 保存到 Transaction Log;
- 如果不是 Open 状态,则校验状态是否正常,如果当前状态不是 ABORTING 状态,说明状态异常,抛出相应的 InvalidTxnStatusException
第三部则是 abortTxnOnSubscription,
- 获取 TxnId 对应的 TxnMeta 和 lowWatermark
- EndTxnOnSubscription:遍历 TxnMeta 上的所有订阅信息,对每一个订阅执行 endTxn,这里是通过 TBClient 发送
ndTxnOnSubscription
请求给 TB(每个topic都会有一个TB,这里的请求就是发送给该分区所在的broker),对应的 TB 接收到请求之后- 如果是 failover 或者 exclusive 的订阅,获取订阅对应的 active consumer
- 首先从 Topic 中获取到订阅信息,然后使用订阅来结束订阅,最终会使用 PendingAckHandle.abortTxn 来完成作业提交
- 如果是 cumulative 类型的订阅,首先在 pendingAckStore 中保存 pendingAckMetadataEntry来记录 abort 动作;如果 cumulativeAckOfTransaction 的key 和abort 的txnId 一致,将 cumulativeAckOfTransaction 置为null
- 如果是 individual 类型的订阅,首先在 pendingAckStore 中保存 pendingAckMetadataEntry来记录 abort 动作;然后从 individualAckOfTransaction 中取出 pendingAckMessageForCurrentTxn 信息,遍历所有的 positioin,从 individualAckPositions 删除对应pendingAck 信息,然后从 individualAckOfTransaction 中删除 txn 信息;重新投递没有ack的消息;处理loawatermark
- EndTxnOnProducePartition:遍历 TxnMeta 上的所有partition信息,对于每个 paritition,执行 endTxn,和订阅类似,也是通过TBClient 发送请求到 TB,然后 end persistentTopic 上的 txn(TB.commit)
- 构建一个abortMarker,内部包含了 TxnID 信息
- 将这个 abortMarker 信息写入到原始topic中
- 将 abort 的事务添加到 aborts map 中
- 更新 maxReadPosition 信息: 从 ongoingTxns 中删除 txn,如果依旧存在事务,maxReadPosition 位置是第一个事务的前一个entry,否则是 topic 的 LAC 位置
- 清理丢去的事务信息
- 根据 maxReadPositionAndAbortTimes 的值,决定是否执行 snapshot
第四步,在 TransactionLog 中,记录事务结束的信息(ABORTING -> ABORTED),并且更新 TxnMetaMap 中的信息。完成之后,说明整个事务结束,可以从 TxnMetaMap中删除该事务。