小言_互联网的博客

Hadoop3.2.1 【 HDFS 】源码分析 : 文件系统FSImage解析

612人阅读  评论(0)

Table of Contents

一.前言

二.构建&初始化

三.保存命名空间

3.1.saveFSImageInAllDirs()

3.2.FSImageSaver

3.3.saveFSImage(context, sd, nnf);

3.4.FSImageFormatProtobuf.Saver

3.5.saveInternal()方法

3.5.1. 写入文件头[ FSImageUtil.FILE_VERSION  ]

3.5.2. 构建FileSummary 对象

3.5.3. 处理数据的压缩方式:

3.5.4.SectionName

3.5.5.save*()方法

3.5.6. 将FileSummary写入文件

四. 加载FSImage  :  FSImage.loadFSImage()

五. 加载Edits 文件


 

 

一.前言

Namenode会定期将文件系统的命名空间(文件目录树、文件/ 目录元信息) 保存到fsimage文件中, 以防止Namenode掉电或者进程崩溃。 但如果Namenode实时地将内存中的元数据同步到fsimage文件中, 将会非常消耗资源且造成Namenode运行缓慢。 所以Namenode会先将命名空间的修改操作保存在editlog文件中, 然后定期合并fsimage和editlog文件。

FSImage类主要实现了以下功能:

保存命名空间——将当前时刻Namenode内存中的命名空间保存到fsimage文件中。

加载fsimage文件——将磁盘上fsimage文件中保存的命名空间加载到Namenode内存中, 这个操作是保存命名空间操作的逆操作。

加载editlog文件——Namenode加载了fsimage文件后, 内存中只包含了命名空间在保存fsimage文件时的信息, Namenode还需要加载后续对命名空间的修改操作,即editlog文件中记录的内容。 所以FSImage类还提供了加载editlog文件到Namenode内存中的功能.

 

 

 

 

二.构建&初始化

FSImage是在FSNamesystem中的loadFromDisk方法进行初始化的

 

 


  
  1. /**
  2. * Construct the FSImage. Set the default checkpoint directories.
  3. *
  4. * Setup storage and initialize the edit log.
  5. *
  6. * @param conf Configuration
  7. * @param imageDirs Directories the image can be stored in.
  8. * @param editsDirs Directories the editlog can be stored in.
  9. * @throws IOException if directories are invalid.
  10. */
  11. protected FSImage(Configuration conf,
  12. Collection<URI> imageDirs,
  13. List<URI> editsDirs)
  14. throws IOException {
  15. this.conf = conf;
  16. //构建NNStorage ==> NNStorage负责管理NameNode使用的 StorageDirectories。
  17. storage = new NNStorage(conf, imageDirs, editsDirs);
  18. // dfs.namenode.name.dir.restore 默认: false
  19. // 设置为true可使NameNode尝试恢复以前失败的dfs.NameNode.name.dir。
  20. // 启用后,将在检查点期间尝试恢复任何失败的目录。
  21. if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
  22. DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
  23. storage.setRestoreFailedStorage( true);
  24. }
  25. // 构建 FSEditLog
  26. this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
  27. //
  28. archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
  29. }

 

 

 

 

三.保存命名空间

FSImage类最重要的功能之一就是将当前时刻Namenode的命名空间保存到fsimage文件中。 

3.1.saveFSImageInAllDirs()

Namenode可以定义多个存储路径来保存fsimage文件, 对于每一个存储路径,saveFSImageInAllDirs()方法都会启动一个线程负责在这个路径上保存fsimage文件。 同时, 为了防止保存过程中出现错误, 命名空间信息首先会被保存在一个fsimage.ckpt文件中, 当保存操作全部完成之后, 才会将fsimage.ckpt重命名为fsimage文件。 之后saveFSImageInAllDirs()方法会清理Namenode元数据存储文件夹中过期的editlog文件和fsimage文件。


  
  1. /**
  2. * Namenode可以定义多个存储路径来保存fsimage文件,
  3. *
  4. * 1. 对于每一个存储路径,saveFSImageInAllDirs()方法都会启动一个线程负责在这个路径上保存fsimage文件。
  5. *
  6. * 2. 同时, 为了防止保存过程中出现错误, 命名空间信息首先会被保存在一个fsimage.ckpt文件中,
  7. * 当保存操作全部完成之后, 才会将fsimage.ckpt重命名为fsimage文件。
  8. *
  9. * 3. 之后saveFSImageInAllDirs()方法会清理Namenode元数据存储文件夹中过期的editlog文件和
  10. * fsimage文件。
  11. *
  12. * @param source
  13. * @param nnf
  14. * @param txid
  15. * @param canceler
  16. * @throws IOException
  17. */
  18. private synchronized void saveFSImageInAllDirs(FSNamesystem source,
  19. NameNodeFile nnf, long txid, Canceler canceler) throws IOException {
  20. StartupProgress prog = NameNode.getStartupProgress();
  21. prog.beginPhase(Phase.SAVING_CHECKPOINT);
  22. if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
  23. throw new IOException( "No image directories available!");
  24. }
  25. if (canceler == null) {
  26. canceler = new Canceler();
  27. }
  28. //构造保存命名空间操作的上下文
  29. SaveNamespaceContext ctx = new SaveNamespaceContext(source, txid, canceler);
  30. try {
  31. //在每一个保存路径上启动一个线程, 该线程使用FSImageSaver类保存fsimage文件
  32. List<Thread> saveThreads = new ArrayList<Thread>();
  33. // save images into current
  34. for (Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
  35. StorageDirectory sd = it.next();
  36. // 命名空间具体的保存操作是由FSImageSaver这个类来承担的,
  37. // FSImageSaver是FSImage中的内部类, 也是一个线程类,
  38. // 它的run()方法调用了saveFSImage()方法来保存fsimage文件。
  39. FSImageSaver saver = new FSImageSaver(ctx, sd, nnf);
  40. Thread saveThread = new Thread(saver, saver.toString());
  41. saveThreads.add(saveThread);
  42. saveThread.start();
  43. }
  44. //等待所有线程执行完毕
  45. waitForThreads(saveThreads);
  46. saveThreads.clear();
  47. storage.reportErrorsOnDirectories(ctx.getErrorSDs());
  48. //保存文件失败则抛出异常
  49. if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
  50. throw new IOException(
  51. "Failed to save in any storage directories while saving namespace.");
  52. }
  53. if (canceler.isCancelled()) {
  54. deleteCancelledCheckpoint(txid);
  55. ctx.checkCancelled(); // throws
  56. assert false : "should have thrown above!";
  57. }
  58. // 将fsimage.ckpt 改名为 fsimage
  59. renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
  60. // Since we now have a new checkpoint, we can clean up some
  61. // old edit logs and checkpoints.
  62. // Do not purge anything if we just wrote a corrupted FsImage.
  63. if (!exitAfterSave.get()) {
  64. //我们已经完成了fsimage的保存, 那么可以将存储上的一部分editlog和fsimage删除
  65. //如果没有成功,则失败.
  66. purgeOldStorage(nnf);
  67. archivalManager.purgeCheckpoints(NameNodeFile.IMAGE_NEW);
  68. }
  69. } finally {
  70. // Notify any threads waiting on the checkpoint to be canceled
  71. // that it is complete.
  72. //通知所有等待的线程
  73. ctx.markComplete();
  74. ctx = null;
  75. }
  76. prog.endPhase(Phase.SAVING_CHECKPOINT);
  77. }

 

