Publish工作原理

向Pulsar Publish数据,需要首先初始化一个producer,初始化的过程中可以为producer指定一些属性

pulsarClient.newProducer()
                .enableBatching(true) // 开启batch
                .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) // batch的最大等待时间
                .batchingMaxBytes(12800) // batch的最大大小
                .batchingMaxMessages(10) // batch中最多保留的消息条数
                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) // Message Routing Mode
                .roundRobinRouterBatchingPartitionSwitchFrequency(10) // batch开启时,分区切换频率
                .maxPendingMessages(50000) // 最大缓存的生成请求数
                .maxPendingMessagesAcrossPartitions(100000)// 分区topic配置,即所有分区的最大缓存数量
                .blockIfQueueFull(true) // 当达到最大缓存数时是否block客户端
                .sendTimeout(5, TimeUnit.SECONDS) // 发送超时时间
                .topic("test") 
                .compressionType(CompressionType.SNAPPY) // 压缩类型
                .producerName("producer-name")
                .enableChunking(true) // 是否开启Chunking特性
                .accessMode(ProducerAccessMode.Shared) // producer mode
  							 .intercept(new ProducerInterceptor() { // 设置interceptor,实现自定义的功能
                    @Override
                    public void close() {}
                   
                    @Override
                    public boolean eligible(Message message) {return false;}

                    @Override
                    public Message beforeSend(Producer producer, Message message) { return null;}

                    @Override
                    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {}
                })
                .create();

Producer配置

Batch 配置

Producer可以配置Batch发送消息和单条发送消息,开启了batch之后,可以通过:

  • 消息条数
  • 消息大小
  • batch等待时间

以上三个条件来产生batch,上述配置只要满足一个就会触发一个新的batch

Routing 配置

这个配置是针对分区topic的,可以为message设置路由策略来决定message会被发到哪个分区,包含三种:

  • SinglePartition:随机选择一个分区,将所有消息写到这个分区
  • RoundRobinPartition:轮询策略,会将消息轮询的发送到每个分区,如果producer没有开启batch,则每发送一条message,就会切换一个分区;如果开启了batch,则按照batch的最大等待时间batchingMaxPublishDelayMicros的倍数来决定,具体的倍数由参数 batchingPartitionSwitchFrequencyByPublishDelay 指定,即每经过 batchingMaxPublishDelayMicros * batchingPartitionSwitchFrequencyByPublishDelay 的时间,分区切换一次,在切换的时间内message被发送到同一个分区
  • CustomPartition: 自定义分区策略,分区方式有MessageRouter决定,即需要自己实现MessageRouter

对于 SinglePartition 和 RoundRobinPartition,如果message设置了key,都会按照hash的方式决定message写到哪个分区。

默认配置RoundRobinPartition;如果存在customMessageRouter,则RoutingMode只能是CustomPartition。

Pending配置

Producer可以最多hold的message数量,如果超过这个配置,则按照blockIfQueueFull来决定是阻塞生产者或者抛出异常信息。

如果是分区topic,则每个分区可以hold的最大消息数量为:

Math.min(conf.getMaxPendingMessages(), conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);

Chunk 配置

当一条Message的大小超出了broker配置的最大消息大小时,开启Chunk配置,可以让producer自动将消息split成为多个小的消息,并且按照顺序发送到Broker。如果消费者希望按照获取切分之前的Message,需要对producer和consumer进行以下配置:

  • 目前只支持 non-shared subscriptionpersistent topic
  • disable batch
  • Pulsar-client 在接收到 ack之后才能继续发送消息到Broker(防止chunk乱序),所以可以调小maxPendingMessages来避免太高的内存占用
  • 配置message 的 ttl/retention来清理不完整的chunk message, 有些场景比如producer或者broker重启,会导致broker接收到了不完整的chunk message,这一部分消息consumer无法ack,需要按照ttl或者retention被清理掉;或者通过配置ConsumerBuilder#expireTimeOfIncompleteChunkedMessage(long, TimeUnit)来执行message expire逻辑
  • 消费者最好配置 receiverQueueSize 和 maxPendingChuckedMessage

Producer Mode 配置

生产者的模式,目前包括三种:

  • Shared :默认多个producer可以同时向一个topic publish消息
  • Exclusive:请求独占式生产,如果已经存在其他producer,则立即失败
  • WaitForExclusive:producer的创建会pending,直至获取到独占访问权限

Producer 与 Broker 交互

