初始化 PulsarService时,首先为负载均衡初始化一个单线程的线程池

this.loadManagerExecutor = Executors
                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));

然后执行 pulsarService的启动,这个过程会初始化并且启动 LoadManager。

1. LoadManager 初始化

LoadManager 的初始化是通过反射完成的,LoadManager 的类型在conf中配置,默认是 ModularLoadManagerImpl,有一点比较特别的地方需要注意:ModularLoadManagerImpl 并且不是一个LoadManager的子类。LoadManager的子类是SimpleLoadManagerImpl,而 ModularLoadManagerImpl 继承自 ModularLoadManager, 通过 ModularLoadManagerWrapper封装成 一个ModularLoadManagerImplModularLoadManagerWrapper 实现了LoadManager接口。经过这一步骤,就可以确定 Pulsar 使用哪种方式做负载管理。

// 如果是LoadManager,创建之后直接初始化
if (loadManagerInstance instanceof LoadManager) {
                final LoadManager casted = (LoadManager) loadManagerInstance;
                casted.initialize(pulsar);
                return casted;
  					// 如果是 ModularLoadManager,则封装成ModularLoadManagerWrapper,然后初始化
            } else if (loadManagerInstance instanceof ModularLoadManager) {
                final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
                casted.initialize(pulsar);
                return casted;
            }

下文以默认的配置 ModularLoadManagerWrapper 来讲述。

2. Leader选举服务

LeaderElectionService 负责从broker中选举出leader, 初始化过程主要是给 LeaderElectionService 初始化 listener,listener负责ledger 选举之后的回调处理。

protected void startLeaderElectionService() {
        this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
                state -> { // listener
                   ...
                });
				// 启动 ledger 选举服务,会发起一次新的选举
        leaderElectionService.start();
    }
...
  public void start() {
  			// 选举是会携带自己的LedgerBroker信息,其中保存了webServiceUrl信息
        leaderElection.elect(localValue).join();
    }
...
  public LeaderElectionService(CoordinationService cs, String localWebServiceAddress,
            Consumer<LeaderElectionState> listener) {
        this.leaderElection = cs.getLeaderElection(LeaderBroker.class, ELECTION_ROOT, listener);
        this.localValue = new LeaderBroker(localWebServiceAddress);
    }

在 LeaderElectionService 过程中,会首先初始化一个 LeaderElection和 localValue,localValue中保存了当前节点的 webServiceUrl信息。

我们看下 LeaderElection 的初始化过程,这里会使用 CoordinationService,初始化之后的 LedgerElection 可以看做是一个 ledger选举的controller,用来完成ledger的选举。

  @Override
    public <T> LeaderElection<T> getLeaderElection(Class<T> clazz, String path,
            Consumer<LeaderElectionState> stateChangesListener) {

        return (LeaderElection<T>) leaderElections.computeIfAbsent(path,
                key -> new LeaderElectionImpl<T>(store, clazz, path, stateChangesListener, executor));
    }
...
  LeaderElectionImpl(MetadataStoreExtended store, Class<T> clazz, String path,
            Consumer<LeaderElectionState> stateChangesListener,
                       ScheduledExecutorService executor) {
        this.path = path; // /loadbalance/leader
        this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
        this.store = store;  // local metadata store
        // cache ,基于metastore的类型,包含两个主要的内容,一个是store,一个是serDe用来序列化/反序列化读写store内容
        this.cache = store.getMetadataCache(clazz); 
        this.leaderElectionState = LeaderElectionState.NoLeader; // ledgerElection 初始状态
        this.internalState = InternalState.Init; // 
        this.stateChangesListener = stateChangesListener;
        this.executor = executor; // CoordinationServiceImpl 的线程池(单线程)

        store.registerListener(this::handlePathNotification);  //  /loadbalance/leader 路径listener
        store.registerSessionListener(this::handleSessionNotification);
    }

选举过程:

  • local zk 查找 /loadbalance/leader 路径下的内容,并添加watcher,用来监控节点是否被删除,如果删除则发起新的一轮选举

  • 如果 /loadbalance/leader 下没有内容,则当前broker为leadger,在节点写入当前broker的serviceUrl信息,然后回调listener的 brokerIsTheLeaderNow 方法,内容如下:

    [zk: localhost:2181(CONNECTED) 13] get /test/loadbalance/leader
    {"serviceUrl":"http://100.76.40.78:8080","leaderReady":false}
    
  • 如果/loadbalance/leader 有内容,则节点保存的信息为当前ledger信息,然后回调listener的brokerIsAFollowerNow方法

