小言_互联网的博客

MapReduce词频统计演练进阶

379人阅读  评论(0)

 

目录

创建词频统计映射器类

创建词频统计驱动器类

第一次测试运行 

修改词频统计映射器类WordCoutMapper类

修改词频统计驱动器WordCountDriver类

 第二次测试运行

创建词频统计归并器类

第三测试运行

修改词频统计归并器类

 第四次测试运行

修改词频统计驱动器类,设置分区数量

 第五次测试运行

打包jar包上传hadoop运行

创建新词频统计驱动器类

将三个类合并成一个类完成词频统计

在上一篇中我们体验了MapReduce的基本流程所以本次将会在此基础上再进行词频统计的深入演练

Hadoop的MapReduce基本流程体验_open_test01的博客-CSDN博客

创建词频统计映射器类

在项目包中新建WordCountMapper类

继承泛型参数解读:

KEYIN :输入的key类型:LongWritable

VALUEIN:输入的value类型:Text

KEYOUT:输出的key类型:LongWritable

VALUEOUT:输出的value类型:Text


  
  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Mapper;
  4. import java.io.IOException;
  5. public class WordCountMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
  6. @Override
  7. protected void map (LongWritable key, Text value, Context context)
  8. throws IOException, InterruptedException {
  9. // 直接将键值对数据传到下一个阶段
  10. context.write(key, value);
  11. }
  12. }

创建词频统计驱动器类

在项目包中新建WordCountDriver类

这里使用BigData目录中的数据进行输入使用

 输出位置为新建的output目录


  
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.FSDataInputStream;
  3. import org.apache.hadoop.fs.FileStatus;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IOUtils;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import java.net.URI;
  13. public class WordCountDriver {
  14. public static void main (String[] args) throws Exception {
  15. // 创建配置对象
  16. Configuration conf = new Configuration();
  17. // 设置数据节点主机名属性
  18. conf.set( "dfs.client.use.datanode.hostname", "true");
  19. // 获取作业实例
  20. Job job = Job.getInstance(conf);
  21. // 设置作业启动类
  22. job.setJarByClass(WordCountDriver.class);
  23. // 设置Mapper类
  24. job.setMapperClass(WordCountMapper.class);
  25. // 设置map任务输出键类型
  26. job.setMapOutputKeyClass(LongWritable.class);
  27. // 设置map任务输出值类型
  28. job.setMapOutputValueClass(Text.class);
  29. // 定义uri字符串
  30. String uri = "hdfs://master:9000";
  31. // 创建输入目录
  32. Path inputPath = new Path(uri + "/BigData");
  33. // 创建输出目录
  34. Path outputPath = new Path(uri + "/output");
  35. // 获取文件系统
  36. FileSystem fs = FileSystem.get( new URI(uri), conf);
  37. // 删除输出目录(第二个参数设置是否递归)
  38. fs.delete(outputPath, true);
  39. // 给作业添加输入目录(允许多个)
  40. FileInputFormat.addInputPath(job, inputPath);
  41. // 给作业设置输出目录(只能一个)
  42. FileOutputFormat.setOutputPath(job, outputPath);
  43. // 等待作业完成
  44. job.waitForCompletion( true);
  45. // 输出统计结果
  46. System.out.println( "======统计结果======");
  47. FileStatus[] fileStatuses = fs.listStatus(outputPath);
  48. for ( int i = 1; i < fileStatuses.length; i++) {
  49. // 输出结果文件路径
  50. System.out.println(fileStatuses[i].getPath());
  51. // 获取文件系统数据字节输入流
  52. FSDataInputStream in = fs.open(fileStatuses[i].getPath());
  53. // 将结果文件显示在控制台
  54. IOUtils.copyBytes(in, System.out, 4096, false);
  55. }
  56. }
  57. }

第一次测试运行 

 注意:运行时先检查本地主机hadoop环境是否正常

 hadoop解压到本机目录下

 

本地环境变量配置

 下载对应版本的winutils.exehadoop.dll,放在hadoop安装目录的bin子目录里

 

 再将hadoop.dll放到C:/windows/system32文件夹下

 本地运行环境没有问题那么我们就可以开始运行了