完成了配置之后,就可以对Producer进行初始化,然后publish message 到broker,这一系列的过程需要和Broker进行不断的交互,主要的交互流程如下:

image.png

接下来就围绕Producer的整个初始化以及生产过程分析Producer/Broker交互的整个过程

创建ProducerImpl/PartitionedProducerImpl

首先是一些参数校验和补齐的工作:

  • 校验是否batch和chunk同时开启,如果是,抛出异常
  • 校验topic

然后获取元数据信息:

  • 获取元数据信息
  • 创建ProdducerImpl或者PartitionedProducerImpl
  • 在Client的Producer缓存中保存Producer信息

获取元数据

Producer会向Broker发出元数据请求,通过lookup.getPartitionedTopicMetadata来完成,这个过程会涉及到几个交互命令,我们逐个进行分析(以BinaryProtoLookupService为例):

  • 第一步,Producer根据serviceUrl指定的Broker地址连接到Broker;如果配置了多个Broker地址,第一次连接时会随机选择一个,之后按照配置的顺序选取下一个Broker地址,通过bootstrap.connect(remoteAddress)完成,不会发送任务Pulsar Command到Broker,链接创建完毕之后,会触发Netty Handler的channelActive方法:
    • 如果keepAliveIntervalSeconds > 0,则每经过keepAliveIntervalSeconds的时间就启动keepAlive的任务,发送PING到Broker,Broker会返回PONG响应
    • 启动 request 超时定时检查任务,检查requestTimeoutQueue中是否存在超时的请求,如果有则从pendingRequests移除对应的请求并将请求结果置为超时
    • 发送CONNECT命令到Broker,并将Client状态置为 SentConnectFrame; Broker接收到CONNECT请求之后,会返回CONNECTED响应,并且携带Broker端的maxMessageSize配置信息
  • 第二步,链接创建完毕之后,Producer 向 Broker 发送PARTITIONED_METADATA请求,Broker接收到请求之后,会获取topic的元数据信息,然后返回PARTITIONED_METADATA_RESPONSE 给 Producer

创建Producer实例

根据 PARTITIONED_METADATA_RESPONSE 中的分区数,决定创建那种类型的Producer(ProducerImpl或者PartitionedProducerImpl),我们这里以ProducerImpl为例:

  • 将Producer状态置为Uninitialized
  • 初始化 pendingMessages 和 pendingCallbacks 队列,用来存放 pending 的 Message 和 Callback
  • 初始化 sequenceId,如果Producer 有配置 initialSequenceId,则将 lastSequenceIdPublished 和 lastSequenceIdPushed 都置为 initialSequenceId ,将msgIdGenerator 置为 initialSequenceId+1 ;如果没有配置,则 lastSequenceIdPublished 和 lastSequenceIdPushed 都置为-1,msgIdGenerator 置为0; lastSequenceIdPushed 表示已经 send 到 broker 的消息的 sequenceId,lastSequenceIdPublished 表示已经 publish 成功的 message 的 sequenceId
  • 启动 sendRequest超时检查任务,从 pendingMessages选取第一个请求,
    • 如果没有pending的Message,按照配置的超时时间新建一个新的超时检查任务
    • 如果存在pending的request,则判断是否超时,
      • 如果不超时,则计算剩余的等待时间,按照剩余时间new一个新的超时检查任务
      • 如果超时,则释放 pendingMessages 中所有的请求,并且清空 pendingMessages 和 pendingCallbacks,如果开启了batch,丢弃batchContainer中的所有内容,然后按照配置的超时时间创建一个新的超时检查任务
  • 计算出producer的创建超时时间
  • 如果开启Batch,初始化一个BatchContainer
  • 初始StatsRecorder
  • 初始化ConnectionHandler,然后调用ConnectionHandler.grabCnx 查找topic所在的broker地址,然后建立到broker的连接
grabCnx

