今天我们还是统计最热门的前四电影。用Combiner
import com.sort2.UserRateTop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
public class RateHot {
public static class RateHotMap extends Mapper<LongWritable, Text,Text, IntWritable>{
ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//处理json数据,获取字段
String line = value.toString();
UserRateTop userRateTop = objectMapper.readValue(line, UserRateTop.class);
String movie = userRateTop.getMovie();
//传给RateHotCombiner类的Reduce 统计个数,在传给RateHotReduce
//此方法是在map阶段求和,以前写的都是在Reduce阶段求和
context.write(new Text(movie),new IntWritable(1));
}
}
public static class RateHotReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
TreeMap<IntWritable, Text> map;
@Override
//匿名内部类
protected void setup(Context context) throws IOException, InterruptedException {
map = new TreeMap<>(new Comparator<IntWritable>() {
@Override
public int compare(IntWritable o1, IntWritable o2) {
//排序
return o2.compareTo(o1);
}
});
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Integer count = 0;
for (IntWritable value : values) {
count = count + value.get();
}
//将统计完的数据放到map里 排序
map.put(new IntWritable(count), new Text(key));
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
//在这里定义取出多少
int suibian = conf.getInt("suibian", 3);
for (int i = 0; i < suibian; i++) {
Map.Entry<IntWritable, Text> entry = map.pollFirstEntry();
IntWritable count = entry.getKey();
Text movie = entry.getValue();
context.write(movie, count);
}
}
}
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
BasicConfigurator.configure();//自动快速使用缺省log4j的环境
Configuration conf = new Configuration();
conf.setInt("suibian",Integer.parseInt(args[0]));
Job job = Job.getInstance(conf);
// conf.set("yarn.resorcemanager.hostname","192.168.72.110");
// conf.set("fs.deafutFS", "hdfs://192.168.72.110:9000/");
job.setCombinerClass(RateHotCombiner.class);
job.setJarByClass(RateHot.class);
job.setMapperClass(RateHotMap.class);
job.setReducerClass(RateHotReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);
// FileInputFormat.setInputPaths(job,new Path(args[1]));
// FileOutputFormat.setOutputPath(job,new Path(args[1]));
FileInputFormat.setInputPaths(job,new Path("D:/eclipse/wc/input/rating.json"));
FileOutputFormat.setOutputPath(job,new Path("D:/eclipse/wc/output"));
job.submit();
boolean b = job.waitForCompletion(true);
System.exit(b ? 0:1);
}
}
Combiner
类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class RateHotCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Integer count = 0;
for (IntWritable value:values) {
count++;
}
context.write(key,new IntWritable(count));
}
}
结果
注意 : conf.setInt("suibian",Integer.parseInt(args[0]));
里面的参数传递。
转载:https://blog.csdn.net/TylerPY/article/details/100821633
查看评论