3.2.FSImageSaver

命名空间具体的保存操作是由FSImageSaver这个类来承担的, FSImageSaver是FSImage中的内部类, 也是一个线程类,它的run()方法调用了saveFSImage()方法来保存fsimage文件。 

 


  
  1. /**
  2. *
  3. * 命名空间具体的保存操作是由FSImageSaver这个类来承担的,
  4. * FSImageSaver是FSImage中的内部类, 也是一个线程类,
  5. * 它的run()方法调用了saveFSImage()方法来保存fsimage文件。
  6. *
  7. *
  8. *
  9. * FSImageSaver is being run in a separate thread when saving
  10. * FSImage. There is one thread per each copy of the image.
  11. *
  12. * FSImageSaver assumes that it was launched from a thread that holds
  13. * FSNamesystem lock and waits for the execution of FSImageSaver thread
  14. * to finish.
  15. * This way we are guaranteed that the namespace is not being updated
  16. * while multiple instances of FSImageSaver are traversing it
  17. * and writing it out.
  18. */
  19. private class FSImageSaver implements Runnable {
  20. private final SaveNamespaceContext context;
  21. private final StorageDirectory sd;
  22. private final NameNodeFile nnf;
  23. public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd,
  24. NameNodeFile nnf) {
  25. this.context = context;
  26. this.sd = sd;
  27. this.nnf = nnf;
  28. }
  29. @Override
  30. public void run() {
  31. // Deletes checkpoint file in every storage directory when shutdown.
  32. Runnable cancelCheckpointFinalizer = () -> {
  33. try {
  34. deleteCancelledCheckpoint(context.getTxId());
  35. LOG.info( "FSImageSaver clean checkpoint: txid={} when meet " +
  36. "shutdown.", context.getTxId());
  37. } catch (IOException e) {
  38. LOG.error( "FSImageSaver cancel checkpoint threw an exception:", e);
  39. }
  40. };
  41. ShutdownHookManager. get().addShutdownHook(cancelCheckpointFinalizer,
  42. SHUTDOWN_HOOK_PRIORITY);
  43. try {
  44. //保存fsimage文件
  45. System. out.println( "context : "+ context);
  46. System. out.println( "sd : "+ sd);
  47. System. out.println( "nnf : "+ nnf);
  48. saveFSImage(context, sd, nnf);
  49. } catch (SaveNamespaceCancelledException snce) {
  50. LOG.info( "Cancelled image saving for " + sd.getRoot() +
  51. ": " + snce.getMessage());
  52. // don't report an error on the storage dir!
  53. } catch (Throwable t) {
  54. LOG.error( "Unable to save image for " + sd.getRoot(), t);
  55. context.reportErrorOnStorageDirectory(sd);
  56. try {
  57. deleteCancelledCheckpoint(context.getTxId());
  58. LOG.info( "FSImageSaver clean checkpoint: txid={} when meet " +
  59. "Throwable.", context.getTxId());
  60. } catch (IOException e) {
  61. LOG.error( "FSImageSaver cancel checkpoint threw an exception:", e);
  62. }
  63. }
  64. }
  65. @Override
  66. public String toString() {
  67. return "FSImageSaver for " + sd.getRoot() +
  68. " of type " + sd.getStorageDirType();
  69. }
  70. }

3.3.saveFSImage(context, sd, nnf);

saveFSImage()方法会使用一个FSImageFormat.Saver对象来完成保存操作,FSImageFormat.Saver类会以fsimage文件定义的格式保存Namenode的命名空间信息, 需要注意命名空空间信息会先写入fsimage.ckpt文件中。 saveFSImage()方法还会生成fsimage文件的md5校验文件, 以确保fsimage文件的正确性。 


  
  1. /**
  2. * 将 FS image 的内容保存到文件中。
  3. * Save the contents of the FS image to the file.
  4. */
  5. void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
  6. NameNodeFile dstType) throws IOException {
  7. //获取当前命名空间中记录的最新事务的txid
  8. long txid = context.getTxId();
  9. // fsimage文件
  10. File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
  11. File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
  12. // FSImageFormatProtobuf.Saver类负责保存fsimage
  13. FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
  14. //压缩类
  15. FSImageCompression compression = FSImageCompression.createCompression(conf);
  16. //调用Saver类保存fsimage文件
  17. long numErrors = saver.save(newFile, compression);
  18. if (numErrors > 0) {
  19. // The image is likely corrupted.
  20. LOG.error( "Detected " + numErrors + " errors while saving FsImage " +
  21. dstFile);
  22. exitAfterSave. set( true);
  23. }
  24. //保存MD5校验值
  25. MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
  26. storage.setMostRecentCheckpointInfo(txid, Time.now());
  27. }

