Subscription 工作原理

从Pulsar subscription 数据,需要首先初始化一个 consumer,初始化的过程中可以为consumer指定一些属性

 Consumer<byte[]> consumer = pulsarClient.newConsumer()
    .topic("persistent://public/default/topic")
    .subscriptionName("sub")
    .consumerName("consumer")
    .subscriptionMode(SubscriptionMode.Durable)
    .subscriptionType(SubscriptionType.Exclusive)
    .ackTimeout(3, TimeUnit.SECONDS)
    .ackTimeoutTickTime(5, TimeUnit.SECONDS)
    .acknowledgmentGroupTime(2, TimeUnit.SECONDS)
    .isAckReceiptEnabled(true)
    .maxPendingChuckedMessage(100)
    .autoAckOldestChunkedMessageOnQueueFull(true)
    .isAckReceiptEnabled(ackReceiptEnabled)
    .startMessageIdInclusive()// 是否包含seek的messageId
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) // 订阅的起始positino,默认lastest
    .enableRetry(true)
    .deadLetterPolicy(DeadLetterPolicy.builder()
            .maxRedeliverCount(5)
            .deadLetterTopic("persistent://public/default/dead-letter-topic")
            .retryLetterTopic("persistent://public/default/retry-letter-topic")
            .build())
    .receiverQueueSize(5000)
    .maxTotalReceiverQueueSizeAcrossPartitions(50000)
    .batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY)
    .priorityLevel(1)
    .enableBatchIndexAcknowledgment(true)
    .maxPendingChuckedMessage(1000)
    .subscriptionName("my-subscriber-name")
    .subscribe();

Consuemr配置

Consumer的配置相比于Producer要多一些,我们详细的来看一下:

基本配置

  • topic:订阅的topic名称
  • subscriptionName:订阅名称
  • consumerName:消费者名称

订阅模式

subscriptionMode:订阅模式,包括durable和non-durable两种,

  • durable的订阅,会有cursor来记录订阅的位置信息
  • non-durable的订阅则没有cursor

订阅类型

subscriptionType:订阅类型,包括:

  • Exclusive:一个订阅中只能有一个consumer,可以保证消费的顺序
  • Failover:一个订阅中可以存在多个consumer,只有一个consumer可以接收消息,当这个consumer断开连接之后,其他的consumer才可以接收消息,可以保证消费的顺序;对于多分区topic,每个分区都可以保证消费的顺序,多个分区会在所有可用的consumer上分配,对于一个分区,最多有一个active的consumer(类似于kafka、tubemq)
  • Shared:一个订阅中可以存在多个consumer,消息以 round-robin 的方式发送给所有连接的consumer,不能保证消费顺序
  • Key_Shared:一个订阅中可以存在多个consumer,所有具有相同key的消息会给发送给同一个consumer

ACK设置

  • ackTimeout : message ack的超时时间,配置时需要大于1s;默认ackTimeout是disable的,即所有已经投递的消息都不会重新投递;当enable ack timeout时,如果一个投递的消息在timeout的时间内没有ack,这个消息将被重新投递

  • ackTimeoutTickTime: UnAckedMessageTracker中ack 超时检查任务的周期,不能超过ackTimeout,这里之所以不用ackTimeout是为了提供一个更加细粒度的超时控制

  • acknowledgmentGroupTime:ack在默认情况下并不是每次都提交到broker,而是通过group的方式提交组合成一个ack请求提交到broker,默认group时间是100ms

  • enableBatchIndexAcknowledgment:是否开启batch index级别的ack,默认pulsar的ack级别是<LedgerId, EntryID>级别,这个配置开启之后,可以ack到batch内的index级别

ChunkMessage消费配置

  • maxPendingChuckedMessage: 最大可以缓存的chunk message数量
  • autoAckOldestChunkedMessageOnQueueFull: 是否自动对超出 maxPendingChuckedMessage的chunk message进行ack,如果设置为true,则自动ack,否则不自动ack(会有超时检查任务触发重新投递)
  • expireTimeOfIncompleteChunkedMessage:chunk message的超时时间,如果consumer超过这个时间没有接收到一个message的所有chunk message,consumer可以expire这些不完整的chunk

