1. Bookie启动时的Ledger存储相关内容

Bookie启动时,会创建一个LedgerStorage。

// 创建LedgerStore
ledgerStorage = buildLedgerStorage(conf);
...
// 创建 SyncTrhead  
syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
// ledger storage 初始化
ledgerStorage.initialize(
            conf,
            ledgerManager,
            ledgerDirsManager,
            indexDirsManager,
            stateManager,
            checkpointSource,//journal list
            syncThread,
            statsLogger,
            allocator);
// 创建HandleFactory
handles = new HandleFactoryImpl(ledgerStorage);


由上述代码可以看到Bookie初始化时的相关逻辑是:

  • 创建并初始化ledgerStorage
  • 创建SyncTrhead
  • 创建HandleFactoryImpl

1.1 创建并初始化LedgerStorage

创建Ledger Storage 是根据配置的类名,通过反射的方式创建的,默认是 SortedLedgerStorage,然后对其进行初始化。

先看一下 SortedLedgerStorage 的内容:SortedLedgerStorage 是 InterleavedLedgerStorage 的扩展,由 EntryMemTable 和 InterleavedLedgerStorage 组成。所有的Entry 都会首先添加到MemTable,然后当 MemTable满了之后,将数据刷到 InterleavedLedgerStorage。

public class SortedLedgerStorage
        implements LedgerStorage, CacheCallback, SkipListFlusher,
            CompactableLedgerStorage, EntryLogger.EntryLogListener {
    private static final Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class);

    EntryMemTable memTable;
    private ScheduledExecutorService scheduler;
    private StateManager stateManager;
    private final InterleavedLedgerStorage interleavedLedgerStorage;
    ...
    @Override
    public void initialize(ServerConfiguration conf,
                           LedgerManager ledgerManager,
                           LedgerDirsManager ledgerDirsManager,
                           LedgerDirsManager indexDirsManager,
                           StateManager stateManager,
                           CheckpointSource checkpointSource,
                           Checkpointer checkpointer,
                           StatsLogger statsLogger,
                           ByteBufAllocator allocator)
            throws IOException {
 				// 初始化 InterleavedLedgerStorage
        interleavedLedgerStorage.initializeWithEntryLogListener(
            conf,
            ledgerManager,
            ledgerDirsManager,
            indexDirsManager,
            stateManager,
            checkpointSource,
            checkpointer,
            // 使用 自己作为 Entry log listener,因为自己管理文件滚动和cp
            this,
            statsLogger,
            allocator);
				// 初始化 MemTable
        if (conf.isEntryLogPerLedgerEnabled()) {
            this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger);
        } else {
            this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
        }
        // 初始化 scheduler
        this.scheduler = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder()
                .setNameFormat("SortedLedgerStorage-%d")
                .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 2).build());
        this.stateManager = stateManager;
    }

这里的初始化过程是:

  • InterleavedLedgerStorage 初始化
  • MemTable初始化
  • scheduler 初始化

####1.1.1 InterleavedLedgerStorage 初始化

InterleavedLedgerStorage 初始化的逻辑如下:

void initializeWithEntryLogListener(ServerConfiguration conf,
                                        LedgerManager ledgerManager,
                                        LedgerDirsManager ledgerDirsManager,
                                        LedgerDirsManager indexDirsManager,
                                        StateManager stateManager,
                                        CheckpointSource checkpointSource,
                                        Checkpointer checkpointer,
                                        EntryLogListener entryLogListener,
                                        StatsLogger statsLogger,
                                        ByteBufAllocator allocator) throws IOException {
        initializeWithEntryLogger(
                conf,
                ledgerManager,
                ledgerDirsManager,
                indexDirsManager,
                stateManager,
                checkpointSource,
                checkpointer,
               // 初始化EntryLogger
                new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE),
                        allocator),
                statsLogger);
    }

public void initializeWithEntryLogger(ServerConfiguration conf,
            LedgerManager ledgerManager,
            LedgerDirsManager ledgerDirsManager,
            LedgerDirsManager indexDirsManager,
            StateManager stateManager,
            CheckpointSource checkpointSource,//journal list
            Checkpointer checkpointer, // SyncThread
            EntryLogger entryLogger,
            StatsLogger statsLogger) throws IOException {
    checkNotNull(checkpointSource, "invalid null checkpoint source");
    checkNotNull(checkpointer, "invalid null checkpointer");
    this.entryLogger = entryLogger;
    this.entryLogger.addListener(this);
    this.checkpointSource = checkpointSource;
    this.checkpointer = checkpointer;
    // 初始化LedgerCacheImpl
    ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
            null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
    // 初始化 GC 线程
    gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger.scope("gc"));
    pageSize = conf.getPageSize();
    // 增加ledger 目录 listener
    ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
    // Expose Stats
    getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
    getEntryStats = statsLogger.getOpStatsLogger(STORAGE_GET_ENTRY);
    pageScanStats = statsLogger.getOpStatsLogger(STORAGE_SCRUB_PAGES_SCANNED);
    retryCounter = statsLogger.getCounter(STORAGE_SCRUB_PAGE_RETRIES);
}

初始化的顺序为:

  • 创建EntryLog
  • 初始化LedgerCacheImpl
  • 初始化 GC 线程

#####1.1.1.1 创建EntryLog

InterleavedLedgerStorage 初始化是首先会创建一个EntryLogger,EntryLogger管理bookKeeper entry的写入。所有entry都会写入到一个common log中,LedgerCache 中保存了指向文件中entry所处的offset的指针信息。Entry log 文件以一个long型数命名。

