目录
在上一篇中我们体验了MapReduce的基本流程所以本次将会在此基础上再进行词频统计的深入演练
Hadoop的MapReduce基本流程体验_open_test01的博客-CSDN博客
创建词频统计映射器类
在项目包中新建WordCountMapper类
继承泛型参数解读:
KEYIN :输入的key类型:LongWritable
VALUEIN:输入的value类型:Text
KEYOUT:输出的key类型:LongWritable
VALUEOUT:输出的value类型:Text
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Mapper;
-
-
import java.io.IOException;
-
-
public
class
WordCountMapper
extends
Mapper<LongWritable, Text, LongWritable, Text> {
-
@Override
-
protected
void
map
(LongWritable key, Text value, Context context)
-
throws IOException, InterruptedException {
-
// 直接将键值对数据传到下一个阶段
-
context.write(key, value);
-
}
-
}
创建词频统计驱动器类
在项目包中新建WordCountDriver类
这里使用BigData目录中的数据进行输入使用
输出位置为新建的output目录
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.FSDataInputStream;
-
import org.apache.hadoop.fs.FileStatus;
-
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IOUtils;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
import java.net.URI;
-
-
public
class
WordCountDriver {
-
public
static
void
main
(String[] args)
throws Exception {
-
// 创建配置对象
-
Configuration
conf
=
new
Configuration();
-
// 设置数据节点主机名属性
-
conf.set(
"dfs.client.use.datanode.hostname",
"true");
-
-
// 获取作业实例
-
Job
job
= Job.getInstance(conf);
-
// 设置作业启动类
-
job.setJarByClass(WordCountDriver.class);
-
-
// 设置Mapper类
-
job.setMapperClass(WordCountMapper.class);
-
// 设置map任务输出键类型
-
job.setMapOutputKeyClass(LongWritable.class);
-
// 设置map任务输出值类型
-
job.setMapOutputValueClass(Text.class);
-
-
// 定义uri字符串
-
String
uri
=
"hdfs://master:9000";
-
// 创建输入目录
-
Path
inputPath
=
new
Path(uri +
"/BigData");
-
// 创建输出目录
-
Path
outputPath
=
new
Path(uri +
"/output");
-
-
// 获取文件系统
-
FileSystem
fs
= FileSystem.get(
new
URI(uri), conf);
-
// 删除输出目录(第二个参数设置是否递归)
-
fs.delete(outputPath,
true);
-
-
// 给作业添加输入目录(允许多个)
-
FileInputFormat.addInputPath(job, inputPath);
-
// 给作业设置输出目录(只能一个)
-
FileOutputFormat.setOutputPath(job, outputPath);
-
-
// 等待作业完成
-
job.waitForCompletion(
true);
-
-
// 输出统计结果
-
System.out.println(
"======统计结果======");
-
FileStatus[] fileStatuses = fs.listStatus(outputPath);
-
for (
int
i
=
1; i < fileStatuses.length; i++) {
-
// 输出结果文件路径
-
System.out.println(fileStatuses[i].getPath());
-
// 获取文件系统数据字节输入流
-
FSDataInputStream
in
= fs.open(fileStatuses[i].getPath());
-
// 将结果文件显示在控制台
-
IOUtils.copyBytes(in, System.out,
4096,
false);
-
}
-
}
-
}
第一次测试运行
注意:运行时先检查本地主机hadoop环境是否正常
hadoop解压到本机目录下
本地环境变量配置
下载对应版本的winutils.exe
和hadoop.dll
,放在hadoop安装目录的bin
子目录里
-
winutils/winutils.exe at master · cdarlint/winutils · GitHub
- https://github.com/cdarlint/winutils/blob/master/hadoop-3.2.2/bin/hadoop.dll
再将hadoop.dll放到C:/windows/system32文件夹下
本地运行环境没有问题那么我们就可以开始运行了
查看WordCountDriver类在IDEA的运行结果
如果不想看到统计结果之前的大堆信息 可以修改log4j.properties
文件,将INFO
改为ERROR
在Hadoop WebUI界面查看结果文件
修改词频统计映射器类WordCoutMapper类
- 行首数字对于我们做单词统计没有任何用处,只需要拿到每一行内容,按空格拆分成单词,每个单词计数1,因此,
WordCoutMapper
的输出应该是单词和个数,于是,输出键类型为Text
,输出值类型为IntWritable
。 - 将每行按空格拆分成单词数组,输出
<单词, 1>
的键值对
修改该方法的泛型参数键值类型
-
public
class
WordCountMapper
extends
Mapper<LongWritable, Text, Text, IntWritable> {
-
@Override
-
protected
void
map
(LongWritable key, Text value, Context context)
-
throws IOException, InterruptedException {
-
// 获取行内容
-
String
line
= value.toString();
-
// 按空格拆分得到单词数组
-
String[] words = line.split(
" ");
-
// 遍历单词数组,生成输出键值对
-
for (
int
i
=
0; i < words.length; i++) {
-
context.write(
new
Text(words[i]),
new
IntWritable(
1));
-
}
-
}
-
}
修改词频统计驱动器WordCountDriver类
修改map任务输出键值类型使其与WordCoutMapper类泛型参数键值类型对应
第二次测试运行
输出结果以键值对形式出现
对于这样一组键值对,传递到reduce阶段,按键排序,其值构成迭代器
-
<
1>
-
-
af <
1,
1,
1>
-
-
as <
1,
1>
-
-
faf <
1>
-
-
gaf <
1>
映射任务与归并任务示意图
创建词频统计归并器类
在项目包中创建WordCountReducer类
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Reducer;
-
-
import java.io.IOException;
-
import java.util.ArrayList;
-
import java.util.List;
-
-
public
class
WordCountReducer
extends
Reducer<Text, IntWritable, Text, Text> {
-
@Override
-
protected
void
reduce
(Text key, Iterable<IntWritable> values, Context context)
-
throws IOException, InterruptedException {
-
// 定义整数数组列表
-
List<Integer> integers =
new
ArrayList<>();
-
// 遍历输入值迭代器
-
for (IntWritable value : values) {
-
// 将每个值添加到数组列表
-
integers.add(value.get());
// 利用get()方法将hadoop数据类型转换成java数据类型
-
}
-
// 输出新的键值对,注意要将java字符串转换成hadoop的text类型
-
context.write(key,
new
Text(integers.toString()));
-
}
-
}
修改 WordCountDriver
类与WordCoutMapper类中的泛型参数键值类型
设置词频统计的Reducer类及其输出键类型和输出值类型(Text,Text)
在 WordCountDriver
类中写入Reducer输出设置
第三测试运行
现在我们需要修改词频统计归并器,将每个键(单词)的值迭代器进行累加,得到每个单词出现的总次数
修改词频统计归并器类
修改WordCountReducer类
输出键值类型改为IntWritable
,遍历值迭代器,累加得到单词出现次数
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Reducer;
-
-
import java.io.IOException;
-
-
public
class
WordCountReducer
extends
Reducer<Text, IntWritable, Text, IntWritable> {
-
@Override
-
protected
void
reduce
(Text key, Iterable<IntWritable> values, Context context)
-
throws IOException, InterruptedException {
-
// 定义键出现次数
-
int
count
=
0;
-
// 遍历输入值迭代器
-
for (IntWritable value : values) {
-
count += value.get();
// 其实针对此案例,可用count++来处理
-
}
-
// 输出新的键值对,注意要将java的int类型转换成hadoop的IntWritable类型
-
context.write(key,
new
IntWritable(count));
-
}
-
}
修改WordCountDriver类
第四次测试运行
可以看到每个单词出现的次数
修改词频统计驱动器类,设置分区数量
写入WordCountDriver类
第五次测试运行
并产生了3个结果文件
打包jar包上传hadoop运行
拷贝WordCountDriverr 驱动类全类名
hadoop中执行命令运行jar包
执行命令
-
hadoop jar MRWordCount-
1.0-SNAPSHOT.jar mpr.WordCountDriver
-
-
hadoop jar 使用的jar包 全类名
运行结果如下
这样可以执行但是路径却是被写死了 我们用一种给main传递路径参数args[索引]
所以下面我们将对其修改
创建新词频统计驱动器类
创建一个新的WordCountDriver类 并命名为WordCountDriverNew
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.FSDataInputStream;
-
import org.apache.hadoop.fs.FileStatus;
-
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IOUtils;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
import java.net.URI;
-
-
public
class
WordCountDriverNew {
-
public
static
void
main
(String[] args)
throws Exception {
-
// 创建配置对象
-
Configuration
conf
=
new
Configuration();
-
// 设置数据节点主机名属性
-
conf.set(
"dfs.client.use.datanode.hostname",
"true");
-
-
// 获取作业实例
-
Job
job
= Job.getInstance(conf);
-
// 设置作业启动类
-
job.setJarByClass(WordCountDriverNew.class);
-
-
// 设置Mapper类
-
job.setMapperClass(WordCountMapper.class);
-
// 设置map任务输出键类型
-
job.setMapOutputKeyClass(Text.class);
-
// 设置map任务输出值类型
-
job.setMapOutputValueClass(IntWritable.class);
-
-
// 设置Reducer类
-
job.setReducerClass(WordCountReducer.class);
-
// 设置reduce任务输出键类型
-
job.setOutputKeyClass(Text.class);
-
// 设置reduce任务输出值类型
-
job.setOutputValueClass(IntWritable.class);
-
-
// 设置分区数量(reduce任务的数量,结果文件的数量)
-
job.setNumReduceTasks(
3);
-
-
// 定义uri字符串
-
String
uri
=
"hdfs://master:9000";
-
// 声明输入目录
-
Path
inputPath
=
null;
-
// 声明输出目录
-
Path
outputPath
=
null;
-
// 判断输入参数个数
-
if (args.length ==
0) {
-
// 创建输入目录
-
inputPath =
new
Path(uri +
"/wordcount/input");
-
// 创建输出目录
-
outputPath =
new
Path(uri +
"/wordcount/output");
-
}
else
if (args.length ==
2) {
-
// 创建输入目录
-
inputPath =
new
Path(uri + args[
0]);
-
// 创建输出目录
-
outputPath =
new
Path(uri + args[
1]);
-
}
else {
-
// 提示用户参数个数不符合要求
-
System.out.println(
"参数个数不符合要求,要么是0个,要么是2个!");
-
// 结束应用程序
-
return;
-
}
-
-
// 获取文件系统
-
FileSystem
fs
= FileSystem.get(
new
URI(uri), conf);
-
// 删除输出目录(第二个参数设置是否递归)
-
fs.delete(outputPath,
true);
-
-
// 给作业添加输入目录(允许多个)
-
FileInputFormat.addInputPath(job, inputPath);
-
// 给作业设置输出目录(只能一个)
-
FileOutputFormat.setOutputPath(job, outputPath);
-
-
// 等待作业完成
-
job.waitForCompletion(
true);
-
-
// 输出统计结果
-
System.out.println(
"======统计结果======");
-
FileStatus[] fileStatuses = fs.listStatus(outputPath);
-
for (
int
i
=
1; i < fileStatuses.length; i++) {
-
// 输出结果文件路径
-
System.out.println(fileStatuses[i].getPath());
-
// 获取文件系统数据字节输入流
-
FSDataInputStream
in
= fs.open(fileStatuses[i].getPath());
-
// 将结果文件显示在控制台
-
IOUtils.copyBytes(in, System.out,
4096,
false);
-
}
-
}
-
}
然后重新打包上传到hadoop执行
执行命令 这次将可以指定路径进行运行
-
hadoop jar MRWordCount-
1.0-SNAPSHOT.jar /BigData /outputs
-
-
hadoop jar 使用的jar包 全类名 /输入数据原文件 /输出路径目录
在webUI上查看运行结果
将三个类合并成一个类完成词频统计
新建WordCount类 并将功能合并 处理词频统计任务
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.conf.Configured;
-
import org.apache.hadoop.fs.FSDataInputStream;
-
import org.apache.hadoop.fs.FileStatus;
-
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IOUtils;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.util.Tool;
-
import org.apache.hadoop.util.ToolRunner;
-
-
import java.io.IOException;
-
import java.net.URI;
-
-
public
class
WordCount
extends
Configured
implements
Tool {
-
-
public
static
class
WordCountMapper
extends
Mapper<LongWritable, Text, Text, IntWritable> {
-
@Override
-
protected
void
map
(LongWritable key, Text value, Context context)
-
throws IOException, InterruptedException {
-
// 获取行内容
-
String
line
= value.toString();
-
// 清洗所有英文标点符号(\p——属性[property],P——标点符号[Punctuation])
-
line = line.replaceAll(
"[\\pP]",
"");
-
// 按空格拆分得到单词数组
-
String[] words = line.split(
" ");
-
// 遍历单词数组,生成输出键值对
-
for (
int
i
=
0; i < words.length; i++) {
-
context.write(
new
Text(words[i]),
new
IntWritable(
1));
-
}
-
}
-
}
-
-
public
static
class
WordCountReducer
extends
Reducer<Text, IntWritable, Text, IntWritable> {
-
@Override
-
protected
void
reduce
(Text key, Iterable<IntWritable> values, Context context)
-
throws IOException, InterruptedException {
-
// 定义输出键出现次数
-
int
count
=
0;
-
// 历输出值迭代对象,统计其出现次数
-
for (IntWritable value : values) {
-
count = count + value.get();
-
}
-
// 生成键值对输出
-
context.write(key,
new
IntWritable(count));
-
}
-
}
-
-
@Override
-
public
int
run
(String[] strings)
throws Exception {
-
// 创建配置对象
-
Configuration
conf
=
new
Configuration();
-
// 设置数据节点主机名属性
-
conf.set(
"dfs.client.use.datanode.hostname",
"true");
-
-
// 获取作业实例
-
Job
job
= Job.getInstance(conf);
-
// 设置作业启动类
-
job.setJarByClass(WordCountDriver.class);
-
-
// 设置Mapper类
-
job.setMapperClass(WordCountMapper.class);
-
// 设置map任务输出键类型
-
job.setMapOutputKeyClass(Text.class);
-
// 设置map任务输出值类型
-
job.setMapOutputValueClass(IntWritable.class);
-
-
// 设置Reducer类
-
job.setReducerClass(WordCountReducer.class);
-
// 设置reduce任务输出键类型
-
job.setOutputKeyClass(Text.class);
-
// 设置reduce任务输出值类型
-
job.setOutputValueClass(IntWritable.class);
-
-
// 设置分区数量(reduce任务的数量,结果文件的数量)
-
job.setNumReduceTasks(
3);
-
-
// 定义uri字符串
-
String
uri
=
"hdfs://master:9000";
-
// 创建输入目录
-
Path
inputPath
=
new
Path(uri +
"/wordcount2/input");
-
// 创建输出目录
-
Path
outputPath
=
new
Path(uri +
"/wordcount2/output");
-
-
// 获取文件系统
-
FileSystem
fs
= FileSystem.get(
new
URI(uri), conf);
-
// 删除输出目录(第二个参数设置是否递归)
-
fs.delete(outputPath,
true);
-
-
// 给作业添加输入目录(允许多个)
-
FileInputFormat.addInputPath(job, inputPath);
-
// 给作业设置输出目录(只能一个)
-
FileOutputFormat.setOutputPath(job, outputPath);
-
-
// 等待作业完成
-
boolean
res
= job.waitForCompletion(
true);
-
-
// 输出统计结果
-
System.out.println(
"======统计结果======");
-
FileStatus[] fileStatuses = fs.listStatus(outputPath);
-
for (
int
i
=
1; i < fileStatuses.length; i++) {
-
// 输出结果文件路径
-
System.out.println(fileStatuses[i].getPath());
-
// 获取文件系统数据字节输入流
-
FSDataInputStream
in
= fs.open(fileStatuses[i].getPath());
-
// 将结果文件显示在控制台
-
IOUtils.copyBytes(in, System.out,
4096,
false);
-
}
-
-
if (res) {
-
return
0;
-
}
else {
-
return -
1;
-
}
-
}
-
-
public
static
void
main
(String[] args)
throws Exception {
-
int
res
= ToolRunner.run(
new
WordCount(), args);
-
System.exit(res);
-
}
-
}
查看运行结果
转载:https://blog.csdn.net/dafsq/article/details/128317636