grabCnx分为两个阶段,首先是lookup topic所在的broker,然后创建producer到broker的连接。先看findBroker的过程,依旧是通过LookupService来查找:

  • 首先根据serviceUrl向一个Broker发送 LOOKUP 请求,broker接收到lookup请求之后,首先根据请求的topic,查找对应的namespacebundle信息,然后查找bundle所在的broker地址,最后将结果返回给producer
  • producer得到topic所在的broker地址之后,创建到对应broker的连接,连接建立之后,回调ProducerImpl的connectionOpened方法,执行接下来的初始化工作:
    • 发送 PRODUCER 请求到broker,broker接收到 PRODUCER 请求之后,会进行以下操作:
      • 获取Topic信息
      • 检查 Topic 是否超出了BacklogQuota的限制,如果超过则返回异常,异常根据BacklogQuota.RetentionPolicy的不同而不同,如果policy是producer_request_hold,client接收到异常之后,会阻塞Producer的创建;如果是producer_exception则会抛出异常
      • 创建服务端的Producer对象,并将Producer注册到Topic中
      • 返回 PRODUCER_SUCCESS响应给Producer,会携带消息去重中的最大sequenceId
    • 接收到PRODUCER_SUCCESS响应之后,Producer会根据sequenceId来初始化lastSequenceIdPublished以及msgIdGenerator,逻辑是如果msgIdGenerator为0并且producer没有指定InitialSequenceId,则将lastSequenceIdPublished置为sequenceId,msgIdGenerator置为sequenceId+1
    • 如果开启了Batch,则初始化一个定时任务,根据配置的batchingMaxPublishDelayMicros时间来定时发送消息,发送是会将BatchContainer中的所有message遍历封装到send请求中发送到Broker,请求中会携带第一条message的sequenceId和最后一条mesage的sequenceId
    • 重新发送pendingMessages中的消息

到这里,producer的初始化工作全部完成。

生产数据

Producer初始化完成之后,就可以使用Producer来发送数据到broker。发送数据之前需要首先构造出Message,然后将Message发送到Broker。

TypedMessageBuilder<byte []> messageBuilder = producer.newMessage()
                .key("Key" + key) // 分区路由使用的key,即partition key
                .value("Value" + value) // 内容
                .deliverAfter(100, TimeUnit.SECONDS) // 延迟投递
                .eventTime(System.currentTimeMillis()) // eventtime时间
                .orderingKey("orkerkey".getBytes()) // 用于SubscriptionType#Key_Shared模式的key,如果没有会使用partition key
                .property("myKey-async","myValue-async"); // 消息属性
MessageIdImpl messageId = (MessageIdImpl) messageBuilder.sendAsync().get();

在构建Message的过程中可以指定一些参数,如上代码所示。生产数据的流程

  • Producer首先判断是否超过MaxPendingMessage的限制,超出则抛出异常
  • 判断是否是replicated的消息并且具有ProducerName,如果不是replicated的消息但是具有producerName,则抛出异常,因为producerName是在序列化send请求是设置的,在这里不应该存在producerName
  • 判断是否是不支持Batch 或者 hasDeliverAtTime,这两种场景都需要单独发送,不需要batch,并且如果请求的大小超过了最大限制,则需要判断是否支持chunk,不支持则抛出异常
  • 如果需要切分,那么切分之后会增加send请求数,再次判断是否超过MaxPendingMessage的限制,对每个chunk单独进行发送,切分之后的chunk具有相同的sequenceId(UUID,组合了producerName和sequenceId的字符串),不同的chunkId,相同的totalChunk,以及相同的压缩/非压缩size、producerName
  • 如果是chunked message,不用考虑batch,因为初始化producer的时候已经做了校验,batch和chunk不能同时存在,直接将消息发送到Broker
  • 如果是普通message,判断是否支持可以batch发送
    • 如果可以Batch(开启batch,并且没有deliverAtTime),判断是否可以添加到当前Batch
      • 如果可以添加到当前Batch(batch的消息大小和条数都可以容纳当前message),判断sequenceId是否小于等于 lastSequenceIdPushed
        • 如果sequenceId <= lastSequenceIdPushed,说明message可能重复,isLastSequenceIdPotentialDuplicated = true,需要则先把BatchContainer中的消息打包发送到Broker,原因是把可能重复的消息和不重复的消息区分开,然后将当前Message添加到BatchContainer
        • 否则,现将Message加到BatchContainer中,然后判断batch是否full,如果batch是full状态,则把BatchContainer中的消息打包发送到Broker
      • 如果不能添加到当前Batch,则先把BatchContainer中的消息打包发送到Broker,然后将当前Message添加到BatchContainer
    • 如果不可以Batch,直接发送消息到Broker

Broker接收到send请求之后,将通过ManagedLedger将数据写入到Bookie,然后返回 SEND_RECEIPT 给client。 客户端拿到SEND_RECEIPT之后,可以获取messageId,包括LedgerId、EntryId (batchIndex)。

本文主要介绍了Pulsar producer的相关内容,包括一些关键配置和 producer 与 broker交互的流程。