Pulsar 中提供了跨地域的能力,即GEO Replication。通过 GEO, pulsar 可以支持跨地域的多集群之间数据相互复制或者单向复制能力。下面通过源码的方式来深入了解 Pulsar GEO 的复制原理。

消息同步

首先在 PulsarService启动的时候,会注册一个删除 cluster 的 listener,当删除一个cluser时,会遍历所有的topic,关闭topic中对应cluster的replicator,并删除 cluster 对应的 pulsarClient。


pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
...
private void handleDeleteCluster(Notification notification) {
        if (ClusterResources.pathRepresentsClusterName(notification.getPath())
                && notification.getType() == NotificationType.Deleted) {
            final String clusterName = ClusterResources.clusterNameFromPath(notification.getPath());
            getBrokerService().closeAndRemoveReplicationClient(clusterName);
        }
}

然后在启动 BrokerService 时,会启动一个定时任务来检查 ReplicationPolicies 的变动。当 policies 变动时,会更新 remote cluster 的 replicator 关系。

this.startCheckReplicationPolicies();
...
protected void startCheckReplicationPolicies() {
        int interval = pulsar.getConfig().getReplicationPolicyCheckDurationSeconds();
        if (interval > 0) {
            messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkReplicationPolicies), interval, interval,
                    TimeUnit.SECONDS);
        }
} 
...
  public void checkReplicationPolicies() {
        forEachTopic(Topic::checkReplication);
    }
...
  public CompletableFuture<Void> checkReplication() {
        TopicName name = TopicName.get(topic);
        if (!name.isGlobal()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
                .getNamespaceResources()
                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
                .thenCompose(optPolicies -> {
                            if (!optPolicies.isPresent()) {
                                return FutureUtil.failedFuture(
                                        new ServerMetadataException(
                                                new MetadataStoreException.NotFoundException()));
                            }

                            return CompletableFuture.completedFuture(optPolicies.get());
                        });
        CompletableFuture<Integer> ttlFuture = getMessageTTL();
        return CompletableFuture.allOf(policiesFuture, ttlFuture)
                .thenCompose(__ -> {
                    // 获取 policies
                    Policies policies = policiesFuture.join();
                    int newMessageTTLinSeconds = ttlFuture.join();

                    Set<String> configuredClusters;
                    if (policies.replication_clusters != null) {
                        configuredClusters = Sets.newTreeSet(policies.replication_clusters);
                    } else {
                        configuredClusters = Collections.emptySet();
                    }
                    String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
                    if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
                        log.info("Deleting topic [{}] because local cluster is not part of "
                                + " global namespace repl list {}", topic, configuredClusters);
                        return deleteForcefully();
                    }
                    List<CompletableFuture<Void>> futures = Lists.newArrayList();
                    // Check for missing replicators
                    for (String cluster : configuredClusters) {
                        if (cluster.equals(localCluster)) {
                            continue;
                        }
												// 为所有的remote cluster 创建 Replicator
                        if (!replicators.containsKey(cluster)) {
                            futures.add(startReplicator(cluster));
                        }
                    }
                    // Check for replicators to be stopped
                    replicators.forEach((cluster, replicator) -> {
                        // 更新 TTL,用来做数据过期检查
                        ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
                        if (!cluster.equals(localCluster)) {
                            if (!configuredClusters.contains(cluster)) {
                                futures.add(removeReplicator(cluster));
                            }
                        }
                    });
                    return FutureUtil.waitForAll(futures);
                });
    }

来看下创建 Replicator 的过程,首先是为 Replicator 创建一个cursor,用来记录复制的位置信息

 CompletableFuture<Void> startReplicator(String remoteCluster) {
        log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
        final CompletableFuture<Void> future = new CompletableFuture<>();

        String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
        ledger.asyncOpenCursor(name, new OpenCursorCallback() {
            @Override
            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
                boolean isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster);
                if (isReplicatorStarted) {
                    future.complete(null);
                } else {
                    future.completeExceptionally(new NamingException(
                            PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
                }
            }

            @Override
            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally(new PersistenceException(exception));
            }

        }, null);

        return future;
    }

创建cursor 之后,开始创建 remote cluster 的 replicator

protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) {
        AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
        replicators.computeIfAbsent(remoteCluster, r -> {
            try {
                return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
                        brokerService);
            } catch (NamingException | PulsarServerException e) {
                isReplicatorStarted.set(false);
            }
            return null;
        });
        // clean up replicator if startup is failed
        if (!isReplicatorStarted.get()) {
            replicators.remove(remoteCluster);
        }
        return isReplicatorStarted.get();
    }
