Pulsar Proxy 可以看做是 pulsar broker的代理,producer 和 consumer 可以不直接与 Broker 直接链接,而是通过 proxy 和 broker进行连接。

image.png

1. 使用方式

1.1 如何配置?

在使用 proxy 之前,需要对proxy进行相关的配置来让 proxy 发现 Broker。有两种方式:配置 Broker URL 或者 配置服务发现。

1.1.1 配置 Broker URL

配置方式如下,修改 conf/proxy.conf 文件

brokerServiceURL=pulsar://brokers.example.com:6650
brokerWebServiceURL=http://brokers.example.com:8080

即在 Proxy上配置链接的 Broker 地址,可以通过单个域名对应多个IP的方式来屏蔽单个broker 故障的问题。注意 proxy权限控制需要访问zk,所以如果使用这种方式时,需要在proxy上disable 权限控制(Broker的权限控制不受影响)

1.1.2 配置服务发现

配置服务发现的方式类似于 Broker 的元数据配置

metadataStoreUrl=my-zk-0:2181,my-zk-1:2181,my-zk-2:2181
configurationMetadataStoreUrl=my-zk-0:2184,my-zk-remote:2184

这种方式可以直接访问zk,所以安全性不如第一种方式。

1.2 启动 Proxy

使用如下命令启动 proxy 服务, 一个pulsar集群可以启动多个 proxy。

 bin/pulsar proxy \
  --metadata-store zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 \
  --configuration-metadata-store zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181

2. 原理

Proxy 启动时首先会进行参数的校验,需要至少满足一种配置方式,配置Broker URL 或者配置服务发现。

Proxy 的主要的逻辑都包含在 ProxyService 这个类中,在 proxy 启动时,会对应的启动 proxyService,对应的内部会启动一个 netty 的 server,即 ServerBootstrap,ServerBootstrap监听本机的服务端口。

当 client 和 proxy 建立连接之后,会初始化 channel(ServiceChannelInitializer),channel 初始化,会为channel 配置一系列的 handler,主要关注 ProxyConnection 这个 handler。

ProxyConenction 初始状态为Init

此时开始处理client的请求,以 producer 为例,依次会发送的请求为:

  • CONNECT
  • PARTITION_METADATA
  • LOOKUP
  • PRODUCER
  • SEND

当 proxy 接收到 CONNECT 请求时,此时 ProxyConnection 处于 Init 状态,直接使用 ProxyConenction.handleConnect方法进行处理。

此时需要区分 Connect 请求中是否包含 brokerUrl 信息,对于brokerURL 信息在2.2节之后会比较好理解。

2.1 包含 BrokerUlr 信息

包含 BrokerUrl 信息,此时会直接链接到对应 Broker。

  • ProxyConnection状态修改为 ProxyConnectingToBroker
  • 初始化 DirectProxyHandler,用来处理后续的请求
  • 通过 DirectProxyHandler 创建 到 Broker 的链接,内部也是一个Bootstrap
  • 为 BootStrap 设置 ChannelInitializer以及对应的handler,类型为ProxyBackendHandler,用来处理 proxy和broker之间的请求,ProxyBackendHandler 初始状态为Init
  • 创建到 Broker 的链接,并且记录 对应的channel 为 outboundChannel(DirectProxyHandler 的 outboundChannel,对应的 inboundChannel 为 ProxyConnection的channel,即 client 和 proxy 的 channel)
  • 链接创建完成之后,回调 channelActive,发送 CONNECT 请求到 Broker
  • 接收到 Broker CONNECTED 响应,此时ProxyBackendHandler状态为 Init,回调 ProxyBackendHandler.handleConnected 方法进行处理
  • 将 ProxyBackendHandler 状态置为 HandshakeCompleted,并为 inbound 和outbound 添加 ParserProxyHandler(记录一些状态信息的handler)
  • 此时完成了 proxy 到 broker 的connect
  • 将 ProxyConnection 状态置为 ProxyConnectionToBroker, 并发送Connected 响应给 client

至此,完成了 CONNECT 请求的处理,此时的 ProxyConnection 状态为 ProxyConnectionToBroker,这种状态下,接收到的所有请求都会通过 directProxyHandler 的 outboundhandler直接发送,这里的 outboudnhandler 即为 ProxyBackendHandler 中创建的到 Broker 的链接。

PARTITION_METADATA 请求为例,ProxyConnection 会直接通过 directProxyHandler 的 outboundhandler发送给 Broker,当 Broker 处理按请求之后,会将结果发送给 ProxyBackendHandler(状态为 HandshakeCompleted),此时将结果通过 inboundChannel (inboundChannel是client和proxy的channel)发送给 client。

剩余的所有请求都是类似处理,直接通过 outbound 和 inbound channel 进行转发。

2.2 不包含 BrokerUrl 信息

2.2.1 CONNECT 请求

如果client的connect 请求没有指定 brokerUrl,这创建一个新的 LookupProxyHandler,通过这个 handler 来处理转发逻辑,然后直接通知 client 已经完成 connect。

注意,此时 PorxyConnection 的状态没有变动,还是 init

2.2.2 PARTITION_METADATA 请求

继续处理 PARTITIONED_METADATA 请求,通过 LookupProxyHandler.handlePartitionMetadataResponse 来完成。获取元数据的请求需要发送给 Broker,首先要获取 Broker 的地址:

  • 首先获取当前 availableBroker 列表
  • 从 availableBroker 列表中(轮询)选择一个

获得一个可用的 Broker 之后,

  • 建立到 Broker 的链接
  • 发送 PARTITION_METADATA 请求
  • 发送 METADATA 结果给 client
  • 释放到 Broker 的链接

2.2.3 LOOKUP 请求

lookup 请求也是通过LookupProxyHandler来处理的,流程和 PARTITION_METADATA 请求类似,先要获取 Broker 的地址:

  • 首先获取当前 availableBroker 列表
  • 从 availableBroker 列表中(轮询)选择一个

获得一个可用的 Broker 之后,

  • 建立到 Broker 的链接
  • 发送 LOOKUP 请求
  • 发送 LOOKUP_RESPONSE 结果给 client, 注意: 返回的结果是 Broker 的 serviceUrl 信息,但是 client 不能和 Broker 直接建立链接,所以在Lookup 的响应中有一个标识位 ProxyThroughServiceUrl 会被设置为 true,标识这个 lookup_response 结果是一个 proxy返回的结果,此时客户端不会直接链接到 Broker,而是建立到 Proxy 的链接
  • 释放到 Broker 的链接

这里展开说一下,第三步的流程,是实现 proxy 能力的关键。
当 client 拿到 proxy 返回的 lookup 响应之后,会建立到 proxy 的链接,并且记录目标 BrokerUrl信息。当物理连接建立之后,会回调 channelActive 方法,然后回构建 CONNECT 请求,注意此时的CONNECT 请求有一个独特的处理,请求会携带 ProxyToBrokerUrl 信息,即CONNECT 中有实际的 Broker 指标信息。此时就回到了 2.1 中的处理流程。之后的处理流程和 2.1 描述一致。

3. 总结

Pulsar proxy 提供了一种跨网络访问集群的能力,可以在不直接暴漏所有 broker 服务的前提下堆外提供服务。