另外,LeaderElectionService 停止时,如果是leadger,会将/loadbalance/leader 的内容删除。

LeaderElectionService的逻辑比较简单清晰,下面看一下两个回调的处理,回调的处理逻辑在初始化LeaderElectionservide时指定

state -> {					// 如果是 ledger
                    if (state == LeaderElectionState.Leading) {
                        LOG.info("This broker was elected leader");
                        if (getConfiguration().isLoadBalancerEnabled()) {
                            long loadSheddingInterval = TimeUnit.MINUTES
                                    .toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
                            long resourceQuotaUpdateInterval = TimeUnit.MINUTES
                                    .toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());

                            if (loadSheddingTask != null) {
                                loadSheddingTask.cancel(false);
                            }
                            if (loadResourceQuotaTask != null) {
                                loadResourceQuotaTask.cancel(false);
                            }
                            loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
                                    new LoadSheddingTask(loadManager),
                                    loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
                            loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
                                    new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
                                    resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
                        }
                    } else { // 如果是 follower
                        if (leaderElectionService != null) {
                            LOG.info("This broker is a follower. Current leader is {}",
                                    leaderElectionService.getCurrentLeader());
                        }
                        if (loadSheddingTask != null) {
                            loadSheddingTask.cancel(false);
                            loadSheddingTask = null;
                        }
                        if (loadResourceQuotaTask != null) {
                            loadResourceQuotaTask.cancel(false);
                            loadResourceQuotaTask = null;
                        }
                    }
                });

2.1 如果是 leader

首先判断,是否开启负载均衡配置,如果开启的话,则开启两个任务,loadSheddingTask 和 loadResourceQuotaUpdaterTaks。

 loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
                                    new LoadSheddingTask(loadManager),
                                    loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
 loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
                                    new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
                                    resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);

2.1.1 loadSheddingTask

负责broker节点负载的卸载工作,默认check周期是1分钟,有两种例外的情况

  • 如果当前只有一个broker,loadShedding任务不会执行
  • 为了防止频繁的 shed,会有一个等待时间(默认是30分钟),在这个时间内已经被 shed 的 bundle 不会再次被shed,结果在 recentlyUnloadedBundles

根据所有的负载信息和 loadSheddingStrategy 选出需要卸载的 bundle 列表,卸载策略主要有两种,OverloadShedder 和 ThresholdShedder。

  • OverloadShedder :一个基于当前资源使用率峰值进行shedding的方案,超过threshold的broker会执行shedding操作
  • ThresholdShedder:一个基于多种资源权重计算资源使用率,并且计算过程中会考虑历史值,然后综合考虑所有broker的负载均值,从负载超出均值(加一个上浮指标)的broker 算出 bundle 卸载到负载低的broker上,一次可以shedding多个bundle
2.1.1.1 OverloadShedder

leadger 可以获取到所有broker的负载信息,我们称之为 LoadData,其中包含了 BrokerData (broker负载信息), BundleData (bundle负载信息)

  1. 获取卸载的threshold,loadBalancerBrokerOverloadedThresholdPercentage 默认是85
  2. 遍历所有 broker 的负载信息,首先计算broker的最大资源使用率,取值为 cpu、内存、堆外内存、出流量、入流量 最大值
  3. 如果没有超过threshold,不执行卸载,超过的执行以下逻辑进行卸载
  4. 计算需要卸载的负载percentOfTrafficToOffload,currentUsage - threshold + 0.05(多卸载的部分)
  5. 然后计算需要卸载的流量信息,当前broker出、入流量和 * percentOfTrafficToOffload,作为 minimumThroughputToOffload
  6. 如果broker上只有一个bundle,不执行卸载
  7. 遍历 broker上的bundle负载信息
    1. 根据broker负载信息中的bundle信息,过滤掉HeartBeat namespace对应的bundle
    2. 然后根据bundle负载的shortTearm计算出入流量和,并生成<bundle, 流量throughput>的映射关系
    3. 过滤掉 recentlyUnloadedBundles 中的bundle
    4. 过滤掉LoadData中BundleData 不包含的bundle
    5. 根据流量throughput进行排序(升序)
    6. 计算出需要卸载的bundle信息,即从第一个bundle 开始累加,直至大于minimumThroughputToOffload
