根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。
数据说明如下: a.txt
数据切分方式:,
数据所在位置:/user/test/input/a.txt
15733218050,15778423030,1542457633,1542457678,450000,530000
15733218050 | 15778423030 | 1542457633 | 1542457678 | 450000 | 530000 |
---|---|---|---|---|---|
呼叫者手机号 | 接受者手机号 | 开始时间戳(s) | 接受时间戳(s) | 呼叫者地址省份编码 | 接受者地址省份编码 |
Mysql
数据库:
用户名:root
密码:123123
数据库名:mydb
用户表:userphone
列名 | 类型 | 非空 | 是否自增 | 介绍 |
---|---|---|---|---|
id | int(11) | √ | √ | 用户ID |
phone | varchar(255) | 手机号 | ||
trueName | varchar(255) | 真实姓名 |
地址省份表:allregion
列名 | 类型 | 非空 | 是否自增 | 介绍 |
---|---|---|---|---|
id | int(11) | √ | √ | 用户ID |
CodeNum | varchar(255) | 编号 | ||
Address | varchar(255) | 地址 |
清洗规则:
-
处理数据中的时间戳(秒级)将其转化为
2017-06-21 07:01:58
,年-月-日 时:分:秒 这种格式; -
处理数据中的省份编码,结合
mysql
的表数据对应,将其转换成省份名称; -
处理用户手机号,与
mysql
的表数据对应,关联用户的真实姓名; -
处理数据中的开始时间与结束时间并计算通信时长(以秒为单位);
-
设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为:
/user/test/input/a.txt (HDFS)
; 清洗后的数据存放于:/user/test/output (HDFS)
。
数据清洗后如下:
邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市
邓二 | 张倩 | 13666666666 | 15151889601 | 2018-03-29 10:58:12 | 2018-03-29 10:58:42 | 30 | 黑龙江省 | 上海市 |
---|---|---|---|---|---|---|---|---|
用户名A | 用户名B | 用户A的手机号 | 用户B的手机号 | 开始时间 | 结束时间 |
step/com/LogMR.java
-
package com;
-
import java.io.IOException;
-
import java.sql.Connection;
-
import java.sql.ResultSet;
-
import java.sql.SQLException;
-
import java.sql.Statement;
-
import java.text.SimpleDateFormat;
-
import java.util.ArrayList;
-
import java.util.HashMap;
-
import java.util.Iterator;
-
import java.util.List;
-
import java.util.Map;
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
public
class
LogMR {
-
/********** begin **********/
-
static
class
MyMapper
extends
Mapper<LongWritable, Text, PhoneLog, NullWritable> {
-
Map<String, String> userMap =
new
HashMap<>();
-
Map<String, String> addressMap =
new
HashMap<>();
-
SimpleDateFormat
sdf
=
new
SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
-
PhoneLog
pl
=
new
PhoneLog();
-
Text
text
=
new
Text();
-
@Override
-
protected
void
setup
(Context context)
throws IOException, InterruptedException {
-
Connection
connection
= DBHelper.getConnection();
-
try {
-
Statement
statement
= connection.createStatement();
-
String
sql
=
"select * from userphone";
-
ResultSet
resultSet
= statement.executeQuery(sql);
-
while (resultSet.next()) {
-
String
phone
= resultSet.getString(
2);
-
String
trueName
= resultSet.getString(
3);
-
userMap.put(phone, trueName);
-
}
-
String
sql2
=
"select * from allregion";
-
ResultSet
resultSetA
= statement.executeQuery(sql2);
-
while (resultSetA.next()) {
-
String
phone
= resultSetA.getString(
2);
-
String
trueName
= resultSetA.getString(
3);
-
addressMap.put(phone, trueName);
-
}
-
}
catch (SQLException e) {
-
e.printStackTrace();
-
}
-
}
-
@Override
-
protected
void
map
(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
-
String
str
= value.toString();
-
String[] split = str.split(
",");
-
if (split.length ==
6) {
-
String
trueName1
= userMap.get(split[
0]);
-
String
trueName2
= userMap.get(split[
1]);
-
String
address1
= addressMap.get(split[
4]);
-
String
address2
= addressMap.get(split[
5]);
-
long
startTimestamp
= Long.parseLong(split[
2]);
-
String
startTime
= sdf.format(startTimestamp *
1000);
-
long
endTimestamp
= Long.parseLong(split[
3]);
-
String
endTime
= sdf.format(endTimestamp *
1000);
-
long
timeLen
= endTimestamp - startTimestamp;
-
pl.SetPhoneLog(trueName1, trueName2, split[
0], split[
1], startTime, endTime, timeLen, address1,
-
address2);
-
context.write(pl, NullWritable.get());
-
}
-
}
-
}
-
public
static
void
main
(String[] args)
throws Exception {
-
Configuration
conf
=
new
Configuration();
-
Job
job
= Job.getInstance(conf);
-
job.setJarByClass(LogMR.class);
-
job.setMapperClass(MyMapper.class);
-
job.setMapOutputKeyClass(PhoneLog.class);
-
job.setMapOutputValueClass(NullWritable.class);
-
job.setNumReduceTasks(
0);
-
Path
inPath
=
new
Path(
"/user/test/input/a.txt");
-
Path
out
=
new
Path(
"/user/test/output");
-
FileInputFormat.setInputPaths(job, inPath);
-
FileOutputFormat.setOutputPath(job, out);
-
job.waitForCompletion(
true);
-
}
-
/********** end **********/
-
}
step/com/DBHelper.java
-
package com;
-
import java.sql.Connection;
-
import java.sql.DriverManager;
-
import java.sql.SQLException;
-
public
class
DBHelper {
-
/********** begin **********/
-
private
static
final
String
driver
=
"com.mysql.jdbc.Driver";
-
private
static
final
String
url
=
"jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
-
private
static
final
String
username
=
"root";
// 数据库的用户名
-
private
static
final
String
password
=
"123123";
// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
-
private
static
Connection
conn
=
null;
// 声明数据库连接对象
-
static {
-
try {
-
Class.forName(driver);
-
}
catch (Exception ex) {
-
ex.printStackTrace();
-
}
-
}
-
public
static Connection
getConnection
() {
-
if (conn ==
null) {
-
try {
-
conn = DriverManager.getConnection(url, username, password);
-
}
catch (SQLException e) {
-
e.printStackTrace();
-
}
// 连接数据库
-
return conn;
-
}
-
return conn;
-
}
-
/********** end **********/
-
}
step/com/phonelog.java
-
package com;
-
import java.io.DataInput;
-
import java.io.DataOutput;
-
import java.io.IOException;
-
import org.apache.hadoop.io.Writable;
-
import org.apache.hadoop.io.WritableComparable;
-
public
class
PhoneLog
implements
WritableComparable<
PhoneLog> {
-
private String userA;
-
private String userB;
-
private String userA_Phone;
-
private String userB_Phone;
-
private String startTime;
-
private String endTime;
-
private
Long timeLen;
-
private String userA_Address;
-
private String userB_Address;
-
public PhoneLog() {
-
}
-
public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,
-
String endTime,
Long timeLen, String userA_Address, String userB_Address) {
-
this.userA = userA;
-
this.userB = userB;
-
this.userA_Phone = userA_Phone;
-
this.userB_Phone = userB_Phone;
-
this.startTime = startTime;
-
this.endTime = endTime;
-
this.timeLen = timeLen;
-
this.userA_Address = userA_Address;
-
this.userB_Address = userB_Address;
-
}
-
public String getUserA_Phone() {
-
return userA_Phone;
-
}
-
public void setUserA_Phone(String userA_Phone) {
-
this.userA_Phone = userA_Phone;
-
}
-
public String getUserB_Phone() {
-
return userB_Phone;
-
}
-
public void setUserB_Phone(String userB_Phone) {
-
this.userB_Phone = userB_Phone;
-
}
-
public String getUserA() {
-
return userA;
-
}
-
public void setUserA(String userA) {
-
this.userA = userA;
-
}
-
public String getUserB() {
-
return userB;
-
}
-
public void setUserB(String userB) {
-
this.userB = userB;
-
}
-
public String getStartTime() {
-
return startTime;
-
}
-
public void setStartTime(String startTime) {
-
this.startTime = startTime;
-
}
-
public String getEndTime() {
-
return endTime;
-
}
-
public void setEndTime(String endTime) {
-
this.endTime = endTime;
-
}
-
public
Long getTimeLen() {
-
return timeLen;
-
}
-
public void setTimeLen(
Long timeLen) {
-
this.timeLen = timeLen;
-
}
-
public String getUserA_Address() {
-
return userA_Address;
-
}
-
public void setUserA_Address(String userA_Address) {
-
this.userA_Address = userA_Address;
-
}
-
public String getUserB_Address() {
-
return userB_Address;
-
}
-
public void setUserB_Address(String userB_Address) {
-
this.userB_Address = userB_Address;
-
}
-
@Override
-
public void write(DataOutput
out) throws IOException {
-
out.writeUTF(userA);
-
out.writeUTF(userB);
-
out.writeUTF(userA_Phone);
-
out.writeUTF(userB_Phone);
-
out.writeUTF(startTime);
-
out.writeUTF(endTime);
-
out.writeLong(timeLen);
-
out.writeUTF(userA_Address);
-
out.writeUTF(userB_Address);
-
}
-
@Override
-
public void readFields(DataInput
in) throws IOException {
-
userA =
in.readUTF();
-
userB =
in.readUTF();
-
userA_Phone =
in.readUTF();
-
userB_Phone =
in.readUTF();
-
startTime =
in.readUTF();
-
endTime =
in.readUTF();
-
timeLen =
in.readLong();
-
userA_Address =
in.readUTF();
-
userB_Address =
in.readUTF();
-
}
-
@Override
-
public String toString() {
-
return userA +
"," + userB +
"," + userA_Phone +
"," + userB_Phone +
"," + startTime +
"," + endTime +
","
-
+ timeLen +
"," + userA_Address +
"," + userB_Address;
-
}
-
@Override
-
public int compareTo(PhoneLog pl) {
-
if(
this.hashCode() == pl.hashCode()) {
-
return
0;
-
}
-
return -
1;
-
}
-
}
最后重启hadoop#start-all.sh 完成评测
转载:https://blog.csdn.net/qq_61604164/article/details/127868559
查看评论