小言_互联网的博客

第1关:MapReduce综合应用案例 — 电信数据清洗

570人阅读  评论(0)

根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。

数据说明如下: a.txt

数据切分方式:,

数据所在位置:/user/test/input/a.txt

15733218050,15778423030,1542457633,1542457678,450000,530000

15733218050 15778423030 1542457633 1542457678 450000 530000
呼叫者手机号 接受者手机号 开始时间戳(s) 接受时间戳(s) 呼叫者地址省份编码 接受者地址省份编码

Mysql数据库:

用户名:root 密码:123123

数据库名:mydb

用户表:userphone

列名 类型 非空 是否自增 介绍
id int(11) 用户ID
phone varchar(255) 手机号
trueName varchar(255) 真实姓名

地址省份表:allregion

列名 类型 非空 是否自增 介绍
id int(11) 用户ID
CodeNum varchar(255) 编号
Address varchar(255) 地址

清洗规则:

  • 处理数据中的时间戳(秒级)将其转化为2017-06-21 07:01:58,年-月-日 时:分:秒 这种格式;

  • 处理数据中的省份编码,结合mysql的表数据对应,将其转换成省份名称;

  • 处理用户手机号,与mysql的表数据对应,关联用户的真实姓名;

  • 处理数据中的开始时间与结束时间并计算通信时长(以秒为单位);

  • 设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /user/test/input/a.txt (HDFS); 清洗后的数据存放于:/user/test/output (HDFS)

数据清洗后如下:

邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市

邓二 张倩 13666666666 15151889601 2018-03-29 10:58:12 2018-03-29 10:58:42 30 黑龙江省 上海市
用户名A 用户名B 用户A的手机号 用户B的手机号 开始时间 结束时间