2.1.1.2 ThresholdShedder

leader 可以获取到所有broker的负载信息,负载信息是从/loadbalance/brokers/brokerUrl路径获取的,每个broker都会上报一份自己的负载到对应的路径下,总体的负载数据我们称之为 LoadData,其中包含了 BrokerData (broker负载信息), BundleData (bundle负载信息)

  1. 首先是根据负载信息计算出所有broker的负载均值,计算平均负载会考虑历史负载的比重,默认是90%,均值就是 总负载/broker数
    1. 计算总负载totalUsage,遍历所有的 broker 负载信息
      1. 查找历史负载值 historyUsage,如果不存在则不考虑历史负载,直接使用当前负载
      2. 根据 broker 每项资源负载及其占比来计算使用率 currentUsage,资源维度包括cpu、内存、堆外内存、出流量、入流量,算出单项资源使用率 * 比重 最大的一项作为该 broker的资源使用,
      3. totalUsage = historyUsage * 0.9 + currentUsage * 0.1(hisotryUsage为0时例外)
    2. 在获取 totalUsage 之后,使用 totalUsage/broker 数量,得到平均负载 avgUsage
  2. 根据 broker的负载,计算出超出(负载均值 + 卸载Threshold,默认是10%)的broker,这些作为需要卸载的broker
  3. 对每个需要卸载流量的 broker, 当前 broker 流量 * (当前负载 - 负载均值 - 卸载Threshold + 额外卸载值(默认5%))为需要卸载的流量
  4. 如果broker上只有一个bundle,不执行卸载
  5. 如果需要卸载的流量小于卸载阈值,默认是10MB,则不执行卸载
  6. 根据流量遍历 broker上的 bundle 负载信息
    1. 根据 broker 负载信息中的 bundle 信息,过滤掉 HeartBeat namespace 对应的 bundle
    2. 然后根据 bundle 负载的 shortTearm 计算出入流量和,并生成 <bundle, 流量 throughput>的映射关系
    3. 过滤掉 recentlyUnloadedBundles 中的 bundle
    4. 过滤掉 LoadData 中 BundleData 不包含的 bundle
    5. 根据流量 throughput 进行排序(升序)
    6. 计算出需要卸载的 bundle 信息,即从第一个 bundle 开始累加,直到 sum 值超过了最少卸载流量
  7. 对于需要 unlaod 的 bundle,使用 admin 去调用 namespace unload 接口,并将 unload 的信息保存在 recentlyUnloadedBundles 中
算法问题
  1. 一次卸载达不到预期效果

假设 总量是 10GB,当前是5GB,均值是1GB,即当前负载是50%,均值负载是 10%, threashold 是 10%,additional 是 5%

触发卸载的负载是20%

第一次需要卸载的流量是 5GB*(50-10-10+5)=1.75GB,下载以后剩余3.25GB,假设卸载之后的负载均值是1.1GB

那么当前broker负载,是32.5% - 11% - 10 % = 11.5%,依旧需要卸载

除了负载下载任务之外,还会启动一个loadResourceQuotaTask,默认执行周期为15分钟

2.1.2 loadResourceQuotaTask