Deadletter / RetryLetter 配置

  • enableRetry :是否支持自动retry

  • deadLetterPolicy: 死信队列配置,可以配置最大 re-deliver 次数,超过这个次数之后,consumer会将数据写入到deadLetterTopic中,然后对message进行ack;如果enableRetry =true,consumer会自动订阅RetryLetterTopic,在调用 reconsumeLater 方法之后,会首先将数据写入到RetryLetterTopic中并对message进行ack,当retry次数超过max re-deliver次数之后,会将topic写入deadLetter中,并且对message进行ack

如果enableRetry =false,ack超时或者nack的次数大于 maxRedeliverCount ,就会将 message 写入DeadLetterTopic,并且ack,重试是在broker的RedeliveryTracker记录的;

如果enableRetry =true, 那么retry也会生效,如果调用了reconsumeLater方法,consumer会将message写到RetryLetterTopic中,并且进行ack (不需要等待 重试次数大于maxRedeliverCount),因为consumer会自动订阅 RetryLetterTopic 所以consumer可以从RetryLetterTopic中继续消费到这个message;当retry的次数超过了maxRedeliverCount时,会将message写入到DeadLetterTopic,并且 ack RetryLetterTopic

接收缓存队列配置

  • receiverQueueSize:consumer最大的接收缓存大小
  • maxTotalReceiverQueueSizeAcrossPartitions:分区topic的最大接收缓存

这两个配置和producer的配置类似,决定了consumer一次可以从broker的最大的数据接收量

优先级

  • priorityLevel: 0表示最高优先级。在Share模式的订阅中,broker会有限将数据dispatch到最高优先级的consumer中(如果这个consumer的接收队列没有满),当高优先级的consumer没有permits时,才会dispatch到低一级优先级的consumer。

对于Share的订阅,Consumer-A 优先级为0,Consumer-B 优先级为 1,那么broker会有限将数据dispatch到A,直到A的permits使用完才会向B发送数据。下面是一个示例:

Consumer PriorityLevel Permits

  • C1 0 2

  • C2 0 1

  • C3 0 1

  • C4 1 2

  • C5 1 1

Broker dispatch的顺序为C1,C2,C3,C1,C4,C5,C4

对于Failover订阅,Broker会根据优先级和consumerName的字典序选择active 的 consumer,

如果优先级一样,按照name的字典序选择,如下,active consumer为C1

Consumer PriorityLevel Name

  • C1 0 aaa
  • C2 0 bbb

如果优先级币一样,按照优先级选择,如下,active consumer 为C2

Consumer PriorityLevel Name

  • C1 1 aaa
  • C2 0 bbb

对于分区topic,Broker会将partition平均的分配给最高优先级的consumer。

BatchReceive 配置

  • batchReceivePolicy : batch receive策略可以现在在一个batch中的消息数量和大小,也可以指定一个等待消息的超时时间,batch receive会在数量、大小或者超时三个条件中任何一个触发的条件下完成。

自定义batch receive policy。