public EntryLogger(ServerConfiguration conf,
            LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger,
            ByteBufAllocator allocator) throws IOException {
        // netty frame的最大size,预留了500字节给协议
        this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500;
        this.allocator = allocator;
        this.ledgerDirsManager = ledgerDirsManager;
        this.conf = conf;
        entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
        if (listener != null) {
            addListener(listener);
        }
        // 初始化日志文件Header:
  			// Header是1KB的块,存放在 Entry logger 文件的头部位置,包含一些指纹和元数据
    		// Header有以下部分组成,
  			// 1. 指纹信息:四个字节,BKLO
  			// 2. 日志文件Header版本:4个字节
    	  // 3. Ledger map的offset: 8个字节
  			// 4. Ledger数量: 4个字节
        logfileHeader.writeBytes("BKLO".getBytes(UTF_8));
        logfileHeader.writeInt(HEADER_CURRENT_VERSION);
        logfileHeader.writerIndex(LOGFILE_HEADER_SIZE);

        // 遍历所有的ledge目录,找到最大的logId
        long logId = INVALID_LID;
        for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
            if (!dir.exists()) {
                throw new FileNotFoundException(
                        "Entry log directory '" + dir + "' does not exist");
            }
            long lastLogId = getLastLogId(dir);
            if (lastLogId > logId) {
                logId = lastLogId;
            }
        }
   			// 设置最大的UnFlushedLogId
        this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1);
  			// 设置EntryLoggerAllocator
        this.entryLoggerAllocator = new EntryLoggerAllocator(conf, ledgerDirsManager, recentlyCreatedEntryLogsStatus,
                logId, allocator);
  			// 初始化 EntryLogManager
        if (entryLogPerLedgerEnabled) {
            this.entryLogManager = new EntryLogManagerForEntryLogPerLedger(conf, ledgerDirsManager,entryLoggerAllocator, listeners, recentlyCreatedEntryLogsStatus, statsLogger);
        } else {
            this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, ledgerDirsManager, entryLoggerAllocator,listeners, recentlyCreatedEntryLogsStatus);
        }
    }

可以看到这里最后会创建EntryLogManager,下面是EntryLogManager的接口信息:

interface EntryLogManager {

    // 向对应的entrylog添加一个Entry,返回在entrylog中的位置
    long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException;

  	// 获取当前EntryLogId对应的 active logChannel,如果没有返回null。
    BufferedLogChannel getCurrentLogIfPresent(long entryLogId);

    // 返回可写的ledger dir,用来创建新的entryLog
    File getDirForNextEntryLog(List<File> writableLedgerDirs);

    // cp
    void checkpoint() throws IOException;

    // flush 当前以及滚动的log
    void flush() throws IOException;
		
    // 关闭log
    void close() throws IOException;

    // 强制关闭log
    void forceClose();

  	// 在执行SortedLedgerStorage cp之前prepare entryLogger/entryLogManager
    void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException;

    // 这个方法需要在 EntryMemTable flush之前调用,会在 EntryMemTable flush之前保存 EntryLog的状态
    // commitEntryMemTableFlush 会在 EntryMemTable flush 之后执行响应的操作
    void prepareEntryMemTableFlush();

    // 这个方法需要在 entrymemtable flush 之后调用,假定每个 commitEntryMemTableFlush 都会有 
    // 相应的prepareEntryMemTableFlush,并且这两个请求都是通过同一个线程发出。
    boolean commitEntryMemTableFlush() throws IOException;

    // 创建新的log用于压缩
    BufferedLogChannel createNewLogForCompaction() throws IOException;
}
1.1.1.2 初始化LedgerCacheImpl

LedgerCache接口如下:

// LedgerCache维护entry log file 文件位置的映射,(LedgerId, EntryId)-> (EntryId,Offset)
public interface LedgerCache extends Closeable {

    boolean setFenced(long ledgerId) throws IOException;
    boolean isFenced(long ledgerId) throws IOException;

    void setMasterKey(long ledgerId, byte[] masterKey) throws IOException;
    byte[] readMasterKey(long ledgerId) throws IOException, BookieException;
    boolean ledgerExists(long ledgerId) throws IOException;
		// 写入entry的offset信息
    void putEntryOffset(long ledger, long entry, long offset) throws IOException;
    // 获取entry的offset信息
    long getEntryOffset(long ledger, long entry) throws IOException;

    void flushLedger(boolean doAll) throws IOException;
    long getLastEntry(long ledgerId) throws IOException;

    Long getLastAddConfirmed(long ledgerId) throws IOException;
    long updateLastAddConfirmed(long ledgerId, long lac) throws IOException;
    boolean waitForLastAddConfirmedUpdate(long ledgerId, long previousLAC, Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
    void cancelWaitForLastAddConfirmedUpdate(long ledgerId, Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;

    void deleteLedger(long ledgerId) throws IOException;

    void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException;
    ByteBuf getExplicitLac(long ledgerId);

    // 表示index不存在异常
    class NoIndexForLedger extends IOException {
        NoIndexForLedger(String reason, Exception cause) {
            super(reason, cause);
        }
    }

    // 表示index页
    interface PageEntries {
        LedgerEntryPage getLEP() throws IOException;
        long getFirstEntry();
        long getLastEntry();
    }

    /**
     * Iterable over index pages -- returns PageEntries rather than individual
     * entries because getEntries() above needs to be able to throw an IOException.
     */
    interface PageEntriesIterable extends AutoCloseable, Iterable<PageEntries> {}

    PageEntriesIterable listEntries(long ledgerId) throws IOException;

    OfLong getEntriesIterator(long ledgerId) throws IOException;

    // ledger 的元数据信息
    class LedgerIndexMetadata {
        public final byte[] masterKey;
        public final long size;
        public final boolean fenced;
        LedgerIndexMetadata(byte[] masterKey, long size, boolean fenced) {
            this.masterKey = masterKey;
            this.size = size;
            this.fenced = fenced;
        }

        public String getMasterKeyHex() {
            if (null == masterKey) {
                return "NULL";
            } else {
                return bytes2Hex(masterKey);
            }
        }
    }

    LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException;
}

LedgerCacheImpl 的作用是映射 ledger entry number(ledgerId, entryId) 到 entry 日志文件的(entrylogid,offset)信息。

public class LedgerCacheImpl implements LedgerCache {
    private static final Logger LOG = LoggerFactory.getLogger(LedgerCacheImpl.class);

    private final IndexInMemPageMgr indexPageManager;
    private final IndexPersistenceMgr indexPersistenceManager;
    private final int pageSize;
    private final int entriesPerPage;

    public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers, LedgerDirsManager ledgerDirsManager) throws IOException {
        this(conf, activeLedgers, ledgerDirsManager, NullStatsLogger.INSTANCE);
    }

    public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers, LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) throws IOException {
  			// index page的大小,
        this.pageSize = conf.getPageSize();
        this.entriesPerPage = pageSize / 8;
        // 初始化 indexPersistenceManager
        this.indexPersistenceManager = new IndexPersistenceMgr(pageSize, entriesPerPage, conf, activeLedgers, ledgerDirsManager, statsLogger);
      	// 初始化 indexPageManager
        this.indexPageManager = new IndexInMemPageMgr(pageSize, entriesPerPage, conf,
                indexPersistenceManager, statsLogger);
    }
}

