导语
Pulsar 作为一个消息传输的解决方案,最基本的功能是提供了pub/sub模型的消息服务,即作为一个消息中间件的能力,本文主要以Java Client为例讲述生产者、消费者和Broker之间的交互过程。

1. Client 与 broker 交互流程

和常见的MQ一样,Topic(分区)由 Broker 持有,因此生产者和消费者首先需要创建连接到 Broker,然后查询对应发布或者订阅的 Topic 所在的 Broker 信息。
这个过程起始分为以下几个步骤:

  • 创建到 Broker 的连接
  • 连接创建完毕之后,向 Broker 查询 Topic 的分区数信息
  • 根据分区数,查询对应分区当前被哪个 Broker 持有
  • 创建到分区所在 Broker 的连接

当 producer/consumer 已经连接到具体分区所在的 Broker 之后,
生产者可以进行生产:

  • 向 Broker 发送请求注册 Producer
  • 发送数据

消费者可以进行消费:

  • 向 Broker 发送请求注册 Subscription
  • 请求数据
  • 发送 Ack 请求到 Broker

当 Broker 接收到上述请求之后,会一一处理并且回应:

  • 对于创建链接请求,响应创建成功
  • 对于分区查询请求,响应分区数
  • 对于分区所有 Broker 查询请求,响应 Broker URL
  • 对于生产者/消费者注册请求,响应生产者、消费者注册成功
  • 对于生产者注册请求,响应生产注册成功
  • 对于生产数据请求,响应生产已经接受
  • 对于消费数据请求,响应数据

总体上的生产者、消费者和Broker的交互逻辑如上所述,下面结合代码详细描述整个过程。

2. PulsarClient 初始化过程

初始化Pulsar Producer和Consumer都需要先初始化 Pulsar client。创建一个 pulsar client 的代码如下:

PulsarClient pulsarClient =
                PulsarClient.builder()
                        .serviceUrl(url)
                        .statsInterval(intervalInSecs, TimeUnit.SECONDS)
			.build();

这里会涉及到两个参数

  • serviceUrl: broker 地址列表,也可以通过ServiceUrlProvider提供
  • statsInterval: client状态采集的周期,默认是60s,比如生产者会采集生产速率,消费者会采集消费速率,当配置大于0时生效,如果小于等于0则不采集状态

初始化 PulsarClientImpl

当所有的参数设置完成之后,最后build()会初始化一个PulsarClientImpl对象。在这个过程中,首先会初始化 EventLoopGroup 和 ConnectionPool ,然后执行 PulsarClientImpl的初始化。

初始化EventLoopGroup

  • 根据平台是否支持epoll,决定初始化一个 EpollEventLoopGroup 或者NioEventLoopGroup(netty可以使用native transport来提升性能),并且设置EventLoopGroup的IO线程数。

一个EventLoopGroup包含多个EventLoop, Netty使用EventLoop来处理连接上的读写事件,而一个连接上的所有请求都保证在一个EventLoop中被处理,一个EventLoop中只有一个Thread,所以也就实现了一个连接上的所有事件只会在一个线程中被执行。

初始化ConnectionPool

主要是初始化一下内容

  • eventLoopGroup
  • 连接池缓存pool,pool用来保存 链接地址和ClientCnx的映射关系
  • 初始化 Netty Bootstrap,设置一些参数以及handler,PulsarChannelInitializer, 在链接建立之后,会初始化 Channel 的 handler pipeline,主要的handler是 ClientCnx
  • 初始化 Netty dnsResolver
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
            Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
        this.eventLoopGroup = eventLoopGroup;
        this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();

        pool = new ConcurrentHashMap<>();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup);
        bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));

        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
        bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
        bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);

        try {
            bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier));
        } catch (Exception e) {
            log.error("Failed to create channel initializer");
            throw new PulsarClientException(e);
        }

        this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
    }

EventLoopGroup 和 ConnectionPool初始化完毕之后,继续进行PulsarClientImpl的初始化工作

  • 初始化lookup服务 :Lookup服务用来查找topic 的元数据信息,比如分区数、分区所在broker地址、Schema信息等,有两种类型,HttpLookupService 和 BinaryProtoLookupService
  • 初始化producer/consumer缓存(set)
  • 初始化MemoryLimitController:MemoryLimitController用来做生产消息的内存占用限制,内部会维护内存占用的计数器,当生产消息时,会增加计数;生产完毕时减少计数;如果计数达到上限时,根据blockIfQueueFull来决定阻塞请求或者抛出异常

完成初始化工作之后,将Client的状态置为Open