...
  public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
                                BrokerService brokerService) throws NamingException, PulsarServerException {
        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
        this.topic = topic;
        this.cursor = cursor;
        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
                Codec.decode(cursor.getName()), cursor, null);
        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
        PENDING_MESSAGES_UPDATER.set(this, 0);
        readBatchSize = Math.min(
                producerQueueSize,
                topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
        readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes();
        producerQueueThreshold = (int) (producerQueueSize * 0.9);
        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
        startProducer();
    }

创建过程的主要内容是创建 producer 用来向 remote cluster 写入数据,然后启动这个 Producer

public synchronized void startProducer() {
        log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
        producerBuilder.createAsync().thenAccept(producer -> {
            readEntries(producer);
        }).exceptionally(ex -> {
            if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
                long waitTimeMs = backOff.next();
                log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
                        localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);

                // BackOff before retrying
                brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
            } else {
                log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
                        localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
            }
            return null;
        });

    }
...
protected void readMoreEntries() {
        int availablePermits = getAvailablePermits();
        if (availablePermits > 0) {
            int messagesToRead = Math.min(availablePermits, readBatchSize);
            // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
            messagesToRead = Math.max(messagesToRead, 1);
            // Schedule read
            if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
                cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this,
                        null, PositionImpl.latest);
            } 
        } else if (availablePermits == -1) {
            // no permits from rate limit
            topic.getBrokerService().executor().schedule(
                () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
        } 
    }

调用cursor去读本地topic的一批数据,读取完成之后,对这批entry做处理,会做一些检查,通过检查的entry会通过producer,发送到remote cluster

for (int i = 0; i < entries.size(); i++) {
                Entry entry = entries.get(i);
                int length = entry.getLength();
                ByteBuf headersAndPayload = entry.getDataBuffer();
                MessageImpl msg;
                try {
                    msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
                } catch (Throwable t) {
                    log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
                            localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
                    cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                    entry.release();
                    continue;
                }
								// 开启订阅状态复制时的处理
                if (isEnableReplicatedSubscriptions) {
                    checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
                }
								// 跳过其他集群复制而来的entry
                if (msg.isReplicated()) {
                    // Discard messages that were already replicated into this region
                    cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                    entry.release();
                    msg.recycle();
                    continue;
                }
								// 如果指定了复制集群并且不包含 replicator 对应 remote cluster 的 entry
                if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) {
                    cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                    entry.release();
                    msg.recycle();
                    continue;
                }
								// 跳过过期的entry
                if (msg.isExpired(messageTTLInSeconds)) {
                    msgExpired.recordEvent(0 /* no value stat */);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}][{} -> {}] Discarding expired message at position {}, replicateTo {}", topicName,
                                localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo());
                    }
                    cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                    entry.release();
                    msg.recycle();
                    continue;
                }
								// 跳过producer没有ready时的entry
                if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
                    isLocalMessageSkippedOnce = true;
                    entry.release();
                    msg.recycle();
                    continue;
                }
                dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength()));
                // Increment pending messages for messages produced locally
                PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                msgOut.recordEvent(headersAndPayload.readableBytes());
  							// 设置复制集群信息,表明复制来源
                msg.setReplicatedFrom(localCluster);
                headersAndPayload.retain();

                getSchemaInfo(msg).thenAccept(schemaInfo -> {
                    msg.setSchemaInfoForReplicator(schemaInfo);
                  	// 使用 producer 向 remote cluster 发送数据
                    producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
                }).exceptionally(ex -> {
                    log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName,
                            localCluster, remoteCluster, ex);
                    return null;
                });
                atLeastOneMessageSentForReplication = true;
            }

在producer 向remote cluster 写完数据之后,会回调cursor处理,在cursor中记录已经复制的位置

  public void sendComplete(Exception exception) {
            if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) {
                log.error("[{}][{} -> {}] Error producing on remote broker", replicator.topicName,
                        replicator.localCluster, replicator.remoteCluster, exception);
                // cursor should be rewinded since it was incremented when readMoreEntries
                replicator.cursor.rewind();
            } else {
               	// 在cursor中记录已经复制的位置
                replicator.cursor.asyncDelete(entry.getPosition(), replicator, entry.getPosition());
            }
            entry.release();

            int pending = PENDING_MESSAGES_UPDATER.decrementAndGet(replicator);
            if (pending < replicator.producerQueueThreshold 
                    && HAVE_PENDING_READ_UPDATER.get(replicator) == FALSE 
            ) {
                if (pending == 0 || replicator.producer.isWritable()) {
                    // 读取更多的数据
                    replicator.readMoreEntries();
                }
            }

            recycle();
        }