初始化LedgerCacheImpl时,会初始化IndexInMemPageMgr和IndexPersistenceMgr。

IndexPersistenceMgr 负责将 持久化index的内容。

public IndexPersistenceMgr(int pageSize,
                           int entriesPerPage,
                           ServerConfiguration conf,
                           SnapshotMap<Long, Boolean> activeLedgers,
                           LedgerDirsManager ledgerDirsManager,
                           StatsLogger statsLogger) throws IOException {
    // 打开文件的最大数目
    this.openFileLimit = conf.getOpenFileLimit();
    this.activeLedgers = activeLedgers;
    this.ledgerDirsManager = ledgerDirsManager;
    this.pageSize = pageSize;
    this.entriesPerPage = entriesPerPage;
    LOG.info("openFileLimit = {}", openFileLimit);
  
    // 遍历 ledger dirs 查找ledger index 文件,index文件名是ledgeId的十六进制格式
    // 如果ledger dir存在对应的index文件,就把这个ledgerId标识为active
    getActiveLedgers();


    int concurrencyLevel = Math.max(1, Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
    // 创建 file info cache
    fileInfoBackingCache = new FileInfoBackingCache(this::createFileInfoBackingFile,
            conf.getFileInfoFormatVersionToWrite());
    RemovalListener<Long, CachedFileInfo> fileInfoEvictionListener = this::handleLedgerEviction;
    writeFileInfoCache = buildCache(
        concurrencyLevel,
        conf.getFileInfoCacheInitialCapacity(),
        openFileLimit,
        conf.getFileInfoMaxIdleTime(),
        fileInfoEvictionListener);
    readFileInfoCache = buildCache(
        concurrencyLevel,
        2 * conf.getFileInfoCacheInitialCapacity(),
        2 * openFileLimit,
        conf.getFileInfoMaxIdleTime(),
        fileInfoEvictionListener);

    // Expose Stats
    persistenceMgrStats = new IndexPersistenceMgrStats(
        statsLogger,
        () -> writeFileInfoCache.size(),
        () -> readFileInfoCache.size()
    );
}

IndexInMemPageMgr

public IndexInMemPageMgr(int pageSize,
                         int entriesPerPage,
                         ServerConfiguration conf,
                         IndexPersistenceMgr indexPersistenceManager,
                         StatsLogger statsLogger) {
    this.pageSize = pageSize;
    this.entriesPerPage = entriesPerPage;
    this.indexPersistenceManager = indexPersistenceManager;
    this.pageMapAndList = new InMemPageCollection(statsLogger);

    long maxDirectMemory = DirectMemoryUtils.maxDirectMemory();

    if (conf.getPageLimit() <= 0) {
        // By default, allocate a third of the direct memory to the page cache
        this.pageLimit = (int) ((maxDirectMemory / 3) / this.pageSize);
    } else {
        this.pageLimit = conf.getPageLimit();
    }
    LOG.info("maxDirectMemory = {}, pageSize = {}, pageLimit = {}",
            maxDirectMemory, pageSize, pageLimit);
    // Expose Stats
    this.ledgerCacheHitCounter = statsLogger.getCounter(LEDGER_CACHE_HIT);
    this.ledgerCacheMissCounter = statsLogger.getCounter(LEDGER_CACHE_MISS);
    this.ledgerCacheReadPageStats = statsLogger.getOpStatsLogger(LEDGER_CACHE_READ_PAGE);
    // Export sampled stats for index pages, ledgers.
    statsLogger.registerGauge(
            NUM_INDEX_PAGES,
            new Gauge<Integer>() {
                @Override
                public Integer getDefaultValue() {
                    return 0;
                }
                @Override
                public Integer getSample() {
                    return getNumUsedPages();
                }
            }
    );
}
1.1.1.3 初始化 GC 线程

GC线程在后台运行,删除那些没有active ledger的entry log file.

public GarbageCollectorThread(ServerConfiguration conf,
                              LedgerManager ledgerManager,
                              final CompactableLedgerStorage ledgerStorage,
                              StatsLogger statsLogger,
                              ScheduledExecutorService gcExecutor)
    throws IOException {
    this.gcExecutor = gcExecutor;
    this.conf = conf;

    this.entryLogger = ledgerStorage.getEntryLogger();
    this.ledgerStorage = ledgerStorage;
    this.gcWaitTime = conf.getGcWaitTime();

    this.numActiveEntryLogs = 0;
    this.totalEntryLogSize = 0L;
    this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage, conf, statsLogger);
    this.gcStats = new GarbageCollectorStats(
        statsLogger,
        () -> numActiveEntryLogs,
        () -> totalEntryLogSize,
        () -> garbageCollector.getNumActiveLedgers()
    );

    this.garbageCleaner = ledgerId -> {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("delete ledger : " + ledgerId);
            }
            gcStats.getDeletedLedgerCounter().inc();
            ledgerStorage.deleteLedger(ledgerId);
        } catch (IOException e) {
            LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
        }
    };

    // compaction parameters
    minorCompactionThreshold = conf.getMinorCompactionThreshold();
    minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
    majorCompactionThreshold = conf.getMajorCompactionThreshold();
    majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND;
    isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();

    AbstractLogCompactor.LogRemovalListener remover = new AbstractLogCompactor.LogRemovalListener() {
        @Override
        public void removeEntryLog(long logToRemove) {
            GarbageCollectorThread.this.removeEntryLog(logToRemove);
        }
    };
    if (conf.getUseTransactionalCompaction()) {
        this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
    } else {
        this.compactor = new EntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
    }

    if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) {
        if (minorCompactionThreshold > 1.0f) {
            throw new IOException("Invalid minor compaction threshold "
                                + minorCompactionThreshold);
        }
        if (minorCompactionInterval <= gcWaitTime) {
            throw new IOException("Too short minor compaction interval : "
                                + minorCompactionInterval);
        }
        enableMinorCompaction = true;
    }

    if (majorCompactionInterval > 0 && majorCompactionThreshold > 0) {
        if (majorCompactionThreshold > 1.0f) {
            throw new IOException("Invalid major compaction threshold "
                                + majorCompactionThreshold);
        }
        if (majorCompactionInterval <= gcWaitTime) {
            throw new IOException("Too short major compaction interval : "
                                + majorCompactionInterval);
        }
        enableMajorCompaction = true;
    }

    if (enableMinorCompaction && enableMajorCompaction) {
        if (minorCompactionInterval >= majorCompactionInterval
            || minorCompactionThreshold >= majorCompactionThreshold) {
            throw new IOException("Invalid minor/major compaction settings : minor ("
                                + minorCompactionThreshold + ", " + minorCompactionInterval
                                + "), major (" + majorCompactionThreshold + ", "
                                + majorCompactionInterval + ")");
        }
    }

    LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold="
           + minorCompactionThreshold + ", interval=" + minorCompactionInterval);
    LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold="
           + majorCompactionThreshold + ", interval=" + majorCompactionInterval);

    lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis();
}

