public class TrafficReduce extends Reducer<Text, Text,NullWritable,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { /********** Begin **********/ //相同订单只保留第一行 /********** End **********/ } }补全代码要求去重清洗:若有相同订单 id(orderid)只保留第一行,其他的清洗掉; 撤销理由(cancelreason)有大量的字符串 null,请将这些字符串用"未知"替代; 将数据集中的订单时间(ordertime)、订单撤销时间(canceltime)转换为 “yyyy-MM-dd HH:mm:ss”格式,同时只保留订单时间(ordertime)和订单撤销时间(canceltime)在 2019 年 03 月 07 日的数据,其他日期或者不匹配的数据清洗掉(字段名数据); 处理数据集中的行政区划代码(address),结合 MySQL 数据库 trafficdb 的表 t_address 中的 address_code 与 address_name 数据对应,将其转换成对应的地区名称,在行政区划代码(address)字段后插入,列名为 districtname,若数据中行政区划代码在数据库没有找到对应的行政区名称,则将行政区名设置为“未知”; 清洗完的数据存放至/root/files下,字符之间分割符为|。
时间: 2023-08-27 09:04:56 浏览: 85
现有student.txt和student-score.txt 将两个文件上传到hdfs上 使用Map/Reduce框架完成下面
public class TrafficReduce extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//相同订单只保留第一行
Text firstValue = null;
for(Text value:values){
if(firstValue==null){
firstValue = new Text(value);
}
}
if(firstValue!=null){
String[] fields = firstValue.toString().split("\\|");
String orderId = fields[0];
String cancelReason = fields[1];
String orderTime = fields[2];
String cancelTime = fields[3];
String addressCode = fields[4];
String districtName = "未知";
//处理撤销理由
if("null".equals(cancelReason)){
cancelReason = "未知";
}
//将时间转换成标准格式并筛选日期
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try{
Date orderDate = sdf.parse(orderTime);
if(orderDate.before(sdf.parse("2019-03-07 00:00:00"))||orderDate.after(sdf.parse("2019-03-07 23:59:59"))){
return;
}
orderTime = sdf.format(orderDate);
}catch(ParseException e){
return;
}
if(!"null".equals(cancelTime)){
try{
Date cancelDate = sdf.parse(cancelTime);
if(cancelDate.before(sdf.parse("2019-03-07 00:00:00"))||cancelDate.after(sdf.parse("2019-03-07 23:59:59"))){
cancelTime = "未知";
}else{
cancelTime = sdf.format(cancelDate);
}
}catch(ParseException e){
cancelTime = "未知";
}
}else{
cancelTime = "未知";
}
//处理行政区划代码
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try{
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/trafficdb","root","123456");
pstmt = conn.prepareStatement("select address_name from t_address where address_code=?");
pstmt.setString(1, addressCode);
rs = pstmt.executeQuery();
if(rs.next()){
districtName = rs.getString("address_name");
}
}catch(ClassNotFoundException|SQLException e){
e.printStackTrace();
}finally{
if(rs!=null){
try{
rs.close();
}catch(SQLException e){
e.printStackTrace();
}
}
if(pstmt!=null){
try{
pstmt.close();
}catch(SQLException e){
e.printStackTrace();
}
}
if(conn!=null){
try{
conn.close();
}catch(SQLException e){
e.printStackTrace();
}
}
}
String newRecord = orderId+"|"+cancelReason+"|"+orderTime+"|"+cancelTime+"|"+addressCode+"|"+districtName;
context.write(NullWritable.get(), new Text(newRecord));
}
}
}
阅读全文