client.newConsumer().batchReceivePolicy(BatchReceivePolicy.builder()
              .maxNumMessages(100)
              .maxNumBytes(5 * 1024 * 1024)
              .timeout(100, TimeUnit.MILLISECONDS);

Consumer 和 Broker 交互

完成了配置之后,就可以对Consumer(后续均使用ConsumerImpl为例,并且topic为Non-Paititioned topic)进行初始化,然后从broker消费数据,这一系列的过程需要和Broker进行不断的交互,主要的交互流程如下:

Consumer -> Broker: CONNECT
Broker -> Consumer: CONNECTED
Consumer -> Broker: PARTITIONED_METADATA
Broker -> Consumer: PARTITIONED_METADATA_RESPONSE
Consumer -> Broker: LOOKUP
Broker -> Consumer: LOOKUP_RESPONSE
Consumer -> Broker: SUBSCIBE
Broker -> Consumer: SUCCESS
Consumer -> Broker: FLOW
Broker -> Consumer: MESSAGE
Consumer - Broker: ACK
Broker -> Consumer: XXX
Consumer - Broker: ...
Broker -> Consumer: ...

创建ConsumerImpl

首先是一些参数校验和补齐的工作:

  • 校验topic和subscription name
  • 初始化DLQ相关配置
  • 校验 Compact topic,如果是读取Compact topic,那么订阅类型只能是Exclusive或者Failover
  • 校验ConsumerEventListener,用来监听Failover模式下active consumer切换时间

根据配置的Topic信息,决定创建那种类型的Topic,包括三种类型:配置一个Topic,配置多个Topic,Topic parttern,以一个topic为例:

和Producer一样首先要获取元数据信息,

  • 如果是一个分区Topic,则初始化MultiTopicsConsumerImpl,否则,初始化ConsumerImpl
  • 在consumers缓存中记录consumer信多个Topic信息
ConsumerImpl

如果Topic是一个非分区Topic,client会创建一个ConsumerImpl对象,并且做初始化的相关工作,关键的初始化如下:

  • 最大接收队列长度,maxReceiverQueueSize
  • incomingMessages,接收队列,用于存放broker 发送过来的message
  • unAckedChunkedMessageIdSequenceMap,保存没有完全消费的chunk message信息的map
  • 配置batch receive policy ,如果policy 中配置了超时时间,则会启动一个TimeoutTask,这个超时任务会定期检查pending的batch receive 请求,如果请求超时,那么会从incomming中取出message,封装在MessagesImpl中返回,这个过程会将所有的message添加到unAckTracker中跟踪ack状态
  • 设置 receiverQueueRefillThreshold,这个值用来重新出发FlOW请求,当permit数量大于这个值时就会想broker发送FLOW请求,是maxReceiveQueueSize的一半
  • 初始化NegativeAcksTracker,NegativeAcksTracker 用来跟踪nack的message
    • consumer调用nack时,NegativeAcksTracker 会记录messageId信息
    • 然后由一个定时任务去检查nack的时间是否超过nackDelayNanos,如果没有超过,则用剩余的时间初始化一个新的定时任务;如果超过,则触发nack的逻辑
      • 首先将chunkmessage和nack的message 都加到 messagesToRedeliver中
      • 如果订阅类型不是share,那么为了保证有序,需要重传所有 的消息,清空incomingqueue和unackTracker,然后向Broker发送 REDELIVER_UNACKNOWLEDGED_MESSAGES 请求,broker会重新投递所有的unack的消息
      • 如果订阅类型是share,那么首先从incomming queue中的移除所有需要messagesToRedeliver的所有message,然后向broker 发送REDELIVER_UNACKNOWLEDGED_MESSAGES ,并携带 需要重传的MessageId信息,broker会投递所有messageId对应的Message
  • 初始化chunk message相关内容,maxPendingChunkedMessage,pendingChunkedMessageUuidQueue,expireTimeOfIncompleteChunkedMessageMillis,autoAckOldestChunkedMessageOnQueueFull
  • 初始化unAckTracker,用来跟踪没有ack的message信息,当message在timeout时间内没有ack时,unAckTracker会触发消息的重新投递
  • 初始化acknowledgmentsGroupingTracker,ack在默认情况下并不是每次都提交到broker,而是通过group的方式提交组合成一个ack请求提交到broker,默认group时间是100ms

grabcnx

grabcnx的过程和producer一致,grabCnx分为两个阶段,首先是lookup topic所在的broker,然后创建consumer到broker的连接。先看findBroker的过程,依旧是通过LookupService来查找:

  • 首先根据serviceUrl向一个Broker发送 LOOKUP 请求,broker接收到lookup请求之后,首先根据请求的topic,查找对应的namespacebundle信息,然后查找bundle所在的broker地址,最后将结果返回给consumer
  • consumer得到topic所在的broker地址之后,创建到对应broker的连接,连接建立之后,回调ConsumerImpl的connectionOpened方法,执行接下来的初始化工作:
    • 清理内部的接收队列,并且为startMessageId赋值,如果incomming queue 有message,那么startMessageId为第一个message的上一个message(如果是batch message,返回batchIndex -1;否则返回 entryId -1);如果incomming queue 中没有message,则startMessageId为lastDequeuedMessageId(如果lastDequeuedMessageId != Position.earliest); 如果没有接收和处理过任务数据,则使用startMessageId
    • 清理possibleSendToDeadLetterTopicMessages
    • 发送Subscribe请求到Broker,如果是durable的订阅,那么subscribe请求中的startMessageId为空,因为对于durable的订阅,起始位置有broker的cursor决定;另外注意是否允许自动创建Topic, 由subscribe请求中携带的 forCreateTopic 和Broker的 isAllowAutoTopicCreation 共同;是否可以自动创建 Subscription,由 broker 配置 isAllowAutoSubscriptionCreation 和 topic 配置 autoSubscriptionCreationOverride 共同决定
      • Broker 接收到SubScribe请求之后,首先获取topic对应的Topic对象,然后调用Topic#Subscribe创建新的subscription,以duarable为例,会调用Topic#getDurableSubscription-> ML#asyncOpenCursor
        • 初始化一个 ManagedCursorImpl,主要是individualDeletedMessages,batchDeletedIndexes以及一些状态信息的初始化,cursor状态为uninitialized
        • 对cursor进行初始化操作,将curosr的markDeletePosition置为lac,readposition置为lac的下一个位置,cursor状态为No_Ledger
        • 为cursor创建一个ledger,根据订阅配置的消费起始位置初始化cursor,重置markDeletePosition和readposition
      • 通过ML创建cursor完毕之后,为这个订阅和cursor初始化一个subscription,以PersistentSubscription为例,初始化包括PersistentMessageExpiryMonitor,PersistentMessageExpiryMonitor用来处理消息的过期逻辑
      • 实例化一个broker侧的Consumer,consumer会跟踪individual ack的状态信息
      • 将consumer添加到subscription中,这个过程会根据订阅类型为subscription初始化dispatcher,
        • Exclusive,对应PersistentDispatcherSingleActiveConsumer
        • Shared,对应PersistentDispatcherMultipleConsumers
        • Failover,对应PersistentDispatcherSingleActiveConsumer
        • Key_Shared, 对应PersistentStickyKeyDispatcherMultipleConsumers
        • dispatcher初始化之后,将consumer记录在dispatcher中,
          • 对于Exclusive,会选择第一个consumer作为active的consumer;
          • 对于Failover,会选择第partitionIndex % consumersSize个consumer作为active的consumer,如果是费分区topic,则选择第一个consumer;
          • 然后会开始为active consumer读数据,过程如下
            • 如果是Exclusive,或者Failover并且activeConsumerFailoverDelayTimeMillis <0,对cursor进行rewind,即将cursor重置到mark delete的位置,将readposition置为markdelete的下一个位置
            • 如果是failover的consuemr,通知所有consumer,active consumer的变更
            • 然后尝试读消息,不过consumer初始化的permit数为0,所有不会执行读取
          • 如果是Shared,会将consumer保存在consumerList中,之后会根据优先级进行轮训发送
          • 如果是Key_Shared,则会根据key的hash值选择consumer,这里选择策略有ConsistentHashingStickyKeyConsumerSelector、HashRangeAutoSplitStickyKeyConsumerSelector和HashRangeExclusiveStickyKeyConsumerSelector,这里不展开详细说明。
      • consumer添加到subscription之后,进行检查backlogged cursor信息
        • 如果consumers不为空并且cursor的entry数量小于backloggedCursorThresholdEntries,则将cursor置为active
        • 否则将cursor置为inactive,active可以从cache中读数据,inactive的数据只能充bookkeeper中读取
      • 返回SUCCESS响应给Consumer
    • 接收到Broker响应之后,首先将状态设置为Ready,并将permit数量设置为0,如果是非分区的consumer或者是一个重连的分区consumer,将permit数增加为receiveQueue的大小,并且向broker 发送flow请求。

到这里,consumer的初始化工作全部完成。

FLOW

可以看到对于一个非分区的topic,初始化完成之后,就会想broker 发送 FLOW 请求来获取消息,flow请求中会携带permit数量,broker 接收到 FLOW 之后的处理流程如下:

  • 使用Broker侧的consumer对象,执行Consumer#flowPermits ,这个过程会增加consumer的permit值,然后 Subscription#consumerFlow->dispatcher#consumerFlow
  • 计算可以读取的entry数量
  • ManagedCursorImpl#asyncReadEntriesOrWait,如果有数据就读取,如果没有,则等待10ms之后再去读取,
  • ManagerLedger#asyncReadEntry -> EntryCacheImpl#asyncReadEntry,首先从cache中读取,否则从bookie读取entry
  • 过滤entry,将可以发送给consumer的数据准备好,过滤的原因包括:checksum 或者metadata corrupt,内部标记数据,延迟消息等
  • 发送数据给consuemr,发送是每个entry单独发送的,并且减少broker侧Consumer的permit值(代码是增加了一个负值)
  • 如果没有pending的读请求,发起一次新的读请求(push过程)

consumer接收到entry之后,如果entry中是一个message,将其加入到incommingqueue中;如果是batch message,则将解析出来的每个message加到incomming queue中;

receive

consumer下一步会调用receive方法处理message,处理过程如下:

  • 从incommingqueue中取出一个message
  • 增加consumer的permit
  • 在 unAckedMessageTracker 中记录messageId, 如果在ack 超时时间内没有ack,就会向broker 发送redeliver请求

ACK

执行完消费逻辑之后,可以进行ack,ack分为两种类型Cumulative和Individual,默认是Individual类型的ack。Individual会ack一条message,Cumulative会ack这个message之前的所有数据,Shared和Key_Shared模式不能使用Cumulative数据。ack默认不是每个请求都会发到broker,而是group ack的方式,聚集一批ack一起发送。

  • 如果是Individual,记录指标,重unAckTracker和PossibleSendToDeadLetterTopicMessages中删除messageId,
    • 如果acknowledgementGroupTimeMicros==0,说明不用等待直接发送ack到broker
    • 否则,将ack请求放在pendingIndividualAcks中,当ack数量大于MAX_ACK_GROUP_SIZE或者超时时,将pendingIndividualAcks的所有ack一起发送给broker
  • 如果是Cumulative,
    • 如果acknowledgementGroupTimeMicros==0,说明不用等待直接发送ack到broker
    • 否则,更新LAST_CUMULATIVE_ACK_UPDATER中的messageId信息,当超时时,发送Cumulative ack 给broker

Broker接收到ack请求之后的处理如下:

  • 如果是individual类型的ack,Subscription#acknowledgeMessage -> ManagedCursor#asyncDelete
    • 对于每个ack的position,
      • 查看individualDeletedMessages是否包含,如果包含,说明position已经全部ack,则从batchDeletedIndexes中删除
      • 在individualDeletedMessages中保存position信息,如果是batch消息则在batchDeletedIndexes保存响应的信息,如果对应的ackset是empty说明所有batchMessage中的singleMessage都已经ack,则从batchDeletedIndexes删除,并且individualDeletedMessages保存position的range信息
    • 检查individualDeletedMessages第一个range,如果range的下限小于等于markDeletePosition,说明第一个ranger的下限位置对应的已经ack,将markDeletePosition移动至range的上限位置,
    • 更新markDeletePosition,并且将individualDeletedMessages中所有在新markDeletePosition之前的信息删除
    • 更新readPosition
    • 持久化ack信息,包括position、IndividualDeletedMessages、BatchedEntryDeletionIndexInfo信息
  • 如果是cumulative类型的ack,只有一个ack的position
    • 如果支持batchIndex,从batchIndex中删除position之前的所有内容
    • 更新markDeletePosition为ack position的前一个position
    • 更新readPosition
    • 持久化ack信息,包括position、IndividualDeletedMessages、BatchedEntryDeletionIndexInfo信息

整个消费的流程大致如上文所述,MultiTopicsConsumerImpl以及PatternMultiTopicsConsumerImpl的没有涉及,另外消费过程中的一些细节和特性,比如Batch消息生产、消费,Chunk message的生产、消费,Key_Shared消费的数据分发模式,也没有展开描述,会在后续的文章中逐个讲述。