导语
Pulsar 作为一个消息传输的解决方案,最基本的功能是提供了pub/sub模型的消息服务,即作为一个消息中间件的能力,本文主要以Java Client为例讲述生产者、消费者和Broker之间的交互过程。
1. Client 与 broker 交互流程
和常见的MQ一样,Topic(分区)由 Broker 持有,因此生产者和消费者首先需要创建连接到 Broker,然后查询对应发布或者订阅的 Topic 所在的 Broker 信息。
这个过程起始分为以下几个步骤:
- 创建到 Broker 的连接
- 连接创建完毕之后,向 Broker 查询 Topic 的分区数信息
- 根据分区数,查询对应分区当前被哪个 Broker 持有
- 创建到分区所在 Broker 的连接
当 producer/consumer 已经连接到具体分区所在的 Broker 之后,
生产者可以进行生产:
- 向 Broker 发送请求注册 Producer
- 发送数据
消费者可以进行消费:
- 向 Broker 发送请求注册 Subscription
- 请求数据
- 发送 Ack 请求到 Broker
当 Broker 接收到上述请求之后,会一一处理并且回应:
- 对于创建链接请求,响应创建成功
- 对于分区查询请求,响应分区数
- 对于分区所有 Broker 查询请求,响应 Broker URL
- 对于生产者/消费者注册请求,响应生产者、消费者注册成功
- 对于生产者注册请求,响应生产注册成功
- 对于生产数据请求,响应生产已经接受
- 对于消费数据请求,响应数据
总体上的生产者、消费者和Broker的交互逻辑如上所述,下面结合代码详细描述整个过程。
2. PulsarClient 初始化过程
初始化Pulsar Producer和Consumer都需要先初始化 Pulsar client。创建一个 pulsar client 的代码如下:
PulsarClient pulsarClient =
PulsarClient.builder()
.serviceUrl(url)
.statsInterval(intervalInSecs, TimeUnit.SECONDS)
.build();
这里会涉及到两个参数
- serviceUrl: broker 地址列表,也可以通过ServiceUrlProvider提供
- statsInterval: client状态采集的周期,默认是60s,比如生产者会采集生产速率,消费者会采集消费速率,当配置大于0时生效,如果小于等于0则不采集状态
初始化 PulsarClientImpl
当所有的参数设置完成之后,最后build()
会初始化一个PulsarClientImpl对象。在这个过程中,首先会初始化 EventLoopGroup 和 ConnectionPool ,然后执行 PulsarClientImpl的初始化。
初始化EventLoopGroup
- 根据平台是否支持epoll,决定初始化一个 EpollEventLoopGroup 或者NioEventLoopGroup(netty可以使用native transport来提升性能),并且设置EventLoopGroup的IO线程数。
一个EventLoopGroup包含多个EventLoop, Netty使用EventLoop来处理连接上的读写事件,而一个连接上的所有请求都保证在一个EventLoop中被处理,一个EventLoop中只有一个Thread,所以也就实现了一个连接上的所有事件只会在一个线程中被执行。
初始化ConnectionPool
主要是初始化一下内容
- eventLoopGroup
- 连接池缓存pool,pool用来保存 链接地址和ClientCnx的映射关系
- 初始化 Netty Bootstrap,设置一些参数以及handler,
PulsarChannelInitializer
, 在链接建立之后,会初始化 Channel 的 handler pipeline,主要的handler是 ClientCnx - 初始化 Netty dnsResolver
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
this.eventLoopGroup = eventLoopGroup;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
pool = new ConcurrentHashMap<>();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
try {
bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier));
} catch (Exception e) {
log.error("Failed to create channel initializer");
throw new PulsarClientException(e);
}
this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
.channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
}
EventLoopGroup 和 ConnectionPool初始化完毕之后,继续进行PulsarClientImpl的初始化工作
- 初始化lookup服务 :Lookup服务用来查找topic 的元数据信息,比如分区数、分区所在broker地址、Schema信息等,有两种类型,HttpLookupService 和 BinaryProtoLookupService
- 初始化producer/consumer缓存(set)
- 初始化MemoryLimitController:MemoryLimitController用来做生产消息的内存占用限制,内部会维护内存占用的计数器,当生产消息时,会增加计数;生产完毕时减少计数;如果计数达到上限时,根据blockIfQueueFull来决定阻塞请求或者抛出异常
完成初始化工作之后,将Client的状态置为Open。