zk 文件存储

zk 有 2 种文件,快照和事务日志,快照是某一时刻的全量数据,事务日志中记录了数据的修改事件。

快照的文件名是 snapshot.zxid,zxid 是当前最大的事务 id

// org.apache.zookeeper.server.persistence.FileTxnSnapLog#save
public void save(DataTree dataTree,
        ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
    throws IOException {
    long lastZxid = dataTree.lastProcessedZxid;
    // 快照文件名为 snapshot.lastZxid
    File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
    LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
            snapshotFile);
    snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
    
}

 事务日志的文件名是 log.zxid,zxid 是当前文件第一条日志的事务 id

// org.apache.zookeeper.server.persistence.FileTxnLog#append
public synchronized boolean append(TxnHeader hdr, Record txn)
    throws IOException
{
    if (hdr == null) {
        return false;
    }

    if (hdr.getZxid() <= lastZxidSeen) {
        LOG.warn("Current zxid " + hdr.getZxid()
                + " is <= " + lastZxidSeen + " for "
                + hdr.getType());
    } else {
        lastZxidSeen = hdr.getZxid();
    }

    if (logStream==null) {
       if(LOG.isInfoEnabled()){
            LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
       }
       // 创建一个新的事务日志文件 log.zxid,这里的 zxid 是第一条事务日志的 zxid
       logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
       fos = new FileOutputStream(logFileWrite);
       logStream=new BufferedOutputStream(fos);
       oa = BinaryOutputArchive.getArchive(logStream);
       FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
       fhdr.serialize(oa, "fileheader");
       // Make sure that the magic number is written before padding.
       logStream.flush();
       filePadding.setCurrentSize(fos.getChannel().position());
       streamsToFlush.add(fos);
    }
    filePadding.padFile(fos.getChannel());
    byte[] buf = Util.marshallTxnEntry(hdr, txn);
    if (buf == null || buf.length == 0) {
        throw new IOException("Faulty serialization for header " +
                "and txn");
    }
    Checksum crc = makeChecksumAlgorithm();
    crc.update(buf, 0, buf.length);
    oa.writeLong(crc.getValue(), "txnEntryCRC");
    Util.writeTxnBytes(oa, buf);

    return true;
}

 zk 加载数据:从 snap 文件和 log 文件解析出全量数据

// org.apache.zookeeper.server.persistence.FileTxnSnapLog#restore
public long restore(DataTree dt, Map<Long, Integer> sessions, 
        PlayBackListener listener) throws IOException {
    snapLog.deserialize(dt, sessions);
    return fastForwardFromEdits(dt, sessions, listener);
}

逆序选择 100 个 snap 文件,从最新的文件开始解析,如果有一个文件校验正确,则退出

public long deserialize(DataTree dt, Map<Long, Integer> sessions)
        throws IOException {
    // we run through 100 snapshots (not all of them)
    // if we cannot get it running within 100 snapshots
    // we should  give up
    List<File> snapList = findNValidSnapshots(100);
    if (snapList.size() == 0) {
        return -1L;
    }
    File snap = null;
    boolean foundValid = false;
    for (int i = 0; i < snapList.size(); i++) {
        snap = snapList.get(i);
        InputStream snapIS = null;
        CheckedInputStream crcIn = null;
        try {
            LOG.info("Reading snapshot " + snap);
            snapIS = new BufferedInputStream(new FileInputStream(snap));
            crcIn = new CheckedInputStream(snapIS, new Adler32());
            InputArchive ia = BinaryInputArchive.getArchive(crcIn);
            deserialize(dt, sessions, ia);
            long checkSum = crcIn.getChecksum().getValue();
            long val = ia.readLong("val");
            if (val != checkSum) {
                throw new IOException("CRC corruption in snapshot :  " + snap);
            }
            foundValid = true;
            break;
        } catch(IOException e) {
            LOG.warn("problem reading snap file " + snap, e);
        } finally {
            if (snapIS != null) 
                snapIS.close();
            if (crcIn != null) 
                crcIn.close();
        } 
    }
    if (!foundValid) {
        throw new IOException("Not able to find valid snapshots in " + snapDir);
    }
    dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
    return dt.lastProcessedZxid;
}

对比已经解析出的最大 zxid,选择对应的 log 文件

// org.apache.zookeeper.server.persistence.FileTxnLog.FileTxnIterator#init
void init() throws IOException {
    storedFiles = new ArrayList<File>();
    List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
    for (File f: files) {
        if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
            storedFiles.add(f);
        }
        // add the last logfile that is less than the zxid
        else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
            storedFiles.add(f);
            break;
        }
    }
    goToNextLog();
    if (!next())
        return;
    while (hdr.getZxid() < zxid) {
        if (!next())
            return;
    }
}
原文地址:https://www.cnblogs.com/allenwas3/p/11840780.html