小言_互联网的博客

Hadoop中的计数器(累加器)

467人阅读  评论(0)

1、什么是计数器?

计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到map 或reduce 任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。

2、Hadoop内置计数器列表

MapReduce任务计数器 org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器 org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat计数器 org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat计数器 org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器 org.apache.hadoop.mapreduce.JobCounter

每次mapreduce执行完成之后,我们都会看到一些日志记录出来,其中最重要的一些日志记录如下截图:

  • 所有的这些都是MapReduce的计数器的功能,既然MapReduce当中有计数器的功能,我们如何实现自己的计数器???

3、需求:以前面文章介绍的排序以及序列化为案例,统计map接收到的数据记录条数。

4、实现:

第一种方式定义计数器:

  • 通过context上下文对象可以获取我们的计数器,进行记录
  • 通过context上下文对象,在map端使用计数器进行统计


  
  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.NullWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Counter;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. public class FlowSortMapper extends Mapper<LongWritable, Text, FlowSortBean, NullWritable> {
  8. private FlowSortBean flowSortBean;
  9. //初始化
  10. @Override
  11. protected void setup(Context context) throws IOException, InterruptedException {
  12. flowSortBean = new FlowSortBean();
  13. }
  14. @Override
  15. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  16. //自定义我们的计数器,这里实现了统计map输入数据的条数
  17. Counter counter = context.getCounter( "MR_COUNT", "MapRecordCounter");
  18. counter.increment( 1L);
  19. /**
  20. * 手机号 上行包 下行包 上行总流量 下行总流量
  21. * 13480253104 3 3 180 180
  22. */
  23. String[] split = value.toString().split( "\t");
  24. flowSortBean.setPhone(split[ 0]);
  25. flowSortBean.setUpPackNum(Integer.parseInt(split[ 1]));
  26. flowSortBean.setDownPackNum(Integer.parseInt(split[ 2]));
  27. flowSortBean.setUpPayLoad(Integer.parseInt(split[ 3]));
  28. flowSortBean.setDownPayLoad(Integer.parseInt(split[ 4]));
  29. context.write(flowSortBean, NullWritable.get());
  30. }
  31. }

运行程序之后就可以看到我们自定义的计数器在map阶段读取了七条数据 :

第二种方式定义计数器:

通过enum枚举类型来定义计数器,

统计reduce端数据的输入的key有多少个,对应的value有多少个


  
  1. import org.apache.hadoop.io.NullWritable;
  2. import org.apache.hadoop.mapreduce.Reducer;
  3. import java.io.IOException;
  4. public class FlowSortReducer extends Reducer<FlowSortBean, NullWritable, FlowSortBean, NullWritable> {
  5. public static enum Counter{
  6. REDUCE_INPUT_RECORDS
  7. }
  8. @Override
  9. protected void reduce(FlowSortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  10. context.getCounter(Counter.REDUCE_INPUT_RECORDS).increment( 1L);
  11. //经过排序后的数据,直接输出即可
  12. context.write(key, NullWritable.get());
  13. }
  14. }

 


转载:https://blog.csdn.net/weixin_43230682/article/details/107917953
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场