####1.1.2 MemTable初始化

EntryMemTable是基于EntrySkipList实现的,EntrySkipList基于ConcurrentSkipListMap实现,可以简单理解为一个有序的map结构。

public EntryMemTable(final ServerConfiguration conf, final CheckpointSource source,
                     final StatsLogger statsLogger) {
    this.checkpointSource = source;
    this.kvmap = newSkipList();
    this.snapshot = EntrySkipList.EMPTY_VALUE;
    this.conf = conf;
    this.size = new AtomicLong(0);
    this.allocator = new SkipListArena(conf);
    this.previousFlushSucceeded = new AtomicBoolean(true);
    // skip list size limit
    this.skipListSizeLimit = conf.getSkipListSizeLimit();

    if (skipListSizeLimit > (Integer.MAX_VALUE - 1) / 2) {
        // gives 2*1023MB for mem table.
        // consider a way to create semaphore with long num of permits
        // until that 1023MB should be enough for everything (tm)
        throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2));
    }
    // double the size for snapshot in progress + incoming data
    this.skipListSemaphore = new Semaphore((int) skipListSizeLimit * 2);

    // Stats
    this.memTableStats = new EntryMemTableStats(statsLogger);
}

1.2 Journal

1.2.1 Journal 初始化

// instantiate the journals
journals = Lists.newArrayList();
for (int i = 0; i < journalDirectories.size(); i++) {
    journals.add(new Journal(i, journalDirectories.get(i),
            conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
}

创建Journal的操作主要是,为每个journalDirectory创建一个Journal对象,可以看到Journal对象中,保存了LedgerDirManager的引用。下面看一下Journal的初始化过程,设置了journal和运行中所需的一些参数。

public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
        LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator) {
    super("BookieJournal-" + conf.getBookiePort());
    this.allocator = allocator;

    if (conf.isBusyWaitEnabled()) {
        // 如果busywait等待开启,可以回去更低的延迟
        queue = new BlockingMpscQueue<>(conf.getJournalQueueSize());
        forceWriteRequests = new BlockingMpscQueue<>(conf.getJournalQueueSize());
    } else {
        queue = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
        forceWriteRequests = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
    }
		
    this.ledgerDirsManager = ledgerDirsManager;
    this.conf = conf;
    this.journalDirectory = journalDirectory;
    // journal文件的最大容量
    this.maxJournalSize = conf.getMaxJournalSizeMB() * MB;
    // pre-allocation 的容量
    this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB;
    // journal 写缓冲区的大小
    this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
    // journal是否在ack之前将数据刷盘(fsync)
    this.syncData = conf.getJournalSyncData();
    // 保留的journal文件数量
    this.maxBackupJournals = conf.getMaxBackupJournals();
    // ForceWrite线程,后一个后台线程,定期持久化journal内容
    this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites());
    // 最大的group时间,在此之后,触发flush
    this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
  	// 最大的group缓冲大小,达到阈值之后,触发flush
    this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
  	// 最大的group 时间大小,达到阈值之后,触发flush
    this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
    this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite();
  	// journal写的补齐大小
    this.journalAlignmentSize = conf.getJournalAlignmentSize();
  	// journal 回调处理的线程数
    if (conf.getNumJournalCallbackThreads() > 0) {
        this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
                                                     new DefaultThreadFactory("bookie-journal-callback"));
    } else {
        this.cbThreadPool = MoreExecutors.newDirectExecutorService();
    }
  	// 是否在queue empty时,执行flush;如果有最大的等待的时间(要求group force write),则可以不执行
  	// empty flush
    this.flushWhenQueueEmpty = maxGroupWaitInNanos <= 0 || conf.getJournalFlushWhenQueueEmpty();
		// force write之后,是否从page cache中移除page
    this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
    // 获取last log mark,会遍历ledger目录
    if (conf.getJournalDirs().length == 1) {
        lastMarkFileName = LAST_MARK_DEFAULT_NAME;
    } else {
        lastMarkFileName = LAST_MARK_DEFAULT_NAME + "." + journalIndex;
    }
    lastLogMark.readLog();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
    }

    // Expose Stats
    this.journalStats = new JournalStats(statsLogger);
}

Journal 启动

在Bookie启动时,会读取journal的内容,并且重放,然后更新 LastLogMark对象。LastLogMark记录记录了file和fileposition信息,说明这个位置之前的entry都已经持久化。

// replay journals
try {
    readJournal();
} catch (IOException | BookieException ioe) {
    LOG.error("Exception while replaying journals, shutting down", ioe);
    shutdown(ExitCode.BOOKIE_EXCEPTION);
    return;
}