查看WordCountDriver类在IDEA的运行结果

 如果不想看到统计结果之前的大堆信息 可以修改log4j.properties文件,将INFO改为ERROR

在Hadoop WebUI界面查看结果文件

 

修改词频统计映射器类WordCoutMapper类

  • 行首数字对于我们做单词统计没有任何用处,只需要拿到每一行内容,按空格拆分成单词,每个单词计数1,因此,WordCoutMapper的输出应该是单词个数,于是,输出键类型为Text,输出值类型为IntWritable
  • 将每行按空格拆分成单词数组,输出<单词, 1>的键值对

 修改该方法的泛型参数键值类型


  
  1. public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  2. @Override
  3. protected void map (LongWritable key, Text value, Context context)
  4. throws IOException, InterruptedException {
  5. // 获取行内容
  6. String line = value.toString();
  7. // 按空格拆分得到单词数组
  8. String[] words = line.split( " ");
  9. // 遍历单词数组,生成输出键值对
  10. for ( int i = 0; i < words.length; i++) {
  11. context.write( new Text(words[i]), new IntWritable( 1));
  12. }
  13. }
  14. }

修改词频统计驱动器WordCountDriver类

修改map任务输出键值类型使其与WordCoutMapper类泛型参数键值类型对应

 第二次测试运行

输出结果以键值对形式出现

 

对于这样一组键值对,传递到reduce阶段,按键排序,其值构成迭代器


  
  1. < 1>
  2. af < 1, 1, 1>
  3. as < 1, 1>
  4. faf < 1>
  5. gaf < 1>

映射任务与归并任务示意图

 

创建词频统计归并器类

在项目包中创建WordCountReducer类

 


  
  1. import org.apache.hadoop.io.IntWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. public class WordCountReducer extends Reducer<Text, IntWritable, Text, Text> {
  8. @Override
  9. protected void reduce (Text key, Iterable<IntWritable> values, Context context)
  10. throws IOException, InterruptedException {
  11. // 定义整数数组列表
  12. List<Integer> integers = new ArrayList<>();
  13. // 遍历输入值迭代器
  14. for (IntWritable value : values) {
  15. // 将每个值添加到数组列表
  16. integers.add(value.get()); // 利用get()方法将hadoop数据类型转换成java数据类型
  17. }
  18. // 输出新的键值对,注意要将java字符串转换成hadoop的text类型
  19. context.write(key, new Text(integers.toString()));
  20. }
  21. }

修改 WordCountDriver类与WordCoutMapper类中的泛型参数键值类型

设置词频统计的Reducer类及其输出键类型和输出值类型(Text,Text)

在 WordCountDriver类中写入Reducer输出设置

第三测试运行

现在我们需要修改词频统计归并器,将每个键(单词)的值迭代器进行累加,得到每个单词出现的总次数

 

修改词频统计归并器类

修改WordCountReducer类

输出键值类型改为IntWritable,遍历值迭代器,累加得到单词出现次数


  
  1. import org.apache.hadoop.io.IntWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  6. @Override
  7. protected void reduce (Text key, Iterable<IntWritable> values, Context context)
  8. throws IOException, InterruptedException {
  9. // 定义键出现次数
  10. int count = 0;
  11. // 遍历输入值迭代器
  12. for (IntWritable value : values) {
  13. count += value.get(); // 其实针对此案例,可用count++来处理
  14. }
  15. // 输出新的键值对,注意要将java的int类型转换成hadoop的IntWritable类型
  16. context.write(key, new IntWritable(count));
  17. }
  18. }

 修改WordCountDriver类

 第四次测试运行

可以看到每个单词出现的次数

 

修改词频统计驱动器类,设置分区数量

写入WordCountDriver类

 第五次测试运行

 并产生了3个结果文件

 

打包jar包上传hadoop运行

 

 拷贝WordCountDriverr 驱动类全类名

 hadoop中执行命令运行jar包

执行命令


  
  1. hadoop jar MRWordCount- 1.0-SNAPSHOT.jar mpr.WordCountDriver
  2. hadoop jar 使用的jar包 全类名

 

 

 

 运行结果如下

 

这样可以执行但是路径却是被写死了 我们用一种给main传递路径参数args[索引]