loadResourceQuotaTask的是作为leader,将所有bundle的负载信息写到zk

  • loadResourceQuotaTask : 负责更新负载数据,LoadManager#writeResourceQuotasToZooKeeper,会将所有broker信息和bundle信息汇总,然后写到zk

    • 第一步是更新负载信息,Map<String, BundleData> bundleData,过程和初始化时根据zk更新本地所有broker的负载过程一样

      • 遍历所有的broker负载数据
        • 遍历broker的bundle信息
          • 如果在bundleData中存在这个bundle,则更新其信息
          • 如果不存在,这从zk(/loadbalance/bundle-data/public/default/0xa0000000_0xb0000000/loadbalance/resource-quota/namespace)获取最新的数据(如果zk没有则生成一个默认的数据)
        • 遍历broker的PreallocatedBundleData信息
          • 如果在bundleData中存在,则从broker的预分配信息中删除这个bundle,另外也要从loadmanager的preallocatedBundleToBroker删除这个bundle
        • 更新broker的TimeAverageData信息
        • 更新loadmanager的brokerToNamespaceToBundleRange 信息
    • 第二步将负载信息写到zk

      • 将Bundle信息写到zk,路径是/loadbalance/bundle-data/bundle ,如下:

        [zk: localhost:2181(CONNECTED) 6] get /mayo_test/loadbalance/bundle-data/public/default/0xa0000000_0xb0000000
        {"shortTermData":{"maxSamples":10,"numSamples":0,"msgThroughputIn":50000.0,"msgThroughputOut":50000.0,"msgRateIn":50.0,"msgRateOut":50.0},"longTermData":{"maxSamples":1000,"numSamples":0,"msgThroughputIn":50000.0,"msgThroughputOut":50000.0,"msgRateIn":50.0,"msgRateOut":50.0},"topics":0}
        
      • 将Broker信息写到zk,路径是/loadbalance/broker-time-average/broker,如下:

        [zk: localhost:2181(CONNECTED) 10] get /mayo_test/loadbalance/broker-time-average/100.76.40.78:8080
        {
        "shortTermMsgThroughputIn":2.535843746704023E-5,
        "shortTermMsgThroughputOut":2.543994721421319E-5,
        "shortTermMsgRateIn":4.610624994007313E-7,
        "shortTermMsgRateOut":4.625444948038767E-7, 
        "longTermMsgThroughputIn":0.020691195402680305,
        "longTermMsgThroughputOut":0.020757703211301066,
        "longTermMsgRateIn":3.7620355277600577E-4,
        "longTermMsgRateOut":3.7741278566001945E-4
        }
        
  • brokerIsTheLeaderNow TheLeaderNow :停止 loadSheddingTaskloadResourceQuotaTask

到这一步,leader选举和对应操作完成,可以看到如果broker被选为leader,会负责更新bundle信息,并且根据 loadshedding 的策略来选择需要shed的bundle信息。

2.2 如果是follower

会 cancle loadSheddingTask 和 loadResourceQuotaTask。


前面的描述中会用到每个broker的负载信息,这些信息是怎么保存的呢?

3. 启动负载管理服务

在 PulsarService 启动之时,会 startLoadManagementService,主要是两个内容,启动 loadManager和 loadReportTask,这里的loadReportTask 的任务就是将本地的负载信息上报到zk。

protected void startLoadManagementService() throws PulsarServerException {
        LOG.info("Starting load management service ...");
  			// 启动 loadManager 服务,
        this.loadManager.get().start();

        if (config.isLoadBalancerEnabled()) {
            LOG.info("Starting load balancer");
            if (this.loadReportTask == null) {
                long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
                // 启动负载上报工作,负载上报工作每 5s 执行一次
                this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(
                        new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval,
                        TimeUnit.MILLISECONDS);
            }
        }
    }

3.1 启动 loadManager

这里的 loadManager 是 ModularLoadManagerWrapper,内部是调用 ModularLoadManagerImpl的start方法,每个broker 都会有一个 loadmanager 来进行负载管理。

// 获取 protocol handler 信息
Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();
// 初始化 LocalBrokerData,包含 webServiceUrl,brokerUrl,advertisedListner信息等,作为 lastData
lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
        pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
// 把 protocolData 信息保存在LocalBrokerData中
lastData.setProtocols(protocolData);
// 配置是否支持topic模式,持久化或者非持久化
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
// lastData 保存了最新的broke让信息,包括 webServiceUrl/brokerUrl/advertisedListner/protocalHandle/isEnablePersistentTopics/isEnableNonPersistentTopics 等信息

// 重新初始化一个LocalBrokerData,作为local值,这里
localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
        pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners
localData.setProtocols(protocolData);
// 相比于lastData值,localData中会增加                                
localData.setBrokerVersionString(pulsar.getBrokerVersion());
// 配置是否支持topic模式,持久化或者非持久化
localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
// lookup 服务地址                                
String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
        + (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get()
                : conf.getWebServicePortTls().get());
                                
// broker 负载对应的 zk 路径,/loadbalance/brokers/${brokerUrl}                   
brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
// 保存一段时间平均负载值的zk路径,/loadbalance/broker-time-average/lookupServiceAddress                            
final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
                                
