一、Hadoop
1.1 Hadoop 的发展历史
1.1.1 概念
Hadoop是一个由Apache基金会开发的分布式系统基础架构,主要解决海量数据的存储和计算问题,广义上Hadoop指的是Hadoop生态圈
1.1.2 谷歌的三驾马车
谷歌的三篇论文 | 对应的技术 |
---|---|
GFS | HDFS |
MapReduce | MapReduce |
BigTable | Hbase |
1.1.3 Hadoop 三大发行版本
- 免费开源版本 apache
- 软件收费版本 cloudera
- 免费开源版本 hortonworks
1.1.4 Hadoop的优点
-
扩容能力强:集群拓展方便
-
成本低:Hadoop可以运行在廉价的pc机上
-
效率高:动态并行移动数据,速度快
-
可靠性:自动维护数据的多份复制,独特的副本机制
- 数据自动保存多个副本
- 某个副本丢失可以自动恢复
1.1.5 Hadoop的组成
-
HDFS:高可靠,高吞吐的分布式文件系统
-
MapReduce:分布式离线并行计算框架
-
YARN:作业调度与集群资源管理框架
-
Common:支持其他模块的工具模块
1.2 Hadoop 生态系统
Components | Explanation |
---|---|
Sqoop | Hadoop(Hive)与传统数据库(Mysql)之间进行数据的传递 |
Flume | 高可用、高可靠分布式的海量日志采集、聚合和传输系统 |
Kafka | 高吞吐量的分布式发布订阅消息系统 |
Storm | 分布式实时流式计算(过时) |
Spark | 最流行的开源大数据内存计算框架 |
Oozie | 管理Hadoop作业(job)的工作流程调度管理系统 |
Hbase | 分布式、面向列的开源数据库(NoSQL) |
Hive | 基于Hadoop的数据仓库工具,提供简单地类SQL查询功能 |
二、HDFS
2.1 HDFS 概述
2.1.1 HDFS 起源
HDFS全称Hadoop Distribute File System,Hadoop的分布式文件系统;解决了海量数据的存储问题
2.1.2 HDFS的设计理念
- 故障检测和自动快速恢复是HDFS的核心架构目标
- 流式读取数据,批量处理数据,不适合用户交互式,注重数据访问的高吞吐
- 支持大文件,不支持小文件
- 对文件的要求是一次写入多次读出
- 移动计算代价比移动数据代价低
- 可移植性高
2.1.3 HDFS优缺点
- 优点
- 高容错性
- 数据自动保存多个副本
- 某个副本丢失后,可自动回复
- 适合大数据处理
- 数据规模大
- 文件规模大
- 可以运行在廉价的pc机上
- 高容错性
- 缺点
- 不适合低延时数据访问
- 无法高效存储小文件
- 一个block块在NameNode中占150byte(固定),过多小文件会占用NameNode内存
- 小文件的寻址时间大于读取时间,不符合HDFS设计目标
- 不支持并发写和随机写
- 一个文件只能有一个写,不允许多线程同时写
- 不支持随机写,但支持追加写
2.1.4 HDFS架构
- Master/Slave 架构
- HDFS采用Master/Slave架构,一般一个集群有一个NameNode和一定数目DataNode组成,Namenode 是 HDFS 集群主节点,Datanode 是 HDFS 集群从节点,两种角色各司其职,共同协调完成分布式的文件存储服务
- 分块存储
- HDFS中文件在物理上是分块存储,通过dfs.blocksize配置,2.x之后的版本默认128M
- HDFS中文件在逻辑上是连续的,提供一个文件目录树
- block块大小计算
- 理想寻址时间为10ms
- 理想读取文件时间为寻址时间的10%
- 市面上磁盘的存储速率为100M/s
- 因此block块大小:10ms/10% *100M/s = 100M -> 128M
- 命名空间(NameSpace)
- NameNode负责维护文件系统的命名空间
- NameNode会给客户端提供一个统一的抽象目录树
- NameNode 元数据
- 包括文件名,副本数,分块数,每个块的节点位置,权限等
- DataNode 数据存储
- 文件的各个block的存储管理由DataNode维护,集群启动时DataNode会汇报自己每个块的元数据,之后也会进行通信汇报block块信息
- NameNode 概述
- NameNode是HDFS的核心
- NameNode也称为Master
- NameNode存储HDFS的元数据:文件系统中所有的目录树,并跟踪整个集群的文件
- NameNode不存储实际数据
- NameNode知道HDFS中任何给定文件的块列表及其位置,使用此信息NameNode知道如何从块中构建文件
- NameNode不持久化存储每个文件中各个块所在的节点信息,这些信息会会在系统启动时从块中数据节点重建
- NameNode关闭,集群将无法访问
- NameNode所在的机器通常配置大量内存
- DataNode
- DataNode负责将实际数据存储在HDFS中
- DataNode也称为Slave
- NameNode和DataNode会保持不断通信
- DataNode启动时,会将自己发布到NameNode并汇报自己负责的块列表
- 某个DataNode关闭不会影响集群的正常使用
- DataNode每个3秒向NameNode发送心跳,若NameNode长时间(10分钟)没有接受到DataNode发送的心跳则认为该节点不可用,会将该节点的数据复制到其他节点,并且永远不再使用该节点(退出集群后再次上线可以恢复)
- DataNode所在的机器通常配置大量磁盘空间
2.1.5 安全模式
安全模式是HDFS所处的一种特殊状态,在这种状态下,文件系统只接受读数据请求,而不接受创建、删除、修改等变更请求。同时会对数据块进行大量校验,导致资源的分配和申请耗时远超预期。
在NameNode主节点启动时,HDFS首先进入安全模式,DataNode在启动的时候会向namenode汇报可用的block等状态,当整个系统达到安全标准时,HDFS自动离开安全模式。如果HDFS处于安全模式下,则文件block不能进行任何的副本复制操作,hdfs集群刚启动的时候,默认30S钟的时间是出于安全期的,只有过了30S之后,集群脱离了安全期,然后才可以对集群进行操作
safemode阈值由dfs.namenode.safemode.threshold-pct参数控制(缺省0.999),每个块的满足需求最小副本数由dfs.namenode.replication.min参数控制。因此在业务紧急要求恢复的时候,可以尝试将以下两个参数调低,使安全模式尽快结束:
dfs.namenode.replication.min——满足需要的最小副本数
dfs.namenode.safemode.threshold-pct——集群中满足正常配置的数据块比例
如果是为了防止某些异常数据导致始终校验不过的情况,可以尝试将上面的dfs.namenode.safemode.threshold-pct参数调为0或比0小的值后重启NameNode,这样永远不会进入安全模式;或者使用以下命令手动退出安全模式
hdfs dfsadmin -safemode
查看
hdfs dfsadmin -safemode get
离开
hdfs dfsadmin -safemode leave
2.2 HDFS 读写流程
2.2.1 HDFS 写流程
- 客户端向NameNode发送写数据请求(包含待上传文件名和将要上传的路径)
- NameNode检查路径是否存在,文件是否重名等(假设满足上传条件)
- NameNode向客户端响应数据,可以上传文件
- 客户端根据文件大小进行切分成一个个block块,并向NameNode发送提交即将上传block1的请求
- NameNode查询DataNode信息,规划block1的存储位置
- NameNode向客户端返回block1可以存储的数据节点ip列表
- 客户端直接请求数据节点1上传block1,数据节点1存储block1完毕并根据ip列表将block1发送给数据节点2,数据节点2存储完毕block1并根据ip列表将block1发送给数据节点3,数据节点3存储完成响应数据给数据节点2,数据节点2将响应数据给数据节点1,数据节点1将存储结果返回给NameNode和客户端
- 重复第四步上传下一个block
2.2.2 HDFS 读流程
- 客户端向NameNode请求下载文件
- NameNode返回目标文件的元数据
- 客户端根据元数据请求DataNode读取数据block
- DataNode向客户端传输数据
- 重复第三步,直到所有的块传输完成
- 客户端根据元数据组装block块完成读取数据#
2.2.3 网络拓扑与机架感知
1.引入
客户端上传数据到HDFS时,会上传到离当前客户端最近的数据节点,因此通过网络拓扑获取网络中的距离
2.网络拓扑
- 同一节点上的进程距离为0
- 同一机架上的不同节点距离为2
- 同一数据中心不同机架的节点距离为4
- 不同数据中心的节点距离为6
3.机架感知
以三个副本为例,第一个副本根据放在距离客户端最近的一个节点,第二个副本放在该节点同一机架的不同节点,第三个副本放在不同机架的随机节点
2.3 NameNode 工作机制
2.3.1 NameNode 元数据的存储位置
首先,为了提高HDFS的读写速率,必定将NameNode的元数据存储在内存中,但是内存一旦断电元数据将丢失,因此必须将内存中的元数据存储在磁盘中用于备份
2.3.2 Fsimage
Fsimage为内存元数据的备份。若内存的元数据发生改变,如果同时更新Fsimage会降低效率,如果不更新会发生数据不一致问题
2.3.3 edits
针对上述问题,最终逻辑是不更新Fsimage文件,为解决数据不一致问题,引入edits文件,该文件只记录操作并且采用追加写 的形式,即每当内存的元数据发生改变的同时记录本次操作记录追加到磁盘中的edits,这样内存元数据等于磁盘的Fsimage + edits
2.3.4 NameNode 工作机制
当NameNode启动时先滚动edits并生成一个空的edits.inprogress会将Fsimage和edits文件加载到内存中进行合并,之后的操作(增删)将追加到edits.inprogress中
2.4 Secondary NameNode 工作机制
2.4.1 2NN 工作机制
根据NameNode的工作机制,当edits的操作记录记录过多时不仅会降低追加效率,同时断电恢复时会花费大量时间,因此2NN将针对此问题进行解决,将触发检查条件时,2NN首先通知NameNode滚动edits生成新的eidts.inprogress(之后的操作记录将写在此文件)并通过http get的形式将磁盘的Fsimage和edits复制过来并加载到内存中进行合并,生成Fsimage.chkpoint文件,并通过http post形式拷贝给NameNode重命名为Fsimage后替换原来的Fsimage。
2.4.2 check point
-
每个一个小时执行一次
<property> <name>dfs.namenode.checkpoint.period</name> <value>3600</value> </property>
-
100万次操作执行一次(一分钟检查一次操作数)
<property> <name>dfs.namenode.checkpoint.txns</name> <value>1000000</value> <description>操作动作次数</description> </property> <property> <name>dfs.namenode.checkpoint.check.period</name> <value>60</value> <description> 1分钟检查一次操作次数</description> </property>
2.5 HDFS Shell 操作
shell | |||
---|---|---|---|
hadoop fs -ls path | -h 转换显示的字节数单位 | -R 递归显示 | 显示文件、目录信息 |
hadoop fs -mkdir path | -p 创建多级文件 | 在hdfs上创建目录 | |
hadoop fs -put src dst | -f 覆盖原文件上传 | -p 保留文件的信息 | 上传本地文件到hdfs |
hadoop fs -get src dst | -crc 为下载的文件写CRC校验和 | -f 覆盖原文件下载 | 下载hdfs文件到本地 |
hadoop fs -appendToFile src dst | 将一个文件追加到另一个文件之后 | ||
hadoop fs -cat path | 显示hdfs文件内容到控制台 | ||
hadoop fs -tail path | -f 监测文件追加的数据 | 显示最后一千字节内容 | |
hadoop fs -chgrp path | -R 递归修改 | 修改文件组 | |
hadoop fs -chmod path | -R 递归修改 | 修改文件权限 | |
hadoop fs -chown path | 修改文件所有者 | ||
hadoop fs -copyFromLocal src dst | 从本地文件系统拷贝文件到hdfs(类似-put) | ||
hadoop fs -copyToLocal src dst | 从hdfs文件拷贝到本地文件系统(类似-get) | ||
hadoop fs -cp path1 path2 | 从hdfs一个路径复制到另一个路径 | ||
hadoop fs -mv path1 path2 | 在hdfs目录中移动文件(剪切) | ||
hadoop fs -rm path | -r 递归删除 | 删除hdfs文件(夹) | |
hadoop fs -df path | -h 换单位 | 统计文件系统可用空间信息 | |
hadoop fs -du path | -h 换单位 | 显示文件大小 | |
hadoop fs -setrep | -R 递归改变 | 设置文件的副本数(优先级最高) | |
hadoop fs -help | 帮助文档 | ||
hadoop fs -count path | 统计当前路径文件夹个数,算上自身 | ||
hadoop fs -touchz file | 创建空白文件 |
三、MapReduce
3.1 MapReduce 入门
3.1.1 定义
MapReduce是一个分布式运算程序的编程框架,是开发“基于Hadoop的数据分析应用”的核心框架
3.1.2 核心功能
将用户编写的业务逻辑代码和自带默认组件整合一个完整的分布式运算程序,并发布到Hadoop集群上运行
3.1.3 优缺点
1.优点
- 易于编程
- 简单地实现一些接口,就可以完成一个分布式程序
- 良好的拓展性
- 当计算资源不足时,可以通过简单地增加机器来拓展计算能力
- 高容错性
- 一台机器宕机,上面的计算任务会转移到另一个节点上运行,任务不会失败
- 可以做PB级的海量数据的离线处理
2.缺点
- 不擅长实时计算
- 不擅长流式计算
- 不擅长DAG计算
3.1.4 MapReduce 核心思想
MapReduce实现分布式计算分成2个阶段
- 第一个阶段MapTask并发实例,完全并行运行,互不干扰
- 第二个阶段ReduceTask并发实例,完全并行运行,数据依赖上一个阶段所有MapTask并发实例输出
- MapReduce编程模型只能包含一个Map阶段一个Reduce阶段,但可以实现多个MapReduce串行运行
3.1.5 MapReduce 进程
- MrAppMaster:负责整个程序的过程调度及状态协调
- MapTask:负责Map阶段整个数据处理流程
- ReduceTask:负责Reduce阶段整个数据处理流程
3.1.6 常用数据序列化类型
Java类型 | Hadoop Writable类型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
3.1.7 MapReduce 编程规范
1. Mapper 阶段
- 自定义Mapper继承hadoop的Mapper
- Mapper的输入数据以K-V的形式(K-V类型可自定义)
- Mapper中业务逻辑写在map()方法中
- Mapper的输出数据以K-V的形式(K-V类型可自定义)
- map()方法(MapTask进程)对每个<K,V>调用一次
2.Reducer 阶段
- 自定义Reducer继承hadoop的Reducer
- Reducer的输入数据类型对应Mapper的输出数据类型
- Reducer中业务逻辑写在reduce()方法中
- reduce()方法(ReduceTask进程)对每一组相同K的<K,V>组调用一次
3.Driver 阶段
提交封装了MapReduce程序相关运行参数的job对象
3.2 Hadoop 序列化
3.2.1 为什么不使用java的序列化
java的序列化是一个重量级序列化框架(Serializable),会携带很多额外信息,不利于Hadoop节点之间的告诉传输,因此Hadoop自己开发一套序列化机制(Writable)
3.2.2 Hadoop序列化特点
- 紧凑:高效使用存储空间
- 快速:读写数据额外开销小
- 可拓展:随着通信协议的升级而可升级
- 互操作:支持多语言的交互
3.2.3 自定义bean对象实现序列化
实现bean对象序列化步骤:
- 实现Writable接口
- 必须提供无参构造器(反射调用)
- 重写序列化方法[ write() ]
- 重写反序列化方法[ readFields() ]
- 序列化和反序列化顺序必须一致
- 建议重写toString()方法,用于显示结果到文件中
- 自定义bean实现Complarable接口可以放在key中传输,之后的归并排序则以此为基础
3.3 MapReduce 框架原理
3.3.1 InputFormat 数据输入
1.切片与MapTask并行度决定机制
切片:数据切片只是在逻辑上对输入进行分片,不会再磁盘上将次分片存储
block块:是HDFS物理上把数据分成一块一块
MapTask并行度 = 数据的切片数(默认为block数)
根据block对于一个个小文件也会占一个块,因此对于一个小文件也需要起一个MapTask导致效率过低,即MapTask的并行度并不是越多越好
- 一个Job的Map阶段并行度由客户端提交Job时的切片数决定
- 每一个split切片分配一个MapTask并行实例处理
- 默认情况,切片大小=BlockSize
- 切片时不考虑数据集整体,而是针对每个文件单独切片
- 读取多个文件时,切片以文件为单位
- 如:一个129M文件和一个100M文件,最终的切片时129M切两个切片,100M的文件单独算,单独切
- 这里的文件是逻辑上的文件,即上传到hdfs一个300M文件,不是针对物理存储的block文件
1)切片图解
2)解释
上述图片为一个300M文件和一个100M文件,若切片大小设置为100M,则300M切分为三个切片,100M为一个切片,但此种方法有缺陷,因为一个MapTask对应一个切片,但300M文件在hdfs分三个block块存储,因此MapTask读取数据涉及到了节点的网络IO影响效率,这也是为什么切片默认大小为block大小,可以有效降低网络IO
3.3.2 FileInputFormat 切片机制
1.类继承关系
Object
|--InputFormat
|--FileInputFormat
|--TextInputFormat
|--NLineInputFormat
|--KeyValueTextInputFormat
|--CombineFileFormat
2.源码解析
A base class for file-based {@link InputFormat}s.
InputFormat实现切片的源码
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
getSplits()
实现了数据切片的逻辑,createRecordReader()
将数据封装成k-v对形式传输给map,FileInputFormat
作为其实现类实现了getSplits()
方法(FileInputFormat没有去实现createRecordReader()
),核心逻辑是
Math.max(minSize, Math.min(maxSize, blockSize))
minSize default
- 1
maxSize default
- Long.MAX_VALUE
默认为blockSize大小,对于自定义切片大小:若定义大于blockSize则设置大于blockSize的minSize,若定义小于blockSize则设置小于blockSize的maxSize
3.切片详解
默认数据读入是通过FileInputFormat来实现,其切片流程如下:
- 数据存储目录寻址
- 遍历处理(规划切片)目录下每一个文件
- 遍历第一个文件ss.txt
- 获取文件大小
- 计算切片大小
- 开始进行切片划分
- 将切片信息写到一个切片规划文件中(起始位置,长度,所在节点列表)
- 提交切片规划(Yarn或Local),并根据切片规划计算开启的MapTask数
3.3.3 FileInputFormat 实现类
1.TextInputFormat
FileInputFormat的默认实现类,沿用父类的getSplits()
方法,并实现createRecordReader()
方法
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
根据LineRecordReader
逻辑TextInputFormat是对每个切片数据一行封装成一个RecordReader,其中key是当前行第一个数据在文件中的偏移量类型为LongWritable,value是当前行数据(不包含任何终止符)类型为Text,并将其传给map()
2.KeyValueTextInputFormat
沿用父类的getSplits()
方法,并实现createRecordReader()
方法
public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
context.setStatus(genericSplit.toString());
return new KeyValueLineRecordReader(context.getConfiguration());
}
public KeyValueLineRecordReader(Configuration conf)
throws IOException {
lineRecordReader = new LineRecordReader();
String sepStr = conf.get(KEY_VALUE_SEPARATOR, "\t");
this.separator = (byte) sepStr.charAt(0);
}
/** Read key/value pair in a line. */
public synchronized boolean nextKeyValue()
throws IOException {
byte[] line = null;
int lineLen = -1;
if (lineRecordReader.nextKeyValue()) {
innerValue = lineRecordReader.getCurrentValue();
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if (line == null)
return false;
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
int pos = findSeparator(line, 0, lineLen, this.separator);
setKeyValue(key, value, line, lineLen, pos);
return true;
}
其逻辑是沿用lineRecordReader.nextKeyValue()只不过将读取的一行数据按照KEY_VALUE_SEPARATOR
字符进行分割为两部分(即使匹配到多个分隔符也只分割一次),将第一部分封装成key类型为Text,将第二部分封装成value类型为Text,默认分隔符为\t
设置分割符并启用KeyValueInputFormat方式
// 设置分隔符
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
// 设置输入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);
3.NLineInputFormat
NLineInputFormat作为FileInputFormat实现类重写了父类的getSplits()
方法
/**
* Logically splits the set of input files for the job, splits N lines
* of the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job)
throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status,
job.getConfiguration(), numLinesPerSplit));
}
return splits;
}
其逻辑是给定一个N,每N行为一个切片,由于没有实现createRecordReader()
方法,因此NLineInputFormat封装k-v的逻辑沿用TextInputFormat方式
设置行数N并启用NLineInputFormat方式
// 设置每个切片InputSplit中划分三条记录
NLineInputFormat.setNumLinesPerSplit(job, 3);
// 设置输入格式
job.setInputFormatClass(NLineInputFormat.class);
4.CombineTextInputFormat
上述实现类均无法解决小文件问题,不论按照上述哪种方法切片都解决不了小文件带来的问题,因为切片针对的是单独的文件而不是数据集,因此CombineTextInputFormat重写了getSplits()
方法
@Override
public List<InputSplit> getSplits(JobContext job)
throws IOException {
long minSizeNode = 0;
long minSizeRack = 0;
long maxSize = 0;
Configuration conf = job.getConfiguration();
// the values specified by setxxxSplitSize() takes precedence over the
// values that might have been specified in the config
if (minSplitSizeNode != 0) {
minSizeNode = minSplitSizeNode;
} else {
minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
}
if (minSplitSizeRack != 0) {
minSizeRack = minSplitSizeRack;
} else {
minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
}
if (maxSplitSize != 0) {
maxSize = maxSplitSize;
} else {
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
// If maxSize is not configured, a single split will be generated per
// node.
}
if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
throw new IOException("Minimum split size pernode " + minSizeNode +
" cannot be larger than maximum split size " +
maxSize);
}
if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
throw new IOException("Minimum split size per rack " + minSizeRack +
" cannot be larger than maximum split size " +
maxSize);
}
if (minSizeRack != 0 && minSizeNode > minSizeRack) {
throw new IOException("Minimum split size per node " + minSizeNode +
" cannot be larger than minimum split " +
"size per rack " + minSizeRack);
}
// all the files in input set
List<FileStatus> stats = listStatus(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
if (stats.size() == 0) {
return splits;
}
// In one single iteration, process all the paths in a single pool.
// Processing one pool at a time ensures that a split contains paths
// from a single pool only.
for (MultiPathFilter onepool : pools) {
ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
// pick one input path. If it matches all the filters in a pool,
// add it to the output set
for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
FileStatus p = iter.next();
if (onepool.accept(p.getPath())) {
myPaths.add(p); // add it to my output set
iter.remove();
}
}
// create splits for all files in this pool.
getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
}
// create splits for all files that are not in any pool.
getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
// free up rackToNodes map
rackToNodes.clear();
return splits;
}
从逻辑上将很多小文件规划到一个切片中,这样多个小文件就可以交给一MapTask处理,其切片机制如下
- 设置MaxInputSplitSize
- 将输入目录下所有文件大小,一次和设置的最大值比较,规则如下
- 若小于最大值,逻辑上划分一块
- 若大于最大值小于最大值的两倍,逻辑上将文件均分为两份
- 若大于最大值的两倍,先以最大值切一块,剩下部分再次按照上述逻辑继续
- 判断逻辑划分后的切片大小并进行合并,合并规则如下
- 若大于等于最大值,则单独形成一个切片
- 若小于最大值,则跟下一个文件进行合并共同形成一个切片
设置虚拟存储切片大小并启用CombineTextInputFormat方法
// 虚拟存储切片最大值设置4M
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
// 设置输入格式
job.setInputFormatClass(CombineTextInputFormat.class);
3.4 MapReduce 工作流程
3.4.1 MapTask 工作流程
1.规划阶段
客户端submit()前,根据配置信息形成一个任务分配规划,即切片规划;submit()提交首先验证输入输出路径,提交切片信息、jar包(集群模式会提交,本地模式不会提交)、配置文件等;根据切片数计算需要起MapTask的个数
2.Read 阶段
MapTask通过RecordReader逻辑从输入的InputSplit中解析出一个个key-value,自定义InputFormat将在这里调用
3.Map 阶段
将解析出的key-value交给自定义的map()函数,并产生一系列新的key-value
4.Collect 阶段
context.write()后内部会调用OutPutCollectior.collect()输出,并调用分区函数(默认HashPartitioner)对key进行分区后写入一个环形内存缓冲区中
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
collect()
方法中是一个同步方法在这里实现了写入环形缓冲区的逻辑
try {
// serialize key bytes into buffer
int keystart = bufindex;
keySerializer.serialize(key);
if (bufindex < keystart) {
// wrapped the key; must make contiguous
bb.shiftBufferedKey();
keystart = 0;
}
// serialize value bytes into buffer
final int valstart = bufindex;
valSerializer.serialize(value);
bb.write(b0, 0, 0);
同时向缓冲区的另一部分写入kv的元数据(分区信息,key的开始位置,value的开始位置,vlaue的长度)
// write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
5.Spill 阶段
当写入环形缓冲区的数据达到最大值的80%(默认环形缓冲区大小100M)会触发溢写操作spill,会将缓冲区的数据先按照partition进行排序再按照key进行排序,并将数据写入到磁盘中,此过程同步。
spillLock.lock();
try{
...
}finally{
spillLock.unlock();
}
溢写线程启动会锁定这80%的内存区域执行操作,MapTask的输出结果还可以往剩下的20%内存区域写互不影响,溢写线程启动后如果job设置了Combiner便在排序后落盘前执行,对相同的key的value进行累加(只能做累加,算平均值不能用Combiner)减少溢写到磁盘的数据量
6.Combine 阶段
合并溢写文件,可能map的输出数据量很大触发多次溢写操作则会生成很多临时文件,当整个map()数据处理结束会对磁盘中的溢写临时文件进行merge合并,最终的输出文件只有一个并为这个文件提供一个索引文件记录每个key对应数据的偏移量;在合并文件的时候以分区为单位进行合并,对于每个分区采用多轮递归合并方式(默认每轮10个文件io.sort.factor)并进行归并排序
3.4.2 ReduceTask 工作流程
1.Copy 阶段
简单地拉取数据ReduceTask根据自己的编号去对应的分区拉取数据到内存,如果数据过大超过内存数也会触发溢写操作,将数据写到磁盘中
2.Merge 阶段
和Copy阶段同时进行,ReduceTask会启动连个线程对内存和磁盘数据进行合并,方式内存使用过多和磁盘磁盘文件太多
3.Sort 阶段
把分散的数据文件再次合并成一个大文件,再进行一次归并排序
4.Reduce 阶段
reduce()将计算结果写到HDFS上
3.4.3 Shuffle 机制
1.概念
map()之后reduce()之前的所有MapTask和ReduceTask工作流程称为shuffle
2.shuffle 总结
map()的context.write()提交数据到collector(收集器),collector通过调用collect()对数据进行操作包括调用Partitioner的分区方法(默认HashPartitioner可自定义)根据k对kv进行分区后写入环形缓冲区(抽象概念本质是一个字节数据)当写入的数据达到环形缓冲区大小的80%触发溢写线程,线程启动后先对这80%的内存先按照分区数排序每个分区内单独按照key进行排序(快速排序),若检测到有Combiner则调用最终一次溢写生成一个临时文件,当map()方法结束MapTask对所有的溢写临时文件再次进行归并排序,若检测到Combiner则调用最终一个MapTask输出一个文件等待ReduceTask拉取。
当所有的MapTask结束后启动ReduceTask,ReduceTask根据自己的编号去对应的分区拉取数据到内存,若数据过多也会触发溢写操作,将数据写到磁盘,在拉取数据的过程中ReduceTask同时启动两个后台线程对内存数据和磁盘文件进行合并,最终对所有文件进行归并排序,若监测到Combiner则调用,最终根据key的不同将数据发送给reduce()
3.Partition 分区
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
在collectior中的collect中被调用,默认传入的numReduceTasks=1
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
因此默认分区为1,根据业务逻辑也可以自定义分区数,源码中的partitions是通过
partitions = jobContext.getNumReduceTasks();
实现自定义分区逻辑步骤如下:
- 自义定分区类继承Partition实现
getPartition()
方法
public class WorldCountPartition extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
int len = text.toString().length();
if (len > 4) {
return 0;
} else if (len == 4) {
return 1;
} else {
return 2;
}
}
}
- 配置分区类设置ReduceTask数
job.setPartitionerClass(WorldCountPartition.class);
job.setNumReduceTasks(3);
- 总结
- 使用默认分区机制不需要设置ReduceTask数(默认为1)
- 使用自定义分区必须设置ReduceTask数
- ReduceTask数 < 分区数,报java.io.IOException: Illegal partition for mr (2)
- ReduceTask数 > 分区数,最后一个part-r-xxxxx没有数据
- 保险起见自定义分区return几个分区值就设置几个ReduceTask,即使有的分区返回不了也只是多几个空白文件不会报错
4.排序机制
排序是MapReduce框架中最重要的操作之一,MapTask和ReduceTask均会对数据按照key进行排序,该操作是Hadoop默认操作,任何应用程序中的数据均会被排序不管逻辑上是否需要
w1:整个MR都有哪几次排序?
q1:对于MapTask,它会将处理的数据结果暂时放在环形缓冲区,当缓冲区达到阈值,对缓冲区的数据进行一次快速排序后将其写到磁盘,多次溢写生成很多文件,当map()结束时会对所有溢写文件进行归并排序;对于ReduceTask在读取完数据后对内存和磁盘的所有数据进行归并排序,因此总体上有三次排序
5.自定义排序案例
15688888888 333 444
15688888888 333 555
15688888888 333 666
15666666666 111 222
15666666666 111 222
15666666666 111 222
15666666666 111 222
上述数据分别是手机号、上行流量、下行流量,要求是按照总流量排序
step1:首先通过MR计算出所有手机号的总流量(此步骤省略)
15649868893 7702 15305 23007
15666666666 444 888 1332
15688888888 1665 2553 4218
step2:对第一次MR计算的结果在进行一次MR计算修改map()的输入k为flow对象,value为手机号码,对于k在shuffle过程中会进行排序,因此hadoop对序列化可排序封装了一个新的接口WritableComparable
,因此flow实体类如下:
package mr.flow;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Flow implements WritableComparable<Flow> {
private int upFlow;
private int downFlow;
private int sumFlow;
public Flow() {
}
public Flow(int upFlow, int downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
public int getSumFlow() {
return sumFlow;
}
public void setSumFlow(int sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readInt();
downFlow = in.readInt();
sumFlow = in.readInt();
}
@Override
public int compareTo(Flow o) {
return -Integer.compare(sumFlow, o.sumFlow);
}
}
Mapper阶段:
package mr.flow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowSortMapper extends Mapper<LongWritable, Text, Flow, Text> {
Flow k = new Flow();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] lines = value.toString().split("\t");
k.setUpFlow(Integer.parseInt(lines[1]));
k.setDownFlow(Integer.parseInt(lines[2]));
k.setSumFlow(Integer.parseInt(lines[3]));
v.set(lines[0]);
context.write(k, v);
}
}
Reducer阶段:
package mr.flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowSortReducer extends Reducer<Flow, Text, Text, Flow> {
@Override
protected void reduce(Flow key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
Driver阶段:
package mr.flow;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJarByClass(FlowSortDriver.class);
job.setMapperClass(FlowSortMapper.class);
job.setReducerClass(FlowSortReducer.class);
job.setMapOutputKeyClass(Flow.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Flow.class);
FileInputFormat.setInputPaths(job, new Path("output/part-r-00000"));
FileOutputFormat.setOutputPath(job, new Path("sortOutput"));
job.waitForCompletion(true);
}
}
3.5 MapReduce 应用
现在对于MapReduce的应用更加偏向于对数据的处理、清洗数据等,很少作为主力计算框架
3.5.1 Join
1.概念
即多表关联查询
2.Reduce Join
效率低,不使用此方法,大致流程是在join放在reduce阶段,但是reduce的输入value会有如下大坑:
- Iterable<T> values只能遍历一次
- 遍历values时候key也会跟着变
- 对于自定义的类型Iterable<Bean> values,MR为减少对象的创建,当遍历values时只会创建一次Bean对象,随后通过get/set方法对其复制遍历,因此当我们手动set某个属性时,MR只会为其保存最后一次set的值,为此MR提供了
BeanUtils.copyProperties()
方法用于对象的快速拷贝来解决此问题
上述的大坑的原因都是需要追源码
protected class ValueIterable implements Iterable<VALUEIN> {
private ValueIterator iterator = new ValueIterator();
@Override
public Iterator<VALUEIN> iterator() {
return iterator;
}
}
这就是为什么values只会遍历一次,因此遍历时底层调用的是iterator()
返回的是同一个迭代器,剩下的原因可以再ReduceContextImpl
类找到
3.Map Join
1)使用场景
一张表数据特别大,关联的表非常小
2)解决方案
在Map端缓存关联小表,在map()内部进行关联后直接数据,不使用Reduce阶段避免shuffle
3)核心方法
setup()
每个MapTask在执行map()之前都会先执行一次该方法,因此将缓存小表的逻辑写在这
/**
* Called once at the beginning of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
在Driver类中添加缓存文件
job.addCacheFile(new URI("xxx"));
在setup()中获取缓存文件
URI[] files = context.getCacheFiles()
4)源码
假设有很多很多的数据(大表)
用户id::电影id::用户评分
1::1::5
1::2::5
1::3::5
1::4::5
2::1::5
2::2::5
2::3::5
2::4::5
3::1::5
3::2::5
3::3::5
3::4::5
需要关联的小表
用户id::用户名::性别::年龄
1::小明::M::56
2::小红::N::20
3::小芳::M::18
电影id::电影名::上映时间
1::复仇者联盟1::2010
2::复仇者联盟2::2015
3::复仇者联盟3::2018
4::复仇者联盟4::2019
最终的输出结果
小明::复仇者联盟1::5
小明::复仇者联盟2::5
小明::复仇者联盟3::5
小明::复仇者联盟4::5
小红::复仇者联盟1::5
小红::复仇者联盟2::5
小红::复仇者联盟3::5
小红::复仇者联盟4::5
小芳::复仇者联盟1::5
小芳::复仇者联盟2::5
小芳::复仇者联盟3::5
小芳::复仇者联盟4::5
Mapper代码
package mr.mapjointhree;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoinThreeTableMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> user = new HashMap<>();
Map<String, String> movies = new HashMap<>();
Text k = new Text();
@Override
protected void setup(Context context) throws IOException {
URI[] cacheFiles = context.getCacheFiles();
BufferedReader brUser = new BufferedReader(new FileReader(cacheFiles[0].getPath()));
BufferedReader brMovies = new BufferedReader(new FileReader(cacheFiles[1].getPath()));
String line;
while ((line = brUser.readLine()) != null) {
String[] split = line.split("::");
user.put(split[0], split[1]);
}
while ((line = brMovies.readLine()) != null) {
String[] split = line.split("::");
movies.put(split[0], split[1]);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("::");
k.set(user.get(split[0]) + "::" + movies.get(split[1]) + "::" + split[2]);
context.write(k, NullWritable.get());
}
}
Driver代码
package mr.mapjointhree;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinThreeTableDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
// 缓存小表文件
job.addCacheFile(new URI("input/user.txt"));
job.addCacheFile(new URI("input/movies.txt"));
job.setJarByClass(MapJoinThreeTableDriver.class);
job.setMapperClass(MapJoinThreeTableMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 不需要ReduceTask,设置为0
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path("input/ratings.txt"));
FileOutputFormat.setOutputPath(job, new Path("output"));
job.waitForCompletion(true);
}
}
3.5.2 计数器
1.概念
Hadoop为每个作业维护若干内置计数器,以描述多项指标,使用户可以监测以处理的数据量等,并在控制台输出
2.使用
/**
* Get the {@link Counter} for the given <code>groupName</code> and
* <code>counterName</code>.
* @param counterName counter name
* @return the <code>Counter</code> for the given <code>groupName</code> and
* <code>counterName</code>
*/
public Counter getCounter(String groupName, String counterName);
3.5.3 数据清洗(ETL)
类似Map Join,在Map阶段对数据进行清洗后直接数据避开Reduce即可,可以再使用上计数器。
3.5.4 Top N问题
数据如下,链接在文章底部
需求:统计搜索词top10
分析:
- 统计每个搜索词的词频
- 分析词频top10
统计词频Mapper
package mr.top10;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordFrequencyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
Text k = new Text();
LongWritable v = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\t");
k.set(words[3]);
v.set(Long.parseLong(words[4]) + Long.parseLong(words[4]));
context.write(k, v);
}
}
统计词频Reducer
package mr.top10;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordFrequencyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
v.set(count);
context.write(key, v);
}
}
自定义分区
package mr.top10;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.regex.Pattern;
public class WordFrequencyPartitioner extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text text, LongWritable longWritable, int numPartitions) {
if (Pattern.compile("[a-zA-Z]+").matcher(text.toString()).find())
return 0;
else if (Pattern.compile("[0-9]+").matcher(text.toString()).find())
return 1;
else
return 2;
}
}
Top N Bean
package mr.top10;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Top10Bean implements WritableComparable<Top10Bean> {
private String searchName;
private Long searchNum;
public Top10Bean() {
}
public Top10Bean(String searchName, Long searchNum) {
this.searchName = searchName;
this.searchNum = searchNum;
}
public String getSearchName() {
return searchName;
}
public void setSearchName(String searchName) {
this.searchName = searchName;
}
public Long getSearchNum() {
return searchNum;
}
public void setSearchNum(Long searchNum) {
this.searchNum = searchNum;
}
@Override
public int compareTo(Top10Bean o) {
/*
* 在这里处理当搜索量相同时按照所有词字典序
* 若仅比较搜索量会造成数据的覆盖在TreeSet里
* 同时也可以解决Bean在MR在reduce不合逻辑情况
* */
int compare = Long.compare(o.searchNum, searchNum);
if (compare == 0) {
return searchName.compareTo(o.searchName);
}
return compare;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(searchName);
out.writeLong(searchNum);
}
@Override
public void readFields(DataInput in) throws IOException {
searchName = in.readUTF();
searchNum = in.readLong();
}
@Override
public String toString() {
return searchName + "\t" + searchNum;
}
}
Top N Mapper
package mr.top10;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.TreeSet;
public class TopNMapper extends Mapper<LongWritable, Text, Text, Top10Bean> {
TreeSet<Top10Bean> cacheTop10 = new TreeSet<>();
@Override
protected void map(LongWritable key, Text value, Context context) {
String[] split = value.toString().split("\t");
cacheTop10.add(new Top10Bean(split[0], Long.parseLong(split[1])));
// TreeSet 默认有序,根据bean逻辑,最后一个最小
if (cacheTop10.size() > 10) {
cacheTop10.remove(cacheTop10.last());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Text top10 = new Text("top10");
for (Top10Bean bean : cacheTop10) {
//写死key为了让这些数据进入同一个reduce task
context.write(top10, bean);
}
}
}
Top N Reduce
package mr.top10;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.TreeSet;
public class TopNReducer extends Reducer<Text, Top10Bean, Top10Bean, NullWritable> {
TreeSet<Top10Bean> cacheTop10 = new TreeSet<>();
@Override
protected void reduce(Text key, Iterable<Top10Bean> values, Context context) throws IOException, InterruptedException {
for (Top10Bean value : values) {
//reduce遍历迭代器时只有一个对象,即value始终是一个
//需要将value拷贝一份放入cacheTop10
Top10Bean bean = new Top10Bean();
try {
BeanUtils.copyProperties(bean, value);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
cacheTop10.add(bean);
if (cacheTop10.size() > 10) {
cacheTop10.remove(cacheTop10.last());
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Top10Bean bean : cacheTop10) {
context.write(bean, NullWritable.get());
}
}
}
Driver 启动类
package mr.top10;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job1 = Job.getInstance();
job1.setJarByClass(Driver.class);
job1.setMapperClass(WordFrequencyMapper.class);
job1.setReducerClass(WordFrequencyReducer.class);
// 配置Combiner 提前合并
job1.setCombinerClass(WordFrequencyReducer.class);
// 配置分区
job1.setPartitionerClass(WordFrequencyPartitioner.class);
// 设置ReduceTask
job1.setNumReduceTasks(3);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(LongWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(LongWritable.class);
//提高MapTask并行度
FileInputFormat.setMaxInputSplitSize(job1, 26214400);
FileInputFormat.setInputPaths(job1, new Path("input/bigtable"));
FileOutputFormat.setOutputPath(job1, new Path("output"));
boolean b = job1.waitForCompletion(true);
Job job2 = Job.getInstance();
job2.setJarByClass(Driver.class);
job2.setMapperClass(TopNMapper.class);
job2.setReducerClass(TopNReducer.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Top10Bean.class);
job2.setOutputKeyClass(Top10Bean.class);
job2.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job2, new Path("output/part-r-00002"));
FileOutputFormat.setOutputPath(job2, new Path("top10"));
if (b)
job2.waitForCompletion(true);
}
}
最终结果
人体艺术 22796
百度 17172
新亮剑 11292
馆陶县县长闫宁的父亲 8574
儿子与母亲不正当关系 7520
优酷 5738
黑狐 5600
武动乾坤 4996
龙门飞甲 4836
新亮剑全集 4602
3.6 MapReduce 开发总结
在编写MapReduce时需要考虑以下几个方面:
- 输入数据接口:InputFormat
- 默认实现类:TextInputFormat,一次读一行,k为该行起始偏移量;v该行内容
- KeyValueTextInputFormat,一次读一行,被分隔符分割为kv,默认分隔符为
\t
- NLineInoutFormat,按照指定行N来划分切片
- CombineTextInputFormat,合并小文件提高效率
- 自定义InputFormat
- 逻辑处理接口:Mapper
- setup()
- map()
- cleanup()
- Partitioner分区
- 默认实现类HashPartitioner,默认逻辑
key.hashCode()&Integer.MAXVALUE % numReduces
- 自定义分区,继承Partitioner重写
getPartition()
- 默认实现类HashPartitioner,默认逻辑
- Comparable排序
- 实现Comparable接口重写
compareTo()
- 若自定义Bean需要作为Key传输可以实现WritableComparable
- 实现Comparable接口重写
- Combiner合并
- 在不改变业务逻辑的情况下推荐使用,提交合并减少IO传输,提高效率
四、Yarn
yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作平台,MR等运算程序则相当于运行于操作系统上的应用程序
4.1 JobTracker 框架
4.1.1 执行流程
- 客户端提交job到JobTracker,JobTracekr是MapReduce核心,与集群中的所有节点(Task Tracker)定时通信(heartbeat),管理哪些程序应该跑在哪些节点上,同时管理所有job失败、重启等操作
- TaskTracker每个节点都有一个,主要监测自己所在节点的资源使用情况
- TaskTracker同时监视当前节点task运行情况,TaskTracker需要把这些信息通过heartbeat发送给JobTracker
4.1.2 缺点
- JobTracker存在单点故障
- 普遍认为老版本的hadoop只能支持4000节点
- 资源计算过于简单,仅使用map/reduce task个数来衡量资源占用不考虑cpu、内存等
- 把资源强制划分为map task slot和reduce task slot,当job只有map或者reduce会造成资源浪费
- 源码复杂,增加维护成本
- 只能运行MapReduce
4.2 Yarn 框架
hadoop 2.x 将MapReduce的资源调度框架(JobTracker)分离出来并加以优化即为yarn,相对于hadoop 1.x来说hadoop 2.x的MapReduce只作为计算框架提高了效率,解耦合。
4.2.1 yarn 成员
ResourceManager:简称RM,负责集群资源的同一管理和调度、处理客户端请求、监控集群中的NM
NodeManager:简称NM,负责自己所在节点的应用资源使用情况,并向RM汇报,接受并处理来自RM,AM的各种指令
ApplicationMaster:简称AM,每个应用程序都对应一个AM,主要负责应用程序的管理、向RM申请资源并分配给task,AM与NM通信来启动或停止task,运行在Container中
Container:封装了CPU,内存等资源的一个容器,相当于一个运行环境的抽象
4.2.2 执行流程
第一步:client向RM申请一个taskId
第二步:RM返回一个taskId和该job资源的提交路径
第三步:client将jar包、切片规划、配置文件到指定的资源提交路径
第四步:client提交资源完成后,向RM申请一个AM
第五步:RM收到申请后将job添加进资源调度器(默认容量调度器)
第六步:某个空闲的NM领取到该job
第七步:该NM创建Container并在里面启动运行AM
第八步:下载资源到本地
第九步:AM向RM申请运行MapTask的资源并领取
第十步:RM将任务分配给对应的NM,NM领取任务后创建Container
第11步:AM将向接受任务的NM发送启动脚本
第12步
- a:NM启动MapTask
- b:各个MapTask向AM汇报任务状态和进度
第13步
- a:AM等待所有MapTask完成后向RM申请运行ReduceTask资源(Container),并运行ReduceTask(和九十一样)
- b:ReduceTask向AM汇报任务状态和进度
第14步:等待所有的ReduceTask结束,AM向RM申请注销自己
4.2.3 三种资源调度器
1.FIFO Scheduler
将job放置在一个队列中,按顺序运行job;会造成大任务阻塞小任务,不适合共享集群
2.Capacity Scheduler
一个独立的专门队列保证小任务已提交就就可以启动;但会造成资源的浪费(没有小任务提交,这片资源不会被使用)
3.Fair Scheduler
不需要预留资源,调度器会在所有运行的作业之间动态平衡资源
五、补充
5.1 MapReduce 企业优化
5.1.1 MapReduce 为什么跑得慢?
-
计算机性能
CPU、磁盘、内存、网络等
-
IO操作
- 数据倾斜:某个Reduce处理数据量特别大,其他Reduce数据量小,导致资源分配不均
- map和reduce设置不合理
- map运行时间长,导致reduce等待过久
- 小文件过多
- spill次数过多
- 合并次数过多
5.1.2 MapReduce 优化方法
1.数据输入
- 在执行MapReduce任务前将小文件进行合并,减少map个数
- 采用CombinerTextInputFormat来作为输入,解决小文件问题
2.Map 阶段
- 减少spill次数通过
io.sort.mb
及sort.spill.percent
分别配置环形缓冲区内存上限和最大值阈值,从而减少soill次数,减少IO次数 - 在不影响业务逻辑情况下,先进性Combine处理,减少数据量
3.Reducer 阶段
- 合理配置map和renduce个数
- 减少reduce的使用,reduce在连接数据集时将产生大量网络消耗
4.IO 传输
采用压缩数据的方式减少IO传输消耗
5.数据倾斜问题
-
使用Combine,减少传输的数据量,若导致数据倾斜的key大量分布在不同的MapTask时不适用
-
导致数据倾斜的key大量分布在不同的mapper
-
局部聚合加全局聚合
- 第一次在 map 阶段对那些导致了数据倾斜的 key 加上 1 到 n 的随机前缀,这样本来相
同的 key 也会被分到多个 Reducer 中进行局部聚合,数量就会大大降低。
-
第二次 mapreduce,去掉 key 的随机前缀,进行全局聚合。
-
思想:二次 mr,第一次将 key 随机散列到不同 reducer 进行处理达到负载均衡目的。第
二次再根据去掉 key 的随机前缀,按原 key 进行 reduce 处理。
-
增加Reduce个数,增加并行度
-
自定义分区
-
转载:https://blog.csdn.net/qq_41858402/article/details/108207777