saveFSImage()方法构造了一个FSImageFormatProtobuf.Saver对象来保存命名空间,FSImageFormatProtobuf是一个工具类, 它提供了以protobuf格式读取和写入fsimage文件的方法。 


  
  1. /**
  2. * save()方法会打开fsimage文件的输出流并且获得文件通道,
  3. * 然后调用saveInternal()方法将命名空间保存到fsimage文件中。
  4. *
  5. * @return number of non-fatal errors detected while writing the image.
  6. * @throws IOException on fatal error.
  7. */
  8. long save(File file, FSImageCompression compression) throws IOException {
  9. FileOutputStream fout = new FileOutputStream(file);
  10. fileChannel = fout.getChannel();
  11. try {
  12. LOG.info( "Saving image file {} using {}", file, compression);
  13. long startTime = monotonicNow();
  14. // 保存到fsimage文件
  15. long numErrors = saveInternal( fout, compression, file.getAbsolutePath());
  16. LOG.info( "Image file {} of size {} bytes saved in {} seconds {}.", file,
  17. file.length(), (monotonicNow() - startTime) / 1000,
  18. (numErrors > 0 ? ( " with" + numErrors + " errors") : ""));
  19. return numErrors;
  20. } finally {
  21. fout.close();
  22. }
  23. }

 

3.4.FSImageFormatProtobuf.Saver

使用protobuf定义的fsimage文件的格式, 它包括了4个部分的信息

■ MAGIC: fsimage的文件头, 是“HDFSIMG1”这个字符串的二进制形式, MAGIC头标识了当前fsimage文件是使用protobuf格式序列化的。 FSImage类在读取fsimage文件时, 会先判断fsimage文件是否包含了MAGIC头, 如果包含了则使用protobuf格式反序列化fsimage文件。

■ SECTIONS: fsimage文件会将同一类型的Namenode元信息保存在一个section中,例如将文件系统元信息保存在NameSystemSection中, 将文件系统目录树中的所有INode信息保存在INodeSection中, 将快照信息保存在SnapshotSection中等。fsimage文件的第二个部分就是Namenode各类元信息对应的所有section, 每类section中都包含了对应Namenode元信息的属性.

■ FileSummary: FileSummary记录了fsimage文件的元信息, 以及fsimage文件保存的所有section的信息。 FileSummary中的ondiskVersion字段记录了fsimage文件的版本号(3.2.1 版本中这个字段的值为1) , layoutVersion字段记录了当前HDFS的文件
系统布局版本号, codec字段记录了fsimage文件的压缩编码, sections字段则记录了fsimage文件中各个section字段的元信息, 每个fsimage文件中记录的section在FileSummary中都有一个与之对应的section字段。 FileSummary的section字段记录了对应的fsimage中section的名称、 在fsimage文件中的长度, 以及这个section在fsimage中的起始位置。 FSImage类在读取fsimage文件时, 会先从fsimage中读取出FileSummary部分, 然后利用FileSummary记录的元信息指导fsimage文件的反序列化操作。

■ FileSummaryLength: FileSummaryLength记录了FileSummary在fsimage文件中所占的长度, FSImage类在读取fsimage文件时, 会首先读取FileSummaryLength获取FileSummary部分的长度, 然后根据这个长度从fsimage中反序列化出FileSummary

 

FSImageFormatProtobuf.Saver类就是以protobuf格式将Namenode的命名空间保存至fsimage文件的工具类.这个类的入口方法是save()方法。 save()方法会打开fsimage文件的输出流并且获得文件通道, 然后调用saveInternal()方法将命名空间保存到fsimage文件中.

3.5.saveInternal()方法

saveInternal()方法首先构造底层fsimage文件的输出流, 构造fsimage文件的描述类FileSummary, 然后在FileSummary中记录ondiskVersion、 layoutVersion、 codec等信息。

接下来saveInternal()方法依次向fsimage文件中写入命名空间信息、 inode信息、 快照信息、 安全信息、 缓存信息、 StringTable信息等。注意上述信息都是以section为单位写入的, 每个section的格式定义请参考fsimage.proto文件。

