Publish工作原理
向Pulsar Publish数据,需要首先初始化一个producer,初始化的过程中可以为producer指定一些属性
pulsarClient.newProducer()
.enableBatching(true) // 开启batch
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) // batch的最大等待时间
.batchingMaxBytes(12800) // batch的最大大小
.batchingMaxMessages(10) // batch中最多保留的消息条数
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition) // Message Routing Mode
.roundRobinRouterBatchingPartitionSwitchFrequency(10) // batch开启时,分区切换频率
.maxPendingMessages(50000) // 最大缓存的生成请求数
.maxPendingMessagesAcrossPartitions(100000)// 分区topic配置,即所有分区的最大缓存数量
.blockIfQueueFull(true) // 当达到最大缓存数时是否block客户端
.sendTimeout(5, TimeUnit.SECONDS) // 发送超时时间
.topic("test")
.compressionType(CompressionType.SNAPPY) // 压缩类型
.producerName("producer-name")
.enableChunking(true) // 是否开启Chunking特性
.accessMode(ProducerAccessMode.Shared) // producer mode
.intercept(new ProducerInterceptor() { // 设置interceptor,实现自定义的功能
@Override
public void close() {}
@Override
public boolean eligible(Message message) {return false;}
@Override
public Message beforeSend(Producer producer, Message message) { return null;}
@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {}
})
.create();
Producer配置
Batch 配置
Producer可以配置Batch发送消息和单条发送消息,开启了batch之后,可以通过:
- 消息条数
- 消息大小
- batch等待时间
以上三个条件来产生batch,上述配置只要满足一个就会触发一个新的batch
Routing 配置
这个配置是针对分区topic的,可以为message设置路由策略来决定message会被发到哪个分区,包含三种:
- SinglePartition:随机选择一个分区,将所有消息写到这个分区
- RoundRobinPartition:轮询策略,会将消息轮询的发送到每个分区,如果producer没有开启batch,则每发送一条message,就会切换一个分区;如果开启了batch,则按照batch的最大等待时间batchingMaxPublishDelayMicros的倍数来决定,具体的倍数由参数 batchingPartitionSwitchFrequencyByPublishDelay 指定,即每经过 batchingMaxPublishDelayMicros * batchingPartitionSwitchFrequencyByPublishDelay 的时间,分区切换一次,在切换的时间内message被发送到同一个分区
- CustomPartition: 自定义分区策略,分区方式有MessageRouter决定,即需要自己实现MessageRouter
对于 SinglePartition 和 RoundRobinPartition,如果message设置了key,都会按照hash的方式决定message写到哪个分区。
默认配置RoundRobinPartition;如果存在customMessageRouter,则RoutingMode只能是CustomPartition。
Pending配置
Producer可以最多hold的message数量,如果超过这个配置,则按照blockIfQueueFull来决定是阻塞生产者或者抛出异常信息。
如果是分区topic,则每个分区可以hold的最大消息数量为:
Math.min(conf.getMaxPendingMessages(), conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
Chunk 配置
当一条Message的大小超出了broker配置的最大消息大小时,开启Chunk配置,可以让producer自动将消息split成为多个小的消息,并且按照顺序发送到Broker。如果消费者希望按照获取切分之前的Message,需要对producer和consumer进行以下配置:
- 目前只支持 non-shared subscription 和 persistent topic
- disable batch
- Pulsar-client 在接收到 ack之后才能继续发送消息到Broker(防止chunk乱序),所以可以调小maxPendingMessages来避免太高的内存占用
- 配置message 的 ttl/retention来清理不完整的chunk message, 有些场景比如producer或者broker重启,会导致broker接收到了不完整的chunk message,这一部分消息consumer无法ack,需要按照ttl或者retention被清理掉;或者通过配置ConsumerBuilder#expireTimeOfIncompleteChunkedMessage(long, TimeUnit)来执行message expire逻辑
- 消费者最好配置 receiverQueueSize 和 maxPendingChuckedMessage
Producer Mode 配置
生产者的模式,目前包括三种:
- Shared :默认多个producer可以同时向一个topic publish消息
- Exclusive:请求独占式生产,如果已经存在其他producer,则立即失败
- WaitForExclusive:producer的创建会pending,直至获取到独占访问权限
Producer 与 Broker 交互
完成了配置之后,就可以对Producer进行初始化,然后publish message 到broker,这一系列的过程需要和Broker进行不断的交互,主要的交互流程如下:
接下来就围绕Producer的整个初始化以及生产过程分析Producer/Broker交互的整个过程
创建ProducerImpl/PartitionedProducerImpl
首先是一些参数校验和补齐的工作:
- 校验是否batch和chunk同时开启,如果是,抛出异常
- 校验topic
然后获取元数据信息:
- 获取元数据信息
- 创建ProdducerImpl或者PartitionedProducerImpl
- 在Client的Producer缓存中保存Producer信息
获取元数据
Producer会向Broker发出元数据请求,通过lookup.getPartitionedTopicMetadata来完成,这个过程会涉及到几个交互命令,我们逐个进行分析(以BinaryProtoLookupService为例):
- 第一步,Producer根据serviceUrl指定的Broker地址连接到Broker;如果配置了多个Broker地址,第一次连接时会随机选择一个,之后按照配置的顺序选取下一个Broker地址,通过bootstrap.connect(remoteAddress)完成,不会发送任务Pulsar Command到Broker,链接创建完毕之后,会触发Netty Handler的channelActive方法:
- 如果keepAliveIntervalSeconds > 0,则每经过keepAliveIntervalSeconds的时间就启动keepAlive的任务,发送PING到Broker,Broker会返回PONG响应
- 启动 request 超时定时检查任务,检查requestTimeoutQueue中是否存在超时的请求,如果有则从pendingRequests移除对应的请求并将请求结果置为超时
- 发送CONNECT命令到Broker,并将Client状态置为 SentConnectFrame; Broker接收到CONNECT请求之后,会返回CONNECTED响应,并且携带Broker端的maxMessageSize配置信息
- 第二步,链接创建完毕之后,Producer 向 Broker 发送PARTITIONED_METADATA请求,Broker接收到请求之后,会获取topic的元数据信息,然后返回PARTITIONED_METADATA_RESPONSE 给 Producer
创建Producer实例
根据 PARTITIONED_METADATA_RESPONSE 中的分区数,决定创建那种类型的Producer(ProducerImpl或者PartitionedProducerImpl),我们这里以ProducerImpl为例:
- 将Producer状态置为Uninitialized
- 初始化 pendingMessages 和 pendingCallbacks 队列,用来存放 pending 的 Message 和 Callback
- 初始化 sequenceId,如果Producer 有配置 initialSequenceId,则将 lastSequenceIdPublished 和 lastSequenceIdPushed 都置为 initialSequenceId ,将msgIdGenerator 置为 initialSequenceId+1 ;如果没有配置,则 lastSequenceIdPublished 和 lastSequenceIdPushed 都置为-1,msgIdGenerator 置为0; lastSequenceIdPushed 表示已经 send 到 broker 的消息的 sequenceId,lastSequenceIdPublished 表示已经 publish 成功的 message 的 sequenceId
- 启动 sendRequest超时检查任务,从 pendingMessages选取第一个请求,
- 如果没有pending的Message,按照配置的超时时间新建一个新的超时检查任务
- 如果存在pending的request,则判断是否超时,
- 如果不超时,则计算剩余的等待时间,按照剩余时间new一个新的超时检查任务
- 如果超时,则释放 pendingMessages 中所有的请求,并且清空 pendingMessages 和 pendingCallbacks,如果开启了batch,丢弃batchContainer中的所有内容,然后按照配置的超时时间创建一个新的超时检查任务
- 计算出producer的创建超时时间
- 如果开启Batch,初始化一个BatchContainer
- 初始StatsRecorder
- 初始化ConnectionHandler,然后调用ConnectionHandler.grabCnx 查找topic所在的broker地址,然后建立到broker的连接
grabCnx
grabCnx分为两个阶段,首先是lookup topic所在的broker,然后创建producer到broker的连接。先看findBroker的过程,依旧是通过LookupService来查找:
- 首先根据serviceUrl向一个Broker发送 LOOKUP 请求,broker接收到lookup请求之后,首先根据请求的topic,查找对应的namespacebundle信息,然后查找bundle所在的broker地址,最后将结果返回给producer
- producer得到topic所在的broker地址之后,创建到对应broker的连接,连接建立之后,回调ProducerImpl的connectionOpened方法,执行接下来的初始化工作:
- 发送 PRODUCER 请求到broker,broker接收到 PRODUCER 请求之后,会进行以下操作:
- 获取Topic信息
- 检查 Topic 是否超出了BacklogQuota的限制,如果超过则返回异常,异常根据BacklogQuota.RetentionPolicy的不同而不同,如果policy是producer_request_hold,client接收到异常之后,会阻塞Producer的创建;如果是producer_exception则会抛出异常
- 创建服务端的Producer对象,并将Producer注册到Topic中
- 返回 PRODUCER_SUCCESS响应给Producer,会携带消息去重中的最大sequenceId
- 接收到PRODUCER_SUCCESS响应之后,Producer会根据sequenceId来初始化lastSequenceIdPublished以及msgIdGenerator,逻辑是如果msgIdGenerator为0并且producer没有指定InitialSequenceId,则将lastSequenceIdPublished置为sequenceId,msgIdGenerator置为sequenceId+1
- 如果开启了Batch,则初始化一个定时任务,根据配置的batchingMaxPublishDelayMicros时间来定时发送消息,发送是会将BatchContainer中的所有message遍历封装到send请求中发送到Broker,请求中会携带第一条message的sequenceId和最后一条mesage的sequenceId
- 重新发送pendingMessages中的消息
- 发送 PRODUCER 请求到broker,broker接收到 PRODUCER 请求之后,会进行以下操作:
到这里,producer的初始化工作全部完成。
生产数据
Producer初始化完成之后,就可以使用Producer来发送数据到broker。发送数据之前需要首先构造出Message,然后将Message发送到Broker。
TypedMessageBuilder<byte []> messageBuilder = producer.newMessage()
.key("Key" + key) // 分区路由使用的key,即partition key
.value("Value" + value) // 内容
.deliverAfter(100, TimeUnit.SECONDS) // 延迟投递
.eventTime(System.currentTimeMillis()) // eventtime时间
.orderingKey("orkerkey".getBytes()) // 用于SubscriptionType#Key_Shared模式的key,如果没有会使用partition key
.property("myKey-async","myValue-async"); // 消息属性
MessageIdImpl messageId = (MessageIdImpl) messageBuilder.sendAsync().get();
在构建Message的过程中可以指定一些参数,如上代码所示。生产数据的流程
- Producer首先判断是否超过MaxPendingMessage的限制,超出则抛出异常
- 判断是否是replicated的消息并且具有ProducerName,如果不是replicated的消息但是具有producerName,则抛出异常,因为producerName是在序列化send请求是设置的,在这里不应该存在producerName
- 判断是否是不支持Batch 或者 hasDeliverAtTime,这两种场景都需要单独发送,不需要batch,并且如果请求的大小超过了最大限制,则需要判断是否支持chunk,不支持则抛出异常
- 如果需要切分,那么切分之后会增加send请求数,再次判断是否超过MaxPendingMessage的限制,对每个chunk单独进行发送,切分之后的chunk具有相同的sequenceId(UUID,组合了producerName和sequenceId的字符串),不同的chunkId,相同的totalChunk,以及相同的压缩/非压缩size、producerName
- 如果是chunked message,不用考虑batch,因为初始化producer的时候已经做了校验,batch和chunk不能同时存在,直接将消息发送到Broker
- 如果是普通message,判断是否支持可以batch发送
- 如果可以Batch(开启batch,并且没有deliverAtTime),判断是否可以添加到当前Batch
- 如果可以添加到当前Batch(batch的消息大小和条数都可以容纳当前message),判断sequenceId是否小于等于 lastSequenceIdPushed
- 如果sequenceId <= lastSequenceIdPushed,说明message可能重复,isLastSequenceIdPotentialDuplicated = true,需要则先把BatchContainer中的消息打包发送到Broker,原因是把可能重复的消息和不重复的消息区分开,然后将当前Message添加到BatchContainer
- 否则,现将Message加到BatchContainer中,然后判断batch是否full,如果batch是full状态,则把BatchContainer中的消息打包发送到Broker
- 如果不能添加到当前Batch,则先把BatchContainer中的消息打包发送到Broker,然后将当前Message添加到BatchContainer
- 如果可以添加到当前Batch(batch的消息大小和条数都可以容纳当前message),判断sequenceId是否小于等于 lastSequenceIdPushed
- 如果不可以Batch,直接发送消息到Broker
- 如果可以Batch(开启batch,并且没有deliverAtTime),判断是否可以添加到当前Batch
Broker接收到send请求之后,将通过ManagedLedger将数据写入到Bookie,然后返回 SEND_RECEIPT 给client。 客户端拿到SEND_RECEIPT之后,可以获取messageId,包括LedgerId、EntryId (batchIndex)。
本文主要介绍了Pulsar producer的相关内容,包括一些关键配置和 producer 与 broker交互的流程。