初始化 PulsarService时,首先为负载均衡初始化一个单线程的线程池
this.loadManagerExecutor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
然后执行 pulsarService的启动,这个过程会初始化并且启动 LoadManager。
1. LoadManager 初始化
LoadManager 的初始化是通过反射完成的,LoadManager
的类型在conf中配置,默认是 ModularLoadManagerImpl
,有一点比较特别的地方需要注意:ModularLoadManagerImpl
并且不是一个LoadManager
的子类。LoadManager
的子类是SimpleLoadManagerImpl,而 ModularLoadManagerImpl
继承自 ModularLoadManager
, 通过 ModularLoadManagerWrapper
封装成 一个ModularLoadManagerImpl
, ModularLoadManagerWrapper
实现了LoadManager
接口。经过这一步骤,就可以确定 Pulsar 使用哪种方式做负载管理。
// 如果是LoadManager,创建之后直接初始化
if (loadManagerInstance instanceof LoadManager) {
final LoadManager casted = (LoadManager) loadManagerInstance;
casted.initialize(pulsar);
return casted;
// 如果是 ModularLoadManager,则封装成ModularLoadManagerWrapper,然后初始化
} else if (loadManagerInstance instanceof ModularLoadManager) {
final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
casted.initialize(pulsar);
return casted;
}
下文以默认的配置 ModularLoadManagerWrapper
来讲述。
2. Leader选举服务
LeaderElectionService
负责从broker中选举出leader, 初始化过程主要是给 LeaderElectionService
初始化 listener,listener负责ledger 选举之后的回调处理。
protected void startLeaderElectionService() {
this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
state -> { // listener
...
});
// 启动 ledger 选举服务,会发起一次新的选举
leaderElectionService.start();
}
...
public void start() {
// 选举是会携带自己的LedgerBroker信息,其中保存了webServiceUrl信息
leaderElection.elect(localValue).join();
}
...
public LeaderElectionService(CoordinationService cs, String localWebServiceAddress,
Consumer<LeaderElectionState> listener) {
this.leaderElection = cs.getLeaderElection(LeaderBroker.class, ELECTION_ROOT, listener);
this.localValue = new LeaderBroker(localWebServiceAddress);
}
在 LeaderElectionService 过程中,会首先初始化一个 LeaderElection和 localValue,localValue中保存了当前节点的 webServiceUrl信息。
我们看下 LeaderElection 的初始化过程,这里会使用 CoordinationService,初始化之后的 LedgerElection 可以看做是一个 ledger选举的controller,用来完成ledger的选举。
@Override
public <T> LeaderElection<T> getLeaderElection(Class<T> clazz, String path,
Consumer<LeaderElectionState> stateChangesListener) {
return (LeaderElection<T>) leaderElections.computeIfAbsent(path,
key -> new LeaderElectionImpl<T>(store, clazz, path, stateChangesListener, executor));
}
...
LeaderElectionImpl(MetadataStoreExtended store, Class<T> clazz, String path,
Consumer<LeaderElectionState> stateChangesListener,
ScheduledExecutorService executor) {
this.path = path; // /loadbalance/leader
this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
this.store = store; // local metadata store
// cache ,基于metastore的类型,包含两个主要的内容,一个是store,一个是serDe用来序列化/反序列化读写store内容
this.cache = store.getMetadataCache(clazz);
this.leaderElectionState = LeaderElectionState.NoLeader; // ledgerElection 初始状态
this.internalState = InternalState.Init; //
this.stateChangesListener = stateChangesListener;
this.executor = executor; // CoordinationServiceImpl 的线程池(单线程)
store.registerListener(this::handlePathNotification); // /loadbalance/leader 路径listener
store.registerSessionListener(this::handleSessionNotification);
}
选举过程:
-
local zk 查找
/loadbalance/leader
路径下的内容,并添加watcher,用来监控节点是否被删除,如果删除则发起新的一轮选举 -
如果
/loadbalance/leader
下没有内容,则当前broker为leadger,在节点写入当前broker的serviceUrl信息,然后回调listener的brokerIsTheLeaderNow
方法,内容如下:[zk: localhost:2181(CONNECTED) 13] get /test/loadbalance/leader {"serviceUrl":"http://100.76.40.78:8080","leaderReady":false}
-
如果
/loadbalance/leader
有内容,则节点保存的信息为当前ledger信息,然后回调listener的brokerIsAFollowerNow
方法
另外,LeaderElectionService
停止时,如果是leadger,会将/loadbalance/leader
的内容删除。
LeaderElectionService
的逻辑比较简单清晰,下面看一下两个回调的处理,回调的处理逻辑在初始化LeaderElectionservide时指定
state -> { // 如果是 ledger
if (state == LeaderElectionState.Leading) {
LOG.info("This broker was elected leader");
if (getConfiguration().isLoadBalancerEnabled()) {
long loadSheddingInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
}
loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadSheddingTask(loadManager),
loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
}
} else { // 如果是 follower
if (leaderElectionService != null) {
LOG.info("This broker is a follower. Current leader is {}",
leaderElectionService.getCurrentLeader());
}
if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
loadSheddingTask = null;
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
loadResourceQuotaTask = null;
}
}
});
2.1 如果是 leader
首先判断,是否开启负载均衡配置,如果开启的话,则开启两个任务,loadSheddingTask 和 loadResourceQuotaUpdaterTaks。
loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadSheddingTask(loadManager),
loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
2.1.1 loadSheddingTask
负责broker节点负载的卸载工作,默认check周期是1分钟,有两种例外的情况
- 如果当前只有一个broker,loadShedding任务不会执行
- 为了防止频繁的 shed,会有一个等待时间(默认是30分钟),在这个时间内已经被 shed 的 bundle 不会再次被shed,结果在
recentlyUnloadedBundles
根据所有的负载信息和 loadSheddingStrategy 选出需要卸载的 bundle 列表,卸载策略主要有两种,OverloadShedder 和 ThresholdShedder。
- OverloadShedder :一个基于当前资源使用率峰值进行shedding的方案,超过threshold的broker会执行shedding操作
- ThresholdShedder:一个基于多种资源权重计算资源使用率,并且计算过程中会考虑历史值,然后综合考虑所有broker的负载均值,从负载超出均值(加一个上浮指标)的broker 算出 bundle 卸载到负载低的broker上,一次可以shedding多个bundle
2.1.1.1 OverloadShedder
leadger 可以获取到所有broker的负载信息,我们称之为 LoadData,其中包含了 BrokerData (broker负载信息), BundleData (bundle负载信息)
- 获取卸载的threshold,loadBalancerBrokerOverloadedThresholdPercentage 默认是85
- 遍历所有 broker 的负载信息,首先计算broker的最大资源使用率,取值为 cpu、内存、堆外内存、出流量、入流量 最大值
- 如果没有超过threshold,不执行卸载,超过的执行以下逻辑进行卸载
- 计算需要卸载的负载percentOfTrafficToOffload,currentUsage - threshold + 0.05(多卸载的部分)
- 然后计算需要卸载的流量信息,当前broker出、入流量和 * percentOfTrafficToOffload,作为 minimumThroughputToOffload
- 如果broker上只有一个bundle,不执行卸载
- 遍历 broker上的bundle负载信息
- 根据broker负载信息中的bundle信息,过滤掉HeartBeat namespace对应的bundle
- 然后根据bundle负载的shortTearm计算出入流量和,并生成<bundle, 流量throughput>的映射关系
- 过滤掉 recentlyUnloadedBundles 中的bundle
- 过滤掉LoadData中BundleData 不包含的bundle
- 根据流量throughput进行排序(升序)
- 计算出需要卸载的bundle信息,即从第一个bundle 开始累加,直至大于minimumThroughputToOffload
2.1.1.2 ThresholdShedder
leader 可以获取到所有broker的负载信息,负载信息是从/loadbalance/brokers/brokerUrl
路径获取的,每个broker都会上报一份自己的负载到对应的路径下,总体的负载数据我们称之为 LoadData,其中包含了 BrokerData (broker负载信息), BundleData (bundle负载信息)
- 首先是根据负载信息计算出所有broker的负载均值,计算平均负载会考虑历史负载的比重,默认是90%,均值就是 总负载/broker数
- 计算总负载totalUsage,遍历所有的 broker 负载信息
- 查找历史负载值 historyUsage,如果不存在则不考虑历史负载,直接使用当前负载
- 根据 broker 每项资源负载及其占比来计算使用率 currentUsage,资源维度包括cpu、内存、堆外内存、出流量、入流量,算出单项资源使用率 * 比重 最大的一项作为该 broker的资源使用,
- totalUsage = historyUsage * 0.9 + currentUsage * 0.1(hisotryUsage为0时例外)
- 在获取 totalUsage 之后,使用 totalUsage/broker 数量,得到平均负载 avgUsage
- 计算总负载totalUsage,遍历所有的 broker 负载信息
- 根据 broker的负载,计算出超出(负载均值 + 卸载Threshold,默认是10%)的broker,这些作为需要卸载的broker
- 对每个需要卸载流量的 broker, 当前 broker 流量 * (当前负载 - 负载均值 - 卸载Threshold + 额外卸载值(默认5%))为需要卸载的流量
- 如果broker上只有一个bundle,不执行卸载
- 如果需要卸载的流量小于卸载阈值,默认是10MB,则不执行卸载
- 根据流量遍历 broker上的 bundle 负载信息
- 根据 broker 负载信息中的 bundle 信息,过滤掉 HeartBeat namespace 对应的 bundle
- 然后根据 bundle 负载的 shortTearm 计算出入流量和,并生成 <bundle, 流量 throughput>的映射关系
- 过滤掉 recentlyUnloadedBundles 中的 bundle
- 过滤掉 LoadData 中 BundleData 不包含的 bundle
- 根据流量 throughput 进行排序(升序)
- 计算出需要卸载的 bundle 信息,即从第一个 bundle 开始累加,直到 sum 值超过了最少卸载流量
- 对于需要 unlaod 的 bundle,使用 admin 去调用 namespace unload 接口,并将 unload 的信息保存在 recentlyUnloadedBundles 中
算法问题
- 一次卸载达不到预期效果
假设 总量是 10GB,当前是5GB,均值是1GB,即当前负载是50%,均值负载是 10%, threashold 是 10%,additional 是 5%
触发卸载的负载是20%
第一次需要卸载的流量是 5GB*(50-10-10+5)=1.75GB,下载以后剩余3.25GB,假设卸载之后的负载均值是1.1GB
那么当前broker负载,是32.5% - 11% - 10 % = 11.5%,依旧需要卸载
除了负载下载任务之外,还会启动一个loadResourceQuotaTask,默认执行周期为15分钟
2.1.2 loadResourceQuotaTask
loadResourceQuotaTask的是作为leader,将所有bundle的负载信息写到zk
-
loadResourceQuotaTask : 负责更新负载数据,
LoadManager#writeResourceQuotasToZooKeeper
,会将所有broker信息和bundle信息汇总,然后写到zk-
第一步是更新负载信息,
Map<String, BundleData> bundleData
,过程和初始化时根据zk更新本地所有broker的负载过程一样- 遍历所有的broker负载数据
- 遍历broker的bundle信息
- 如果在
bundleData
中存在这个bundle,则更新其信息 - 如果不存在,这从zk(
/loadbalance/bundle-data/public/default/0xa0000000_0xb0000000
和/loadbalance/resource-quota/namespace
)获取最新的数据(如果zk没有则生成一个默认的数据)
- 如果在
- 遍历broker的PreallocatedBundleData信息
- 如果在bundleData中存在,则从broker的预分配信息中删除这个bundle,另外也要从loadmanager的
preallocatedBundleToBroker
删除这个bundle
- 如果在bundleData中存在,则从broker的预分配信息中删除这个bundle,另外也要从loadmanager的
- 更新broker的
TimeAverageData
信息 - 更新loadmanager的
brokerToNamespaceToBundleRange
信息
- 遍历broker的bundle信息
- 遍历所有的broker负载数据
-
第二步将负载信息写到zk
-
将Bundle信息写到zk,路径是
/loadbalance/bundle-data/bundle
,如下:[zk: localhost:2181(CONNECTED) 6] get /mayo_test/loadbalance/bundle-data/public/default/0xa0000000_0xb0000000 {"shortTermData":{"maxSamples":10,"numSamples":0,"msgThroughputIn":50000.0,"msgThroughputOut":50000.0,"msgRateIn":50.0,"msgRateOut":50.0},"longTermData":{"maxSamples":1000,"numSamples":0,"msgThroughputIn":50000.0,"msgThroughputOut":50000.0,"msgRateIn":50.0,"msgRateOut":50.0},"topics":0}
-
将Broker信息写到zk,路径是
/loadbalance/broker-time-average/broker
,如下:[zk: localhost:2181(CONNECTED) 10] get /mayo_test/loadbalance/broker-time-average/100.76.40.78:8080 { "shortTermMsgThroughputIn":2.535843746704023E-5, "shortTermMsgThroughputOut":2.543994721421319E-5, "shortTermMsgRateIn":4.610624994007313E-7, "shortTermMsgRateOut":4.625444948038767E-7, "longTermMsgThroughputIn":0.020691195402680305, "longTermMsgThroughputOut":0.020757703211301066, "longTermMsgRateIn":3.7620355277600577E-4, "longTermMsgRateOut":3.7741278566001945E-4 }
-
-
-
brokerIsTheLeaderNow
TheLeaderNow :停止loadSheddingTask
和loadResourceQuotaTask
到这一步,leader选举和对应操作完成,可以看到如果broker被选为leader,会负责更新bundle信息,并且根据 loadshedding 的策略来选择需要shed的bundle信息。
2.2 如果是follower
会 cancle loadSheddingTask 和 loadResourceQuotaTask。
前面的描述中会用到每个broker的负载信息,这些信息是怎么保存的呢?
3. 启动负载管理服务
在 PulsarService 启动之时,会 startLoadManagementService,主要是两个内容,启动 loadManager和 loadReportTask,这里的loadReportTask 的任务就是将本地的负载信息上报到zk。
protected void startLoadManagementService() throws PulsarServerException {
LOG.info("Starting load management service ...");
// 启动 loadManager 服务,
this.loadManager.get().start();
if (config.isLoadBalancerEnabled()) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
// 启动负载上报工作,负载上报工作每 5s 执行一次
this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(
new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval,
TimeUnit.MILLISECONDS);
}
}
}
3.1 启动 loadManager
这里的 loadManager 是 ModularLoadManagerWrapper,内部是调用 ModularLoadManagerImpl的start方法,每个broker 都会有一个 loadmanager 来进行负载管理。
// 获取 protocol handler 信息
Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();
// 初始化 LocalBrokerData,包含 webServiceUrl,brokerUrl,advertisedListner信息等,作为 lastData
lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
// 把 protocolData 信息保存在LocalBrokerData中
lastData.setProtocols(protocolData);
// 配置是否支持topic模式,持久化或者非持久化
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
// lastData 保存了最新的broke让信息,包括 webServiceUrl/brokerUrl/advertisedListner/protocalHandle/isEnablePersistentTopics/isEnableNonPersistentTopics 等信息
// 重新初始化一个LocalBrokerData,作为local值,这里
localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners
localData.setProtocols(protocolData);
// 相比于lastData值,localData中会增加
localData.setBrokerVersionString(pulsar.getBrokerVersion());
// 配置是否支持topic模式,持久化或者非持久化
localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
// lookup 服务地址
String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+ (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get()
: conf.getWebServicePortTls().get());
// broker 负载对应的 zk 路径,/loadbalance/brokers/${brokerUrl}
brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
// 保存一段时间平均负载值的zk路径,/loadbalance/broker-time-average/lookupServiceAddress
final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
// 获取本地 broker 的一些负载信息,并更新到 localData 中
updateLocalBrokerData();
// 将 localData 更新到zk
brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join();
// 将 timeAverage 负载信息更新到zk,此时的 timeAverage 信息没有内容
timeAverageBrokerDataCache.readModifyUpdateOrCreate(timeAverageZPath,
__ -> new TimeAverageBrokerData()).join();
// 更新所有
updateAll();
// 最后上报的时间
lastBundleDataUpdate = System.currentTimeMillis();
可以看到,loadmanager启动之后的最要工作就是把本地负载信息上报到zk,下面分步骤来详细看下
3.1.1 updateLocalBrokerData
3.1.1.1 获取资源使用率
机器资源使用率包括CPU/内存/网卡出入流量三个部分,以Linux系统为例。在 LoadManager 初始化时为操作系统指定一个 LinuxBrokerHostUsageImpl 对象,这个对象负责计算 broker 节点的资源使用率。
获取本地硬件资源使用率 systemResourceUsage
,硬件资源的获取是通过 LinuxBrokerHostUsageImpl(默认1分钟会运行一次)来实现的,会定期的获取网卡、内存、cpu资源使用情况;
public void calculateBrokerHostUsage() {
// 计算网卡使用率
List<String> nics = getNics();
double totalNicLimit = getTotalNicLimitKbps(nics);
double totalNicUsageTx = getTotalNicUsageTxKb(nics);
double totalNicUsageRx = getTotalNicUsageRxKb(nics);
double totalCpuLimit = getTotalCpuLimit();
long now = System.currentTimeMillis();
double elapsedSeconds = (now - lastCollection) / 1000d;
if (elapsedSeconds <= 0) {
log.warn("elapsedSeconds {} is not expected, skip this round of calculateBrokerHostUsage", elapsedSeconds);
return;
}
SystemResourceUsage usage = new SystemResourceUsage();
// 计算CPU使用率
double cpuUsage = getTotalCpuUsage(elapsedSeconds);
if (lastCollection == 0L) {
usage.setMemory(getMemUsage());
usage.setBandwidthIn(new ResourceUsage(0d, totalNicLimit));
usage.setBandwidthOut(new ResourceUsage(0d, totalNicLimit));
} else {
double nicUsageTx = (totalNicUsageTx - lastTotalNicUsageTx) / elapsedSeconds;
double nicUsageRx = (totalNicUsageRx - lastTotalNicUsageRx) / elapsedSeconds;
usage.setMemory(getMemUsage());
usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit));
usage.setBandwidthOut(new ResourceUsage(nicUsageTx, totalNicLimit));
}
lastTotalNicUsageTx = totalNicUsageTx;
lastTotalNicUsageRx = totalNicUsageRx;
lastCollection = System.currentTimeMillis();
this.usage = usage;
usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
}
3.1.1.2 获取 bundle 信息
然后,从 pulsarStats 中获取所有 bundle 的状态信息 bundleStats,
public double msgRateIn;
public double msgThroughputIn;
public double msgRateOut;
public double msgThroughputOut;
public int consumerCount;
public int producerCount;
public long topics;
public long cacheSize;
3.1.1.3 更新资源使用和bundle信息
在获取了 机器资源使用情况 和 bundle 负载信息之后,会把 systemResourceUsage和 bundleStats 更新到 localData 中,并且会把负载信息记录在指标中。
对于bundle信息,还会和上一次的bundle信息对比,记录当前获取了哪些bundle的ownership以及丢失了哪些bundle的ownership,另外使用最新的bundle负载来计算broker最新的 msgThroughputIn/out,msgRateIn/out,numTopics,numBundles,numConsumers,numProducers等指标信息。
3.1.2 更新 localData 到zk
将统计到的loacalData负载信息上报到zk,这个过程会在/loadbalance/brokers/lookupServiceAddress
路径下,写入当前broker 的localData信息,localData包含了broker的url、advertisedListener等基本信,系统资源使用信息和bundle的状态信息。
注意: 数据类型为 LocalBrokerData
3.1.3 更细 timeAverage 负载信息
在路径 /loadbalance/broker-time-average/lookupServiceAddress
更新一个默认的 全新的平均负载信息
经过以上步骤,可以看到 broker 计算出了 最新的资源使用率,bundle信息,并且将这些信息保存到了zk。
注意:数据类型为TimeAverageBrokerData
3.1.4 Update All
这个过程会更新本地全量负载信息的broker以及bundle信息,这一步骤是为了更具最新的集群负载信息做负载均衡的决策
public void updateAll() {
if (log.isDebugEnabled()) {
log.debug("Updating broker and bundle data for loadreport");
}
updateAllBrokerData();
updateBundleData();
// broker has latest load-report: check if any bundle requires split
checkNamespaceBundleSplit();
}
public class LoadData {
/**
* Map from broker names to their available data.
*/
private final Map<String, BrokerData> brokerData;
/**
* Map from bundle names to their time-sensitive aggregated data.
*/
private final Map<String, BundleData> bundleData;
/**
* Map from recently unloaded bundles to the timestamp of when they were last loaded.
*/
private final Map<String, Long> recentlyUnloadedBundles;
broker 本地有个LoadData对象,其中保存了每个broker的负载信息,bundle的信息,以及最近unlaod的bundle信息(避免频繁卸载)
3.1.4.1 更新 brokerData (Map<String, BrokerData> brokerData)
更新 broker 数据时,首先从 /loadbalance/brokers
路径下获取 activeBroker 列表,然后对于每个active 的broker,从 /loadbalance/brokers/lookupServiceAddress
路径下获取每个broker上报的负载信息
- 如果不存在则从本地的全量负载信息LoadData 中删除对应的broker
- 如果 本地 全量负载信息中包含了这个broker的信息,则使用新获取的负载来更新本地信息(LocalBrokerData);不包含的话,则创建一个新的 LocalBrokerData,保存在全量负载信息中
- 最后做一次过滤,对于每个在全量负载中出现的broker,判断是否在activeBroker列表中,如果没有则从全量负载信息中删除
3.1.4.2 更新 bundleData(Map<String, BundleData> bundleData)
经过上一步,我们已经获取了每个broker的最新负载信息,然后可以获取每个broker上的bundle负载信息,使用这些信息去更新LoadData的bundleData信息。。
更新 Bundle 信息,在全量负载信息中,除了brokerData的信息之外,还有bundle的信息bundleData ,这里处理的内容是将上一步中获取的 全量的 brokerData 信息更新到 bundle信息 bundleData 中。
Bundle数据更新
首先是遍历 brokerData 中的所有的bundle信息,如果 bundleData 已经存在,则使用最新的bundle信息来更新 bundleData,
if (bundleData.containsKey(bundle)) {
// If we recognize the bundle, add these stats as a new sample.
bundleData.get(bundle).update(stats);
}
....
public void update(final NamespaceBundleStats newSample) {
shortTermData.update(newSample);
longTermData.update(newSample);
this.topics = (int) newSample.topics;
}
首先是更新 shortTermData 和 longShortTermData,操作类似,都是求均值的过程,
private double getUpdatedValue(final double oldAverage, final double newSample) {
return ((numSamples - 1) * oldAverage + newSample) / numSamples;
}
计算方法是使用 (历史值*采样数 + 新值 * 1) / 总的采样数,更新之后会得到最新的 msgThroughputIn、msgThroughputOut、msgRateIn、msgRateOut指标。
然后更新 bundle 的 topics (数量)信息。
如果 bundleData 不存在,首先从 zk 上查找 bundle data,如果找不到,则使用 bundle 的最新信息最为首次采样值:
- 首先从
/loadbalance/bundle-data/tenant/namespace/bundle
下获取负载信息,存在则返回; - 否则,从
/loadbalance/resource-quota/namespace/tenant/namespace/bundle
下获取 ResourceQuota 信息,如果存在,则使用其中的shortTerm和longTerm信息来初始化一个新的bundledata返回 - 否则,初始化一个全新的bundledata返回
- 获取到 bundleData 之后,使用最新的 bundle 信息来更新这个 bundleData,并且保存在LoadData的bundleData map中
。
preallocatedBundle清理 (todo: preallocate的逻辑)
首先获取 broker 上报的 PreallocatedBundleData信息,遍历素有 Preallocated的bunlde,如果
- 在 broker 上报信息中已经存在,并且在 全量信息的 bundle 信息中也存在,则从上报broker信息的 getPreallocatedBundleData 和 preallocatedBundleToBroker 两个map中删除bundle信息
然后,根据前面获得的最新的bundle 信息更新 loadData 中brokerData.timeAverageData;
最后更新,<Broker, <NameSpace,<Set
如果开启了自动spit,则会查找需要split的bundle执行split操作,split执行之后,会把响应的负载信息从loadData中删除掉。
注意:每个broker启动时,都会有UpdateAll的操作,即在启动之后,所有broker都会有一个全局的负载信息,这个信息反映了broker启动时的集群负载情况。
3.2 启动 loadReport任务
如果开启了 loadbalance 功能,broker还会启动一个 LoadReportUpdaterTask 任务,默认是5s执行一次。
任务的主要内容是,首先 更新本地的系统资源使用信息和bundle状态信息,然后将信息更新到zk上,这里来详细看下需要上报的内容,首先上报的内容是 LocalBrokerData 类型的对象,其中包含了
- 基本的 webServiceUrl、pulsarServiceUrl、topic模式(persistent、non-persistent topic)
- 系统物理资源使用情况:cpu、内存、堆外内存、网卡入、网卡出
- 实时指标,msgThroughputIn、msgThroughputOut、msgRateIn、msgRateOut
- 统计信息:numTopics、numBundles、numConsumers、numProducers
1的内容在初始化是就已经从配置中获取,2的内容会通过系统资源使用计算得到,3和4的内容则从 broker 的当前实时指标中的bundlestats 中计算获取。可以看到这个过程和LoadManager启动时的计算本机负载的逻辑是一样的。
注意:不是每个周期都会上报负载信息,而是会有 needBrokerDataUpdate 的判断,判断负载变化是否超过阈值,如果超过才会上报,否则不会上报到zk。
3.3 负载更新
现在还剩余一个问题,就是 loadReport 上报的信息是如何更新到leader,并且被leader用来做负载决策的?
其实,在 LoadManager 初始化时,就初始化了zk的一个nitofication类,
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
这个Nitification会在负载信息变化时被nofify
public void handleDataNotification(Notification t) {
if (t.getPath().startsWith(LoadManager.LOADBALANCE_BROKERS_ROOT)) {
brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT)
.thenAccept(brokers -> {
reapDeadBrokerPreallocations(brokers);
});
try {
scheduler.submit(ModularLoadManagerImpl.this::updateAll);
} catch (RejectedExecutionException e) {
// Executor is shutting down
}
}
}
即 /loadbalance/brokers
下有任何改动,都会触发回调,首先处理broker下线的情况
3.3.1 broker下线
判断 active broker中是否还存在这个broker信息,不存在则执行相应的清理操作
3.3.2 负载更新
public void updateAll() {
if (log.isDebugEnabled()) {
log.debug("Updating broker and bundle data for loadreport");
}
updateAllBrokerData();
updateBundleData();
// broker has latest load-report: check if any bundle requires split
checkNamespaceBundleSplit();
}
和前文的update一样,经过更新,broker就获取了最新的负载信息,可以据此来执行负载策略。
3.4 小结
综上所述,broker 在启动时,会启动 loadReport 的任务,用于上报当前broker的资源使用情况和bundlestats信息,;并且会发起一个选举,选举会产生一个leader,这个leader 会启动两个任务,
loadSheddingTask 和 loadResourceQuotaTask,
- lodSheddingTask会选出需要卸载负载的bundle,然后调用admin接口进行卸载;
- loadResourceQuotaTask 则负责根据broker上报的bundle 信息跟新leader上的bundle 信息,并且写入到zk /mayo_test/loadbalance/bundle-data/public/default/0xa0000000_0xb0000000 和 /mayo_test/loadbalance/broker-time-average/100.76.40.78:8080 路径下。
4. 如何分配 bundle 给 broker
在 客户端执行 lookup 时,实际上并不是分配的单个topic,而是分配的一个bundle,这样可以显著降低元数据的数量。
4.1 LookUp 命令
比如在执行 lookup 命令时,
CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
.thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
这里主要执行两个操作,第一步是 获取 topic 所在的 bundle,第二步是查找 bundle 所在的 broker。
4.1.1获取 topic 所在的 bundle
查找 topic 所在 bundle 也分为两步,第一步是获取 namespace 的 bundle 信息,第二步是计算得到 topic 所在的 bundle
4.1.1.1 查找 namespace 的 bundle
首先 从 local-policies 中获取,/admin/local-policies/tenant/namespace 路径的内容,格式如下:
[zk: localhost:2181(CONNECTED) 18] get /bookie_ps_test/admin/local-policies/test/test
{"bundles":{"boundaries":["0x00000000","0x40000000","0x80000000","0xc0000000","0xffffffff"],"numBundles":4}}
如果找不到的话,则从 policies 去获取 /admin/policies/tenant/namespace 的内容,这里包含了namespace的全部policies信息,格式如下:
[zk: localhost:2181(CONNECTED) 19] get /pulsar_ps_test/admin/policies/test/test
{"auth_policies":{"namespace_auth":{},"destination_auth":{},"subscription_auth_roles":{}},"replication_clusters":["ps_test"],"bundles":{"boundaries":["0x00000000","0x40000000","0x80000000","0xc0000000","0xffffffff"],"numBundles":4},"backlog_quota_map":{"destination_storage":{"limit":-1073741824,"policy":"producer_request_hold"}},"clusterDispatchRate":{},"topicDispatchRate":{"ps_test":{"dispatchThrottlingRateInMsg":0,"dispatchThrottlingRateInByte":0,"relativeToPublishRate":false,"ratePeriodInSecond":1}},"subscriptionDispatchRate":{"ps_test":{"dispatchThrottlingRateInMsg":0,"dispatchThrottlingRateInByte":0,"relativeToPublishRate":false,"ratePeriodInSecond":1}},"replicatorDispatchRate":{},"clusterSubscribeRate":{"ps_test":{"subscribeThrottlingRatePerConsumer":0,"ratePeriodInSecond":30}},"persistence":{"bookkeeperEnsemble":3,"bookkeeperWriteQuorum":2,"bookkeeperAckQuorum":2,"managedLedgerMaxMarkDeleteRate":0.0},"publishMaxMessageRate":{},"latency_stats_sample_rate":{},"message_ttl_in_seconds":36000,"deleted":false,"encryption_required":false,"subscription_auth_mode":"None","max_producers_per_topic":0,"max_consumers_per_topic":0,"max_consumers_per_subscription":0,"max_unacked_messages_per_consumer":50000,"max_unacked_messages_per_subscription":200000,"compaction_threshold":0,"offload_threshold":-1,"schema_auto_update_compatibility_strategy":"Full","schema_compatibility_strategy":"UNDEFINED","is_allow_auto_update_schema":true,"schema_validation_enforced":false}
然后从这个policies中获取 bundles 信息,根据这个信息构造一个新的 LocalPolicies,并且更新到zk。完成之后,在bundlecache中也就具有了这个 bundle 信息。
得到所有的bundle 信息之后,根据 topic的hashCode,看topic落在哪个bundle 方位内,这个bundle 就是topic所在的bundle。
4.1.2 查找 bundle 所在的brokerUrl
-
首先检查 ownerShipCache 中有没有这个 bundle 信息,如果有直接返回;
-
没有则继续查找,这个查找的过程就是 bundle 的分配过程,
searchForCandidateBroker(bundle, future, options);
- 先排除掉 heartbeat 和 sla namespace,我们只看正常的 namespace bundle 的查找,如果查找options中已经指定了当前broker为bundle的owner,直接返回当前broker的url信息;
- 否则继续查找,首先找到当前leader,作为执行负载均衡决定的节点,如果当前节点是leader 或者当前 ledger 不是 active 状态,则在当前节点做分配,这时会为bundle查找负载最低的broker节点
- 从 LoadData 的 bundle 信息找,查找
- 如果当前节点不是leader节点,并且leader节点是active,则会重定向到 leader节点