重放的过程是把journal中需要重放的内容,全部写到 ledger storage中。

  • 读取journal文件的内容,更具不同类型做不同处理
    • METAENTRY_ID_LEDGER_KEY:获取master key信息,在journal 和 ledger storage 中保存ledger的maser key映射
    • METAENTRY_ID_FENCE_KEY: 查找对应的handler,然后将ledger storage置为fence状态
    • METAENTRY_ID_LEDGER_EXPLICITLAC : 查找对应的handler,然后将LAC信息写入
    • 将从journal中读取的entry信息,写入ledger storage
  • replay之后,更新journal的LastLogMark

然后会启动journal线程,每个journal目录对应一个journalThread,journal线程负责将journal entry持久化到journal 文件,此外还负责journal文件的滚动操作,滚动后的文件在SyncThread中会删除。

当向bookie写入entry时,entry会写入journal,首先写入queue中,然后journal线程和forcewritethread刷盘。

journal 写入entry

Journal线程会不断从queue取出entry,然后写入journal文件的bufferedchannel的writecache,这个过程不是每个entry都会写入的,每个queue都会写入writecache,writecache写入filechannel是分组写入的,控制条件有四个,满足其中之一就会写入:

  • maxGroupWaitInNanos,指最多等待这个时间就会把当前一组写入,其实这里的逻辑不是这样的,当发现一个entry 以及超出了maxGroupWaitInNanos时,会尝试查找更多的超时entry一起写入,如果没有更多的entry执行flush操作,这里的flush是指将writecache数据写入filechannel
  • bufferedEntriesThreshold:最大缓存的entry数量,超过执行flush
  • bufferedWritesThreshold :最大缓存的数据量,超过执行flush
  • flushWhenQueueEmpty:如果没有entry写入queue,并且允许flush empty queue,则执行flush

执行flush的逻辑,就是将writeCache数据写入filechannel,这个过程还会为这批flush的entry创建一个forceWrite请求,放在forceWriteRequests中,ForceWriteThread为已经写入的数据执行刷盘逻辑。需要注意的是forceWrite请求只是用来表示是否需要执行刷盘,没有其他作用。

具体流程是:

  • 从forceWriteRequests中取出一个请求
    • 如果允许enableGroupForceWrites,会生成一个特殊的forceWrite请求,这个forceWrite请求作为一个marker存在,并且会添加到forceWriteRequests中,第一个请求都会刷盘,后续的请求是否刷盘由前一个请求的类型决定,如果是普通的请求,则不会执行刷盘,如果是marker,则执行刷盘。
    • 如果不允许enableGroupForceWrites,每有一个forceWriteRequests都执行一次刷盘

journal文件的整体写入过程是:

client -> queue -> writecahe -> fileChannel -> file ,其中 queue -> writecahe -> fileChannel 由journal线程控制,最后一个过程由forceWriteThread控制

1.3 创建SyncTread

SyncThread 是一个后台线程,主要是对 ledger storage 做checkpoint。在ledger storage 完成cp之后,在 cp之前添加的journal 文件会被清理。

在所有的数据都被持久化到ledger index 文件和entry logger之后,持久化 log marker 到磁盘就是安全的。这LogMark表示的是journal文件的某个位置。如果bookie在log mark之后失败,那么bookie可以从log mark开始继续journal的entries,这样可以保证数据不会丢失。

在持久化LogMark之后,所有比log mark小的journal 文件都可以被删除。可以通过设置保留一定数量的journal文件,用来做灾难之后的手动恢复。

public SyncThread(ServerConfiguration conf,
                  LedgerDirsListener dirsListener,
                  LedgerStorage ledgerStorage,
                  CheckpointSource checkpointSource) {
    this.dirsListener = dirsListener;
    this.ledgerStorage = ledgerStorage;
    this.checkpointSource = checkpointSource;
    this.executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("SyncThread"));
}

1.4 创建HandleFactoryImpl

HandleFactoryImpl(LedgerStorage ledgerStorage) {
    this.ledgerStorage = ledgerStorage;
    this.ledgers = new ConcurrentLongHashMap<>();
    this.readOnlyLedgers = new ConcurrentLongHashMap<>();

    ledgerStorage.registerLedgerDeletionListener(this);
}

2. 数据写入流程

向Bookie写入一条数据,在NettyServer接收到AddEntry的请求之后,会通过BookieRequestProcessor做处理,然后调用到Bookie#addEntry方法。

public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
        throws IOException, BookieException, InterruptedException {
    long requestNanos = MathUtils.nowInNano();
    boolean success = false;
    int entrySize = 0;
    try {
        // 获得handle
        LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
        synchronized (handle) {
            if (handle.isFenced()) {
                throw BookieException
                        .create(BookieException.Code.LedgerFencedException);
            }
            entrySize = entry.readableBytes();
            // 写入
            addEntryInternal(handle, entry, ackBeforeSync, cb, ctx, masterKey);
        }
        success = true;
    } ...

        entry.release();
    }
}

这里主要的逻辑:

  • 获得entry的handle,这里的handle是 LegerDescriptorImpl,主要 封装了 ledgerStorage、ledgerId和masterKey
  • 调用addEntryInternal方法写入
private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
                              boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
        throws IOException, BookieException, InterruptedException {
    long ledgerId = handle.getLedgerId();
    // 通过 handle 执行entry操作,
    long entryId = handle.addEntry(entry);

    bookieStats.getWriteBytes().add(entry.readableBytes());

    // journal `addEntry` should happen after the entry is added to ledger storage.
    // otherwise the journal entry can potentially be rolled before the ledger is created in ledger storage.
    if (masterKeyCache.get(ledgerId) == null) {
        // Force the load into masterKey cache
        byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey);
        if (oldValue == null) {
            // new handle, we should add the key to journal ensure we can rebuild
            ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length);
            bb.putLong(ledgerId);
            bb.putLong(METAENTRY_ID_LEDGER_KEY);
            bb.putInt(masterKey.length);
            bb.put(masterKey);
            bb.flip();
            // 写journal 文件头
            getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync */, new NopWriteCallback(), null);
        }
    }

    if (LOG.isTraceEnabled()) {
        LOG.trace("Adding {}@{}", entryId, ledgerId);
    }
    // 写entry
    getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx);
}

