Chunk message

在实际的生产环境环境中,会存在一些特定的消息超出默认最大消息配置(5MB)的情况,如果不进行处理,这部分消息就会因为超过了5MB而被broker拒绝处理。Pulsar 对于这种场景,使用 chunk message机制来满足大消息的需求。chunk message的原理比较简单,即在生产消息时,对超过了5MB的消息进行切分,切分之后的消息都会小于5MB,消息到了broker之后,broker不会对chunk 消息做任何特殊的处理,而是作为一个普通的消息,在消费时consumer会将消息拼接成一个完整的消息,然后进行处理,因此为了保证chunk message 都可以拼接得到一个完整的消息,chunk message不能支持shared模型的消费模式。

Chunk message 的生产

chunk message是对于超过最大消息限制的消息的一种特殊处理,因此不支持batch,如果支持了batch,那么batch会后的消息会超过最大消息的限制。

 checkArgument(!(conf.isBatchingEnabled() && conf.isChunkingEnabled()),
                "Batching and chunking of messages can't be enabled together");

如果要支持chunk message,还需要在构造producer时,将chunk message的开关打开,

producerBuilder.enableChunking(true);

对于超大消息的处理,首先根据ClientCnx.getMaxMessageSize计算需要将消息切分为几个chunk,

int totalChunks = canAddToBatch(msg) ? 1
                : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
                        + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);

对于chunk 消息,为了可以识别是否属于一个大消息,还会添加一些其他的辅助信息

  if (uuid != null) {
                msgMetadata.setUuid(uuid);
 }
msgMetadata.setChunkId(chunkId)
                .setNumChunksFromMsg(totalChunks)
                .setTotalChunkMsgSize(compressedPayloadSize);
  • uuid,首先为了和其他的消息区分,一个大消息的多个chunk会有一个uuid做唯一标识
  • chunkId,表示当前chunk在整个大消息拆分之后的结果中的位置
  • totalChunks,表示这个大消息被切分为了几个chunk
  • totalChunkMsgSize,原始消息大小

通过以上这三个信息,就可以唯一的标识一个chunk message了。

chunk message生产成功之后会返回最后一个 chunk message 的Id,作为整体的messageId,

 LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp)));
        op.setMessageId(ledgerId, entryId, partitionIndex);
        // if message is chunked then call callback only on last chunk
        if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
            try {
                // Need to protect ourselves from any exception being thrown in the future handler from the
                // application
                op.sendComplete(null);
            } catch (Throwable t) {
                log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
                        producerName, sequenceId, t);
            }
        }
        ReferenceCountUtil.safeRelease(op.cmd);
        op.recycle();

并且sendcallback只会被调用一次。

Chunk message 的消费

消费时,会把所有的chunk接收到之后,拼接成为一个整体的message,然后交由consumer处理。在等待 chunk 消息的过程中,会把接收到的不完整的chunk message,保存在一个缓存chunkedMessagesMap中。

if (msgMetadata.getChunkId() == 0) {
  					// 接收到第一个 chunk 时,初始化接收所有 chunk 的 ByteBuf
            ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(),
                    msgMetadata.getTotalChunkMsgSize());
            int totalChunks = msgMetadata.getNumChunksFromMsg();
  					// 以 UUID 为 key,ChunkedMessageCtx 为value的 chunk message 缓存
            chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
                    (key) -> ChunkedMessageCtx.get(totalChunks, chunkedMsgBuffer));
            pendingChunkedMessageCount++;
            if (maxPendingChunkedMessage > 0 && pendingChunkedMessageCount > maxPendingChunkedMessage) {
                removeOldestPendingChunkedMessage();
            }
            pendingChunkedMessageUuidQueue.add(msgMetadata.getUuid());
        }
...
  static ChunkedMessageCtx get(int numChunksFromMsg, ByteBuf chunkedMsgBuffer) {
            ChunkedMessageCtx ctx = RECYCLER.get();
            ctx.totalChunks = numChunksFromMsg;
            ctx.chunkedMsgBuffer = chunkedMsgBuffer;
            ctx.chunkedMessageIds = new MessageIdImpl[numChunksFromMsg];
            ctx.receivedTime = System.currentTimeMillis();
            return ctx;
        }

之后没接收到一个 chunk message,都会添加到对应的ChunkedMessageCtx中,其中payload写到ctx.chunkedMsgBuffer,并且记录ctx.chunkedMessageIds,

    // if final chunk is not received yet then release payload and return
        if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() - 1)) {
            compressedPayload.release();
            increaseAvailablePermits(cnx);
            return null;
        }

不是最后一个chunk,在执行完上述操作之后,release payload。

如果接收到了最后一个chunk message,那么我们已经获得了一个完整的消息,删除缓存的chunk message信息,然后返回完整message。

异常chunk message的处理

  • 如果接收到一个chunk message,但是在chunk message 中没有uuid对应的ChunkedMessageCtx,说明接收到了一个乱序或者其他消息的chunk message,丢弃。

  • 如果 chunkId 大于了总体chunk 数,说明是个未知 chunk,丢弃。

  • 如果chunkId不是上一个chunkId+1,说明乱序,丢弃。

chunk message 的expire

前文说到,说有的chunk message在拼接完成之前会保存在缓存中,那么如果一个message的一直没有收到所有的chunk,这些数据就会在内存中堆积。对于这部分的数据,consumer会启动一个过期清理的任务,默认1分钟执行一次,

 private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx, boolean autoAck) {
        if (chunkedMsgCtx == null) {
            return;
        }
        // clean up pending chuncked-Message
        chunkedMessagesMap.remove(msgUUID);
        if (chunkedMsgCtx.chunkedMessageIds != null) {
            for (MessageIdImpl msgId : chunkedMsgCtx.chunkedMessageIds) {
                if (msgId == null) {
                    continue;
                }
                if (autoAck) {
                    log.info("Removing chunk message-id {}", msgId);
                    doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
                } else {
                    trackMessage(msgId);
                }
            }
        }
        if (chunkedMsgCtx.chunkedMsgBuffer != null) {
            chunkedMsgCtx.chunkedMsgBuffer.release();
        }
        chunkedMsgCtx.recycle();
        pendingChunkedMessageCount--;
    }

清理任务会把过期的chunk message 从缓存中清楚,并且执行msg的ack,ack之后的数据不会再接收到。

另外,为了防止内存中 pending 状态的 chunk message 过多,consumer还可以配置maxPendingChunkedMessage(这是对于完整message的一个配置,不是值chunk message的数量),大于0时生效。当pending的 chunk message数超过这个配置之后,会将最早接收到的一个 chunkMessage从缓存中清楚,然后根据配置autoAckOldestChunkedMessageOnQueueFull决定是是否ack。