saveInternal()方法以section为单位写入元数据信息时, 还会在FileSummary中记录这个section的长度, 以及section在fsimage文件中的起始位置等信息。 当完成了所有section的写入后, FileSummary对象也就构造完毕了.saveInternal()最后会将FileSummary对象写入fsimage文件中。 


  
  1. /**
  2. * saveInternal()方法首先构造底层fsimage文件的输出流,
  3. * 构造fsimage文件的描述类 FileSummary ,
  4. *
  5. * 然后在FileSummary中记录ondiskVersion、 layoutVersion、 codec等信息。
  6. * 接下来saveInternal()方法依次向fsimage文件中写入
  7. * 1.命名空间信息、
  8. * 2.inode信息、
  9. * 3.快照信息、
  10. * 4.安全信息、
  11. * 5.缓存信息、
  12. * 6.StringTable
  13. * 信息等。
  14. *
  15. * 注意上述信息都是以section为单位写入的, 每个section的格式定义请参考fsimage.proto文件。
  16. * saveInternal()方法以section为单位写入元数据信息时,
  17. * 还会在FileSummary中记录这个section的长度,
  18. * 以及section在fsimage文件中的起始位置等信息。
  19. *
  20. * 当完成了所有section的写入后,
  21. * FileSummary对象也就构造完毕了,
  22. * saveInternal()最后会将
  23. * FileSummary对象写入fsimage文件中。
  24. *
  25. * @return number of non-fatal errors detected while writing the FsImage.
  26. * @throws IOException on fatal error.
  27. *
  28. */
  29. private long saveInternal(FileOutputStream fout, FSImageCompression compression, String filePath) throws IOException {
  30. StartupProgress prog = NameNode.getStartupProgress();
  31. //构造输出流, 一边写入数据, 一边写入校验值
  32. MessageDigest digester = MD5Hash.getDigester();
  33. int layoutVersion = context.getSourceNamesystem().getEffectiveLayoutVersion();
  34. underlyingOutputStream = new DigestOutputStream( new BufferedOutputStream(fout), digester);
  35. underlyingOutputStream.write(FSImageUtil.MAGIC_HEADER);
  36. fileChannel = fout.getChannel();
  37. // FileSummary为fsimage文件的描述部分, 也是protobuf定义的
  38. FileSummary.Builder b = FileSummary.newBuilder()
  39. .setOndiskVersion(FSImageUtil.FILE_VERSION)
  40. .setLayoutVersion(
  41. context.getSourceNamesystem().getEffectiveLayoutVersion());
  42. //获取压缩格式, 并装饰输出流
  43. codec = compression.getImageCodec();
  44. if (codec != null) {
  45. b.setCodec(codec.getClass().getCanonicalName());
  46. sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
  47. } else {
  48. sectionOutputStream = underlyingOutputStream;
  49. }
  50. //保存命名空间信息
  51. saveNameSystemSection(b);
  52. // Check for cancellation right after serializing the name system section.
  53. // Some unit tests, such as TestSaveNamespace#testCancelSaveNameSpace
  54. // depends on this behavior.
  55. // 检查是否取消了保存操作
  56. context.checkCancelled();
  57. Step step;
  58. // Erasure coding policies should be saved before inodes
  59. if (NameNodeLayoutVersion.supports(
  60. NameNodeLayoutVersion.Feature.ERASURE_CODING, layoutVersion)) {
  61. step = new Step(StepType.ERASURE_CODING_POLICIES, filePath);
  62. prog.beginStep(Phase.SAVING_CHECKPOINT, step);
  63. //保存 ErasureCoding 信息
  64. saveErasureCodingSection(b);
  65. prog.endStep(Phase.SAVING_CHECKPOINT, step);
  66. }
  67. //保存命名空间中的inode信息
  68. step = new Step(StepType.INODES, filePath);
  69. prog.beginStep(Phase.SAVING_CHECKPOINT, step);
  70. // Count number of non-fatal errors when saving inodes and snapshots.
  71. // 保存命名空间中的inode信息
  72. long numErrors = saveInodes(b);
  73. // 保存快照信息
  74. numErrors += saveSnapshots(b);
  75. prog.endStep(Phase.SAVING_CHECKPOINT, step);
  76. // 保存安全信息
  77. step = new Step(StepType.DELEGATION_TOKENS, filePath);
  78. prog.beginStep(Phase.SAVING_CHECKPOINT, step);
  79. saveSecretManagerSection(b);
  80. prog.endStep(Phase.SAVING_CHECKPOINT, step);
  81. // 保存缓存信息
  82. step = new Step(StepType.CACHE_POOLS, filePath);
  83. prog.beginStep(Phase.SAVING_CHECKPOINT, step);
  84. saveCacheManagerSection(b);
  85. prog.endStep(Phase.SAVING_CHECKPOINT, step);
  86. // 保存StringTable
  87. saveStringTableSection(b);
  88. // We use the underlyingOutputStream to write the header. Therefore flush
  89. // the buffered stream (which is potentially compressed) first.
  90. // flush输出流
  91. flushSectionOutputStream();
  92. FileSummary summary = b.build();
  93. //将FileSummary写入文件
  94. saveFileSummary(underlyingOutputStream, summary);
  95. //关闭底层输出流
  96. underlyingOutputStream.close();
  97. savedDigest = new MD5Hash(digester.digest());
  98. return numErrors;
  99. }

3.5.1. 写入文件头[ FSImageUtil.FILE_VERSION  ]

fsimage的文件头, 是“HDFSIMG1”这个字符串的二进制形式. MAGIC头标识了当前fsimage文件是使用protobuf格式序列化的。

3.5.2. 构建FileSummary 对象


  
  1. // FileSummary为fsimage文件的描述部分, 也是protobuf定义的 FILE_VERSION : 1
  2. FileSummary.Builder b = FileSummary.newBuilder()
  3. .setOndiskVersion(FSImageUtil.FILE_VERSION)
  4. .setLayoutVersion(
  5. context.getSourceNamesystem().getEffectiveLayoutVersion());

3.5.3. 处理数据的压缩方式:


  
  1. //获取压缩格式, 并装饰输出流
  2. codec = compression.getImageCodec();
  3. if (codec != null) {
  4. b.setCodec(codec.getClass().getCanonicalName());
  5. sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
  6. } else {
  7. sectionOutputStream = underlyingOutputStream;
  8. }

3.5.4.SectionName

SectionName是一个枚举类一共记录了12种类型. [按写入的fsimage的顺序进行排列]