订阅状态同步

初始topic

首先broker侧有一个关于同步订阅的配置,可以在集群间复制订阅的状态。

enableReplicatedSubscriptions

在初始化PersistentTopic时,会做同步订阅的检查

 public synchronized void checkReplicatedSubscriptionControllerState() {
        AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
        subscriptions.forEach((name, subscription) -> {
            if (subscription.isReplicated()) {
                shouldBeEnabled.set(true);
            }
        });
				...
        checkReplicatedSubscriptionControllerState(shouldBeEnabled.get());
   			...
    }

这个过程会检验是否存在是 Replicated 的订阅,这里的一个订阅是否是 Replicated 的状态是在创建订阅时由client指定的。如果是首次创建topic或者不存在订阅,则不存在订阅,shouldBeEnabled 为false,如果topic存在replicated的订阅,shouldBeEnabled 为true。

 private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) {
        boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();
        boolean isEnableReplicatedSubscriptions =
                brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
        if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions) {
          	// 初始化 ReplicatedSubscriptionsController,传入 PersistentTopic 和 local cluster name 作为参数
            replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
                    brokerService.pulsar().getConfiguration().getClusterName()));
        } else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions) {
            replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
            replicatedSubscriptionsController = Optional.empty();
        }
    }

如果,

  • replicatedSubscriptionsController 是Optinal.empty()
  • broker配置开启了enableReplicatedSubscriptions
  • shouldBeEnabled为true,那么会为 PersistentTopic 创建一个新的replicatedSubscriptionsController`。

我们看下 ReplicatedSubscriptionsController 的逻辑,

public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.PublishContext {
  	// topic
    private final PersistentTopic topic;
  	// 集群名称(本地集群名称)
    private final String localCluster;
    // The timestamp of when the last snapshot was initiated
    private long lastCompletedSnapshotStartTime = 0;
  	// 上次snapshot完成的ID
    private String lastCompletedSnapshotId;
    private volatile Position positionOfLastLocalMarker;
    private final ScheduledFuture<?> timer;
    private final ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots =
            new ConcurrentHashMap<>();
    private static final Gauge pendingSnapshotsMetric = Gauge
            .build("pulsar_replicated_subscriptions_pending_snapshots",
                    "Counter of currently pending snapshots")
            .register();

    public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
        this.topic = topic;
        this.localCluster = localCluster;
      	// 定时任务,用来触发一次新的 subscription snapshot
        timer = topic.getBrokerService().pulsar().getExecutor()
                .scheduleAtFixedRate(this::startNewSnapshot, 0,
                        topic.getBrokerService().pulsar().getConfiguration()
                                .getReplicatedSubscriptionsSnapshotFrequencyMillis(),
                        TimeUnit.MILLISECONDS);
    }

ReplicatedSubscriptionsController 中包含了pengding snapshot 的缓存,初始化时,会启动一个定时任务用来做snapshot

private void startNewSnapshot() {
       // 清理pendingSnapshot中超时的部分
        cleanupTimedOutSnapshots();
        if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) {
            return;
        }
        MutableBoolean anyReplicatorDisconnected = new MutableBoolean();
        topic.getReplicators().forEach((cluster, replicator) -> {
            if (!replicator.isConnected()) {
                anyReplicatorDisconnected.setTrue();
            }
        });
        if (anyReplicatorDisconnected.isTrue()) {
            // Do not attempt to create snapshot when some of the clusters are not reachable
            return;
        }
        pendingSnapshotsMetric.inc();
  			// 触发一次新的 snapshot
        ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this,
                topic.getReplicators().keys(), topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
  			// 将本地的一个 Request 请求放在 pendingSnapshots 中,用来track 其过超时信息
        pendingSnapshots.put(builder.getSnapshotId(), builder);
        builder.start();
    }

定时任务的逻辑是首先清理掉超时的 snapshot 任务,然后创建一个新的 snapshot 任务,并且放置在 pendingSnpashots 中,可以看到初始化 ReplicatedSubscriptionsSnapshotBuilder 时,包含了ReplicatedSubscriptionsControllerreplicator 的 keys即所有集群,不包括本地集群),以及broker的配置对象。

snapshot的逻辑如下:

 void start() {
        startTimeMillis = clock.millis();
        controller.writeMarker(
          			// 创建一个新的 snapshot 请求,入参一个随机的ID,和本地集群信息
                Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
    }
...
  // snapshot 请求构造过程
  public static ByteBuf newReplicatedSubscriptionsSnapshotRequest(String snapshotId, String sourceCluster) {
        ReplicatedSubscriptionsSnapshotRequest req = LOCAL_SNAPSHOT_REQUEST.get()
                .clear()
                .setSnapshotId(snapshotId) // id
                .setSourceCluster(sourceCluster); // 本地集群
        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(req.getSerializedSize());
        try {
            req.writeTo(payload);
            return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST, Optional.empty(), payload);
        } finally {
            payload.release();
        }
    }
...
private static ByteBuf newMessage(MarkerType markerType, Optional<String> restrictToCluster, ByteBuf payload) {
        MessageMetadata msgMetadata = LOCAL_MESSAGE_METADATA.get()
                .clear()
                .setPublishTime(System.currentTimeMillis())
                .setProducerName("pulsar.marker")
                .setSequenceId(0)
                .setMarkerType(markerType.getValue()); 
  			// marker类型 REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST
				// 这里 restrictToCluster 是 Optional.empty(),ReplicatedSubscriptionsSnapshotRequest 对应的 msg 的元数据中没有 replicateTo 
  			// 信息,message 的 data 部分就是 ReplicatedSubscriptionsSnapshotRequest 内容,包含了 snapshotId 和本地集群名称信息
        restrictToCluster.ifPresent(msgMetadata::addReplicateTo);
        return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
    }

然后在ReplicatedSubscriptionsController中写入到本地 topic 中。

void writeMarker(ByteBuf marker) {
        try {
            topic.publishMessage(marker, this);
        } finally {
            marker.release();
        }
    }

可以看到,初始化topic之后,就会有定时任务将ReplicatedSubscriptionsSnapshotRequest不断的写入到当前 topic 中。Requst 请求对应的 Message 会随着复制写入到其他的集群中,就相当于是发送了一个 SnapshotRequest 到其他集群。

初始化订阅

创建订阅时需要设置订阅是否是replicated状态,主要由两个参数控制,一个是客户端是否设置了replicated,一个是broker是否开启replication subscription。

 boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;

            if (replicatedSubscriptionState
                    && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
                log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
                replicatedSubscriptionState = false;
            }

在创建订阅时,会使用到这个参数replicatedSubscriptionState

 CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
                    getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
                            replicatedSubscriptionState)
                    : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
                    startMessageRollbackDurationSec);


...
  
  private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
            boolean replicated) {
        checkNotNull(compactedTopic);
        if (subscriptionName.equals(COMPACTION_SUBSCRIPTION)) {
            return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor);
        } else {
            return new PersistentSubscription(this, subscriptionName, cursor, replicated);
        }
    }

如果replicatedSubscriptionState为true,那么最终生成的 PersistentSubscription 就是replicated类型的。前文中用的的replciated判断就是在这个时候设置的。设置过程如下

 public boolean setReplicated(boolean replicated) {
        ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();

        if (!replicated || !config.isEnableReplicatedSubscriptions()) {
            this.replicatedSubscriptionSnapshotCache = null;
        } else if (this.replicatedSubscriptionSnapshotCache == null) {
            this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
                    config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
        }

        if (this.cursor != null) {
            if (replicated) {
                return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
            } else {
                return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
            }
        }

        return false;
    }

即如果client和broker都开启了订阅复制,就会为PersistentSubscription 初始化一个 ReplicatedSubscriptionSnapshotCache

订阅复制

前文可以看到,在初始化topic之后,会有定时任务定期在本地topic中记录ReplicatedSubscriptionsSnapshotRequest,在执行复制数据的过程中,首先是读取到一批 Entries,然后遍历这批 Entries 发送给 remote cluster。

读取的 Entry 可能是 ReplicatedSubscriptionsSnapshotRequest 对应的 Entry,如果是 ReplicatedSubscriptionsSnapshotRequest,这里会首先做是否是 Replicator 对应的集群复制而来的消息,如果不是则跳过不处理,对于 Marker 消息的处理不包括本地生成的 SnapshotRequest 类型的 Marker 消息,

  if (isEnableReplicatedSubscriptions) {
        checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
  }
...
  private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> msg, ByteBuf payload) {
        if (!msg.getMessageBuilder().hasMarkerType()) {
            // No marker is defined
            return;
        }
        int markerType = msg.getMessageBuilder().getMarkerType();
  			// 如果 message 中 replicateFrom 标识,并且当前 PersistentReplicator 对应的 remote cluster 正是 replicateFrom 的集群,则继续处理
  			// 否则: 
  			//     1. 消息没有 replicaFrom 标识,说明是本集群的消息,跳过 Marker 检查
  			//     2. 如果 remoteCluster 和 replicaFrom 不相同,说明不是 PersistentReplicator 对应的 remote cluster,说明不是此 Replicator
        //        复制而来的消息,跳过 Marker 检查
  			// 即:如果消息有 replicateFrom 标识,并且和 replicator 的 remote cluster 一致,也就是说是 replicator 对应 remote cluster 复制而
  			// 来的消息则继续处理
        if (!(msg.getMessageBuilder().hasReplicatedFrom()
                && remoteCluster.equals(msg.getMessageBuilder().getReplicatedFrom()))) {
            return;
        }
				// MarkType 有三种类型,包含 Request/Response/Update 三种类型
        switch (markerType) {
        case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE:
        case MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE:
        case MarkerType.REPLICATED_SUBSCRIPTION_UPDATE_VALUE:
            topic.receivedReplicatedSubscriptionMarker(position, markerType, payload);
            break;
        default:
            // Do nothing
        }
    }

对于不同的 Marker 类型**(来自 replicator 对应的 remote cluster 集群)**的消息,会做不同的处理,分别来看一下其处理过程

REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE 处理

对于 Request 类型的请求,会创建一个 Response,过程如下:

private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
        // 确保启动 replicator
        Replicator replicator = topic.getReplicators().get(request.getSourceCluster());
        if (!replicator.isConnected()) {
            topic.startReplProducers();
        }
  
  			// 构造 Response,包含 snapshotId,source Cluster(remote cluster),local 集群,local集群LAC,其中 remote cluster 用来构造 
  			// message 的 ReplicateTo 属性,构造 Response message 之后,将 message 写入本地,之后会异步的复制到 remote cluster,相当于向 remote
        // cluster 发送了一个 Snapshot 的 Response(包含了当前集群的LAC 和 cluster 信息)
        PositionImpl lastMsgId = (PositionImpl) topic.getLastPosition();
        ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
                request.getSnapshotId(), // id
                request.getSourceCluster(), // source remote cluster 
                localCluster, // local cluster
                lastMsgId.getLedgerId(), lastMsgId.getEntryId()); // LAC
        writeMarker(marker);
    }
...
     public static ByteBuf newReplicatedSubscriptionsSnapshotResponse(String snapshotId, String replyToCluster, String cluster, long ledgerId, long entryId) {
        ReplicatedSubscriptionsSnapshotResponse response = LOCAL_SNAPSHOT_RESPONSE.get()
                .clear()
                .setSnapshotId(snapshotId);
        response
                .setCluster()
                .setCluster(cluster) // local cluster
                .setMessageId() // LAC
                .setLedgerId(ledgerId)
                .setEntryId(entryId);
        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(response.getSerializedSize());
        try {
            response.writeTo(payload);
            return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE, Optional.of(replyToCluster),
                    payload);
        } finally {
            payload.release();
        }
    }
...
   private static ByteBuf newMessage(MarkerType markerType, Optional<String> restrictToCluster, ByteBuf payload) {
        MessageMetadata msgMetadata = LOCAL_MESSAGE_METADATA.get()
                .clear()
                .setPublishTime(System.currentTimeMillis())
                .setProducerName("pulsar.marker")
                .setSequenceId(0)
                .setMarkerType(markerType.getValue()); // Response 类型
				// 设置 replicaTo 信息,这里是 remote cluster
        restrictToCluster.ifPresent(msgMetadata::addReplicateTo);
        return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
    }

首先获取当前topic的LAC,然后构造一个 ReplicatedSubscriptionsSnapshotResponse ,  其中包含了的 snapshotId, localClusterName, 当前集群topic的LAC信息,然后构造一个 Message, 包含了 Marker 类型,和 ReplicateTo 信息(ReplicateTo 标识了这个消息只能被发送到指定的 cluster),构造完成之后,将这个 message 写入到本地 topic。

即这个过程可以看做是收到 remote cluster 一个 SnapshotRequest 请求(通过 GEO 复制到本地)之后,会生产一个 Response, response 中包含了 本地集群信息、LAC 信息,在消息元数据中指定的ReplicatedTo信息(可以看做是对remote cluster 的 request 请求的一个响应)。

REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE 请求

上一步中生成的一个对于 remote cluster REQUEST 的 Response 会写入到本地 topic ,然后随着 GEO 的过程复制到对应的 remote cluster(因为有 replicateTo信息),对于接收到 Response 集群来讲相当于与是Request 获取了一个相应。

其处理逻辑如下:

 synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
   			// remoter cluster,在 response 中的cluster 是 Response 生成的 cluster信息,即remote cluster
        String cluster = response.getCluster().getCluster();
				// 记录每个remote server 对应的 LAC 信息
        responses.putIfAbsent(cluster, new MarkersMessageIdData().copyFrom(response.getCluster().getMessageId()));
   			// ReplicatedSubscriptionsSnapshotBuilder 初始化时,missingClusters 包含了所有的 remote cluster
        missingClusters.remove(cluster);
   
        if (!missingClusters.isEmpty()) {
         		// 如果还要一些集群没有收到 Response,不继续后续处理
            return;
        }
        // 已经收到所有集群的响应,即本地集群已经获取了所有 remote 集群的 LAC 信息
        if (needTwoRounds && !firstRoundComplete) {
            // Mark that 1st round is done and start a 2nd round
            firstRoundComplete = true;
            missingClusters.addAll(remoteClusters);

            controller.writeMarker(
                    Markers.newReplicatedSubscriptionsSnapshotRequest(snapshotId, controller.localCluster()));
            return;
        }
        // Snapshot 完成,构造一个 Snapshot 写入本地 topic中
        PositionImpl p = (PositionImpl) position;
        controller.writeMarker(
                Markers.newReplicatedSubscriptionsSnapshot(snapshotId, controller.localCluster(),
                        p.getLedgerId(), p.getEntryId(), responses));
        controller.snapshotCompleted(snapshotId);
        double latencyMillis = clock.millis() - startTimeMillis;
        snapshotMetric.observe(latencyMillis);
    }

看下 Snapshot 的构造过程,

@SneakyThrows
public static ByteBuf newReplicatedSubscriptionsSnapshot(String snapshotId, String sourceCluster, long ledgerId,
        long entryId, Map<String, MarkersMessageIdData> clusterIds) {
    ReplicatedSubscriptionsSnapshot snapshot = LOCAL_SNAPSHOT.get()
            .clear()
            .setSnapshotId(snapshotId); // id
    snapshot.setLocalMessageId() // 最后一个 response 对应的 <LedgerId, EntryId>
            .setLedgerId(ledgerId)
            .setEntryId(entryId);
		// 每个 remote cluster 对应的 LAC
    clusterIds.forEach((cluster, msgId) -> {
        snapshot.addCluster()
                .setCluster(cluster)
                .setMessageId().copyFrom(msgId);
    });
    int size = snapshot.getSerializedSize();
    ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(size);
    try {
        snapshot.writeTo(payload);
        return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT, Optional.of(sourceCluster), payload);
    } finally {
        payload.release();
    }
}
...
private static ByteBuf newMessage(MarkerType markerType, Optional<String> restrictToCluster, ByteBuf payload) {
        MessageMetadata msgMetadata = LOCAL_MESSAGE_METADATA.get()
                .clear()
                .setPublishTime(System.currentTimeMillis())
                .setProducerName("pulsar.marker")
                .setSequenceId(0)
                .setMarkerType(markerType.getValue());
        restrictToCluster.ifPresent(msgMetadata::addReplicateTo);
        return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload);
    }

Snapshot 中包含的内容:

  • snapshot id
  • SnapshotResponse 对应的 ledgerId 和 entryId
  • 每个 remote cluster 对应的LAC 信息

然后 Snapshot 封装为一个 Message,message 的元数据包括 SNAPSHOT markType,replicatedTo 为 local cluster,然后写入本地 topic 中,支持 snapshot 的构建完成,即从本地开始发起一个Snapshot(写一个SNAPSHOT_REQUEST 请求到本地topic)开始,已经完成了Snapshot的流程,要从 pendingSnapshots 中将这个 snapshotId 删除。

对于 SNPAPSHOT 类型的数据,在消费时,会有特定的处理,

else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
                PositionImpl pos = (PositionImpl) entry.getPosition();
                // Message metadata was corrupted or the messages was a server-only marker
                if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
                    processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
                }
                entries.set(i, null);
                entry.release();
                subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
                        Collections.emptyMap());
                continue;
  ...
     private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
        // Remove the protobuf headers
        Commands.skipMessageMetadata(headersAndPayload);

        try {
            ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload);
            subscription.processReplicatedSubscriptionSnapshot(snapshot);
        } catch (Throwable t) {
            log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t);
            return;
        }
    }
  ...
    public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
        ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
        if (snapshotCache != null) {
            snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
        }
    }

首先解析得到一个 ReplicatedSubscriptionsSnapshot 类型的对象,然后 subscription 会把这个 snapshot 内容保存在 snapshotCache 中。snapshotCache 是一个数量有限制的Map,当 snapshot 超过了这个限制时,会清理掉最老的 snapshot。

REPLICATED_SUBSCRIPTION_UPDATE_VALU 请求

我们看下对于 update_value 的请求处理, 首先看下触发时机

 if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
            this.updateLastMarkDeleteAdvancedTimestamp();
            // Mark delete position advance
            ReplicatedSubscriptionSnapshotCache snapshotCache  = this.replicatedSubscriptionSnapshotCache;
            if (snapshotCache != null) {
                ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
                        .advancedMarkDeletePosition((PositionImpl) cursor.getMarkDeletedPosition());
                if (snapshot != null) {
                    topic.getReplicatedSubscriptionController()
                            .ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
                }
            }
        }
...
   public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(PositionImpl pos) {
        ReplicatedSubscriptionsSnapshot snapshot = null;
        while (!snapshots.isEmpty()) {
            PositionImpl first = snapshots.firstKey();
            if (first.compareTo(pos) > 0) {
                // Snapshot is associated which an higher position, so it cannot be used now
                break;
            } else {
                // This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we
                // can use
                snapshot = snapshots.pollFirstEntry().getValue();
            }
        }
        return snapshot;
    }

在一个集群的 subscription 在 ack时,会触发 SNAPSHOT 的更新,ack 会更新 cursor 的 markDelete 位置,更新之后,会从 snapshotCache 中获取一个对应的 snapshot,这个snapshot 是对应 position 小于等于 MD 的最小 snapshot,即这个 snapshot 之前的snapshot信息都可以删除,只需要保留之后的就可以保证数据不丢失。

得到 Snapshot 之后,更具这个snapshot的信息,构造 ReplicatedSubscriptionsUpdate 消息

public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) {
        Map<String, MarkersMessageIdData> clusterIds = new TreeMap<>();
        for (int i = 0, size = snapshot.getClustersCount(); i < size; i++) {
            ClusterMessageId cmid = snapshot.getClusterAt(i);
            clusterIds.put(cmid.getCluster(), cmid.getMessageId());
        }
        ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
        writeMarker(subscriptionUpdate);
    }

从 snapshot 中恢复 clusters 信息,包含了 clusterName 和 LAC 信息,然后构造一个 ReplicatedSubscriptionsUpdate 请求,

 public static ByteBuf newReplicatedSubscriptionsUpdate(String subscriptionName,
        Map<String, MarkersMessageIdData> clusterIds) {
        ReplicatedSubscriptionsUpdate update = LOCAL_SUBSCRIPTION_UPDATE.get()
                .clear()
                .setSubscriptionName(subscriptionName);
        clusterIds.forEach((cluster, msgId) -> {
            update.addCluster()
                    .setCluster(cluster)
                    .setMessageId().copyFrom(msgId);
        });
        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(update.getSerializedSize());

        try {
            update.writeTo(payload);
            return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_UPDATE, Optional.empty(), payload);
        } finally {
            payload.release();
        }
    }

ReplicatedSubscriptionsUpdate 中包含了 订阅名,当前snapshot 对应的<clusterName, LAC> 信息,然后构造一个 Message,类型是 SUBSCRIPTION_UPDATE,replicateTo 为null,写入本地topic。

这个信息在复制给remote cluster 之后,remote cluster 会做对应的处理,

 private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
        MarkersMessageIdData updatedMessageId = null;
        for (int i = 0, size = update.getClustersCount(); i < size; i++) {
            ClusterMessageId cmid = update.getClusterAt(i);
            if (localCluster.equals(cmid.getCluster())) {
                updatedMessageId = cmid.getMessageId();
            }
        }
        if (updatedMessageId == null) {
            // No updates for this cluster, ignore
            return;
        }
        Position pos = new PositionImpl(updatedMessageId.getLedgerId(), updatedMessageId.getEntryId());
        PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName());
        if (sub != null) {
            sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap());
        } else {
            // Subscription doesn't exist. We need to force the creation of the subscription in this cluster, because
            topic.createSubscription(update.getSubscriptionName(),
                    InitialPosition.Latest, true /* replicateSubscriptionState */);
        }
    }

首先解析出<cluster, LAC> 信息,找到当前cluster 丢应的LAC 信息,然后更新订阅的 MarkDelete 位置,支持就完成了 订阅的同步。

REPLICATED SUBSCRIPTION 示例

假设有两个集群,A 和 B, 配置为 GEO replication,一个 topic test,和一个订阅 sub,首先订阅在集群A中。

首先在集群 A 的 topic 初始化时

  • 在 topic 中创建 remote cluster 和对应的 Replicator
  • 会创建 ReplicatedSubscriptionsController,其中包含了 localCluster 信息,controller 会启动一个定时任务,不断的构建新的 ReplicatedSubscriptionsSnapshotBuilder
  • builder 中会会创建 ReplicatedSubscriptionsSnapshotRequest,其中包含了 snapshotId,localCluster (A)
  • 构建 Message,元数据中类型为 SNAPSHOT_REQUEST,value 部分为 ReplicatedSubscriptionsSnapshotRequest
  • message 写入 本地 topic 中,假设此时 topic 已经写入了10个数据,即 LAC 为<L1, 10> , request 对应的 position 为<L1, 11>,然后继续写入10个数据,即最后的LAC 为<L1, 21>

当创建 subscription 时,会为订阅创建 ReplicatedSubscriptionSnapshotCache。

当 GEO 复制数据复制到 SNAPSHOT_REQUEST 时,本地处理会跳过(因为只处理 Remote cluster 复制而来的数据)。

所以,当集群B接收到复制消息只有,并且向集群A继续复制时,会发现集群A复制而来的 SNAPSHOT_REQUEST ,假设集群 B 中复制而来的 SNAPSHOT_REQUEST 对应的position 为 <L2, 11>, LAC为 <L2, 21>。

此时集群B会获取当前集群的 LAC,构建一个 ReplicatedSubscriptionsSnapshotResponse,其中包含了

  • SNAPSHOT_REQUEST 的 snapshotId
  • localCluster(B)
  • 集群B的LAC对应的ledgerId、entryId,此时为<L2, 21>
  • 构造 message,元数据中包含了 replicaTo(clusterA),元数据中包含了类型 SNAPSHOT_RESPONSE,value 是ReplicatedSubscriptionsSnapshotResponse
  • message 写入到topic中,假设 Response 的 position 为 <L2, 25>,新的 LAC 为<L2, 30>

集群 B 的信息会复制给 Cluster A,当处理到 response 时, 会跳过处理,当 cluster A 处理到这个复制消息时,在 responses 中保存 <clusterB, <L2,21>> 信息,然后构建一个新的 snapshot 信息:

  • 创建一个新的 ReplicatedSubscriptionsSnapshot,
    • 包含snapshotId,
    • 当前的 position 位置信息(即 clusterB 复制而来的 Response 在集群A中的位置信息,假设是<L1, 25>)
    • 集群B和集群B的LAC信息,<clusterB, <L2,21>>
  • 构造Message,元数据类型为SNAPSHOT,replicateTo clusterA,value 为SNAPSHOT
  • 写入集群 A 的 topic
  • 一次 snapshot 完成。

接下来,cluster A 会将这个snapshot,复制到集群B。

对于SNAPSHOT数据,集群 A 中的订阅sub,在消费过程中会有一些处理,在将消息发送个iconsumer之前,如果判断消息时一个SNAPSHOT消息,就不会继续发送给consumer,而是将其放在 snapshotCache 中,即触发 snapshot 的 response 位置和snapshot的映射关系,<<l1, 25>, snapshot>。

集群 A 中的消息在处理订阅 ack 时,会对应的清理 snapshotCache,并且选出最大的满足snapshot 对应 position 小于等于 markDelete 位置的snapshot,然后根据这个snapshot构造 ReplicatedSubscriptionsUpdate 并写入本地topic,当消息复制到 remote cluster 之后,remote cluster 会根据 snapshot 中对应集群的 LAC 信息来更新cursor信息。