读过程
如下图所示,假设HDFS客户端需要读取文件one.txt,文件one.txt被分为两个数据块,分别为BlockA和BlockB。副本数为3。BlockA存放在D2,D5,D7;BlockB存放在D3,D9,D11。
HDFS客户端与NameNode通信
- 因为NameNode存放了one.txt的数据块的元数据,所以客户端需要向NameNode发出请求,请求获取存放了one.txt的数据块的DataNode列表。
- NameNode收到客户端发来的请求,首先检查客户端用户的权限。如果客户端用户有足够的权限,NameNode接着检查请求的文件是否存在。如果存在,NameNode会发送存放了one.txt的数据块的DataNode列表(该列表的DataNode按照客户端到DataNode的距离由小到大排列)。同时,NameNode会给予客户端一个安全令牌,当客户端访问DataNode时,需要使用令牌进行认证。
HDFS客户端与DataNode通信
- 当客户端收到NameNode发过来的DataNode列表,客户端直接与DataNode通信。通过FSDataInputStream对象给距离客户端最近的DataNode(在D2的BlockA和在D3的BlockB)发送请求。其中,DFSInputStream管理客户端和DataNode之间的通信。
- 客户端用户向DataNode展示NameNode给的安全令牌,然后开始从DataNode读取数据,数据将以流的形式从DataNode流向客户端。
- 读取完所有数据块时,客户端调用close()方法关闭FSDataInputStream。
HDFS读数据的详细过程
- 客户端调用FileSystem的 open() 方法(在HDFS文件系统中DistributedFileSystem具体实现了FileSystem)。
- DistributedFileSystem通过RPC远程调用NameNode,请求获取存放了one.txt的开始几个数据块的DataNode的位置。然后NameNode返回存放该数据块的所有DataNode的地址(NameNode根据距离客户端的远近对DataNode进行排序)
- open()之后,DistributedFileSystem返回一个FSDataInputStream输入流给客户端。对于HDFS而言,具体的输入流是DFSInputStream,DistributedFileSystem会利用DFSInputStream实例化FSDataInputStream。
- 客户端调用 read() 方法读取数据。输入流DFSInputStream根据得到的排序结果,选择距离客户端最近的DataNode建立连接并读取数据。接着数据以流的形式从DataNode流向客户端。
- 当该数据块读取完毕时,DFSInputStream关闭与该DataNode的连接。然后向NameNode发出请求,获取下一个数据块的位置(如果客户端缓存中已经包含该数据块的信息,就不需要请求NameNode)
- 找到下一个数据块最佳的DataNode,读取数据。
- 如果DFSInputStream在读取的过程中,与正在通信的DataNode发生错误,客户端将选择其它距离它最近的DataNode。DFSInputStream也会标记那些发生故障的DataNode,避免下一个块再读取这些DataNode。同时,DFSInputStream会校验数据的检验和,如果发现有错误,会向NameNode汇报,然后读取其它的DataNode。
- 当客户端读取完全部数据,调用FSDataInputStream的 close() 关闭输入流。
代码实现
public static void read() throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf , "root");
FSDataInputStream fis = fs.open(new Path("/picture.jpg"));
FileOutputStream fos = new FileOutputStream("picture.jpg");
int numBytes = 0;
byte[] buffer = new byte[1024];
while ((numBytes = fis.read(buffer)) > 0) {
fos.write(buffer, 0, numBytes);
}
fis.close();
fos.close();
fs.close();
}
写数据
在写操作中,DFSOutputStream维护了两个队列,分别是数据队列(data queue)和确认队列(ack queue)。
假设要向HDFS写入文件two.txt(假装NameNode已经写好two.txt的信息,DataNode已经写入数据,静态图有点难画出这两个过程,大概意思懂就行)
HDFS客户端与NameNode通信
- 为了向HDFS写入文件two.txt,客户端需要向NameNode发起请求。NameNode首先检查客户端用户的权限,如果用户有足够的权限且不存在相同名字的文件,NameNode为该文件创建相应的记录(元数据)。如果文件已经存在,则文件创建失败,报错IOException。
- NameNode向客户端提供所有可以写入文件two.txt的DataNode的地址,同时NameNode会给予客户端一个安全令牌,在客户端写入DataNode前,使用令牌进行认证。
HDFS客户端与DataNode通信
- 客户端收到来自NameNode的DataNode列表和允许写权限后,客户端获取到DataNode的列表和允许写权限后,直接向列表中的第一个DataNode写数据。当一个DataNode正在写的时候,它会根据副本数因子向另一个DataNode复制副本。如果副本数因子为3,那么至少在不同的DataNode上有3个数据块的副本。
- 当副本创建完毕,DataNode会向客户端发送确认信息。每一个DataNode之间形成一个管道。
HDFS读数据的详细过程
- 客户端调用FileSystem的 create() 方法(在HDFS文件系统中DistributedFileSystem具体实现了FileSystem)。
- DistributedFileSystem通过RPC远程调用NameNode,在文件系统的namespace创建一个新文件two.txt。
- NameNode检查客户端用户的权限同时检查文件two.txt是否存在。如果用户有足够的权限以及没有同名的文件,NameNode则创建该文件并添加文件信息。
- 远程方法调用结束后,DistributedFileSystem会给客户端返回一个输出流FSDataOutputStream,而该对象封装了一个DFSOutputStream。客户端使用 write() 向HDFS写入数据。
- 当客户端开始写数据时,DFSOutputStream将客户端的数据分割成若干个数据包,同时将这些包写入一个内部的数据队列。其中,DataStreamer利用这个数据队列,负责向NameNode申请保存文件和副本数据块的若干个DataNode。这些DataNode会形成一个数据流管道,管道内的DataNode数由副本数因子决定。DataStreamer将数据包发送到管道里的第一个DataNode,第一个DataNode保存数据包并将其发送到第二个DataNode。同样的,第二个DataNode保存并将其发送到下一个DataNode,以此类推。
- DFSOutputStream同时还维护了一个确认队列,用于接收来自DataNode的确认信息,一旦DataNode创建了副本,就会发送确认,保证数据的完整性。这些确认信息沿着数据流管道逆流而上,依次经过各个DataNode并最终发送到客户端。当客户端收到确认,就将对应的包从确认队列删除,直到收到所有的确认。
- 客户端调用 close() 方法关闭输出流,就会将DataNode管道里剩余的数据全部flush掉,并且等待确认。当DFSOutputStream的确认队列收到所有确认,就会调用 complete() 告诉NameNode写入文件完毕。
写数据时DataNode发生故障会怎么样
写数据时,如果DataNode发生了故障,那么下面的操作将会发生,这些操作对客户端是透明的。
- 管道将会被关闭,确认队列的数据包会被加入到数据队列的前面,确保那些已经发送到DataNode的数据包不被丢失。
- 在正常DataNode上的当前数据块获得一个新的标识,然后这个标识被送到NameNode,以便发生故障的DataNode恢复了,可以删除这个DataNode上的数据块。
- 将发生故障的DataNode从管道里删除,并将其余正常的DataNode构建一个新的管道,剩余的数据将通过新的管道被写入正常的DataNode中。
- NameNode检测到数据块未被充分复制,就会安排在另一个DataNode上创建多一份副本。而其它即将到来的数据块则正常写入。
代码实现
public static void write() throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf , "root");
FileInputStream fis = new FileInputStream("picture.jpg");
FSDataOutputStream fos = fs.create(new Path("/picture.jpg"));
int numBytes = 0;
byte[] buffer = new byte[1024];
while ((numBytes = fis.read(buffer)) > 0) {
fos.write(buffer, 0, numBytes);
}
fis.close();
fos.close();
fs.close();
}
如有错误,请大家指出(ง •̀_•́)ง (*•̀ㅂ•́)و
转载:https://blog.csdn.net/H_X_P_/article/details/105777251
查看评论