这的执行流程是:

  • 向Ledger写入entry,通过handler写entry,实际是调用LedgerStorage#addEntry方法
  • 向journal写数据,调用Journal#logAddEntry方法

2.1 LedgerStorage 的数据写入流程:

这里以SortedLedgerStorage为例:

public long addEntry(ByteBuf entry) throws IOException {
    long ledgerId = entry.getLong(entry.readerIndex() + 0);
    long entryId = entry.getLong(entry.readerIndex() + 8);
    long lac = entry.getLong(entry.readerIndex() + 16);
    // 调用 EntryMemTable#addEntry方法向memtable写入
    memTable.addEntry(ledgerId, entryId, entry.nioBuffer(), this);
    // 向memtable写入之后,调用ledgercache#updateLastAddConfirmed 更新LAC信息
    // 注意这里不是单个bookie 自己去更新的LAC,lac是entry里自己携带过来的
    interleavedLedgerStorage.ledgerCache.updateLastAddConfirmed(ledgerId, lac);
    return entryId;
}

这里主要是两个步骤:

  • 将数据写入memtable
  • 更新LedgerStorage中ledgerCache的LAC信息

2.1.1 数据写入MemTable

// 注意这的返回值不精确
public long addEntry(long ledgerId, long entryId, final ByteBuffer entry, final CacheCallback cb)
        throws IOException {
    long size = 0;
    long startTimeNanos = MathUtils.nowInNano();
    boolean success = false;
    try {
        // 如果memtable的容量超过限制,或者上次flush没有成功,就先做一次snapshot
        // snapshot是讲kvmap保存在snapshot中,然后重新创建一个kvmap
        // snapshot和kvmap都是EntrySkipList类型
        if (isSizeLimitReached() || (!previousFlushSucceeded.get())) {
            Checkpoint cp = snapshot();
            if ((null != cp) || (!previousFlushSucceeded.get())) {
                // 回调处理
                cb.onSizeLimitReached(cp);
            }
        }

        final int len = entry.remaining();
        if (!skipListSemaphore.tryAcquire(len)) {
            memTableStats.getThrottlingCounter().inc();
            final long throttlingStartTimeNanos = MathUtils.nowInNano();
            skipListSemaphore.acquireUninterruptibly(len);
            memTableStats.getThrottlingStats()
                .registerSuccessfulEvent(MathUtils.elapsedNanos(throttlingStartTimeNanos), TimeUnit.NANOSECONDS);
        }

        this.lock.readLock().lock();
        try {
            // 这里会为Entry分配内存,然后构造出一个EntryKeyValue
            EntryKeyValue toAdd = cloneWithAllocator(ledgerId, entryId, entry);
            size = internalAdd(toAdd);
        } finally {
            this.lock.readLock().unlock();
        }
        success = true;
        return size;
    } finally {
        if (success) {
            memTableStats.getPutEntryStats()
                .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
        } else {
            memTableStats.getPutEntryStats()
                .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
        }
    }
}

写入MemTable的过程如上述代码所示:

  • 如果大小达到限制或者上次flush未成功
    • 先做一次snapshot,snapshot是讲kvmap保存在snapshot中,然后重新创建一个kvmap
    • 回调处理达到限制,回调主体的SortedLedgerManager
  • 将Entry封装成EntryKeyValue
  • 将EntryKeyValue加入到kvmap中

这里详细解释一下回调的处理。

2.1.1.1 MemTable达到上限时的回调处理

SortedLedgerManager#onSizeLimitReached方法如下:

public void onSizeLimitReached(final Checkpoint cp) throws IOException {
    LOG.info("Reached size {}", cp);
    scheduler.execute(new Runnable() {
        @Override
        public void run() {
            try {
                LOG.info("Started flushing mem table.");
                // 在EntryLoggerManager中记录未被flush的文件entry log id
                interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush();
                // flush memTable
                memTable.flush(SortedLedgerStorage.this);
                if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) {
                    interleavedLedgerStorage.checkpointer.startCheckpoint(cp);
                }
            } catch (Exception e) {
                stateManager.transitionToReadOnlyMode();
                LOG.error("Exception thrown while flushing skip list cache.", e);
            }
        }
    });
}

这里的执行逻辑是:

  • flush之前:在EntryLoggerManager中记录entry log id,logIdBeforeFlush
  • flush :flush memtable
  • flush 之后: 通过interleavedLedgerStorage.getEntryLogger()处理flush之后的事情,并且更新cp信息
Before flush

在EntryLoggerManager中记录memtable要flush到的entry log id,logIdBeforeFlush

Running flush

flush的流程就是将MemTable的内容写入到Entry log file的File Channel里。

主要的调用流程为:

EntryMemTable#flush ->

​ EntryMemTable#flushSnapshot ->

​ SortedLedgerStorage#process ->

​ InterleavedLedgerStorage#processEntry ->

​ 1. EntryLogManagerForSingleEntryLog#addEntry

​ 2. LedgerCacheImpl#putEntryOffset

EntryLogManagerForSingleEntryLog#addEntry

 public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
        int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
   			// 获取logChannel用于flush memTable
        BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
        ByteBuf sizeBuffer = sizeBufferForAdd.get();
        sizeBuffer.clear();
        sizeBuffer.writeInt(entry.readableBytes());
   			// 先flush entry 长度
        logChannel.write(sizeBuffer);

        long pos = logChannel.position();
    		// 然后flush entry
        logChannel.write(entry);
   			// flush 完成之后,在EntryLogMeta中记录这个ledger的当前大小
        logChannel.registerWrittenEntry(ledger, entrySize);

        return (logChannel.getLogId() << 32L) | pos;
    }

获取 BufferedLogChannel 的逻辑如下:

