一、Hadoop-MapReduce-流量统计-需求分析
现有一份access.log日志文件
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com
需求:读取access.log文件
关注:第二个字段:手机号 倒数第三字段:上行流量 倒数第二个字段:下行流量
需求:统计每个手机号的上行流量和、下行流量和、总流量和
分析:
-
自定义复杂返回类型Access.java
- 手机号、上行流量、下行流量、总流量
-
根据需求需要将手机号进行分组,然后将上行流量和下行流量相加
-
Mapper:把手机号、上行流量、下行流量拆开
- 把手机号作为key,把Access作为value作为输出
-
Reducer:输出到reducer的结果:(13390905421,<Access, Access>)
二、Hadoop-MapReduce-流量统计-代码实现
-
为了简化代码添加lombok
-
pom.xml中的dependencies节点下添加依赖
<!--添加lombok的依赖--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency>
-
pom.xml中的properties节点下添加版本号
<!--定义lombok版本--> <lombok.version>1.18.12</lombok.version>
-
pom.xml中的repositories节点下添加仓库
<!--引入maven中央仓库--> <repository> <id>central</id> <name>Central Repository</name> <url>https://repo.maven.apache.org/maven2</url> </repository>
-
-
代码实现
-
自定义复杂返回类型-Access.java
/** * @ClassName Access * @Description 定义输出的复杂类型,需要实现Writable接口 * @Author eastern * @Date 2020/4/30 下午4:36 * @Version 1.0 **/ @Data @NoArgsConstructor public class Access implements Writable { /** * 手机号码 */ private String phone; /** * 上行流量 */ private Long up; /** * 下行流量 */ private Long down; /** * 总流量 */ private Long sum; /** * 自定义构造,方便后续构造Access实例 * @param phone * @param up * @param down */ public Access(String phone, Long up, Long down){ this.phone = phone; this.up = up; this.down = down; this.sum = up + down; } /** * 重写toString方法,为了输出更加友好 * @return */ @Override public String toString() { return phone + "," + up + "," + down + "," + sum; } /** * 写入值 * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(phone); out.writeLong(up); out.writeLong(down); out.writeLong(sum); } /** * 读取值,跟写入的顺序需要一一对应 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { this.phone = in.readUTF(); this.up = in.readLong(); this.down = in.readLong(); this.sum = in.readLong(); } }
-
自定义mapper-AccessMapper.java
/** * @ClassName AccessMapper * @Description 自定义Mapper处理类 * @Author eastern * @Date 2020/4/30 下午10:20 * @Version 1.0 **/ public class AccessMapper extends Mapper<LongWritable, Text, Text, Access> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] lines = value.toString().split("\t"); // 取出手机号 String phone = lines[1]; // 取出上行流量 long up = Long.parseLong(lines[lines.length-3]); // 取出下行流量 long down = Long.parseLong(lines[lines.length-2]); context.write(new Text(phone), new Access(phone, up, down)); } }
-
自定义Reducer-AccessReducer.java
/** * @ClassName AccessReducer * @Description 自定义Reducer * @Author eastern * @Date 2020/4/30 下午10:30 * @Version 1.0 **/ public class AccessReducer extends Reducer<Text, Access, NullWritable, Access> { /** * * @param key 手机号 * @param values <Access, Access> * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<Access> values, Context context) throws IOException, InterruptedException { long ups = 0; long downs = 0; for (Access access : values) { ups += access.getUp(); downs += access.getDown(); } context.write(NullWritable.get(), new Access(key.toString(), ups, downs)); } }
-
自定义driver-AccessLocalApp.java
/** * @ClassName AccessLocalApp * @Description Access的Driver * @Author eastern * @Date 2020/4/30 下午10:36 * @Version 1.0 **/ public class AccessLocalApp { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(AccessLocalApp.class); job.setMapperClass(AccessMapper.class); job.setReducerClass(AccessReducer.class); job.setMapOutputKeyClass(Text.class); job.setOutputValueClass(Access.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Access.class); FileInputFormat.setInputPaths(job, new Path("/Users/xxx/IdeaProjects/bigdata/hadoop-mapreduce/src/main/resources/access.log")); FileOutputFormat.setOutputPath(job, new Path("/Users/xxx/IdeaProjects/bigdata/hadoop-mapreduce/src/main/resources/access")); job.waitForCompletion(true); } }
-
三、Hadoop-MapReduce-流量统计-Partitioner
-
源码分析:
-
进入Partitioner源码
@InterfaceAudience.Public @InterfaceStability.Stable public abstract class Partitioner<KEY, VALUE> { /** * Get the partition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job. * * <p>Typically a hash function on a all or a subset of the key.</p> * * @param key the key to be partioned. * @param value the entry value. * @param numPartitions the total number of partitions. * @return the partition number for the <code>key</code>. */ public abstract int getPartition(KEY key, VALUE value, int numPartitions); }
- 抽象类
- Partitioner决定mapTask输出的数据交由哪个reducetask处理
-
寻找sufflem默认分组的实现。
-
我们去Job源码中查看搜索partitioner,发现只提供了setPartitionerClass,不是我们要的答案。
-
在Job类上还集成了JobContextImpl,进入JobContextImpl搜索partitioner,发现有个getPartitionerClass,这边设置的默认实现是HashPartitioner.class。
public Class<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException { return (Class<? extends Partitioner<?,?>>) conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); }
-
在drive程序中打印试试,看看是不是这个HashPartitioner
System.out.println("PartitionerClass--->" + job.getPartitionerClass().toString());
- 控制台显示:
- 可以看到确实是HashPartitioner
- 控制台显示:
-
进入HashPartitioner源码
@InterfaceAudience.Public @InterfaceStability.Stable public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
-
numReduceTasks:你的作业所指定的reducer的个数,决定了reducer作业输出文件的个数。
-
默认numReduceTasks为1
-
源码:Job–>JobContextImpl–>JobConf中
public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
-
-
-
key.hashCode() & Integer.MAX_VALU:保证结果谓非负数
-
-
-
案例需求:
-
将统计结果按照手机的前缀进行区分,并输出到不同的输出文件中
13* ==> …
15* ==> …
other ==> …
-
代码实现
-
自定义Partitioner-AccessPartitioner
/** * @ClassName AccessPartitioner * @Description MapReducer自定义分区规则 * @Author eastern * @Date 2020/5/2 上午11:54 * @Version 1.0 **/ public class AccessPartitioner extends Partitioner<Text, Access> { /** * @param phone 手机号 * @param access * @param numPartitions * @return */ @Override public int getPartition(Text phone, Access access, int numPartitions) { if (phone.toString().startsWith("13")) { return 0; } else if (phone.toString().startsWith("15")) { return 1; } else { return 2; } } }
-
Driver设置Partitioner和reduceTasks
// 设置自定义分区规则 job.setPartitionerClass(AccessPartitioner.class); // 设置分区数 注意:需要与分区规则对应 job.setNumReduceTasks(3);
-
输出效果
-
-
-
转载:https://blog.csdn.net/u012365780/article/details/105893259