序号 名称 Section对象 描述
1 NS_INFO NameSystemSection 命名空间信息
2 ERASURE_CODING ErasureCodingSection 擦除编码信息
3 INODE INodeSection 保存inode 信息
4 INODE_DIR INodeDirectorySection 构建目录信息
5 FILES_UNDERCONSTRUCTION FileUnderConstructionEntry 租约管理
6 SNAPSHOT SnapshotSection 保存快照信息
7 SNAPSHOT_DIFF SnapshotDiffSection 保存快照信息
8 INODE_REFERENCE INodeReferenceSection 引用信息
9 SECRET_MANAGER SecretManagerSection 安全信息
10 CACHE_MANAGER CacheManagerSection 缓存信息
11 STRING_TABLE StringTableSection 保存StringTable
12 EXTENDED_ACL    


 


  
  1. public enum SectionName {
  2. NS_INFO( "NS_INFO"),
  3. STRING_TABLE( "STRING_TABLE"),
  4. EXTENDED_ACL( "EXTENDED_ACL"),
  5. ERASURE_CODING( "ERASURE_CODING"),
  6. INODE( "INODE"),
  7. INODE_REFERENCE( "INODE_REFERENCE"),
  8. SNAPSHOT( "SNAPSHOT"),
  9. INODE_DIR( "INODE_DIR"),
  10. FILES_UNDERCONSTRUCTION( "FILES_UNDERCONSTRUCTION"),
  11. SNAPSHOT_DIFF( "SNAPSHOT_DIFF"),
  12. SECRET_MANAGER( "SECRET_MANAGER"),
  13. CACHE_MANAGER( "CACHE_MANAGER");
  14. private static final SectionName[] values = SectionName.values();
  15. public static SectionName fromString(String name) {
  16. for (SectionName n : values) {
  17. if (n.name.equals(name))
  18. return n;
  19. }
  20. return null;
  21. }
  22. private final String name;
  23. private SectionName(String name) {
  24. this.name = name;
  25. }
  26. }

3.5.5.save*()方法

saveInternal()方法调用了多个save*()方法来记录不同section的元数据信息, 这些方法除了在fsimage文件中写入对应种类的元数据信息外, 还会在FileSummary中记录section的大小, 以及在fsimage中的起始位置。

以saveINodes()方法举例, 改方法构造了一个FSImageFormatPBINode.Saver对象, 并调用这个对象对应的方法保存文件系统目录树中的INode信息、 INodeDirectory信息, 以及处于构建状态的文件信息


  
  1. // saveINodes()方法构造了一个FSImageFormatPBINode.Saver对象,
  2. // 并调用这个对象对应的方法保存文件系统目录树中的INode信息、 INodeDirectory信息,
  3. // 以及处于构建状态的文件信息
  4. private long saveInodes(FileSummary.Builder summary) throws IOException {
  5. FSImageFormatPBINode.Saver saver = new FSImageFormatPBINode.Saver( this,summary);
  6. // 保存INode信息是由FSImageFormatPBINode.Saver.serializeINodeSection()方法实现的
  7. saver.serializeINodeSection(sectionOutputStream);
  8. // 保存info目录信息
  9. saver.serializeINodeDirectorySection(sectionOutputStream);
  10. // 租约管理
  11. saver.serializeFilesUCSection(sectionOutputStream);
  12. return saver.getNumImageErrors();
  13. }

保存INode信息是由FSImageFormatPBINode.Saver.serializeINodeSection()方法实现的。

serializeINodeSection()方法会首先构造一个INodeSection对象, 记录文件系统目录树中保存的最后一个inode的inodeid, 以及命名空间中所有inode的个数。 之后迭代处理将所有inode信息写入fsimage文件中, 最后将INodeSection的属性信息记录在FileSummary中。


  
  1. /**
  2. * serializeINodeSection()方法会首先构造一个INodeSection对象,
  3. * 记录文件系统目录树中保存的最后一个inode的inodeid,
  4. * 以及命名空间中所有inode的个数。
  5. *
  6. * 之后迭代处理将所有inode信息写入fsimage文件中,
  7. * 最后将INodeSection的属性信息记录在FileSummary中。
  8. *
  9. * serializeINodeSection
  10. * @param out
  11. * @throws IOException
  12. */
  13. void serializeINodeSection(OutputStream out) throws IOException {
  14. INodeMap inodesMap = fsn.dir.getINodeMap();
  15. //构造一个 INodeSection, 保存最后一个inode的 inodeid, 以及这个命名空间中所有inode的个数
  16. INodeSection.Builder b = INodeSection.newBuilder()
  17. .setLastInodeId(fsn.dir.getLastInodeId()).setNumInodes(inodesMap.size());
  18. INodeSection s = b.build();
  19. //序列化至输出流
  20. s.writeDelimitedTo(out);
  21. int i = 0;
  22. //迭代处理inodeMap中所有的inode, 调用save()方法将inode信息保存到fsimage中
  23. Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
  24. while (iter.hasNext()) {
  25. INodeWithAdditionalFields n = iter.next();
  26. // 将所有inode信息写入fsimage文件中
  27. save(out, n);
  28. ++i;
  29. if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
  30. context.checkCancelled();
  31. }
  32. }
  33. //调用commitSection()方法在FileSummary中写入inode section
  34. parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
  35. }