@Override
synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize,
        boolean rollLog) throws IOException {
    if (null == activeLogChannel) {
        // log channel can be null because the file is deferred to be created
        createNewLog(UNASSIGNED_LEDGERID, "because current active log channel has not initialized yet");
    }

    boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(activeLogChannel, entrySize)
            : readEntryLogHardLimit(activeLogChannel, entrySize);
    // Create new log if logSizeLimit reached or current disk is full
    boolean createNewLog = shouldCreateNewEntryLog.get();
    if (createNewLog || reachEntryLogLimit) {
        if (activeLogChannel != null) {
            activeLogChannel.flushAndForceWriteIfRegularFlush(false);
        }
        createNewLog(UNASSIGNED_LEDGERID,
            ": createNewLog = " + createNewLog + ", reachEntryLogLimit = " + reachEntryLogLimit);
        // Reset the flag
        if (createNewLog) {
            shouldCreateNewEntryLog.set(false);
        }
    }
    return activeLogChannel;
}
  • 如果当前的EntryLogManager没有activeLogChannel,就创建新的log
  • 如果Entry log大小达到上限,现将 activeLogChannel 的writeBuffer的内容写入file channel并且刷盘,然后创建新的log

创建新的日志文件

void createNewLog(long ledgerId, String reason) throws IOException {
    if (ledgerId != UNASSIGNED_LEDGERID) {
        log.info("Creating a new entry log file for ledger '{}' {}", ledgerId, reason);
    } else {
        log.info("Creating a new entry log file {}", reason);
    }

    BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
    // first tried to create a new log channel. add current log channel to ToFlush list only when
    // there is a new log channel. it would prevent that a log channel is referenced by both
    // *logChannel* and *ToFlush* list.
    if (null != logChannel) {

        // flush the internal buffer back to filesystem but not sync disk
        logChannel.flush();

        // Append ledgers map at the end of entry log
        logChannel.appendLedgersMap();

        BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
        setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
        log.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
                logChannel.getLogId(), rotatedLogChannels);
        for (EntryLogListener listener : listeners) {
            listener.onRotateEntryLog();
        }
    } else {
        setCurrentLogForLedgerAndAddToRotate(ledgerId,
                entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()));
    }
}
  • 如果当前channel 不是null,就先将channel的writebuffer内容写入filechannel,然后写entrylogmeta中的ledgersmap信息到filechannel

  • 创建新的BufferedLogChannel(createNewLOg),并将老的channel添加到滚动logChannel集合中

    此时会为新的BufferdLogChannel选择目录,选择目录的逻辑是从ledgerDirManager中拿到可写的dirs,对dirs进行shuffle之后,取第一个目录。

createNewLog最后会调用到

EntryLoggerAllocator#createNewLog ->

​ EntryLoggerAllocator#allocateNewLog

// 分配一个新的日志文件
private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog, String suffix) throws IOException {
    List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
    String logFileName;
    // It would better not to overwrite existing entry log files
    File testLogFile = null;
    do {
        if (preallocatedLogId >= Integer.MAX_VALUE) {
            preallocatedLogId = 0;
        } else {
            ++preallocatedLogId;
        }
        logFileName = Long.toHexString(preallocatedLogId) + suffix;
        for (File dir : ledgersDirs) {
            testLogFile = new File(dir, logFileName);
            if (testLogFile.exists()) {
                // 如果文件已经存在,打印warn信息
                log.warn("Found existed entry log " + testLogFile
                       + " when trying to create it as a new log.");
                testLogFile = null;
                break;
            }
        }
    } while (testLogFile == null);
    // 新建文件
    File newLogFile = new File(dirForNextEntryLog, logFileName);
    // 创建到文件的file channel
    FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
    // 创建 BufferedLogChannel
    BufferedLogChannel logChannel = new BufferedLogChannel(byteBufAllocator, channel, conf.getWriteBufferBytes(), conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
    logfileHeader.readerIndex(0);
  	// 向新的日志文件写入log Header信息
    logChannel.write(logfileHeader);
		// 在所有的ledger目录里,记录最后一个日志文件的id信息
    for (File f : ledgersDirs) {
        setLastLogId(f, preallocatedLogId);
    }

    if (suffix.equals(EntryLogger.LOG_FILE_SUFFIX)) {
        recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
    }

    log.info("Created new entry log file {} for logId {}.", newLogFile, preallocatedLogId);
    return logChannel;
}

MemTable#flush操作涉及的逻辑大概就这么多。

After flush
if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) {
    interleavedLedgerStorage.checkpointer.startCheckpoint(cp);
}

memtable flush之后的逻辑:

@Override
public boolean commitEntryMemTableFlush() throws IOException {
    long logIdAfterFlush = getCurrentLogId();
    /*
     * in any case that an entry log reaches the limit, we roll the log
     * and start checkpointing. if a memory table is flushed spanning
     * over two entry log files, we also roll log. this is for
     * performance consideration: since we don't wanna checkpoint a new
     * log file that ledger storage is writing to.
     */
    if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != logIdBeforeFlush) {
        log.info("Rolling entry logger since it reached size limitation");
        createNewLog(UNASSIGNED_LEDGERID,
            "due to reaching log limit after flushing memtable : logIdBeforeFlush = "
                + logIdBeforeFlush + ", logIdAfterFlush = " + logIdAfterFlush);
        return true;
    }
    return false;
}

这里的逻辑是如果BufferedLogChannel达到上限,就执行创建新的日志文件。