// 获取本地 broker 的一些负载信息,并更新到 localData 中                           
updateLocalBrokerData();
// 将 localData 更新到zk                                
brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join();
// 将 timeAverage 负载信息更新到zk,此时的 timeAverage 信息没有内容                              
timeAverageBrokerDataCache.readModifyUpdateOrCreate(timeAverageZPath,
        __ -> new TimeAverageBrokerData()).join();
// 更新所有                                
updateAll();
// 最后上报的时间                                
lastBundleDataUpdate = System.currentTimeMillis();

可以看到,loadmanager启动之后的最要工作就是把本地负载信息上报到zk,下面分步骤来详细看下

3.1.1 updateLocalBrokerData

3.1.1.1 获取资源使用率

机器资源使用率包括CPU/内存/网卡出入流量三个部分,以Linux系统为例。在 LoadManager 初始化时为操作系统指定一个 LinuxBrokerHostUsageImpl 对象,这个对象负责计算 broker 节点的资源使用率。

获取本地硬件资源使用率 systemResourceUsage ,硬件资源的获取是通过 LinuxBrokerHostUsageImpl(默认1分钟会运行一次)来实现的,会定期的获取网卡、内存、cpu资源使用情况;

 public void calculateBrokerHostUsage() {
   // 计算网卡使用率
        List<String> nics = getNics();
        double totalNicLimit = getTotalNicLimitKbps(nics);
        double totalNicUsageTx = getTotalNicUsageTxKb(nics);
        double totalNicUsageRx = getTotalNicUsageRxKb(nics);
        double totalCpuLimit = getTotalCpuLimit();

        long now = System.currentTimeMillis();
        double elapsedSeconds = (now - lastCollection) / 1000d;
        if (elapsedSeconds <= 0) {
            log.warn("elapsedSeconds {} is not expected, skip this round of calculateBrokerHostUsage", elapsedSeconds);
            return;
        }

        SystemResourceUsage usage = new SystemResourceUsage();
   // 计算CPU使用率
        double cpuUsage = getTotalCpuUsage(elapsedSeconds);

        if (lastCollection == 0L) {
            usage.setMemory(getMemUsage());
            usage.setBandwidthIn(new ResourceUsage(0d, totalNicLimit));
            usage.setBandwidthOut(new ResourceUsage(0d, totalNicLimit));
        } else {
            double nicUsageTx = (totalNicUsageTx - lastTotalNicUsageTx) / elapsedSeconds;
            double nicUsageRx = (totalNicUsageRx - lastTotalNicUsageRx) / elapsedSeconds;

            usage.setMemory(getMemUsage());
            usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit));
            usage.setBandwidthOut(new ResourceUsage(nicUsageTx, totalNicLimit));
        }

        lastTotalNicUsageTx = totalNicUsageTx;
        lastTotalNicUsageRx = totalNicUsageRx;
        lastCollection = System.currentTimeMillis();
        this.usage = usage;
        usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
    }
3.1.1.2 获取 bundle 信息

然后,从 pulsarStats 中获取所有 bundle 的状态信息 bundleStats,

public double msgRateIn;
public double msgThroughputIn;
public double msgRateOut;
public double msgThroughputOut;
public int consumerCount;
public int producerCount;
public long topics;
public long cacheSize;
3.1.1.3 更新资源使用和bundle信息

在获取了 机器资源使用情况 和 bundle 负载信息之后,会把 systemResourceUsage和 bundleStats 更新到 localData 中,并且会把负载信息记录在指标中。

对于bundle信息,还会和上一次的bundle信息对比,记录当前获取了哪些bundle的ownership以及丢失了哪些bundle的ownership,另外使用最新的bundle负载来计算broker最新的 msgThroughputIn/out,msgRateIn/out,numTopics,numBundles,numConsumers,numProducers等指标信息。

3.1.2 更新 localData 到zk

将统计到的loacalData负载信息上报到zk,这个过程会在/loadbalance/brokers/lookupServiceAddress 路径下,写入当前broker 的localData信息,localData包含了broker的url、advertisedListener等基本信,系统资源使用信息和bundle的状态信息。

注意: 数据类型为 LocalBrokerData

3.1.3 更细 timeAverage 负载信息