save()方法首先将当前INode对象分为目录、 文件以及符号链接等几类, 然后调用各个类型对应的save()重载方法。 重载方法的实现也非常简单, 就是构造不同的protobuf Builder类, 然后设置相应字段的值, 并将序列化之后的对象写入fsimage文件的输出流中。 这里以INodeFile为例, 首先构造protobuf Builder类——INodeSection.INodeFile.Builder, 然后设置blocks——也就是当前文件有哪些数据块, 如果当前的INodeFile处于构建状态, 则设置对应的构建信息。 最后将序列化后的inode信息写入输出流中。


  
  1. /**
  2. * @param out
  3. * @param n
  4. * @throws IOException
  5. */
  6. private void save(OutputStream out, INode n) throws IOException {
  7. if (n.isDirectory()) {
  8. save( out, n.asDirectory());
  9. } else if (n.isFile()) {
  10. save( out, n.asFile());
  11. } else if (n.isSymlink()) {
  12. save( out, n.asSymlink());
  13. }
  14. }
  15. private void save(OutputStream out, INodeDirectory n) throws IOException {
  16. INodeSection.INodeDirectory.Builder b = buildINodeDirectory(n,
  17. parent.getSaverContext());
  18. INodeSection.INode r = buildINodeCommon(n)
  19. .setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b).build();
  20. r.writeDelimitedTo( out);
  21. }
  22. // 首先构造protobuf Builder类——INodeSection.INodeFile.Builder,
  23. // 然后设置blocks——也就是当前文件有哪些数据块,
  24. // 如果当前的INodeFile处于构建状态, 则设置对应的构建信息。
  25. //
  26. // 最后将序列化后的inode信息写入输出流中。
  27. private void save(OutputStream out, INodeFile n) throws IOException {
  28. INodeSection.INodeFile.Builder b = buildINodeFile(n,
  29. parent.getSaverContext());
  30. BlockInfo[] blocks = n.getBlocks();
  31. if (blocks != null) {
  32. for (Block block : n.getBlocks()) {
  33. b.addBlocks(PBHelperClient.convert(block));
  34. }
  35. }
  36. FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();
  37. if (uc != null) {
  38. INodeSection.FileUnderConstructionFeature f =
  39. INodeSection.FileUnderConstructionFeature
  40. .newBuilder().setClientName(uc.getClientName())
  41. .setClientMachine(uc.getClientMachine()).build();
  42. b.setFileUC(f);
  43. }
  44. INodeSection.INode r = buildINodeCommon(n)
  45. .setType(INodeSection.INode.Type.FILE).setFile(b).build();
  46. r.writeDelimitedTo( out);
  47. }
  48. private void save(OutputStream out, INodeSymlink n) throws IOException {
  49. INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
  50. .newBuilder()
  51. .setPermission(buildPermissionStatus(n))
  52. .setTarget(ByteString.copyFrom(n.getSymlink()))
  53. .setModificationTime(n.getModificationTime())
  54. .setAccessTime(n.getAccessTime());
  55. INodeSection.INode r = buildINodeCommon(n)
  56. .setType(INodeSection.INode.Type.SYMLINK).setSymlink(b).build();
  57. r.writeDelimitedTo( out);
  58. }

3.5.6. 将FileSummary写入文件


  
  1. private static void saveFileSummary(OutputStream out, FileSummary summary)
  2. throws IOException {
  3. summary.writeDelimitedTo( out);
  4. int length = getOndiskTrunkSize(summary);
  5. byte[] lengthBytes = new byte[ 4];
  6. ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
  7. out.write(lengthBytes);
  8. }



 

四. 加载FSImage  :  FSImage.loadFSImage()

当Namenode启动时, 首先会将fsimage文件中记录的命名空间加载到Namenode内存中, 然后再一条一条地将editlog文件中记录的更新操作加载并合并到命名空间中。 

整体代码流程图:

Namenode会等待各个Datanode向自己汇报数据块信息来组装blockMap, 从而离开安全模式。 Namenode每次启动时都会调用FSImage.loadFSImage()方法执行加载fsimage和editlog文件的操作。 

这里需要关注两个地方, 一个是加载 FSImage 文件fsImage.recoverTransitionRead(startOpt, this, recovery); 另外一个是根据条件判断是否要合并edits文件[间隔周期1小时或者,edits数量操作100万.].

 

我们直接说重要的loadFSImage方法,前面其实还有些方法,但是太啰嗦了,直接看重点吧. 其实就是加载最后一个fsimage文件.

加载分两步:

1 获取加载器 FSImageFormat.LoaderDelegator .

2. 加载文件: loader.load(curFile, requireSameLayoutVersion);


  
  1. /**
  2. * Load in the filesystem image from file. It's a big list of
  3. * filenames and blocks.
  4. */
  5. private void loadFSImage(File curFile, MD5Hash expectedMd5,
  6. FSNamesystem target, MetaRecoveryContext recovery,
  7. boolean requireSameLayoutVersion) throws IOException {
  8. // BlockPoolId is required when the FsImageLoader loads the rolling upgrade
  9. // information. Make sure the ID is properly set.
  10. target.setBlockPoolId( this.getBlockPoolID());
  11. //获取加载器 FSImageFormat.LoaderDelegator
  12. FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
  13. //加载文件
  14. loader.load(curFile, requireSameLayoutVersion);
  15. // Check that the image digest we loaded matches up with what
  16. // we expected
  17. MD5Hash readImageMd5 = loader.getLoadedImageMd5();
  18. if (expectedMd5 != null &&
  19. !expectedMd5.equals(readImageMd5)) {
  20. throw new IOException( "Image file " + curFile +
  21. " is corrupt with MD5 checksum of " + readImageMd5 +
  22. " but expecting " + expectedMd5);
  23. }
  24. long txId = loader.getLoadedImageTxId();
  25. LOG.info( "Loaded image for txid " + txId + " from " + curFile);
  26. lastAppliedTxId = txId;
  27. storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
  28. }

我们看一下load方法:


  
  1. public void load(File file, boolean requireSameLayoutVersion)
  2. throws IOException {
  3. Preconditions.checkState(impl == null, "Image already loaded!");
  4. FileInputStream is = null;
  5. try {
  6. is = new FileInputStream(file);
  7. byte[] magic = new byte[FSImageUtil.MAGIC_HEADER.length];
  8. IOUtils.readFully( is, magic, 0, magic.length);
  9. // fsimage文件中包括 magicHeader, 使用的是protobuf序列化方式
  10. if (Arrays. equals(magic, FSImageUtil.MAGIC_HEADER)) {
  11. //构造FSImageFormatProtobuf.Loader加载fsimage文件
  12. FSImageFormatProtobuf.Loader loader = new FSImageFormatProtobuf.Loader(
  13. conf, fsn, requireSameLayoutVersion);
  14. impl = loader;
  15. loader.load(file);
  16. } else {
  17. //否则构造FSImageFormat.Loader加载fsimage文件
  18. Loader loader = new Loader(conf, fsn);
  19. impl = loader;
  20. loader.load(file);
  21. }
  22. } finally {
  23. IOUtils.cleanupWithLogger(LOG, is);
  24. }
  25. }