所以下面我们将对其修改

创建新词频统计驱动器类

创建一个新的WordCountDriver类 并命名为WordCountDriverNew


  
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.FSDataInputStream;
  3. import org.apache.hadoop.fs.FileStatus;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IOUtils;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import java.net.URI;
  13. public class WordCountDriverNew {
  14. public static void main (String[] args) throws Exception {
  15. // 创建配置对象
  16. Configuration conf = new Configuration();
  17. // 设置数据节点主机名属性
  18. conf.set( "dfs.client.use.datanode.hostname", "true");
  19. // 获取作业实例
  20. Job job = Job.getInstance(conf);
  21. // 设置作业启动类
  22. job.setJarByClass(WordCountDriverNew.class);
  23. // 设置Mapper类
  24. job.setMapperClass(WordCountMapper.class);
  25. // 设置map任务输出键类型
  26. job.setMapOutputKeyClass(Text.class);
  27. // 设置map任务输出值类型
  28. job.setMapOutputValueClass(IntWritable.class);
  29. // 设置Reducer类
  30. job.setReducerClass(WordCountReducer.class);
  31. // 设置reduce任务输出键类型
  32. job.setOutputKeyClass(Text.class);
  33. // 设置reduce任务输出值类型
  34. job.setOutputValueClass(IntWritable.class);
  35. // 设置分区数量(reduce任务的数量,结果文件的数量)
  36. job.setNumReduceTasks( 3);
  37. // 定义uri字符串
  38. String uri = "hdfs://master:9000";
  39. // 声明输入目录
  40. Path inputPath = null;
  41. // 声明输出目录
  42. Path outputPath = null;
  43. // 判断输入参数个数
  44. if (args.length == 0) {
  45. // 创建输入目录
  46. inputPath = new Path(uri + "/wordcount/input");
  47. // 创建输出目录
  48. outputPath = new Path(uri + "/wordcount/output");
  49. } else if (args.length == 2) {
  50. // 创建输入目录
  51. inputPath = new Path(uri + args[ 0]);
  52. // 创建输出目录
  53. outputPath = new Path(uri + args[ 1]);
  54. } else {
  55. // 提示用户参数个数不符合要求
  56. System.out.println( "参数个数不符合要求,要么是0个,要么是2个!");
  57. // 结束应用程序
  58. return;
  59. }
  60. // 获取文件系统
  61. FileSystem fs = FileSystem.get( new URI(uri), conf);
  62. // 删除输出目录(第二个参数设置是否递归)
  63. fs.delete(outputPath, true);
  64. // 给作业添加输入目录(允许多个)
  65. FileInputFormat.addInputPath(job, inputPath);
  66. // 给作业设置输出目录(只能一个)
  67. FileOutputFormat.setOutputPath(job, outputPath);
  68. // 等待作业完成
  69. job.waitForCompletion( true);
  70. // 输出统计结果
  71. System.out.println( "======统计结果======");
  72. FileStatus[] fileStatuses = fs.listStatus(outputPath);
  73. for ( int i = 1; i < fileStatuses.length; i++) {
  74. // 输出结果文件路径
  75. System.out.println(fileStatuses[i].getPath());
  76. // 获取文件系统数据字节输入流
  77. FSDataInputStream in = fs.open(fileStatuses[i].getPath());
  78. // 将结果文件显示在控制台
  79. IOUtils.copyBytes(in, System.out, 4096, false);
  80. }
  81. }
  82. }

然后重新打包上传到hadoop执行

执行命令 这次将可以指定路径进行运行


  
  1. hadoop jar MRWordCount- 1.0-SNAPSHOT.jar /BigData /outputs
  2. hadoop jar 使用的jar包 全类名 /输入数据原文件 /输出路径目录

 

 在webUI上查看运行结果

 

将三个类合并成一个类完成词频统计