在路径 /loadbalance/broker-time-average/lookupServiceAddress 更新一个默认的 全新的平均负载信息

经过以上步骤,可以看到 broker 计算出了 最新的资源使用率,bundle信息,并且将这些信息保存到了zk。

注意:数据类型为TimeAverageBrokerData

3.1.4 Update All

这个过程会更新本地全量负载信息的broker以及bundle信息,这一步骤是为了更具最新的集群负载信息做负载均衡的决策

public void updateAll() {
    if (log.isDebugEnabled()) {
        log.debug("Updating broker and bundle data for loadreport");
    }
    updateAllBrokerData();
    updateBundleData();
    // broker has latest load-report: check if any bundle requires split
    checkNamespaceBundleSplit();
}

public class LoadData {
    /**
     * Map from broker names to their available data.
     */
    private final Map<String, BrokerData> brokerData;

    /**
     * Map from bundle names to their time-sensitive aggregated data.
     */
    private final Map<String, BundleData> bundleData;

    /**
     * Map from recently unloaded bundles to the timestamp of when they were last loaded.
     */
    private final Map<String, Long> recentlyUnloadedBundles;

broker 本地有个LoadData对象,其中保存了每个broker的负载信息,bundle的信息,以及最近unlaod的bundle信息(避免频繁卸载)

3.1.4.1 更新 brokerData (Map<String, BrokerData> brokerData)

更新 broker 数据时,首先从 /loadbalance/brokers 路径下获取 activeBroker 列表,然后对于每个active 的broker,从 /loadbalance/brokers/lookupServiceAddress 路径下获取每个broker上报的负载信息

  • 如果不存在则从本地的全量负载信息LoadData 中删除对应的broker
  • 如果 本地 全量负载信息中包含了这个broker的信息,则使用新获取的负载来更新本地信息(LocalBrokerData);不包含的话,则创建一个新的 LocalBrokerData,保存在全量负载信息中
  • 最后做一次过滤,对于每个在全量负载中出现的broker,判断是否在activeBroker列表中,如果没有则从全量负载信息中删除
3.1.4.2 更新 bundleData(Map<String, BundleData> bundleData)

经过上一步,我们已经获取了每个broker的最新负载信息,然后可以获取每个broker上的bundle负载信息,使用这些信息去更新LoadData的bundleData信息。。

更新 Bundle 信息,在全量负载信息中,除了brokerData的信息之外,还有bundle的信息bundleData ,这里处理的内容是将上一步中获取的 全量的 brokerData 信息更新到 bundle信息 bundleData 中。

Bundle数据更新

首先是遍历 brokerData 中的所有的bundle信息,如果 bundleData 已经存在,则使用最新的bundle信息来更新 bundleData,

if (bundleData.containsKey(bundle)) {
        // If we recognize the bundle, add these stats as a new sample.
        bundleData.get(bundle).update(stats);
}
....
public void update(final NamespaceBundleStats newSample) {
        shortTermData.update(newSample);
        longTermData.update(newSample);
        this.topics = (int) newSample.topics;
    }

首先是更新 shortTermData 和 longShortTermData,操作类似,都是求均值的过程,

 private double getUpdatedValue(final double oldAverage, final double newSample) {
        return ((numSamples - 1) * oldAverage + newSample) / numSamples;
    }

计算方法是使用 (历史值*采样数 + 新值 * 1) / 总的采样数,更新之后会得到最新的 msgThroughputIn、msgThroughputOut、msgRateIn、msgRateOut指标。

然后更新 bundle 的 topics (数量)信息。

如果 bundleData 不存在,首先从 zk 上查找 bundle data,如果找不到,则使用 bundle 的最新信息最为首次采样值:

  • 首先从 /loadbalance/bundle-data/tenant/namespace/bundle 下获取负载信息,存在则返回;
  • 否则,从 /loadbalance/resource-quota/namespace/tenant/namespace/bundle下获取 ResourceQuota 信息,如果存在,则使用其中的shortTerm和longTerm信息来初始化一个新的bundledata返回
  • 否则,初始化一个全新的bundledata返回
  • 获取到 bundleData 之后,使用最新的 bundle 信息来更新这个 bundleData,并且保存在LoadData的bundleData map中

preallocatedBundle清理 (todo: preallocate的逻辑)

首先获取 broker 上报的 PreallocatedBundleData信息,遍历素有 Preallocated的bunlde,如果