这里面是构造FSImageFormatProtobuf.Loader对象,使用它的load方法加载fsimage文件. 在load方法中最终调用了loadInternal(raFile, fin);方法. 这个方法是加载fsimage文件的相对最底层的方法了.

在加载fsimage操作中, 最终会调用FSImageFormatProtobuf.Loader作为fsimage文件的加载类。 FSImageFormatProtobuf.Loader.loadInternal()方法执行了加载fsimage文件的操作, loadInternal()方法会打开fsimage文件通道, 然后读取fsimage文件中的FileSummary对象, FileSummary对象中记录了fsimage中保存的所有section的信息。loadInternal()会对FileSummary对象中保存的section排序, 然后遍历每个section并调用对应的方法从fsimage文件中加载这个section。

 

代码如下:


  
  1. // loadInternal()方法会打开fsimage文件通道,
  2. // 然后读取fsimage文件中的FileSummary对象,
  3. // FileSummary对象中记录了fsimage中保存的所有section的信息。
  4. // loadInternal()会对FileSummary对象中保存的section排序,
  5. // 然后遍历每个section并调用对应的方法从fsimage文件中加载这个section。
  6. private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
  7. throws IOException {
  8. if (!FSImageUtil.checkFileFormat(raFile)) {
  9. throw new IOException( "Unrecognized file format");
  10. }
  11. // 从fsimage文件末尾加载FileSummary, 也就是fsimage文件内容的描述
  12. FileSummary summary = FSImageUtil.loadSummary(raFile);
  13. if (requireSameLayoutVersion && summary.getLayoutVersion() !=
  14. HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
  15. throw new IOException( "Image version " + summary.getLayoutVersion() +
  16. " is not equal to the software version " +
  17. HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
  18. }
  19. //获取通道
  20. FileChannel channel = fin.getChannel();
  21. // 构造FSImageFormatPBINode.Loader和FSImageFormatPBSnapshot.
  22. // Loader加载INode以及Snapshot
  23. FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
  24. fsn, this);
  25. FSImageFormatPBSnapshot.Loader snapshotLoader = new FSImageFormatPBSnapshot.Loader(
  26. fsn, this);
  27. //对fsimage文件描述中记录的sections进行排序
  28. ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
  29. .getSectionsList());
  30. Collections.sort(sections, new Comparator<FileSummary.Section>() {
  31. @Override
  32. public int compare(FileSummary.Section s1, FileSummary.Section s2) {
  33. SectionName n1 = SectionName.fromString(s1.getName());
  34. SectionName n2 = SectionName.fromString(s2.getName());
  35. if (n1 == null) {
  36. return n2 == null ? 0 : -1;
  37. } else if (n2 == null) {
  38. return -1;
  39. } else {
  40. return n1.ordinal() - n2.ordinal();
  41. }
  42. }
  43. });
  44. StartupProgress prog = NameNode.getStartupProgress();
  45. /**
  46. * beginStep() and the endStep() calls do not match the boundary of the
  47. * sections. This is because that the current implementation only allows
  48. * a particular step to be started for once.
  49. */
  50. Step currentStep = null;
  51. //遍历每个section, 并调用对应的方法加载这个section
  52. for (FileSummary.Section s : sections) {
  53. //在通道中定位这个section的起始位置
  54. channel.position(s.getOffset());
  55. InputStream in = new BufferedInputStream( new LimitInputStream(fin,
  56. s.getLength()));
  57. in = FSImageUtil.wrapInputStreamForCompression(conf,
  58. summary.getCodec(), in);
  59. String n = s.getName();
  60. //调用对应的方法加载不同的section
  61. switch (SectionName.fromString(n)) {
  62. case NS_INFO:
  63. loadNameSystemSection( in);
  64. break;
  65. case STRING_TABLE:
  66. loadStringTableSection( in);
  67. break;
  68. case INODE: {
  69. currentStep = new Step(StepType.INODES);
  70. prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
  71. inodeLoader.loadINodeSection( in, prog, currentStep);
  72. }
  73. break;
  74. case INODE_REFERENCE:
  75. snapshotLoader.loadINodeReferenceSection( in);
  76. break;
  77. case INODE_DIR:
  78. inodeLoader.loadINodeDirectorySection( in);
  79. break;
  80. case FILES_UNDERCONSTRUCTION:
  81. inodeLoader.loadFilesUnderConstructionSection( in);
  82. break;
  83. case SNAPSHOT:
  84. snapshotLoader.loadSnapshotSection( in);
  85. break;
  86. case SNAPSHOT_DIFF:
  87. snapshotLoader.loadSnapshotDiffSection( in);
  88. break;
  89. case SECRET_MANAGER: {
  90. prog.endStep(Phase.LOADING_FSIMAGE, currentStep);
  91. Step step = new Step(StepType.DELEGATION_TOKENS);
  92. prog.beginStep(Phase.LOADING_FSIMAGE, step);
  93. loadSecretManagerSection( in, prog, step);
  94. prog.endStep(Phase.LOADING_FSIMAGE, step);
  95. }
  96. break;
  97. case CACHE_MANAGER: {
  98. Step step = new Step(StepType.CACHE_POOLS);
  99. prog.beginStep(Phase.LOADING_FSIMAGE, step);
  100. loadCacheManagerSection( in, prog, step);
  101. prog.endStep(Phase.LOADING_FSIMAGE, step);
  102. }
  103. break;
  104. case ERASURE_CODING:
  105. Step step = new Step(StepType.ERASURE_CODING_POLICIES);
  106. prog.beginStep(Phase.LOADING_FSIMAGE, step);
  107. loadErasureCodingSection( in);
  108. prog.endStep(Phase.LOADING_FSIMAGE, step);
  109. break;
  110. default:
  111. LOG.warn( "Unrecognized section {}", n);
  112. break;
  113. }
  114. }
  115. }