新建WordCount类 并将功能合并 处理词频统计任务

 


  
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.conf.Configured;
  3. import org.apache.hadoop.fs.FSDataInputStream;
  4. import org.apache.hadoop.fs.FileStatus;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IOUtils;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import org.apache.hadoop.mapreduce.Reducer;
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import org.apache.hadoop.util.Tool;
  17. import org.apache.hadoop.util.ToolRunner;
  18. import java.io.IOException;
  19. import java.net.URI;
  20. public class WordCount extends Configured implements Tool {
  21. public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  22. @Override
  23. protected void map (LongWritable key, Text value, Context context)
  24. throws IOException, InterruptedException {
  25. // 获取行内容
  26. String line = value.toString();
  27. // 清洗所有英文标点符号(\p——属性[property],P——标点符号[Punctuation])
  28. line = line.replaceAll( "[\\pP]", "");
  29. // 按空格拆分得到单词数组
  30. String[] words = line.split( " ");
  31. // 遍历单词数组,生成输出键值对
  32. for ( int i = 0; i < words.length; i++) {
  33. context.write( new Text(words[i]), new IntWritable( 1));
  34. }
  35. }
  36. }
  37. public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  38. @Override
  39. protected void reduce (Text key, Iterable<IntWritable> values, Context context)
  40. throws IOException, InterruptedException {
  41. // 定义输出键出现次数
  42. int count = 0;
  43. // 历输出值迭代对象,统计其出现次数
  44. for (IntWritable value : values) {
  45. count = count + value.get();
  46. }
  47. // 生成键值对输出
  48. context.write(key, new IntWritable(count));
  49. }
  50. }
  51. @Override
  52. public int run (String[] strings) throws Exception {
  53. // 创建配置对象
  54. Configuration conf = new Configuration();
  55. // 设置数据节点主机名属性
  56. conf.set( "dfs.client.use.datanode.hostname", "true");
  57. // 获取作业实例
  58. Job job = Job.getInstance(conf);
  59. // 设置作业启动类
  60. job.setJarByClass(WordCountDriver.class);
  61. // 设置Mapper类
  62. job.setMapperClass(WordCountMapper.class);
  63. // 设置map任务输出键类型
  64. job.setMapOutputKeyClass(Text.class);
  65. // 设置map任务输出值类型
  66. job.setMapOutputValueClass(IntWritable.class);
  67. // 设置Reducer类
  68. job.setReducerClass(WordCountReducer.class);
  69. // 设置reduce任务输出键类型
  70. job.setOutputKeyClass(Text.class);
  71. // 设置reduce任务输出值类型
  72. job.setOutputValueClass(IntWritable.class);
  73. // 设置分区数量(reduce任务的数量,结果文件的数量)
  74. job.setNumReduceTasks( 3);
  75. // 定义uri字符串
  76. String uri = "hdfs://master:9000";
  77. // 创建输入目录
  78. Path inputPath = new Path(uri + "/wordcount2/input");
  79. // 创建输出目录
  80. Path outputPath = new Path(uri + "/wordcount2/output");
  81. // 获取文件系统
  82. FileSystem fs = FileSystem.get( new URI(uri), conf);
  83. // 删除输出目录(第二个参数设置是否递归)
  84. fs.delete(outputPath, true);
  85. // 给作业添加输入目录(允许多个)
  86. FileInputFormat.addInputPath(job, inputPath);
  87. // 给作业设置输出目录(只能一个)
  88. FileOutputFormat.setOutputPath(job, outputPath);
  89. // 等待作业完成
  90. boolean res = job.waitForCompletion( true);
  91. // 输出统计结果
  92. System.out.println( "======统计结果======");
  93. FileStatus[] fileStatuses = fs.listStatus(outputPath);
  94. for ( int i = 1; i < fileStatuses.length; i++) {
  95. // 输出结果文件路径
  96. System.out.println(fileStatuses[i].getPath());
  97. // 获取文件系统数据字节输入流
  98. FSDataInputStream in = fs.open(fileStatuses[i].getPath());
  99. // 将结果文件显示在控制台
  100. IOUtils.copyBytes(in, System.out, 4096, false);
  101. }
  102. if (res) {
  103. return 0;
  104. } else {
  105. return - 1;
  106. }
  107. }
  108. public static void main (String[] args) throws Exception {
  109. int res = ToolRunner.run( new WordCount(), args);
  110. System.exit(res);
  111. }
  112. }

查看运行结果


转载:https://blog.csdn.net/dafsq/article/details/128317636
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场