Pulsar 中提供了跨地域的能力,即GEO Replication。通过 GEO, pulsar 可以支持跨地域的多集群之间数据相互复制或者单向复制能力。下面通过源码的方式来深入了解 Pulsar GEO 的复制原理。
消息同步
首先在 PulsarService启动的时候,会注册一个删除 cluster 的 listener,当删除一个cluser时,会遍历所有的topic,关闭topic中对应cluster的replicator,并删除 cluster 对应的 pulsarClient。
pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
...
private void handleDeleteCluster(Notification notification) {
if (ClusterResources.pathRepresentsClusterName(notification.getPath())
&& notification.getType() == NotificationType.Deleted) {
final String clusterName = ClusterResources.clusterNameFromPath(notification.getPath());
getBrokerService().closeAndRemoveReplicationClient(clusterName);
}
}
然后在启动 BrokerService 时,会启动一个定时任务来检查 ReplicationPolicies 的变动。当 policies 变动时,会更新 remote cluster 的 replicator 关系。
this.startCheckReplicationPolicies();
...
protected void startCheckReplicationPolicies() {
int interval = pulsar.getConfig().getReplicationPolicyCheckDurationSeconds();
if (interval > 0) {
messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkReplicationPolicies), interval, interval,
TimeUnit.SECONDS);
}
}
...
public void checkReplicationPolicies() {
forEachTopic(Topic::checkReplication);
}
...
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
.getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenCompose(optPolicies -> {
if (!optPolicies.isPresent()) {
return FutureUtil.failedFuture(
new ServerMetadataException(
new MetadataStoreException.NotFoundException()));
}
return CompletableFuture.completedFuture(optPolicies.get());
});
CompletableFuture<Integer> ttlFuture = getMessageTTL();
return CompletableFuture.allOf(policiesFuture, ttlFuture)
.thenCompose(__ -> {
// 获取 policies
Policies policies = policiesFuture.join();
int newMessageTTLinSeconds = ttlFuture.join();
Set<String> configuredClusters;
if (policies.replication_clusters != null) {
configuredClusters = Sets.newTreeSet(policies.replication_clusters);
} else {
configuredClusters = Collections.emptySet();
}
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
log.info("Deleting topic [{}] because local cluster is not part of "
+ " global namespace repl list {}", topic, configuredClusters);
return deleteForcefully();
}
List<CompletableFuture<Void>> futures = Lists.newArrayList();
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
// 为所有的remote cluster 创建 Replicator
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// 更新 TTL,用来做数据过期检查
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
return FutureUtil.waitForAll(futures);
});
}
来看下创建 Replicator 的过程,首先是为 Replicator 创建一个cursor,用来记录复制的位置信息
CompletableFuture<Void> startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
ledger.asyncOpenCursor(name, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster);
if (isReplicatorStarted) {
future.complete(null);
} else {
future.completeExceptionally(new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
}
}
@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(new PersistenceException(exception));
}
}, null);
return future;
}
创建cursor 之后,开始创建 remote cluster 的 replicator
protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
brokerService);
} catch (NamingException | PulsarServerException e) {
isReplicatorStarted.set(false);
}
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
}
return isReplicatorStarted.get();
}
...
public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException, PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
this.topic = topic;
this.cursor = cursor;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
Codec.decode(cursor.getName()), cursor, null);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);
readBatchSize = Math.min(
producerQueueSize,
topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes();
producerQueueThreshold = (int) (producerQueueSize * 0.9);
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
startProducer();
}
创建过程的主要内容是创建 producer 用来向 remote cluster 写入数据,然后启动这个 Producer
public synchronized void startProducer() {
log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
producerBuilder.createAsync().thenAccept(producer -> {
readEntries(producer);
}).exceptionally(ex -> {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
long waitTimeMs = backOff.next();
log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);
// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
} else {
log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
}
return null;
});
}
...
protected void readMoreEntries() {
int availablePermits = getAvailablePermits();
if (availablePermits > 0) {
int messagesToRead = Math.min(availablePermits, readBatchSize);
// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
messagesToRead = Math.max(messagesToRead, 1);
// Schedule read
if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this,
null, PositionImpl.latest);
}
} else if (availablePermits == -1) {
// no permits from rate limit
topic.getBrokerService().executor().schedule(
() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
}
}
调用cursor去读本地topic的一批数据,读取完成之后,对这批entry做处理,会做一些检查,通过检查的entry会通过producer,发送到remote cluster
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
int length = entry.getLength();
ByteBuf headersAndPayload = entry.getDataBuffer();
MessageImpl msg;
try {
msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
} catch (Throwable t) {
log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
continue;
}
// 开启订阅状态复制时的处理
if (isEnableReplicatedSubscriptions) {
checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
}
// 跳过其他集群复制而来的entry
if (msg.isReplicated()) {
// Discard messages that were already replicated into this region
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
msg.recycle();
continue;
}
// 如果指定了复制集群并且不包含 replicator 对应 remote cluster 的 entry
if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) {
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
msg.recycle();
continue;
}
// 跳过过期的entry
if (msg.isExpired(messageTTLInSeconds)) {
msgExpired.recordEvent(0 /* no value stat */);
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Discarding expired message at position {}, replicateTo {}", topicName,
localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
msg.recycle();
continue;
}
// 跳过producer没有ready时的entry
if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
isLocalMessageSkippedOnce = true;
entry.release();
msg.recycle();
continue;
}
dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));
// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
msgOut.recordEvent(headersAndPayload.readableBytes());
// 设置复制集群信息,表明复制来源
msg.setReplicatedFrom(localCluster);
headersAndPayload.retain();
getSchemaInfo(msg).thenAccept(schemaInfo -> {
msg.setSchemaInfoForReplicator(schemaInfo);
// 使用 producer 向 remote cluster 发送数据
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
}).exceptionally(ex -> {
log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
localCluster, remoteCluster, ex);
return null;
});
atLeastOneMessageSentForReplication = true;
}
在producer 向remote cluster 写完数据之后,会回调cursor处理,在cursor中记录已经复制的位置
public void sendComplete(Exception exception) {
if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) {
log.error("[{}][{} -> {}] Error producing on remote broker", replicator.topicName,
replicator.localCluster, replicator.remoteCluster, exception);
// cursor should be rewinded since it was incremented when readMoreEntries
replicator.cursor.rewind();
} else {
// 在cursor中记录已经复制的位置
replicator.cursor.asyncDelete(entry.getPosition(), replicator, entry.getPosition());
}
entry.release();
int pending = PENDING_MESSAGES_UPDATER.decrementAndGet(replicator);
if (pending < replicator.producerQueueThreshold
&& HAVE_PENDING_READ_UPDATER.get(replicator) == FALSE
) {
if (pending == 0 || replicator.producer.isWritable()) {
// 读取更多的数据
replicator.readMoreEntries();
}
}
recycle();
}
订阅状态同步
初始topic
首先broker侧有一个关于同步订阅的配置,可以在集群间复制订阅的状态。
enableReplicatedSubscriptions
在初始化PersistentTopic时,会做同步订阅的检查
public synchronized void checkReplicatedSubscriptionControllerState() {
AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
subscriptions.forEach((name, subscription) -> {
if (subscription.isReplicated()) {
shouldBeEnabled.set(true);
}
});
...
checkReplicatedSubscriptionControllerState(shouldBeEnabled.get());
...
}
这个过程会检验是否存在是 Replicated 的订阅,这里的一个订阅是否是 Replicated 的状态是在创建订阅时由client指定的。如果是首次创建topic或者不存在订阅,则不存在订阅,shouldBeEnabled
为false,如果topic存在replicated的订阅,shouldBeEnabled
为true。
private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) {
boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions) {
// 初始化 ReplicatedSubscriptionsController,传入 PersistentTopic 和 local cluster name 作为参数
replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
brokerService.pulsar().getConfiguration().getClusterName()));
} else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions) {
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
replicatedSubscriptionsController = Optional.empty();
}
}
如果,
replicatedSubscriptionsController
是Optinal.empty()- broker配置开启了
enableReplicatedSubscriptions
- shouldBeEnabled
为true,那么会为 PersistentTopic 创建一个新的
replicatedSubscriptionsController`。
我们看下 ReplicatedSubscriptionsController
的逻辑,
public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.PublishContext {
// topic
private final PersistentTopic topic;
// 集群名称(本地集群名称)
private final String localCluster;
// The timestamp of when the last snapshot was initiated
private long lastCompletedSnapshotStartTime = 0;
// 上次snapshot完成的ID
private String lastCompletedSnapshotId;
private volatile Position positionOfLastLocalMarker;
private final ScheduledFuture<?> timer;
private final ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots =
new ConcurrentHashMap<>();
private static final Gauge pendingSnapshotsMetric = Gauge
.build("pulsar_replicated_subscriptions_pending_snapshots",
"Counter of currently pending snapshots")
.register();
public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
this.topic = topic;
this.localCluster = localCluster;
// 定时任务,用来触发一次新的 subscription snapshot
timer = topic.getBrokerService().pulsar().getExecutor()
.scheduleAtFixedRate(this::startNewSnapshot, 0,
topic.getBrokerService().pulsar().getConfiguration()
.getReplicatedSubscriptionsSnapshotFrequencyMillis(),
TimeUnit.MILLISECONDS);
}
ReplicatedSubscriptionsController
中包含了pengding snapshot 的缓存,初始化时,会启动一个定时任务用来做snapshot
private void startNewSnapshot() {
// 清理pendingSnapshot中超时的部分
cleanupTimedOutSnapshots();
if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
return;
}
MutableBoolean anyReplicatorDisconnected = new MutableBoolean();
topic.getReplicators().forEach((cluster, replicator) -> {
if (!replicator.isConnected()) {
anyReplicatorDisconnected.setTrue();
}
});
if (anyReplicatorDisconnected.isTrue()) {
// Do not attempt to create snapshot when some of the clusters are not reachable
return;
}
pendingSnapshotsMetric.inc();
// 触发一次新的 snapshot
ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this,
topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
// 将本地的一个 Request 请求放在 pendingSnapshots 中,用来track 其过超时信息
pendingSnapshots.put(builder.getSnapshotId(), builder);
builder.start();
}
定时任务的逻辑是首先清理掉超时的 snapshot 任务,然后创建一个新的 snapshot 任务,并且放置在 pendingSnpashots
中,可以看到初始化 ReplicatedSubscriptionsSnapshotBuilder
时,包含了ReplicatedSubscriptionsController
、replicator 的 keys(即所有集群,不包括本地集群),以及broker的配置对象。
snapshot的逻辑如下:
void start() {
startTimeMillis = clock.millis();
controller.writeMarker(
// 创建一个新的 snapshot 请求,入参一个随机的ID,和本地集群信息
Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
}
...
// snapshot 请求构造过程
public static ByteBuf newReplicatedSubscriptionsSnapshotRequest(String snapshotId, String sourceCluster) {
ReplicatedSubscriptionsSnapshotRequest req = LOCAL_SNAPSHOT_REQUEST.get()
.clear()
.setSnapshotId(snapshotId) // id
.setSourceCluster(sourceCluster); // 本地集群
ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(req.getSerializedSize());
try {
req.writeTo(payload);
return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST, Optional.empty(), payload);
} finally {
payload.release();
}
}
...
private static ByteBuf newMessage(MarkerType markerType, Optional<String> restrictToCluster, ByteBuf payload) {
MessageMetadata msgMetadata = LOCAL_MESSAGE_METADATA.get()
.clear()
.setPublishTime(System.currentTimeMillis())
.setProducerName("pulsar.marker")
.setSequenceId(0)
.setMarkerType(markerType.getValue());
// marker类型 REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST
// 这里 restrictToCluster 是 Optional.empty(),ReplicatedSubscriptionsSnapshotRequest 对应的 msg 的元数据中没有 replicateTo
// 信息,message 的 data 部分就是 ReplicatedSubscriptionsSnapshotRequest 内容,包含了 snapshotId 和本地集群名称信息
restrictToCluster.ifPresent(msgMetadata::addReplicateTo);
return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
}
然后在ReplicatedSubscriptionsController
中写入到本地 topic 中。
void writeMarker(ByteBuf marker) {
try {
topic.publishMessage(marker, this);
} finally {
marker.release();
}
}
可以看到,初始化topic之后,就会有定时任务将ReplicatedSubscriptionsSnapshotRequest
不断的写入到当前 topic 中。Requst 请求对应的 Message 会随着复制写入到其他的集群中,就相当于是发送了一个 SnapshotRequest 到其他集群。
初始化订阅
创建订阅时需要设置订阅是否是replicated状态,主要由两个参数控制,一个是客户端是否设置了replicated,一个是broker是否开启replication subscription。
boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;
if (replicatedSubscriptionState
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
replicatedSubscriptionState = false;
}
在创建订阅时,会使用到这个参数replicatedSubscriptionState
CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec);
...
private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
boolean replicated) {
checkNotNull(compactedTopic);
if (subscriptionName.equals(COMPACTION_SUBSCRIPTION)) {
return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor);
} else {
return new PersistentSubscription(this, subscriptionName, cursor, replicated);
}
}
如果replicatedSubscriptionState
为true,那么最终生成的 PersistentSubscription 就是replicated类型的。前文中用的的replciated判断就是在这个时候设置的。设置过程如下
public boolean setReplicated(boolean replicated) {
ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();
if (!replicated || !config.isEnableReplicatedSubscriptions()) {
this.replicatedSubscriptionSnapshotCache = null;
} else if (this.replicatedSubscriptionSnapshotCache == null) {
this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
}
if (this.cursor != null) {
if (replicated) {
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
} else {
return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
}
}
return false;
}
即如果client和broker都开启了订阅复制,就会为PersistentSubscription 初始化一个 ReplicatedSubscriptionSnapshotCache
,
订阅复制
前文可以看到,在初始化topic之后,会有定时任务定期在本地topic中记录ReplicatedSubscriptionsSnapshotRequest,在执行复制数据的过程中,首先是读取到一批 Entries,然后遍历这批 Entries 发送给 remote cluster。
读取的 Entry 可能是 ReplicatedSubscriptionsSnapshotRequest 对应的 Entry,如果是 ReplicatedSubscriptionsSnapshotRequest,这里会首先做是否是 Replicator 对应的集群复制而来的消息,如果不是则跳过不处理,对于 Marker 消息的处理不包括本地生成的 SnapshotRequest 类型的 Marker 消息,
if (isEnableReplicatedSubscriptions) {
checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
}
...
private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
if (!msg.getMessageBuilder().hasMarkerType()) {
// No marker is defined
return;
}
int markerType = msg.getMessageBuilder().getMarkerType();
// 如果 message 中 replicateFrom 标识,并且当前 PersistentReplicator 对应的 remote cluster 正是 replicateFrom 的集群,则继续处理
// 否则:
// 1. 消息没有 replicaFrom 标识,说明是本集群的消息,跳过 Marker 检查
// 2. 如果 remoteCluster 和 replicaFrom 不相同,说明不是 PersistentReplicator 对应的 remote cluster,说明不是此 Replicator
// 复制而来的消息,跳过 Marker 检查
// 即:如果消息有 replicateFrom 标识,并且和 replicator 的 remote cluster 一致,也就是说是 replicator 对应 remote cluster 复制而
// 来的消息则继续处理
if (!(msg.getMessageBuilder().hasReplicatedFrom()
&& remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom()))) {
return;
}
// MarkType 有三种类型,包含 Request/Response/Update 三种类型
switch (markerType) {
case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE:
case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE:
case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE:
topic.receivedReplicatedSubscriptionMarker(position, markerType, payload);
break;
default:
// Do nothing
}
}
对于不同的 Marker 类型**(来自 replicator 对应的 remote cluster 集群)**的消息,会做不同的处理,分别来看一下其处理过程
REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE 处理
对于 Request 类型的请求,会创建一个 Response,过程如下:
private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
// 确保启动 replicator
Replicator replicator = topic.getReplicators().get(request.getSourceCluster());
if (!replicator.isConnected()) {
topic.startReplProducers();
}
// 构造 Response,包含 snapshotId,source Cluster(remote cluster),local 集群,local集群LAC,其中 remote cluster 用来构造
// message 的 ReplicateTo 属性,构造 Response message 之后,将 message 写入本地,之后会异步的复制到 remote cluster,相当于向 remote
// cluster 发送了一个 Snapshot 的 Response(包含了当前集群的LAC 和 cluster 信息)
PositionImpl lastMsgId = (PositionImpl) topic.getLastPosition();
ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
request.getSnapshotId(), // id
request.getSourceCluster(), // source remote cluster
localCluster, // local cluster
lastMsgId.getLedgerId(), lastMsgId.getEntryId()); // LAC
writeMarker(marker);
}
...
public static ByteBuf newReplicatedSubscriptionsSnapshotResponse(String snapshotId, String replyToCluster, String cluster, long ledgerId, long entryId) {
ReplicatedSubscriptionsSnapshotResponse response = LOCAL_SNAPSHOT_RESPONSE.get()
.clear()
.setSnapshotId(snapshotId);
response
.setCluster()
.setCluster(cluster) // local cluster
.setMessageId() // LAC
.setLedgerId(ledgerId)
.setEntryId(entryId);
ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(response.getSerializedSize());
try {
response.writeTo(payload);
return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE, Optional.of(replyToCluster),
payload);
} finally {
payload.release();
}
}
...
private static ByteBuf newMessage(MarkerType markerType, Optional<String> restrictToCluster, ByteBuf payload) {
MessageMetadata msgMetadata = LOCAL_MESSAGE_METADATA.get()
.clear()
.setPublishTime(System.currentTimeMillis())
.setProducerName("pulsar.marker")
.setSequenceId(0)
.setMarkerType(markerType.getValue()); // Response 类型
// 设置 replicaTo 信息,这里是 remote cluster
restrictToCluster.ifPresent(msgMetadata::addReplicateTo);
return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
}
首先获取当前topic的LAC,然后构造一个 ReplicatedSubscriptionsSnapshotResponse
, 其中包含了的 snapshotId, localClusterName, 当前集群topic的LAC信息,然后构造一个 Message, 包含了 Marker 类型,和 ReplicateTo 信息(ReplicateTo 标识了这个消息只能被发送到指定的 cluster),构造完成之后,将这个 message 写入到本地 topic。
即这个过程可以看做是收到 remote cluster 一个 SnapshotRequest 请求(通过 GEO 复制到本地)之后,会生产一个 Response, response 中包含了 本地集群信息、LAC 信息,在消息元数据中指定的ReplicatedTo信息(可以看做是对remote cluster 的 request 请求的一个响应)。
REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE 请求
上一步中生成的一个对于 remote cluster REQUEST 的 Response 会写入到本地 topic ,然后随着 GEO 的过程复制到对应的 remote cluster(因为有 replicateTo信息),对于接收到 Response 集群来讲相当于与是Request 获取了一个相应。
其处理逻辑如下:
synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
// remoter cluster,在 response 中的cluster 是 Response 生成的 cluster信息,即remote cluster
String cluster = response.getCluster().getCluster();
// 记录每个remote server 对应的 LAC 信息
responses.putIfAbsent(cluster, new MarkersMessageIdData().copyFrom(response.getCluster().getMessageId()));
// ReplicatedSubscriptionsSnapshotBuilder 初始化时,missingClusters 包含了所有的 remote cluster
missingClusters.remove(cluster);
if (!missingClusters.isEmpty()) {
// 如果还要一些集群没有收到 Response,不继续后续处理
return;
}
// 已经收到所有集群的响应,即本地集群已经获取了所有 remote 集群的 LAC 信息
if (needTwoRounds && !firstRoundComplete) {
// Mark that 1st round is done and start a 2nd round
firstRoundComplete = true;
missingClusters.addAll(remoteClusters);
controller.writeMarker(
Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
return;
}
// Snapshot 完成,构造一个 Snapshot 写入本地 topic中
PositionImpl p = (PositionImpl) position;
controller.writeMarker(
Markers.newReplicatedSubscriptionsSnapshot(snapshotId, controller.localCluster(),
p.getLedgerId(), p.getEntryId(), responses));
controller.snapshotCompleted(snapshotId);
double latencyMillis = clock.millis() - startTimeMillis;
snapshotMetric.observe(latencyMillis);
}
看下 Snapshot 的构造过程,
@SneakyThrows
public static ByteBuf newReplicatedSubscriptionsSnapshot(String snapshotId, String sourceCluster, long ledgerId,
long entryId, Map<String, MarkersMessageIdData> clusterIds) {
ReplicatedSubscriptionsSnapshot snapshot = LOCAL_SNAPSHOT.get()
.clear()
.setSnapshotId(snapshotId); // id
snapshot.setLocalMessageId() // 最后一个 response 对应的 <LedgerId, EntryId>
.setLedgerId(ledgerId)
.setEntryId(entryId);
// 每个 remote cluster 对应的 LAC
clusterIds.forEach((cluster, msgId) -> {
snapshot.addCluster()
.setCluster(cluster)
.setMessageId().copyFrom(msgId);
});
int size = snapshot.getSerializedSize();
ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(size);
try {
snapshot.writeTo(payload);
return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT, Optional.of(sourceCluster), payload);
} finally {
payload.release();
}
}
...
private static ByteBuf newMessage(MarkerType markerType, Optional<String> restrictToCluster, ByteBuf payload) {
MessageMetadata msgMetadata = LOCAL_MESSAGE_METADATA.get()
.clear()
.setPublishTime(System.currentTimeMillis())
.setProducerName("pulsar.marker")
.setSequenceId(0)
.setMarkerType(markerType.getValue());
restrictToCluster.ifPresent(msgMetadata::addReplicateTo);
return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
}
Snapshot 中包含的内容:
- snapshot id
- SnapshotResponse 对应的 ledgerId 和 entryId
- 每个 remote cluster 对应的LAC 信息
然后 Snapshot 封装为一个 Message,message 的元数据包括 SNAPSHOT markType,replicatedTo 为 local cluster,然后写入本地 topic 中,支持 snapshot 的构建完成,即从本地开始发起一个Snapshot(写一个SNAPSHOT_REQUEST 请求到本地topic)开始,已经完成了Snapshot的流程,要从 pendingSnapshots 中将这个 snapshotId 删除。
对于 SNPAPSHOT 类型的数据,在消费时,会有特定的处理,
else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker
if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
}
entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Collections.emptyMap());
continue;
...
private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
// Remove the protobuf headers
Commands.skipMessageMetadata(headersAndPayload);
try {
ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload);
subscription.processReplicatedSubscriptionSnapshot(snapshot);
} catch (Throwable t) {
log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t);
return;
}
}
...
public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
}
}
首先解析得到一个 ReplicatedSubscriptionsSnapshot 类型的对象,然后 subscription 会把这个 snapshot 内容保存在 snapshotCache 中。snapshotCache 是一个数量有限制的Map,当 snapshot 超过了这个限制时,会清理掉最老的 snapshot。
REPLICATED_SUBSCRIPTION_UPDATE_VALU 请求
我们看下对于 update_value 的请求处理, 首先看下触发时机
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
this.updateLastMarkDeleteAdvancedTimestamp();
// Mark delete position advance
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
.advancedMarkDeletePosition((PositionImpl) cursor.getMarkDeletedPosition());
if (snapshot != null) {
topic.getReplicatedSubscriptionController()
.ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
}
}
}
...
public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(PositionImpl pos) {
ReplicatedSubscriptionsSnapshot snapshot = null;
while (!snapshots.isEmpty()) {
PositionImpl first = snapshots.firstKey();
if (first.compareTo(pos) > 0) {
// Snapshot is associated which an higher position, so it cannot be used now
break;
} else {
// This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we
// can use
snapshot = snapshots.pollFirstEntry().getValue();
}
}
return snapshot;
}
在一个集群的 subscription 在 ack时,会触发 SNAPSHOT 的更新,ack 会更新 cursor 的 markDelete 位置,更新之后,会从 snapshotCache 中获取一个对应的 snapshot,这个snapshot 是对应 position 小于等于 MD 的最小 snapshot,即这个 snapshot 之前的snapshot信息都可以删除,只需要保留之后的就可以保证数据不丢失。
得到 Snapshot 之后,更具这个snapshot的信息,构造 ReplicatedSubscriptionsUpdate 消息
public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) {
Map<String, MarkersMessageIdData> clusterIds = new TreeMap<>();
for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) {
ClusterMessageId cmid = snapshot.getClusterAt(i);
clusterIds.put(cmid.getCluster(), cmid.getMessageId());
}
ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
writeMarker(subscriptionUpdate);
}
从 snapshot 中恢复 clusters 信息,包含了 clusterName 和 LAC 信息,然后构造一个 ReplicatedSubscriptionsUpdate 请求,
public static ByteBuf newReplicatedSubscriptionsUpdate(String subscriptionName,
Map<String, MarkersMessageIdData> clusterIds) {
ReplicatedSubscriptionsUpdate update = LOCAL_SUBSCRIPTION_UPDATE.get()
.clear()
.setSubscriptionName(subscriptionName);
clusterIds.forEach((cluster, msgId) -> {
update.addCluster()
.setCluster(cluster)
.setMessageId().copyFrom(msgId);
});
ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(update.getSerializedSize());
try {
update.writeTo(payload);
return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_UPDATE, Optional.empty(), payload);
} finally {
payload.release();
}
}
ReplicatedSubscriptionsUpdate 中包含了 订阅名,当前snapshot 对应的<clusterName, LAC> 信息,然后构造一个 Message,类型是 SUBSCRIPTION_UPDATE,replicateTo 为null,写入本地topic。
这个信息在复制给remote cluster 之后,remote cluster 会做对应的处理,
private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
MarkersMessageIdData updatedMessageId = null;
for (int i = 0, size = update.getClustersCount(); i < size; i++) {
ClusterMessageId cmid = update.getClusterAt(i);
if (localCluster.equals(cmid.getCluster())) {
updatedMessageId = cmid.getMessageId();
}
}
if (updatedMessageId == null) {
// No updates for this cluster, ignore
return;
}
Position pos = new PositionImpl(updatedMessageId.getLedgerId(), updatedMessageId.getEntryId());
PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName());
if (sub != null) {
sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap());
} else {
// Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because
topic.createSubscription(update.getSubscriptionName(),
InitialPosition.Latest, true /* replicateSubscriptionState */);
}
}
首先解析出<cluster, LAC> 信息,找到当前cluster 丢应的LAC 信息,然后更新订阅的 MarkDelete 位置,支持就完成了 订阅的同步。
REPLICATED SUBSCRIPTION 示例
假设有两个集群,A 和 B, 配置为 GEO replication,一个 topic test,和一个订阅 sub,首先订阅在集群A中。
首先在集群 A 的 topic 初始化时
- 在 topic 中创建 remote cluster 和对应的 Replicator
- 会创建 ReplicatedSubscriptionsController,其中包含了 localCluster 信息,controller 会启动一个定时任务,不断的构建新的 ReplicatedSubscriptionsSnapshotBuilder
- builder 中会会创建 ReplicatedSubscriptionsSnapshotRequest,其中包含了 snapshotId,localCluster (A)
- 构建 Message,元数据中类型为 SNAPSHOT_REQUEST,value 部分为 ReplicatedSubscriptionsSnapshotRequest
- message 写入 本地 topic 中,假设此时 topic 已经写入了10个数据,即 LAC 为<L1, 10> , request 对应的 position 为<L1, 11>,然后继续写入10个数据,即最后的LAC 为<L1, 21>
当创建 subscription 时,会为订阅创建 ReplicatedSubscriptionSnapshotCache。
当 GEO 复制数据复制到 SNAPSHOT_REQUEST 时,本地处理会跳过(因为只处理 Remote cluster 复制而来的数据)。
所以,当集群B接收到复制消息只有,并且向集群A继续复制时,会发现集群A复制而来的 SNAPSHOT_REQUEST ,假设集群 B 中复制而来的 SNAPSHOT_REQUEST 对应的position 为 <L2, 11>, LAC为 <L2, 21>。
此时集群B会获取当前集群的 LAC,构建一个 ReplicatedSubscriptionsSnapshotResponse,其中包含了
- SNAPSHOT_REQUEST 的 snapshotId
- localCluster(B)
- 集群B的LAC对应的ledgerId、entryId,此时为<L2, 21>
- 构造 message,元数据中包含了 replicaTo(clusterA),元数据中包含了类型 SNAPSHOT_RESPONSE,value 是ReplicatedSubscriptionsSnapshotResponse
- message 写入到topic中,假设 Response 的 position 为 <L2, 25>,新的 LAC 为<L2, 30>
集群 B 的信息会复制给 Cluster A,当处理到 response 时, 会跳过处理,当 cluster A 处理到这个复制消息时,在 responses 中保存 <clusterB, <L2,21>> 信息,然后构建一个新的 snapshot 信息:
- 创建一个新的 ReplicatedSubscriptionsSnapshot,
- 包含snapshotId,
- 当前的 position 位置信息(即 clusterB 复制而来的 Response 在集群A中的位置信息,假设是<L1, 25>)
- 集群B和集群B的LAC信息,<clusterB, <L2,21>>
- 构造Message,元数据类型为SNAPSHOT,replicateTo clusterA,value 为SNAPSHOT
- 写入集群 A 的 topic
- 一次 snapshot 完成。
接下来,cluster A 会将这个snapshot,复制到集群B。
对于SNAPSHOT数据,集群 A 中的订阅sub,在消费过程中会有一些处理,在将消息发送个iconsumer之前,如果判断消息时一个SNAPSHOT消息,就不会继续发送给consumer,而是将其放在 snapshotCache 中,即触发 snapshot 的 response 位置和snapshot的映射关系,<<l1, 25>, snapshot>。
集群 A 中的消息在处理订阅 ack 时,会对应的清理 snapshotCache,并且选出最大的满足snapshot 对应 position 小于等于 markDelete 位置的snapshot,然后根据这个snapshot构造 ReplicatedSubscriptionsUpdate 并写入本地topic,当消息复制到 remote cluster 之后,remote cluster 会根据 snapshot 中对应集群的 LAC 信息来更新cursor信息。