对于不同类型的section, loadInternal()方法会调用不同的方法加载这个section, 例如对于INodeSection会调用InodeLoader.loadINodeSection()方法加载。 load*()方法的实现都比较简单, 就是按照protobuf格式加载不同的section. 慢慢恢复/构建FSNamesystem对象中的内容.

 

下面是加载loadINodeSection的代码示例:


  
  1. void loadINodeSection(InputStream in, StartupProgress prog,
  2. Step currentStep) throws IOException {
  3. INodeSection s = INodeSection.parseDelimitedFrom(in);
  4. fsn.dir.resetLastInodeId(s.getLastInodeId());
  5. long numInodes = s.getNumInodes();
  6. LOG.info( "Loading " + numInodes + " INodes.");
  7. prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
  8. Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
  9. for ( int i = 0; i < numInodes; ++i) {
  10. INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
  11. if (p.getId() == INodeId.ROOT_INODE_ID) {
  12. // 加载root
  13. loadRootINode(p);
  14. } else {
  15. //加载子节点
  16. INode n = loadINode(p);
  17. dir.addToInodeMap(n);
  18. }
  19. counter.increment();
  20. }
  21. }

下面试loadINodeDirectorySection 方法, 构建目录体系


  
  1. void loadINodeDirectorySection(InputStream in) throws IOException {
  2. final List<INodeReference> refList = parent.getLoaderContext()
  3. .getRefList();
  4. while ( true) {
  5. INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
  6. .parseDelimitedFrom( in);
  7. // note that in is a LimitedInputStream
  8. if (e == null) {
  9. break;
  10. }
  11. INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
  12. for ( long id : e.getChildrenList()) {
  13. INode child = dir.getInode(id);
  14. addToParent(p, child);
  15. }
  16. for ( int refId : e.getRefChildrenList()) {
  17. INodeReference ref = refList.get(refId);
  18. addToParent(p, ref);
  19. }
  20. }
  21. }

 

五. 加载Edits 文件

FSImage.loadEdits()方法会构造一个FSEditLogLoader对象. 然后遍历Namenode所有存储路径上保存的editlog文件的输入流  并调用FSEditLogLoader.loadFSEdits()方法加载指定路径上的editlog文件。


  
  1. /**
  2. * FSImage.loadEdits()方法会构造一个FSEditLogLoader对象,
  3. * 然后遍历Namenode所有存储路径上保存的editlog文件的输入流
  4. * 并调用FSEditLogLoader.loadFSEdits()方法加载指定路径上的editlog文件。
  5. *
  6. * @param editStreams
  7. * @param target
  8. * @param maxTxnsToRead
  9. * @param startOpt
  10. * @param recovery
  11. * @return
  12. * @throws IOException
  13. */
  14. public long loadEdits(Iterable<EditLogInputStream> editStreams,
  15. FSNamesystem target, long maxTxnsToRead,
  16. StartupOption startOpt, MetaRecoveryContext recovery)
  17. throws IOException {
  18. LOG.debug( "About to load edits:\n " + Joiner.on( "\n ").join(editStreams));
  19. StartupProgress prog = NameNode.getStartupProgress();
  20. prog.beginPhase(Phase.LOADING_EDITS);
  21. //记录命名空间中加载的最新的事务id
  22. long prevLastAppliedTxId = lastAppliedTxId;
  23. long remainingReadTxns = maxTxnsToRead;
  24. try {
  25. //构造FSEditLogLoader对象用于加栽editlog文件
  26. FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
  27. //遍历所有存储路径上editlog文件对应的输入流
  28. // Load latest edits
  29. for (EditLogInputStream editIn : editStreams) {
  30. LogAction logAction = loadEditLogHelper.record();
  31. if (logAction.shouldLog()) {
  32. String logSuppressed = "";
  33. if (logAction.getCount() > 1) {
  34. logSuppressed = "; suppressed logging for " +
  35. (logAction.getCount() - 1) + " edit reads";
  36. }
  37. LOG.info( "Reading " + editIn + " expecting start txid #" +
  38. (lastAppliedTxId + 1) + logSuppressed);
  39. }
  40. try {
  41. //调用FSEditLogLoader.loadFSEdits()从某个存储路径上的editlog文件加载修改操作
  42. remainingReadTxns -= loader.loadFSEdits(editIn, lastAppliedTxId + 1,
  43. remainingReadTxns, startOpt, recovery);
  44. } finally {
  45. // lastAppliedTxId记录从editlog加载的最新的事务id
  46. // Update lastAppliedTxId even in case of error, since some ops may
  47. // have been successfully applied before the error.
  48. lastAppliedTxId = loader.getLastAppliedTxId();
  49. }
  50. // If we are in recovery mode, we may have skipped over some txids.
  51. if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID
  52. && recovery != null) {
  53. lastAppliedTxId = editIn.getLastTxId();
  54. }
  55. if (remainingReadTxns <= 0) {
  56. break;
  57. }
  58. }
  59. } finally {
  60. //关闭所有editlog文件的输入流
  61. FSEditLog.closeAllStreams(editStreams);
  62. }
  63. prog.endPhase(Phase.LOADING_EDITS);
  64. return lastAppliedTxId - prevLastAppliedTxId;
  65. }


 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


转载:https://blog.csdn.net/zhanglong_4444/article/details/108444392
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场