  • 在 broker 上报信息中已经存在,并且在 全量信息的 bundle 信息中也存在,则从上报broker信息的 getPreallocatedBundleData 和 preallocatedBundleToBroker 两个map中删除bundle信息

然后,根据前面获得的最新的bundle 信息更新 loadData 中brokerData.timeAverageData;

最后更新,<Broker, <NameSpace,<Set>>>,即记录 broker上有哪些Namespace,以及namespace 有哪些namespace bundle。

如果开启了自动spit,则会查找需要split的bundle执行split操作,split执行之后,会把响应的负载信息从loadData中删除掉。

注意:每个broker启动时,都会有UpdateAll的操作,即在启动之后,所有broker都会有一个全局的负载信息,这个信息反映了broker启动时的集群负载情况。

3.2 启动 loadReport任务

如果开启了 loadbalance 功能,broker还会启动一个 LoadReportUpdaterTask 任务,默认是5s执行一次。

任务的主要内容是,首先 更新本地的系统资源使用信息和bundle状态信息,然后将信息更新到zk上,这里来详细看下需要上报的内容,首先上报的内容是 LocalBrokerData 类型的对象,其中包含了

  1. 基本的 webServiceUrl、pulsarServiceUrl、topic模式(persistent、non-persistent topic)
  2. 系统物理资源使用情况:cpu、内存、堆外内存、网卡入、网卡出
  3. 实时指标,msgThroughputIn、msgThroughputOut、msgRateIn、msgRateOut
  4. 统计信息:numTopics、numBundles、numConsumers、numProducers

1的内容在初始化是就已经从配置中获取,2的内容会通过系统资源使用计算得到,3和4的内容则从 broker 的当前实时指标中的bundlestats 中计算获取。可以看到这个过程和LoadManager启动时的计算本机负载的逻辑是一样的。

注意:不是每个周期都会上报负载信息,而是会有 needBrokerDataUpdate 的判断,判断负载变化是否超过阈值,如果超过才会上报,否则不会上报到zk。

3.3 负载更新

现在还剩余一个问题,就是 loadReport 上报的信息是如何更新到leader,并且被leader用来做负载决策的?

其实,在 LoadManager 初始化时,就初始化了zk的一个nitofication类,

        pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);

这个Nitification会在负载信息变化时被nofify

public void handleDataNotification(Notification t) {
        if (t.getPath().startsWith(LoadManager.LOADBALANCE_BROKERS_ROOT)) {
            brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT)
                    .thenAccept(brokers -> {
                        reapDeadBrokerPreallocations(brokers);
                    });

            try {
                scheduler.submit(ModularLoadManagerImpl.this::updateAll);
            } catch (RejectedExecutionException e) {
                // Executor is shutting down
            }
        }
    }

/loadbalance/brokers 下有任何改动,都会触发回调,首先处理broker下线的情况

3.3.1 broker下线

判断 active broker中是否还存在这个broker信息,不存在则执行相应的清理操作

3.3.2 负载更新

public void updateAll() {
    if (log.isDebugEnabled()) {
        log.debug("Updating broker and bundle data for loadreport");
    }
    updateAllBrokerData();
    updateBundleData();
    // broker has latest load-report: check if any bundle requires split
    checkNamespaceBundleSplit();
}

和前文的update一样,经过更新,broker就获取了最新的负载信息,可以据此来执行负载策略。

3.4 小结

综上所述,broker 在启动时,会启动 loadReport 的任务,用于上报当前broker的资源使用情况和bundlestats信息,;并且会发起一个选举,选举会产生一个leader,这个leader 会启动两个任务,

loadSheddingTask 和 loadResourceQuotaTask,

  • lodSheddingTask会选出需要卸载负载的bundle,然后调用admin接口进行卸载;
  • loadResourceQuotaTask 则负责根据broker上报的bundle 信息跟新leader上的bundle 信息,并且写入到zk /mayo_test/loadbalance/bundle-data/public/default/0xa0000000_0xb0000000 和 /mayo_test/loadbalance/broker-time-average/100.76.40.78:8080 路径下。

4. 如何分配 bundle 给 broker

