public class MarketShareDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MarketShareDriver.class); job.setMapperClass(MarketShareMapper.class); job.setReducerClass(MarketShareReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inPath = new Path("data/phone.txt"); FileInputFormat.setInputPaths(job,inPath); Path outPath = new Path("data/PhoneMarketShare"); FileSystem fs = outPath.getFileSystem(conf); if (fs.exists(outPath)){ fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } } 这段代码怎么修改才能把数据也存到MySQL中
时间: 2024-03-25 10:39:17 浏览: 67
hadoop面试题分解.pdf
要将数据存储到MySQL中,需要在Mapper或Reducer中添加连接MySQL的代码,并将结果写入MySQL中。以下是一个简单的示例:
1. 添加MySQL连接信息
```java
// 定义MySQL连接信息
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/test";
String username = "root";
String password = "password";
// 注册JDBC驱动程序
Class.forName(driver);
// 获取数据库连接
Connection conn = DriverManager.getConnection(url, username, password);
// 创建Statement对象
Statement stmt = conn.createStatement();
```
2. 在Mapper或Reducer中将结果写入MySQL中
```java
// 将结果写入MySQL中
while (values.hasNext()) {
// 获取一条记录
Text value = values.next();
String[] fields = value.toString().split(",");
// 插入数据
String sql = "INSERT INTO PhoneMarketShare (brand, marketShare) VALUES ('" + fields[0] + "', '" + fields[1] + "')";
stmt.executeUpdate(sql);
}
```
完整代码如下所示:
```java
public class MarketShareDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MarketShareDriver.class);
job.setMapperClass(MarketShareMapper.class);
job.setReducerClass(MarketShareReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path inPath = new Path("data/phone.txt");
FileInputFormat.setInputPaths(job,inPath);
Path outPath = new Path("data/PhoneMarketShare");
FileSystem fs = outPath.getFileSystem(conf);
if (fs.exists(outPath)){
fs.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job,outPath);
boolean b = job.waitForCompletion(true);
// 将结果写入MySQL中
if (b) {
// 定义MySQL连接信息
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/test";
String username = "root";
String password = "password";
// 注册JDBC驱动程序
Class.forName(driver);
// 获取数据库连接
Connection conn = DriverManager.getConnection(url, username, password);
// 创建Statement对象
Statement stmt = conn.createStatement();
// 将结果写入MySQL中
Iterator<OutputFormat> it = job.getOutputFormatClass().newInstance().getRecordWriter(null, job, null, null).asInstanceOf[FileOutputFormat<Text, Text>].getRecordWriter(job).asInstanceOf[SequenceFileRecordWriter[Text, Text]].getValues().iterator()
while (it.hasNext()) {
// 获取一条记录
Text value = it.next();
String[] fields = value.toString().split(",");
// 插入数据
String sql = "INSERT INTO PhoneMarketShare (brand, marketShare) VALUES ('" + fields[0] + "', '" + fields[1] + "')";
stmt.executeUpdate(sql);
}
}
System.exit(b ? 0 : 1);
}
}
```
注意:以上代码仅作为示例,实际使用时需要根据实际情况进行修改和调整。
阅读全文