飞道的博客

IDEA本地调试Map-Reduce程序

421人阅读  评论(0)

环境准备

安装Hadoop

访问 Hadoop官网 ,下载Hadoop到Windows本地 ,本例中下载的是 hadoop-3.0.0.tar.gz 。

将 Hadoop 解压到合适的目录,并设置环境变量:

HADOOP_HOME=C:\DevTolls\hadoop-3.0.0

并在 Path 环境变量增加两项:

%HADOOP_HOME%\bin;%HADOOP_HOME%\sbin;

安装winutils插件

访问 winutils的Git项目地址 ,将下载的压缩包进行解压。

根据你的 hadoop 版本,将对应的目录下的 hadoop.dll和winutils.exe 复制到 hadoop 的 \bin 目录。 

若缺少 winutils.exe 的情况下启动 MapReduce 作业会出现如下错误:

2020-04-13 18:47:37,788 WARN [org.apache.hadoop.util.Shell] - Did not find winutils.exe: {}

java.io.FileNotFoundException: Could not locate Hadoop executable: C:\DevTolls\hadoop-3.0.0\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems 

Map-Reduce项目

新建Maven项目

打开 IDEA,进行如下操作新建一个Maven项目。

File ==> New ==> Project...,创建一个 Maven 工程,先设置好JDK,选择不使用模板(不需要勾选 Create from archetype),直接 Next 。 设置好 GroupId 、ArtifactId 和 Project name 完成项目创建。

引入Maven依赖

修改 pom.xml 文件,添加 Hadoop 相关的依赖,根据自己的实际情况,选择合适的版本。


  
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0 </modelVersion>
  6. <groupId>com.pengjunlee </groupId>
  7. <artifactId>wordcount-test </artifactId>
  8. <version>1.0-SNAPSHOT </version>
  9. <properties>
  10. <project.build.sourceEncoding>UTF-8 </project.build.sourceEncoding>
  11. <!--设置hadoop版本-->
  12. <hadoop.version>3.0.0 </hadoop.version>
  13. </properties>
  14. <dependencies>
  15. <!--hadoop 依赖-->
  16. <dependency>
  17. <groupId>junit </groupId>
  18. <artifactId>junit </artifactId>
  19. <version>4.12 </version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.hadoop </groupId>
  23. <artifactId>hadoop-client </artifactId>
  24. <version>${hadoop.version} </version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.hadoop </groupId>
  28. <artifactId>hadoop-common </artifactId>
  29. <version>${hadoop.version} </version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.hadoop </groupId>
  33. <artifactId>hadoop-hdfs </artifactId>
  34. <version>${hadoop.version} </version>
  35. </dependency>
  36. </dependencies>
  37. </project>

编写代码 

WordcountMapper


  
  1. import org.apache.hadoop.io.IntWritable;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. /**
  7. * KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long;
  8. * 在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而是用LongWritable
  9. * VALUEIN:默认情况下,是mr框架所读到的一行文本内容,String;此处用Text
  10. * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String;此处用Text
  11. * VALUEOUT,是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,此处用IntWritable
  12. * @author Administrator
  13. */
  14. public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  15. /**
  16. * map阶段的业务逻辑就写在自定义的map()方法中
  17. * maptask会对每一行输入数据调用一次我们自定义的map()方法
  18. */
  19. @Override
  20. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  21. // 1 将maptask传给我们的文本内容先转换成String
  22. String line = value.toString();
  23. // 2 根据空格将这一行切分成单词
  24. String[] words = line.split( " ");
  25. // 3 将单词输出为<单词,1>
  26. for(String word:words){
  27. // 将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reducetask中
  28. context.write( new Text(word), new IntWritable( 1));
  29. }
  30. }
  31. }

 WordcountReducer


  
  1. import org.apache.hadoop.io.IntWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. /**
  6. * KEYIN , VALUEIN 对应mapper输出的KEYOUT, VALUEOUT类型
  7. * KEYOUT,VALUEOUT 对应自定义reduce逻辑处理结果的输出数据类型 KEYOUT是单词 VALUEOUT是总次数
  8. */
  9. public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  10. /**
  11. * key,是一组相同单词kv对的key
  12. */
  13. @Override
  14. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  15. int count = 0;
  16. // 1 汇总各个key的个数
  17. for(IntWritable value:values){
  18. count +=value.get();
  19. }
  20. // 2输出该key的总次数
  21. context.write(key, new IntWritable(count));
  22. }
  23. }

 WordcountDriver


  
  1. package com.pengjunlee.wordcount;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. /**
  10. * 相当于一个yarn集群的客户端,
  11. * 需要在此封装我们的mr程序相关运行参数,指定jar包
  12. * 最后提交给yarn
  13. * @author Administrator
  14. */
  15. public class WordcountDriver {
  16. public static void main(String[] args) throws Exception {
  17. // 1 获取配置信息,或者job对象实例
  18. Configuration configuration = new Configuration();
  19. // 8 配置提交到yarn上运行,windows和Linux变量不一致
  20. // configuration.set("mapreduce.framework.name", "yarn");
  21. // configuration.set("yarn.resourcemanager.hostname", "node22");
  22. Job job = Job.getInstance(configuration);
  23. // 6 指定本程序的jar包所在的本地路径
  24. // job.setJar("/home/admin/wc.jar");
  25. job.setJarByClass(WordcountDriver.class);
  26. // 2 指定本业务job要使用的mapper/Reducer业务类
  27. job.setMapperClass(WordcountMapper.class);
  28. job.setReducerClass(WordcountReducer.class);
  29. // 3 指定mapper输出数据的kv类型
  30. job.setMapOutputKeyClass(Text.class);
  31. job.setMapOutputValueClass(IntWritable.class);
  32. // 4 指定最终输出的数据的kv类型
  33. job.setOutputKeyClass(Text.class);
  34. job.setOutputValueClass(IntWritable.class);
  35. // 5 指定job的输入原始文件所在目录
  36. FileInputFormat.setInputPaths(job, new Path(args[ 0]));
  37. FileOutputFormat.setOutputPath(job, new Path(args[ 1]));
  38. // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
  39. // job.submit();
  40. boolean result = job.waitForCompletion( true);
  41. System.exit(result? 0: 1);
  42. }
  43. }

log4j.properties

在 /resources 目录下添加 log4j.properties 配置。


  
  1. log4j.rootLogger=INFO, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  5. log4j.appender.logfile=org.apache.log4j.FileAppender
  6. log4j.appender.logfile.File=target/spring.log
  7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

启动测试

模拟数据

在项目根目录下新建一个 /input 文件夹,用来存放测试数据。

demo.txt 内容如下:


  
  1. hello world
  2. dog fish
  3. hadoop
  4. spark
  5. hello world
  6. dog fish
  7. hadoop
  8. spark
  9. hello world
  10. dog fish
  11. hadoop
  12. spark

 配置启动参数

按照如下指引,新建一个启动配置。其中 Program arguments 中需指定输入样本数据目录和统计结果输出目录(必须是一个不存在的目录,否则会报错)。

测试结果

启动 Application ,运行完成之后项目根目录下会多出来一个 /output 目录,里面存放了程序的执行结果。

part-r-00000文件内容如下:


  
  1. dog 3
  2. fish 3
  3. hadoop 3
  4. hello 3
  5. spark 3
  6. world 3

 


转载:https://blog.csdn.net/pengjunlee/article/details/105494757
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场