在 客户端执行 lookup 时,实际上并不是分配的单个topic,而是分配的一个bundle,这样可以显著降低元数据的数量。

4.1 LookUp 命令

比如在执行 lookup 命令时,

CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
                .thenCompose(bundle -> findBrokerServiceUrl(bundle, options));

这里主要执行两个操作,第一步是 获取 topic 所在的 bundle,第二步是查找 bundle 所在的 broker。

4.1.1获取 topic 所在的 bundle

查找 topic 所在 bundle 也分为两步,第一步是获取 namespace 的 bundle 信息,第二步是计算得到 topic 所在的 bundle

4.1.1.1 查找 namespace 的 bundle

首先 从 local-policies 中获取,/admin/local-policies/tenant/namespace 路径的内容,格式如下:

[zk: localhost:2181(CONNECTED) 18] get /bookie_ps_test/admin/local-policies/test/test
{"bundles":{"boundaries":["0x00000000","0x40000000","0x80000000","0xc0000000","0xffffffff"],"numBundles":4}}

如果找不到的话,则从 policies 去获取 /admin/policies/tenant/namespace 的内容,这里包含了namespace的全部policies信息,格式如下:

[zk: localhost:2181(CONNECTED) 19] get /pulsar_ps_test/admin/policies/test/test
{"auth_policies":{"namespace_auth":{},"destination_auth":{},"subscription_auth_roles":{}},"replication_clusters":["ps_test"],"bundles":{"boundaries":["0x00000000","0x40000000","0x80000000","0xc0000000","0xffffffff"],"numBundles":4},"backlog_quota_map":{"destination_storage":{"limit":-1073741824,"policy":"producer_request_hold"}},"clusterDispatchRate":{},"topicDispatchRate":{"ps_test":{"dispatchThrottlingRateInMsg":0,"dispatchThrottlingRateInByte":0,"relativeToPublishRate":false,"ratePeriodInSecond":1}},"subscriptionDispatchRate":{"ps_test":{"dispatchThrottlingRateInMsg":0,"dispatchThrottlingRateInByte":0,"relativeToPublishRate":false,"ratePeriodInSecond":1}},"replicatorDispatchRate":{},"clusterSubscribeRate":{"ps_test":{"subscribeThrottlingRatePerConsumer":0,"ratePeriodInSecond":30}},"persistence":{"bookkeeperEnsemble":3,"bookkeeperWriteQuorum":2,"bookkeeperAckQuorum":2,"managedLedgerMaxMarkDeleteRate":0.0},"publishMaxMessageRate":{},"latency_stats_sample_rate":{},"message_ttl_in_seconds":36000,"deleted":false,"encryption_required":false,"subscription_auth_mode":"None","max_producers_per_topic":0,"max_consumers_per_topic":0,"max_consumers_per_subscription":0,"max_unacked_messages_per_consumer":50000,"max_unacked_messages_per_subscription":200000,"compaction_threshold":0,"offload_threshold":-1,"schema_auto_update_compatibility_strategy":"Full","schema_compatibility_strategy":"UNDEFINED","is_allow_auto_update_schema":true,"schema_validation_enforced":false}

然后从这个policies中获取 bundles 信息,根据这个信息构造一个新的 LocalPolicies,并且更新到zk。完成之后,在bundlecache中也就具有了这个 bundle 信息。

得到所有的bundle 信息之后,根据 topic的hashCode,看topic落在哪个bundle 方位内,这个bundle 就是topic所在的bundle。

4.1.2 查找 bundle 所在的brokerUrl

  1. 首先检查 ownerShipCache 中有没有这个 bundle 信息,如果有直接返回;

  2. 没有则继续查找,这个查找的过程就是 bundle 的分配过程,

searchForCandidateBroker(bundle, future, options);
  • 先排除掉 heartbeat 和 sla namespace,我们只看正常的 namespace bundle 的查找,如果查找options中已经指定了当前broker为bundle的owner,直接返回当前broker的url信息;
  • 否则继续查找,首先找到当前leader,作为执行负载均衡决定的节点,如果当前节点是leader 或者当前 ledger 不是 active 状态,则在当前节点做分配,这时会为bundle查找负载最低的broker节点
    • 从 LoadData 的 bundle 信息找,查找
  • 如果当前节点不是leader节点,并且leader节点是active,则会重定向到 leader节点