然后需要做一次checkpoint,checkpoint在SyncThread线程中做:

 @VisibleForTesting
    public void checkpoint(Checkpoint checkpoint) {
        if (null == checkpoint) {
            // do nothing if checkpoint is null
            return;
        }

        try {
            ledgerStorage.checkpoint(checkpoint);
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        try {
            checkpointSource.checkpointComplete(checkpoint, true);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }

主要有两部分的内容:

  • ledgerstorage做checkpoint

  • journal监听cp完成作态,并回调处理

ledgerstorage的CP:

@Override
public void checkpoint(final Checkpoint checkpoint) throws IOException {
 // flush memTable   
 long numBytesFlushed = memTable.flush(this, checkpoint);  
// 这里的主要逻辑是创建新的日志文件
interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
  
...
  
public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException{
        if (numBytesFlushed > 0) {
            // if bytes are added between previous flush and this checkpoint,
            // it means bytes might live at current active entry log, we need
            // roll current entry log and then issue checkpoint to underlying
            // interleaved ledger storage.
            createNewLog(UNASSIGNED_LEDGERID,
                "due to preparing checkpoint : numBytesFlushed = " + numBytesFlushed);
        }
    }
  
  ... 
 // 创建新的新日志文件的逻辑,是首先把当前writeBuffer的内容(以及ledgerMap)写入到filechannel,
 // 创建新的日志文件及filechannel   
 // 将当前active channel 放在滚动的channel的集合中    
 void createNewLog(long ledgerId, String reason) throws IOException {
        if (ledgerId != UNASSIGNED_LEDGERID) {
            log.info("Creating a new entry log file for ledger '{}' {}", ledgerId, reason);
        } else {
            log.info("Creating a new entry log file {}", reason);
        }

        BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
        // first tried to create a new log channel. add current log channel to ToFlush list only when
        // there is a new log channel. it would prevent that a log channel is referenced by both
        // *logChannel* and *ToFlush* list.
        if (null != logChannel) {
            // 写入文件channel,未刷盘
            logChannel.flush();
            logChannel.appendLedgersMap();
						// 创建新的fileChannel
            BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
            // 把当前active channel添加到滚动集合中,并将当前active channel设置为新的filechannel
            setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
            log.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
                    logChannel.getLogId(), rotatedLogChannels);
            for (EntryLogListener listener : listeners) {
                listener.onRotateEntryLog();
            }
        } else {
            setCurrentLogForLedgerAndAddToRotate(ledgerId,
                    entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()));
        }
    }
 // checkpoint逻辑 
 interleavedLedgerStorage.checkpoint(checkpoint);
}
....
 private void flushOrCheckpoint(boolean isCheckpointFlush)// true
            throws IOException {
        boolean flushFailed = false;
        try {
            // 首先 flush LedgerCache
            ledgerCache.flushLedger(true);
        } catch (LedgerDirsManager.NoWritableLedgerDirException e) {
            throw e;
        } catch (IOException ioe) {
            LOG.error("Exception flushing Ledger cache", ioe);
            flushFailed = true;
        }

        try {
            // if it is just a checkpoint flush, we just flush rotated entry log files
            // in entry logger.
            if (isCheckpointFlush) {
                entryLogger.checkpoint();
            } else {
                entryLogger.flush();
            }
        } catch (LedgerDirsManager.NoWritableLedgerDirException e) {
            throw e;
        } catch (IOException ioe) {
            LOG.error("Exception flushing Ledger", ioe);
            flushFailed = true;
        }
        if (flushFailed) {
            throw new IOException("Flushing to storage failed, check logs");
        }
    }

这里主要有两个操作:

  • ledgerCache的flushLedger
  • EntryLogger的checkpoint

LedgetCacheImpl#flushLedger ->

​ IndexInMemPageMgr#flushOneOrMoreLedgers

EntryLogger#checkpoin ->

2.1.2 更新LedgerCache的LAC

总结

输入写入MemTable流程:

  1. 如果MemTable达到上限
    1. memTable首先做snapshot
    2. snapshot之后,会flush memory 到 Entry Log中
    3. 如果Entry Log达到上限,就创建新的Entry Log
    4. flush完成之后,触发SyncThread的checkpoint
  2. 写入kvmap

2.2 journal的数据写入流程

public void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
                        boolean ackBeforeSync, WriteCallback cb, Object ctx)
        throws InterruptedException {
    //Retain entry until it gets written to journal
    entry.retain();

    journalStats.getJournalQueueSize().inc();
    queue.put(QueueEntry.create(
            entry, ackBeforeSync,  ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
            journalStats.getJournalAddEntryStats(),
            journalStats.getJournalQueueSize()));
}

3 文件结构

3.1 Entry Log

3.1.1 Header

头部大小为1KB,卸载entry logger file的头部位置,包含了fingerprint和元数据信息:

  • fingerprint信息:固定的字符串 ”BKLO“ 4字节
  • HeaderVersion信息:4字节
  • Ledger Map的offset: 8字节
  • Ledger count:4字节

3.1.2 LedgerMap信息

EntryLog的末尾部分会加上LedgerMap的相关信息。

public class EntryLogMetadata {
    private final long entryLogId;
    private long totalSize;
    private long remainingSize;
    // ledgersMap中保存的是ledgerId和一个entry的size信息
    private final ConcurrentLongLongHashMap ledgersMap;
   }

LedgerMap的内存结构是ConcurrentLongLongHashMap,存放内容是<LedgerId, TotalEntrySize>。向EntryLogger中写入时会按照每10,000条进行分批,每一批都会有一个Header信息。Header信息包括:

  • LedgerMap的长度信息:LedgerMap的Header长度 + 每个entry的长度 * entry 数量(最大是10K)- 4
  • LedgerId: -1,因为ledgerMap包含了多个Ledger的容量信息,没有固定的ledgerID
  • EntryId : -2 ,原因同LedgerId
  • ledger数量:即ledgerMap的size信息(如果超过10K,则需要切分)

EntryLogger 尾部append LedgerMap逻辑如下,首先分配内存:长度为LedgerMap头部 + (每个LegerMap Entry 大小 * Ledger Map最大批量值)

  • 写入长度信息:长度为4个字节,内容是LedgerMapHeaderSize + LedgerMapEntrySize * BatchSzie - 4,即不包括自身的长度
  • 写入INVALID_LID:长度为8个字节,一个特定的LedgerId,值为-1
  • 写入LEDGERS_MAP_ENTRY_ID: 长度为8个字节,一个特定的EntryId,值为-2
  • 写入batchSize:长度为4个字节,置为Math.min(10000, LedgerMap的size)
  • 写入LedgerMap的每一个entry:长度为16字节,ledgerId(8字节)+ totalEntrySize(8字节)
  • 分批,重复上述逻辑

可以看到EntryLogger的Header信息中有LedgerMap offset 以及ledger count 信息,这些信息需要写完ledgerMap之后,才会获取,因此在写完LedgerMap信息之后,会更新Header中的这两个信息。