step/com/LogMR.java


  
  1. package com;
  2. import java.io.IOException;
  3. import java.sql.Connection;
  4. import java.sql.ResultSet;
  5. import java.sql.SQLException;
  6. import java.sql.Statement;
  7. import java.text.SimpleDateFormat;
  8. import java.util.ArrayList;
  9. import java.util.HashMap;
  10. import java.util.Iterator;
  11. import java.util.List;
  12. import java.util.Map;
  13. import org.apache.hadoop.conf.Configuration;
  14. import org.apache.hadoop.fs.FileSystem;
  15. import org.apache.hadoop.fs.Path;
  16. import org.apache.hadoop.io.LongWritable;
  17. import org.apache.hadoop.io.NullWritable;
  18. import org.apache.hadoop.io.Text;
  19. import org.apache.hadoop.mapreduce.Job;
  20. import org.apache.hadoop.mapreduce.Mapper;
  21. import org.apache.hadoop.mapreduce.Reducer;
  22. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  24. public class LogMR {
  25. /********** begin **********/
  26. static class MyMapper extends Mapper<LongWritable, Text, PhoneLog, NullWritable> {
  27. Map<String, String> userMap = new HashMap<>();
  28. Map<String, String> addressMap = new HashMap<>();
  29. SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss");
  30. PhoneLog pl = new PhoneLog();
  31. Text text = new Text();
  32. @Override
  33. protected void setup (Context context) throws IOException, InterruptedException {
  34. Connection connection = DBHelper.getConnection();
  35. try {
  36. Statement statement = connection.createStatement();
  37. String sql = "select * from userphone";
  38. ResultSet resultSet = statement.executeQuery(sql);
  39. while (resultSet.next()) {
  40. String phone = resultSet.getString( 2);
  41. String trueName = resultSet.getString( 3);
  42. userMap.put(phone, trueName);
  43. }
  44. String sql2 = "select * from allregion";
  45. ResultSet resultSetA = statement.executeQuery(sql2);
  46. while (resultSetA.next()) {
  47. String phone = resultSetA.getString( 2);
  48. String trueName = resultSetA.getString( 3);
  49. addressMap.put(phone, trueName);
  50. }
  51. } catch (SQLException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. @Override
  56. protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  57. String str = value.toString();
  58. String[] split = str.split( ",");
  59. if (split.length == 6) {
  60. String trueName1 = userMap.get(split[ 0]);
  61. String trueName2 = userMap.get(split[ 1]);
  62. String address1 = addressMap.get(split[ 4]);
  63. String address2 = addressMap.get(split[ 5]);
  64. long startTimestamp = Long.parseLong(split[ 2]);
  65. String startTime = sdf.format(startTimestamp * 1000);
  66. long endTimestamp = Long.parseLong(split[ 3]);
  67. String endTime = sdf.format(endTimestamp * 1000);
  68. long timeLen = endTimestamp - startTimestamp;
  69. pl.SetPhoneLog(trueName1, trueName2, split[ 0], split[ 1], startTime, endTime, timeLen, address1,
  70. address2);
  71. context.write(pl, NullWritable.get());
  72. }
  73. }
  74. }
  75. public static void main (String[] args) throws Exception {
  76. Configuration conf = new Configuration();
  77. Job job = Job.getInstance(conf);
  78. job.setJarByClass(LogMR.class);
  79. job.setMapperClass(MyMapper.class);
  80. job.setMapOutputKeyClass(PhoneLog.class);
  81. job.setMapOutputValueClass(NullWritable.class);
  82. job.setNumReduceTasks( 0);
  83. Path inPath = new Path( "/user/test/input/a.txt");
  84. Path out = new Path( "/user/test/output");
  85. FileInputFormat.setInputPaths(job, inPath);
  86. FileOutputFormat.setOutputPath(job, out);
  87. job.waitForCompletion( true);
  88. }
  89. /********** end **********/
  90. }

step/com/DBHelper.java


  
  1. package com;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.SQLException;
  5. public class DBHelper {
  6. /********** begin **********/
  7. private static final String driver = "com.mysql.jdbc.Driver";
  8. private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
  9. private static final String username = "root"; // 数据库的用户名
  10. private static final String password = "123123"; // 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
  11. private static Connection conn = null; // 声明数据库连接对象
  12. static {
  13. try {
  14. Class.forName(driver);
  15. } catch (Exception ex) {
  16. ex.printStackTrace();
  17. }
  18. }
  19. public static Connection getConnection () {
  20. if (conn == null) {
  21. try {
  22. conn = DriverManager.getConnection(url, username, password);
  23. } catch (SQLException e) {
  24. e.printStackTrace();
  25. } // 连接数据库
  26. return conn;
  27. }
  28. return conn;
  29. }
  30. /********** end **********/
  31. }

step/com/phonelog.java


  
  1. package com;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.Writable;
  6. import org.apache.hadoop.io.WritableComparable;
  7. public class PhoneLog implements WritableComparable< PhoneLog> {
  8. private String userA;
  9. private String userB;
  10. private String userA_Phone;
  11. private String userB_Phone;
  12. private String startTime;
  13. private String endTime;
  14. private Long timeLen;
  15. private String userA_Address;
  16. private String userB_Address;
  17. public PhoneLog() {
  18. }
  19. public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,
  20. String endTime, Long timeLen, String userA_Address, String userB_Address) {
  21. this.userA = userA;
  22. this.userB = userB;
  23. this.userA_Phone = userA_Phone;
  24. this.userB_Phone = userB_Phone;
  25. this.startTime = startTime;
  26. this.endTime = endTime;
  27. this.timeLen = timeLen;
  28. this.userA_Address = userA_Address;
  29. this.userB_Address = userB_Address;
  30. }
  31. public String getUserA_Phone() {
  32. return userA_Phone;
  33. }
  34. public void setUserA_Phone(String userA_Phone) {
  35. this.userA_Phone = userA_Phone;
  36. }
  37. public String getUserB_Phone() {
  38. return userB_Phone;
  39. }
  40. public void setUserB_Phone(String userB_Phone) {
  41. this.userB_Phone = userB_Phone;
  42. }
  43. public String getUserA() {
  44. return userA;
  45. }
  46. public void setUserA(String userA) {
  47. this.userA = userA;
  48. }
  49. public String getUserB() {
  50. return userB;
  51. }
  52. public void setUserB(String userB) {
  53. this.userB = userB;
  54. }
  55. public String getStartTime() {
  56. return startTime;
  57. }
  58. public void setStartTime(String startTime) {
  59. this.startTime = startTime;
  60. }
  61. public String getEndTime() {
  62. return endTime;
  63. }
  64. public void setEndTime(String endTime) {
  65. this.endTime = endTime;
  66. }
  67. public Long getTimeLen() {
  68. return timeLen;
  69. }
  70. public void setTimeLen( Long timeLen) {
  71. this.timeLen = timeLen;
  72. }
  73. public String getUserA_Address() {
  74. return userA_Address;
  75. }
  76. public void setUserA_Address(String userA_Address) {
  77. this.userA_Address = userA_Address;
  78. }
  79. public String getUserB_Address() {
  80. return userB_Address;
  81. }
  82. public void setUserB_Address(String userB_Address) {
  83. this.userB_Address = userB_Address;
  84. }
  85. @Override
  86. public void write(DataOutput out) throws IOException {
  87. out.writeUTF(userA);
  88. out.writeUTF(userB);
  89. out.writeUTF(userA_Phone);
  90. out.writeUTF(userB_Phone);
  91. out.writeUTF(startTime);
  92. out.writeUTF(endTime);
  93. out.writeLong(timeLen);
  94. out.writeUTF(userA_Address);
  95. out.writeUTF(userB_Address);
  96. }
  97. @Override
  98. public void readFields(DataInput in) throws IOException {
  99. userA = in.readUTF();
  100. userB = in.readUTF();
  101. userA_Phone = in.readUTF();
  102. userB_Phone = in.readUTF();
  103. startTime = in.readUTF();
  104. endTime = in.readUTF();
  105. timeLen = in.readLong();
  106. userA_Address = in.readUTF();
  107. userB_Address = in.readUTF();
  108. }
  109. @Override
  110. public String toString() {
  111. return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","
  112. + timeLen + "," + userA_Address + "," + userB_Address;
  113. }
  114. @Override
  115. public int compareTo(PhoneLog pl) {
  116. if( this.hashCode() == pl.hashCode()) {
  117. return 0;
  118. }
  119. return - 1;
  120. }
  121. }

最后重启hadoop#start-all.sh  完成评测


转载:https://blog.csdn.net/qq_61604164/article/details/127868559
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场