Table of Contents
3.3.saveFSImage(context, sd, nnf);
3.4.FSImageFormatProtobuf.Saver
3.5.1. 写入文件头[ FSImageUtil.FILE_VERSION ]
四. 加载FSImage : FSImage.loadFSImage()
一.前言
Namenode会定期将文件系统的命名空间(文件目录树、文件/ 目录元信息) 保存到fsimage文件中, 以防止Namenode掉电或者进程崩溃。 但如果Namenode实时地将内存中的元数据同步到fsimage文件中, 将会非常消耗资源且造成Namenode运行缓慢。 所以Namenode会先将命名空间的修改操作保存在editlog文件中, 然后定期合并fsimage和editlog文件。
FSImage类主要实现了以下功能:
■ 保存命名空间——将当前时刻Namenode内存中的命名空间保存到fsimage文件中。
■ 加载fsimage文件——将磁盘上fsimage文件中保存的命名空间加载到Namenode内存中, 这个操作是保存命名空间操作的逆操作。
■ 加载editlog文件——Namenode加载了fsimage文件后, 内存中只包含了命名空间在保存fsimage文件时的信息, Namenode还需要加载后续对命名空间的修改操作,即editlog文件中记录的内容。 所以FSImage类还提供了加载editlog文件到Namenode内存中的功能.
二.构建&初始化
FSImage是在FSNamesystem中的loadFromDisk方法进行初始化的
-
-
/**
-
* Construct the FSImage. Set the default checkpoint directories.
-
*
-
* Setup storage and initialize the edit log.
-
*
-
* @param conf Configuration
-
* @param imageDirs Directories the image can be stored in.
-
* @param editsDirs Directories the editlog can be stored in.
-
* @throws IOException if directories are invalid.
-
*/
-
protected FSImage(Configuration conf,
-
Collection<URI> imageDirs,
-
List<URI> editsDirs)
-
throws IOException {
-
this.conf = conf;
-
-
//构建NNStorage ==> NNStorage负责管理NameNode使用的 StorageDirectories。
-
storage =
new NNStorage(conf, imageDirs, editsDirs);
-
-
// dfs.namenode.name.dir.restore 默认: false
-
// 设置为true可使NameNode尝试恢复以前失败的dfs.NameNode.name.dir。
-
// 启用后,将在检查点期间尝试恢复任何失败的目录。
-
if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
-
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
-
storage.setRestoreFailedStorage(
true);
-
}
-
-
-
// 构建 FSEditLog
-
this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
-
-
//
-
archivalManager =
new NNStorageRetentionManager(conf, storage, editLog);
-
}
三.保存命名空间
FSImage类最重要的功能之一就是将当前时刻Namenode的命名空间保存到fsimage文件中。
3.1.saveFSImageInAllDirs()
Namenode可以定义多个存储路径来保存fsimage文件, 对于每一个存储路径,saveFSImageInAllDirs()方法都会启动一个线程负责在这个路径上保存fsimage文件。 同时, 为了防止保存过程中出现错误, 命名空间信息首先会被保存在一个fsimage.ckpt文件中, 当保存操作全部完成之后, 才会将fsimage.ckpt重命名为fsimage文件。 之后saveFSImageInAllDirs()方法会清理Namenode元数据存储文件夹中过期的editlog文件和fsimage文件。
-
/**
-
* Namenode可以定义多个存储路径来保存fsimage文件,
-
*
-
* 1. 对于每一个存储路径,saveFSImageInAllDirs()方法都会启动一个线程负责在这个路径上保存fsimage文件。
-
*
-
* 2. 同时, 为了防止保存过程中出现错误, 命名空间信息首先会被保存在一个fsimage.ckpt文件中,
-
* 当保存操作全部完成之后, 才会将fsimage.ckpt重命名为fsimage文件。
-
*
-
* 3. 之后saveFSImageInAllDirs()方法会清理Namenode元数据存储文件夹中过期的editlog文件和
-
* fsimage文件。
-
*
-
* @param source
-
* @param nnf
-
* @param txid
-
* @param canceler
-
* @throws IOException
-
*/
-
private synchronized void saveFSImageInAllDirs(FSNamesystem source,
-
NameNodeFile nnf, long txid, Canceler canceler)
throws IOException {
-
-
-
StartupProgress prog = NameNode.getStartupProgress();
-
prog.beginPhase(Phase.SAVING_CHECKPOINT);
-
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) ==
0) {
-
throw
new IOException(
"No image directories available!");
-
}
-
if (canceler ==
null) {
-
canceler =
new Canceler();
-
}
-
-
-
//构造保存命名空间操作的上下文
-
SaveNamespaceContext ctx =
new SaveNamespaceContext(source, txid, canceler);
-
-
try {
-
-
//在每一个保存路径上启动一个线程, 该线程使用FSImageSaver类保存fsimage文件
-
List<Thread> saveThreads =
new ArrayList<Thread>();
-
// save images into current
-
for (Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
-
StorageDirectory sd = it.next();
-
-
-
// 命名空间具体的保存操作是由FSImageSaver这个类来承担的,
-
// FSImageSaver是FSImage中的内部类, 也是一个线程类,
-
// 它的run()方法调用了saveFSImage()方法来保存fsimage文件。
-
FSImageSaver saver =
new FSImageSaver(ctx, sd, nnf);
-
-
Thread saveThread =
new Thread(saver, saver.toString());
-
-
-
saveThreads.add(saveThread);
-
saveThread.start();
-
}
-
-
//等待所有线程执行完毕
-
waitForThreads(saveThreads);
-
-
-
saveThreads.clear();
-
storage.reportErrorsOnDirectories(ctx.getErrorSDs());
-
-
//保存文件失败则抛出异常
-
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) ==
0) {
-
throw
new IOException(
-
"Failed to save in any storage directories while saving namespace.");
-
}
-
if (canceler.isCancelled()) {
-
deleteCancelledCheckpoint(txid);
-
ctx.checkCancelled();
// throws
-
assert
false :
"should have thrown above!";
-
}
-
-
-
// 将fsimage.ckpt 改名为 fsimage
-
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf,
false);
-
-
-
-
// Since we now have a new checkpoint, we can clean up some
-
// old edit logs and checkpoints.
-
// Do not purge anything if we just wrote a corrupted FsImage.
-
if (!exitAfterSave.get()) {
-
-
//我们已经完成了fsimage的保存, 那么可以将存储上的一部分editlog和fsimage删除
-
//如果没有成功,则失败.
-
purgeOldStorage(nnf);
-
archivalManager.purgeCheckpoints(NameNodeFile.IMAGE_NEW);
-
}
-
}
finally {
-
// Notify any threads waiting on the checkpoint to be canceled
-
// that it is complete.
-
-
-
//通知所有等待的线程
-
ctx.markComplete();
-
ctx =
null;
-
}
-
-
-
prog.endPhase(Phase.SAVING_CHECKPOINT);
-
-
-
}
3.2.FSImageSaver
命名空间具体的保存操作是由FSImageSaver这个类来承担的, FSImageSaver是FSImage中的内部类, 也是一个线程类,它的run()方法调用了saveFSImage()方法来保存fsimage文件。
-
/**
-
*
-
* 命名空间具体的保存操作是由FSImageSaver这个类来承担的,
-
* FSImageSaver是FSImage中的内部类, 也是一个线程类,
-
* 它的run()方法调用了saveFSImage()方法来保存fsimage文件。
-
*
-
*
-
*
-
* FSImageSaver is being run in a separate thread when saving
-
* FSImage. There is one thread per each copy of the image.
-
*
-
* FSImageSaver assumes that it was launched from a thread that holds
-
* FSNamesystem lock and waits for the execution of FSImageSaver thread
-
* to finish.
-
* This way we are guaranteed that the namespace is not being updated
-
* while multiple instances of FSImageSaver are traversing it
-
* and writing it out.
-
*/
-
private
class FSImageSaver implements Runnable {
-
private
final SaveNamespaceContext context;
-
private
final StorageDirectory sd;
-
private
final NameNodeFile nnf;
-
-
public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd,
-
NameNodeFile nnf) {
-
this.context = context;
-
this.sd = sd;
-
this.nnf = nnf;
-
}
-
-
@Override
-
public void run() {
-
// Deletes checkpoint file in every storage directory when shutdown.
-
Runnable cancelCheckpointFinalizer = () -> {
-
try {
-
deleteCancelledCheckpoint(context.getTxId());
-
LOG.info(
"FSImageSaver clean checkpoint: txid={} when meet " +
-
"shutdown.", context.getTxId());
-
}
catch (IOException e) {
-
LOG.error(
"FSImageSaver cancel checkpoint threw an exception:", e);
-
}
-
};
-
ShutdownHookManager.
get().addShutdownHook(cancelCheckpointFinalizer,
-
SHUTDOWN_HOOK_PRIORITY);
-
try {
-
//保存fsimage文件
-
System.
out.println(
"context : "+ context);
-
System.
out.println(
"sd : "+ sd);
-
System.
out.println(
"nnf : "+ nnf);
-
saveFSImage(context, sd, nnf);
-
}
catch (SaveNamespaceCancelledException snce) {
-
LOG.info(
"Cancelled image saving for " + sd.getRoot() +
-
": " + snce.getMessage());
-
// don't report an error on the storage dir!
-
}
catch (Throwable t) {
-
LOG.error(
"Unable to save image for " + sd.getRoot(), t);
-
context.reportErrorOnStorageDirectory(sd);
-
try {
-
deleteCancelledCheckpoint(context.getTxId());
-
LOG.info(
"FSImageSaver clean checkpoint: txid={} when meet " +
-
"Throwable.", context.getTxId());
-
}
catch (IOException e) {
-
LOG.error(
"FSImageSaver cancel checkpoint threw an exception:", e);
-
}
-
}
-
}
-
-
@Override
-
public String toString() {
-
return
"FSImageSaver for " + sd.getRoot() +
-
" of type " + sd.getStorageDirType();
-
}
-
}
3.3.saveFSImage(context, sd, nnf);
saveFSImage()方法会使用一个FSImageFormat.Saver对象来完成保存操作,FSImageFormat.Saver类会以fsimage文件定义的格式保存Namenode的命名空间信息, 需要注意命名空空间信息会先写入fsimage.ckpt文件中。 saveFSImage()方法还会生成fsimage文件的md5校验文件, 以确保fsimage文件的正确性。
-
-
/**
-
* 将 FS image 的内容保存到文件中。
-
* Save the contents of the FS image to the file.
-
*/
-
void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
-
NameNodeFile dstType) throws IOException {
-
-
//获取当前命名空间中记录的最新事务的txid
-
long txid = context.getTxId();
-
-
// fsimage文件
-
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
-
-
File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
-
-
// FSImageFormatProtobuf.Saver类负责保存fsimage
-
FSImageFormatProtobuf.Saver saver =
new FSImageFormatProtobuf.Saver(context);
-
-
//压缩类
-
FSImageCompression compression = FSImageCompression.createCompression(conf);
-
-
//调用Saver类保存fsimage文件
-
long numErrors = saver.save(newFile, compression);
-
-
if (numErrors >
0) {
-
// The image is likely corrupted.
-
LOG.error(
"Detected " + numErrors +
" errors while saving FsImage " +
-
dstFile);
-
exitAfterSave.
set(
true);
-
}
-
-
//保存MD5校验值
-
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
-
storage.setMostRecentCheckpointInfo(txid, Time.now());
-
-
}
saveFSImage()方法构造了一个FSImageFormatProtobuf.Saver对象来保存命名空间,FSImageFormatProtobuf是一个工具类, 它提供了以protobuf格式读取和写入fsimage文件的方法。
-
/**
-
* save()方法会打开fsimage文件的输出流并且获得文件通道,
-
* 然后调用saveInternal()方法将命名空间保存到fsimage文件中。
-
*
-
* @return number of non-fatal errors detected while writing the image.
-
* @throws IOException on fatal error.
-
*/
-
long save(File file, FSImageCompression compression) throws IOException {
-
-
-
FileOutputStream fout =
new FileOutputStream(file);
-
-
fileChannel = fout.getChannel();
-
-
try {
-
LOG.info(
"Saving image file {} using {}", file, compression);
-
long startTime = monotonicNow();
-
-
// 保存到fsimage文件
-
long numErrors = saveInternal( fout, compression, file.getAbsolutePath());
-
-
-
LOG.info(
"Image file {} of size {} bytes saved in {} seconds {}.", file,
-
file.length(), (monotonicNow() - startTime) /
1000,
-
(numErrors >
0 ? (
" with" + numErrors +
" errors") :
""));
-
return numErrors;
-
}
finally {
-
fout.close();
-
}
-
}
3.4.FSImageFormatProtobuf.Saver
使用protobuf定义的fsimage文件的格式, 它包括了4个部分的信息
■ MAGIC: fsimage的文件头, 是“HDFSIMG1”这个字符串的二进制形式, MAGIC头标识了当前fsimage文件是使用protobuf格式序列化的。 FSImage类在读取fsimage文件时, 会先判断fsimage文件是否包含了MAGIC头, 如果包含了则使用protobuf格式反序列化fsimage文件。
■ SECTIONS: fsimage文件会将同一类型的Namenode元信息保存在一个section中,例如将文件系统元信息保存在NameSystemSection中, 将文件系统目录树中的所有INode信息保存在INodeSection中, 将快照信息保存在SnapshotSection中等。fsimage文件的第二个部分就是Namenode各类元信息对应的所有section, 每类section中都包含了对应Namenode元信息的属性.
■ FileSummary: FileSummary记录了fsimage文件的元信息, 以及fsimage文件保存的所有section的信息。 FileSummary中的ondiskVersion字段记录了fsimage文件的版本号(3.2.1 版本中这个字段的值为1) , layoutVersion字段记录了当前HDFS的文件
系统布局版本号, codec字段记录了fsimage文件的压缩编码, sections字段则记录了fsimage文件中各个section字段的元信息, 每个fsimage文件中记录的section在FileSummary中都有一个与之对应的section字段。 FileSummary的section字段记录了对应的fsimage中section的名称、 在fsimage文件中的长度, 以及这个section在fsimage中的起始位置。 FSImage类在读取fsimage文件时, 会先从fsimage中读取出FileSummary部分, 然后利用FileSummary记录的元信息指导fsimage文件的反序列化操作。■ FileSummaryLength: FileSummaryLength记录了FileSummary在fsimage文件中所占的长度, FSImage类在读取fsimage文件时, 会首先读取FileSummaryLength获取FileSummary部分的长度, 然后根据这个长度从fsimage中反序列化出FileSummary
FSImageFormatProtobuf.Saver类就是以protobuf格式将Namenode的命名空间保存至fsimage文件的工具类.这个类的入口方法是save()方法。 save()方法会打开fsimage文件的输出流并且获得文件通道, 然后调用saveInternal()方法将命名空间保存到fsimage文件中.
3.5.saveInternal()方法
saveInternal()方法首先构造底层fsimage文件的输出流, 构造fsimage文件的描述类FileSummary, 然后在FileSummary中记录ondiskVersion、 layoutVersion、 codec等信息。
接下来saveInternal()方法依次向fsimage文件中写入命名空间信息、 inode信息、 快照信息、 安全信息、 缓存信息、 StringTable信息等。注意上述信息都是以section为单位写入的, 每个section的格式定义请参考fsimage.proto文件。
saveInternal()方法以section为单位写入元数据信息时, 还会在FileSummary中记录这个section的长度, 以及section在fsimage文件中的起始位置等信息。 当完成了所有section的写入后, FileSummary对象也就构造完毕了.saveInternal()最后会将FileSummary对象写入fsimage文件中。
-
/**
-
* saveInternal()方法首先构造底层fsimage文件的输出流,
-
* 构造fsimage文件的描述类 FileSummary ,
-
*
-
* 然后在FileSummary中记录ondiskVersion、 layoutVersion、 codec等信息。
-
* 接下来saveInternal()方法依次向fsimage文件中写入
-
* 1.命名空间信息、
-
* 2.inode信息、
-
* 3.快照信息、
-
* 4.安全信息、
-
* 5.缓存信息、
-
* 6.StringTable
-
* 信息等。
-
*
-
* 注意上述信息都是以section为单位写入的, 每个section的格式定义请参考fsimage.proto文件。
-
* saveInternal()方法以section为单位写入元数据信息时,
-
* 还会在FileSummary中记录这个section的长度,
-
* 以及section在fsimage文件中的起始位置等信息。
-
*
-
* 当完成了所有section的写入后,
-
* FileSummary对象也就构造完毕了,
-
* saveInternal()最后会将
-
* FileSummary对象写入fsimage文件中。
-
*
-
* @return number of non-fatal errors detected while writing the FsImage.
-
* @throws IOException on fatal error.
-
*
-
*/
-
private long saveInternal(FileOutputStream fout, FSImageCompression compression, String filePath) throws IOException {
-
-
-
StartupProgress prog = NameNode.getStartupProgress();
-
-
//构造输出流, 一边写入数据, 一边写入校验值
-
MessageDigest digester = MD5Hash.getDigester();
-
-
int layoutVersion = context.getSourceNamesystem().getEffectiveLayoutVersion();
-
-
underlyingOutputStream =
new DigestOutputStream(
new BufferedOutputStream(fout), digester);
-
-
underlyingOutputStream.write(FSImageUtil.MAGIC_HEADER);
-
-
fileChannel = fout.getChannel();
-
-
// FileSummary为fsimage文件的描述部分, 也是protobuf定义的
-
FileSummary.Builder b = FileSummary.newBuilder()
-
.setOndiskVersion(FSImageUtil.FILE_VERSION)
-
.setLayoutVersion(
-
context.getSourceNamesystem().getEffectiveLayoutVersion());
-
-
//获取压缩格式, 并装饰输出流
-
codec = compression.getImageCodec();
-
-
-
if (codec !=
null) {
-
b.setCodec(codec.getClass().getCanonicalName());
-
sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
-
}
else {
-
sectionOutputStream = underlyingOutputStream;
-
}
-
-
//保存命名空间信息
-
saveNameSystemSection(b);
-
-
// Check for cancellation right after serializing the name system section.
-
// Some unit tests, such as TestSaveNamespace#testCancelSaveNameSpace
-
// depends on this behavior.
-
-
// 检查是否取消了保存操作
-
context.checkCancelled();
-
-
Step step;
-
-
// Erasure coding policies should be saved before inodes
-
if (NameNodeLayoutVersion.supports(
-
NameNodeLayoutVersion.Feature.ERASURE_CODING, layoutVersion)) {
-
-
step =
new Step(StepType.ERASURE_CODING_POLICIES, filePath);
-
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
-
-
//保存 ErasureCoding 信息
-
saveErasureCodingSection(b);
-
prog.endStep(Phase.SAVING_CHECKPOINT, step);
-
-
}
-
-
-
//保存命名空间中的inode信息
-
step =
new Step(StepType.INODES, filePath);
-
-
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
-
// Count number of non-fatal errors when saving inodes and snapshots.
-
// 保存命名空间中的inode信息
-
long numErrors = saveInodes(b);
-
-
// 保存快照信息
-
numErrors += saveSnapshots(b);
-
prog.endStep(Phase.SAVING_CHECKPOINT, step);
-
-
-
// 保存安全信息
-
step =
new Step(StepType.DELEGATION_TOKENS, filePath);
-
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
-
saveSecretManagerSection(b);
-
prog.endStep(Phase.SAVING_CHECKPOINT, step);
-
-
-
// 保存缓存信息
-
step =
new Step(StepType.CACHE_POOLS, filePath);
-
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
-
saveCacheManagerSection(b);
-
prog.endStep(Phase.SAVING_CHECKPOINT, step);
-
-
// 保存StringTable
-
saveStringTableSection(b);
-
-
// We use the underlyingOutputStream to write the header. Therefore flush
-
// the buffered stream (which is potentially compressed) first.
-
-
// flush输出流
-
flushSectionOutputStream();
-
-
FileSummary summary = b.build();
-
//将FileSummary写入文件
-
saveFileSummary(underlyingOutputStream, summary);
-
-
//关闭底层输出流
-
underlyingOutputStream.close();
-
-
savedDigest =
new MD5Hash(digester.digest());
-
return numErrors;
-
-
-
}
3.5.1. 写入文件头[ FSImageUtil.FILE_VERSION ]
fsimage的文件头, 是“HDFSIMG1”这个字符串的二进制形式. MAGIC头标识了当前fsimage文件是使用protobuf格式序列化的。
3.5.2. 构建FileSummary 对象
-
//
FileSummary为fsimage文件的描述部分,
也是protobuf定义的
FILE_VERSION :
1
-
FileSummary.Builder
b
=
FileSummary.newBuilder()
-
.setOndiskVersion(FSImageUtil.FILE_VERSION)
-
.setLayoutVersion(
-
context.getSourceNamesystem().getEffectiveLayoutVersion());
3.5.3. 处理数据的压缩方式:
-
//获取压缩格式,
并装饰输出流
-
codec =
compression.getImageCodec();
-
-
-
if
(codec != null) {
-
b.setCodec(codec.getClass().getCanonicalName());
-
sectionOutputStream =
codec.createOutputStream(underlyingOutputStream);
-
}
else {
-
sectionOutputStream =
underlyingOutputStream;
-
}
3.5.4.SectionName
SectionName是一个枚举类一共记录了12种类型. [按写入的fsimage的顺序进行排列]
序号 | 名称 | Section对象 | 描述 |
1 | NS_INFO | NameSystemSection | 命名空间信息 |
2 | ERASURE_CODING | ErasureCodingSection | 擦除编码信息 |
3 | INODE | INodeSection | 保存inode 信息 |
4 | INODE_DIR | INodeDirectorySection | 构建目录信息 |
5 | FILES_UNDERCONSTRUCTION | FileUnderConstructionEntry | 租约管理 |
6 | SNAPSHOT | SnapshotSection | 保存快照信息 |
7 | SNAPSHOT_DIFF | SnapshotDiffSection | 保存快照信息 |
8 | INODE_REFERENCE | INodeReferenceSection | 引用信息 |
9 | SECRET_MANAGER | SecretManagerSection | 安全信息 |
10 | CACHE_MANAGER | CacheManagerSection | 缓存信息 |
11 | STRING_TABLE | StringTableSection | 保存StringTable |
12 | EXTENDED_ACL |
-
public
enum SectionName {
-
NS_INFO(
"NS_INFO"),
-
STRING_TABLE(
"STRING_TABLE"),
-
EXTENDED_ACL(
"EXTENDED_ACL"),
-
ERASURE_CODING(
"ERASURE_CODING"),
-
INODE(
"INODE"),
-
INODE_REFERENCE(
"INODE_REFERENCE"),
-
SNAPSHOT(
"SNAPSHOT"),
-
INODE_DIR(
"INODE_DIR"),
-
FILES_UNDERCONSTRUCTION(
"FILES_UNDERCONSTRUCTION"),
-
SNAPSHOT_DIFF(
"SNAPSHOT_DIFF"),
-
SECRET_MANAGER(
"SECRET_MANAGER"),
-
CACHE_MANAGER(
"CACHE_MANAGER");
-
-
private
static
final SectionName[] values = SectionName.values();
-
-
public
static SectionName fromString(String name) {
-
for (SectionName
n : values) {
-
if (n.name.equals(name))
-
return n;
-
}
-
return
null;
-
}
-
-
private
final String name;
-
-
private SectionName(String name) {
-
this.name = name;
-
}
-
}
3.5.5.save*()方法
saveInternal()方法调用了多个save*()方法来记录不同section的元数据信息, 这些方法除了在fsimage文件中写入对应种类的元数据信息外, 还会在FileSummary中记录section的大小, 以及在fsimage中的起始位置。
以saveINodes()方法举例, 改方法构造了一个FSImageFormatPBINode.Saver对象, 并调用这个对象对应的方法保存文件系统目录树中的INode信息、 INodeDirectory信息, 以及处于构建状态的文件信息
-
// saveINodes()方法构造了一个FSImageFormatPBINode.Saver对象,
-
// 并调用这个对象对应的方法保存文件系统目录树中的INode信息、 INodeDirectory信息,
-
// 以及处于构建状态的文件信息
-
private long saveInodes(FileSummary.Builder summary) throws IOException {
-
-
FSImageFormatPBINode.Saver saver =
new FSImageFormatPBINode.Saver(
this,summary);
-
-
// 保存INode信息是由FSImageFormatPBINode.Saver.serializeINodeSection()方法实现的
-
saver.serializeINodeSection(sectionOutputStream);
-
// 保存info目录信息
-
saver.serializeINodeDirectorySection(sectionOutputStream);
-
// 租约管理
-
saver.serializeFilesUCSection(sectionOutputStream);
-
-
return saver.getNumImageErrors();
-
}
保存INode信息是由FSImageFormatPBINode.Saver.serializeINodeSection()方法实现的。
serializeINodeSection()方法会首先构造一个INodeSection对象, 记录文件系统目录树中保存的最后一个inode的inodeid, 以及命名空间中所有inode的个数。 之后迭代处理将所有inode信息写入fsimage文件中, 最后将INodeSection的属性信息记录在FileSummary中。
-
/**
-
* serializeINodeSection()方法会首先构造一个INodeSection对象,
-
* 记录文件系统目录树中保存的最后一个inode的inodeid,
-
* 以及命名空间中所有inode的个数。
-
*
-
* 之后迭代处理将所有inode信息写入fsimage文件中,
-
* 最后将INodeSection的属性信息记录在FileSummary中。
-
*
-
* serializeINodeSection
-
* @param out
-
* @throws IOException
-
*/
-
void serializeINodeSection(OutputStream out) throws IOException {
-
-
-
INodeMap inodesMap = fsn.dir.getINodeMap();
-
-
//构造一个 INodeSection, 保存最后一个inode的 inodeid, 以及这个命名空间中所有inode的个数
-
INodeSection.Builder b = INodeSection.newBuilder()
-
.setLastInodeId(fsn.dir.getLastInodeId()).setNumInodes(inodesMap.size());
-
-
INodeSection s = b.build();
-
-
//序列化至输出流
-
s.writeDelimitedTo(out);
-
-
int i =
0;
-
-
//迭代处理inodeMap中所有的inode, 调用save()方法将inode信息保存到fsimage中
-
Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
-
while (iter.hasNext()) {
-
INodeWithAdditionalFields n = iter.next();
-
// 将所有inode信息写入fsimage文件中
-
save(out, n);
-
++i;
-
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL ==
0) {
-
context.checkCancelled();
-
}
-
}
-
-
//调用commitSection()方法在FileSummary中写入inode section
-
parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
-
}
save()方法首先将当前INode对象分为目录、 文件以及符号链接等几类, 然后调用各个类型对应的save()重载方法。 重载方法的实现也非常简单, 就是构造不同的protobuf Builder类, 然后设置相应字段的值, 并将序列化之后的对象写入fsimage文件的输出流中。 这里以INodeFile为例, 首先构造protobuf Builder类——INodeSection.INodeFile.Builder, 然后设置blocks——也就是当前文件有哪些数据块, 如果当前的INodeFile处于构建状态, 则设置对应的构建信息。 最后将序列化后的inode信息写入输出流中。
-
/**
-
* @param out
-
* @param n
-
* @throws IOException
-
*/
-
private void save(OutputStream out, INode n) throws IOException {
-
if (n.isDirectory()) {
-
save(
out, n.asDirectory());
-
}
else
if (n.isFile()) {
-
save(
out, n.asFile());
-
}
else
if (n.isSymlink()) {
-
save(
out, n.asSymlink());
-
}
-
}
-
-
private void save(OutputStream out, INodeDirectory n) throws IOException {
-
INodeSection.INodeDirectory.Builder b = buildINodeDirectory(n,
-
parent.getSaverContext());
-
INodeSection.INode r = buildINodeCommon(n)
-
.setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b).build();
-
r.writeDelimitedTo(
out);
-
}
-
-
// 首先构造protobuf Builder类——INodeSection.INodeFile.Builder,
-
// 然后设置blocks——也就是当前文件有哪些数据块,
-
// 如果当前的INodeFile处于构建状态, 则设置对应的构建信息。
-
//
-
// 最后将序列化后的inode信息写入输出流中。
-
-
private void save(OutputStream out, INodeFile n) throws IOException {
-
INodeSection.INodeFile.Builder b = buildINodeFile(n,
-
parent.getSaverContext());
-
BlockInfo[] blocks = n.getBlocks();
-
-
if (blocks !=
null) {
-
for (Block block : n.getBlocks()) {
-
b.addBlocks(PBHelperClient.convert(block));
-
}
-
}
-
-
FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();
-
if (uc !=
null) {
-
INodeSection.FileUnderConstructionFeature f =
-
INodeSection.FileUnderConstructionFeature
-
.newBuilder().setClientName(uc.getClientName())
-
.setClientMachine(uc.getClientMachine()).build();
-
b.setFileUC(f);
-
}
-
-
INodeSection.INode r = buildINodeCommon(n)
-
.setType(INodeSection.INode.Type.FILE).setFile(b).build();
-
r.writeDelimitedTo(
out);
-
}
-
-
-
private void save(OutputStream out, INodeSymlink n) throws IOException {
-
INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
-
.newBuilder()
-
.setPermission(buildPermissionStatus(n))
-
.setTarget(ByteString.copyFrom(n.getSymlink()))
-
.setModificationTime(n.getModificationTime())
-
.setAccessTime(n.getAccessTime());
-
-
INodeSection.INode r = buildINodeCommon(n)
-
.setType(INodeSection.INode.Type.SYMLINK).setSymlink(b).build();
-
r.writeDelimitedTo(
out);
-
}
3.5.6. 将FileSummary写入文件
-
private static void saveFileSummary(OutputStream out, FileSummary summary)
-
throws IOException {
-
summary.writeDelimitedTo(
out);
-
int length = getOndiskTrunkSize(summary);
-
byte[] lengthBytes =
new
byte[
4];
-
ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
-
out.write(lengthBytes);
-
}
四. 加载FSImage : FSImage.loadFSImage()
当Namenode启动时, 首先会将fsimage文件中记录的命名空间加载到Namenode内存中, 然后再一条一条地将editlog文件中记录的更新操作加载并合并到命名空间中。
整体代码流程图:
Namenode会等待各个Datanode向自己汇报数据块信息来组装blockMap, 从而离开安全模式。 Namenode每次启动时都会调用FSImage.loadFSImage()方法执行加载fsimage和editlog文件的操作。
这里需要关注两个地方, 一个是加载 FSImage 文件fsImage.recoverTransitionRead(startOpt, this, recovery); 另外一个是根据条件判断是否要合并edits文件[间隔周期1小时或者,edits数量操作100万.].
我们直接说重要的loadFSImage方法,前面其实还有些方法,但是太啰嗦了,直接看重点吧. 其实就是加载最后一个fsimage文件.
加载分两步:
1 获取加载器 FSImageFormat.LoaderDelegator .
2. 加载文件: loader.load(curFile, requireSameLayoutVersion);
-
/**
-
* Load in the filesystem image from file. It's a big list of
-
* filenames and blocks.
-
*/
-
private
void loadFSImage(File curFile, MD5Hash expectedMd5,
-
FSNamesystem target, MetaRecoveryContext recovery,
-
boolean requireSameLayoutVersion)
throws IOException {
-
// BlockPoolId is required when the FsImageLoader loads the rolling upgrade
-
// information. Make sure the ID is properly set.
-
target.setBlockPoolId(
this.getBlockPoolID());
-
//获取加载器 FSImageFormat.LoaderDelegator
-
FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
-
//加载文件
-
loader.load(curFile, requireSameLayoutVersion);
-
-
// Check that the image digest we loaded matches up with what
-
// we expected
-
MD5Hash readImageMd5 = loader.getLoadedImageMd5();
-
if (expectedMd5 !=
null &&
-
!expectedMd5.equals(readImageMd5)) {
-
throw
new IOException(
"Image file " + curFile +
-
" is corrupt with MD5 checksum of " + readImageMd5 +
-
" but expecting " + expectedMd5);
-
}
-
-
long txId = loader.getLoadedImageTxId();
-
LOG.info(
"Loaded image for txid " + txId +
" from " + curFile);
-
lastAppliedTxId = txId;
-
storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
-
}
我们看一下load方法:
-
public void load(File file, boolean requireSameLayoutVersion)
-
throws IOException {
-
Preconditions.checkState(impl ==
null,
"Image already loaded!");
-
-
FileInputStream
is =
null;
-
try {
-
is =
new FileInputStream(file);
-
byte[] magic =
new
byte[FSImageUtil.MAGIC_HEADER.length];
-
IOUtils.readFully(
is, magic,
0, magic.length);
-
-
// fsimage文件中包括 magicHeader, 使用的是protobuf序列化方式
-
if (Arrays.
equals(magic, FSImageUtil.MAGIC_HEADER)) {
-
-
//构造FSImageFormatProtobuf.Loader加载fsimage文件
-
FSImageFormatProtobuf.Loader loader =
new FSImageFormatProtobuf.Loader(
-
conf, fsn, requireSameLayoutVersion);
-
impl = loader;
-
loader.load(file);
-
-
}
else {
-
//否则构造FSImageFormat.Loader加载fsimage文件
-
Loader loader =
new Loader(conf, fsn);
-
impl = loader;
-
loader.load(file);
-
}
-
}
finally {
-
IOUtils.cleanupWithLogger(LOG,
is);
-
}
-
}
这里面是构造FSImageFormatProtobuf.Loader对象,使用它的load方法加载fsimage文件. 在load方法中最终调用了loadInternal(raFile, fin);方法. 这个方法是加载fsimage文件的相对最底层的方法了.
在加载fsimage操作中, 最终会调用FSImageFormatProtobuf.Loader作为fsimage文件的加载类。 FSImageFormatProtobuf.Loader.loadInternal()方法执行了加载fsimage文件的操作, loadInternal()方法会打开fsimage文件通道, 然后读取fsimage文件中的FileSummary对象, FileSummary对象中记录了fsimage中保存的所有section的信息。loadInternal()会对FileSummary对象中保存的section排序, 然后遍历每个section并调用对应的方法从fsimage文件中加载这个section。
代码如下:
-
// loadInternal()方法会打开fsimage文件通道,
-
// 然后读取fsimage文件中的FileSummary对象,
-
// FileSummary对象中记录了fsimage中保存的所有section的信息。
-
// loadInternal()会对FileSummary对象中保存的section排序,
-
// 然后遍历每个section并调用对应的方法从fsimage文件中加载这个section。
-
private
void loadInternal(RandomAccessFile raFile, FileInputStream fin)
-
throws IOException {
-
if (!FSImageUtil.checkFileFormat(raFile)) {
-
throw
new IOException(
"Unrecognized file format");
-
}
-
-
// 从fsimage文件末尾加载FileSummary, 也就是fsimage文件内容的描述
-
FileSummary summary = FSImageUtil.loadSummary(raFile);
-
-
if (requireSameLayoutVersion && summary.getLayoutVersion() !=
-
HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
-
throw
new IOException(
"Image version " + summary.getLayoutVersion() +
-
" is not equal to the software version " +
-
HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
-
}
-
-
//获取通道
-
FileChannel channel = fin.getChannel();
-
-
// 构造FSImageFormatPBINode.Loader和FSImageFormatPBSnapshot.
-
// Loader加载INode以及Snapshot
-
FSImageFormatPBINode.Loader inodeLoader =
new FSImageFormatPBINode.Loader(
-
fsn,
this);
-
FSImageFormatPBSnapshot.Loader snapshotLoader =
new FSImageFormatPBSnapshot.Loader(
-
fsn,
this);
-
-
//对fsimage文件描述中记录的sections进行排序
-
ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
-
.getSectionsList());
-
Collections.sort(sections,
new Comparator<FileSummary.Section>() {
-
@Override
-
public
int compare(FileSummary.Section s1, FileSummary.Section s2) {
-
SectionName n1 = SectionName.fromString(s1.getName());
-
SectionName n2 = SectionName.fromString(s2.getName());
-
if (n1 ==
null) {
-
return n2 ==
null ?
0 :
-1;
-
}
else
if (n2 ==
null) {
-
return
-1;
-
}
else {
-
return n1.ordinal() - n2.ordinal();
-
}
-
}
-
});
-
-
-
-
StartupProgress prog = NameNode.getStartupProgress();
-
/**
-
* beginStep() and the endStep() calls do not match the boundary of the
-
* sections. This is because that the current implementation only allows
-
* a particular step to be started for once.
-
*/
-
Step currentStep =
null;
-
-
//遍历每个section, 并调用对应的方法加载这个section
-
for (FileSummary.Section
s : sections) {
-
-
//在通道中定位这个section的起始位置
-
channel.position(s.getOffset());
-
InputStream
in =
new BufferedInputStream(
new LimitInputStream(fin,
-
s.getLength()));
-
-
in = FSImageUtil.wrapInputStreamForCompression(conf,
-
summary.getCodec(),
in);
-
-
String n = s.getName();
-
-
//调用对应的方法加载不同的section
-
switch (SectionName.fromString(n)) {
-
case
NS_INFO:
-
loadNameSystemSection(
in);
-
break;
-
case
STRING_TABLE:
-
loadStringTableSection(
in);
-
break;
-
case
INODE: {
-
currentStep =
new Step(StepType.INODES);
-
prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
-
inodeLoader.loadINodeSection(
in, prog, currentStep);
-
}
-
break;
-
case
INODE_REFERENCE:
-
snapshotLoader.loadINodeReferenceSection(
in);
-
break;
-
case
INODE_DIR:
-
inodeLoader.loadINodeDirectorySection(
in);
-
break;
-
case
FILES_UNDERCONSTRUCTION:
-
inodeLoader.loadFilesUnderConstructionSection(
in);
-
break;
-
case
SNAPSHOT:
-
snapshotLoader.loadSnapshotSection(
in);
-
break;
-
case
SNAPSHOT_DIFF:
-
snapshotLoader.loadSnapshotDiffSection(
in);
-
break;
-
case
SECRET_MANAGER: {
-
prog.endStep(Phase.LOADING_FSIMAGE, currentStep);
-
Step step =
new Step(StepType.DELEGATION_TOKENS);
-
prog.beginStep(Phase.LOADING_FSIMAGE, step);
-
loadSecretManagerSection(
in, prog, step);
-
prog.endStep(Phase.LOADING_FSIMAGE, step);
-
}
-
break;
-
case
CACHE_MANAGER: {
-
Step step =
new Step(StepType.CACHE_POOLS);
-
prog.beginStep(Phase.LOADING_FSIMAGE, step);
-
loadCacheManagerSection(
in, prog, step);
-
prog.endStep(Phase.LOADING_FSIMAGE, step);
-
}
-
break;
-
case
ERASURE_CODING:
-
Step step =
new Step(StepType.ERASURE_CODING_POLICIES);
-
prog.beginStep(Phase.LOADING_FSIMAGE, step);
-
loadErasureCodingSection(
in);
-
prog.endStep(Phase.LOADING_FSIMAGE, step);
-
break;
-
default:
-
LOG.warn(
"Unrecognized section {}", n);
-
break;
-
}
-
}
-
}
对于不同类型的section, loadInternal()方法会调用不同的方法加载这个section, 例如对于INodeSection会调用InodeLoader.loadINodeSection()方法加载。 load*()方法的实现都比较简单, 就是按照protobuf格式加载不同的section. 慢慢恢复/构建FSNamesystem对象中的内容.
下面是加载loadINodeSection的代码示例:
-
void loadINodeSection(InputStream in, StartupProgress prog,
-
Step currentStep)
throws IOException {
-
-
INodeSection s = INodeSection.parseDelimitedFrom(in);
-
-
fsn.dir.resetLastInodeId(s.getLastInodeId());
-
-
long numInodes = s.getNumInodes();
-
-
LOG.info(
"Loading " + numInodes +
" INodes.");
-
prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
-
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
-
for (
int i =
0; i < numInodes; ++i) {
-
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
-
if (p.getId() == INodeId.ROOT_INODE_ID) {
-
// 加载root
-
loadRootINode(p);
-
}
else {
-
//加载子节点
-
INode n = loadINode(p);
-
dir.addToInodeMap(n);
-
}
-
counter.increment();
-
}
-
}
下面试loadINodeDirectorySection 方法, 构建目录体系
-
void loadINodeDirectorySection(InputStream
in)
throws IOException {
-
final List<INodeReference> refList = parent.getLoaderContext()
-
.getRefList();
-
while (
true) {
-
INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
-
.parseDelimitedFrom(
in);
-
// note that in is a LimitedInputStream
-
if (e ==
null) {
-
break;
-
}
-
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
-
for (
long
id : e.getChildrenList()) {
-
INode child = dir.getInode(id);
-
addToParent(p, child);
-
}
-
for (
int
refId : e.getRefChildrenList()) {
-
INodeReference ref = refList.get(refId);
-
addToParent(p, ref);
-
}
-
}
-
}
五. 加载Edits 文件
FSImage.loadEdits()方法会构造一个FSEditLogLoader对象. 然后遍历Namenode所有存储路径上保存的editlog文件的输入流 并调用FSEditLogLoader.loadFSEdits()方法加载指定路径上的editlog文件。
-
/**
-
* FSImage.loadEdits()方法会构造一个FSEditLogLoader对象,
-
* 然后遍历Namenode所有存储路径上保存的editlog文件的输入流
-
* 并调用FSEditLogLoader.loadFSEdits()方法加载指定路径上的editlog文件。
-
*
-
* @param editStreams
-
* @param target
-
* @param maxTxnsToRead
-
* @param startOpt
-
* @param recovery
-
* @return
-
* @throws IOException
-
*/
-
public long loadEdits(Iterable<EditLogInputStream> editStreams,
-
FSNamesystem target, long maxTxnsToRead,
-
StartupOption startOpt, MetaRecoveryContext recovery)
-
throws IOException {
-
LOG.debug(
"About to load edits:\n " + Joiner.on(
"\n ").join(editStreams));
-
StartupProgress prog = NameNode.getStartupProgress();
-
prog.beginPhase(Phase.LOADING_EDITS);
-
-
//记录命名空间中加载的最新的事务id
-
long prevLastAppliedTxId = lastAppliedTxId;
-
long remainingReadTxns = maxTxnsToRead;
-
try {
-
-
//构造FSEditLogLoader对象用于加栽editlog文件
-
FSEditLogLoader loader =
new FSEditLogLoader(target, lastAppliedTxId);
-
-
-
//遍历所有存储路径上editlog文件对应的输入流
-
// Load latest edits
-
for (EditLogInputStream editIn : editStreams) {
-
LogAction logAction = loadEditLogHelper.record();
-
if (logAction.shouldLog()) {
-
String logSuppressed =
"";
-
if (logAction.getCount() >
1) {
-
logSuppressed =
"; suppressed logging for " +
-
(logAction.getCount() -
1) +
" edit reads";
-
}
-
LOG.info(
"Reading " + editIn +
" expecting start txid #" +
-
(lastAppliedTxId +
1) + logSuppressed);
-
}
-
try {
-
-
//调用FSEditLogLoader.loadFSEdits()从某个存储路径上的editlog文件加载修改操作
-
remainingReadTxns -= loader.loadFSEdits(editIn, lastAppliedTxId +
1,
-
remainingReadTxns, startOpt, recovery);
-
}
finally {
-
-
// lastAppliedTxId记录从editlog加载的最新的事务id
-
// Update lastAppliedTxId even in case of error, since some ops may
-
// have been successfully applied before the error.
-
lastAppliedTxId = loader.getLastAppliedTxId();
-
}
-
-
-
// If we are in recovery mode, we may have skipped over some txids.
-
if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID
-
&& recovery !=
null) {
-
lastAppliedTxId = editIn.getLastTxId();
-
}
-
if (remainingReadTxns <=
0) {
-
break;
-
}
-
}
-
}
finally {
-
//关闭所有editlog文件的输入流
-
FSEditLog.closeAllStreams(editStreams);
-
}
-
prog.endPhase(Phase.LOADING_EDITS);
-
return lastAppliedTxId - prevLastAppliedTxId;
-
}
转载:https://blog.csdn.net/zhanglong_4444/article/details/108444392