public class MarketShareDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MarketShareDriver.class); job.setMapperClass(MarketShareMapper.class); job.setReducerClass(MarketShareReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inPath = new Path("data/phone.txt"); FileInputFormat.setInputPaths(job,inPath); Path outPath = new Path("data/PhoneMarketShare"); FileSystem fs = outPath.getFileSystem(conf); if (fs.exists(outPath)){ fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); boolean b = job.waitForCompletion(true); // 设置数据库连接信息 String dbUrl = "jdbc:mysql://localhost:3306/phone"; String dbUsername = "root"; String dbPassword = "root"; DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", dbUrl, dbUsername, dbPassword); try (Connection connection = DriverManager.getConnection(dbUrl, dbUsername, dbPassword)) { String createTableSql = "CREATE TABLE IF NOT EXISTS operator_market_share(operator VARCHAR(255), market_share DOUBLE)"; PreparedStatement preparedStatement = connection.prepareStatement(createTableSql); preparedStatement.executeUpdate(); } System.exit(b ? 0 : 1); } } 在这段代码的基础上怎么把结果输入到mysql
时间: 2024-03-25 17:39:33 浏览: 65
要将MapReduce作业的结果输出到MySQL数据库中,需要在代码中进行一些更改和添加。以下是需要进行的更改和添加的步骤:
1. 添加MySQL的JDBC依赖项,可以从Maven中央仓库下载,并将其添加到项目的依赖项中。
2. 在主类中添加对应的import语句:
```java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
```
3. 在main函数中,在创建作业之后,添加以下代码:
```java
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", dbUrl, dbUsername, dbPassword);
```
这行代码将配置数据库连接信息。
4. 在Reducer类中,在reduce函数中添加以下代码:
```java
String insertTableSql = "INSERT INTO operator_market_share (operator, market_share) VALUES (?, ?)";
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = DriverManager.getConnection(dbUrl, dbUsername, dbPassword);
preparedStatement = connection.prepareStatement(insertTableSql);
while (values.iterator().hasNext()) {
String value = values.iterator().next().toString();
String[] parts = value.split("\\s+");
preparedStatement.setString(1, parts[0]);
preparedStatement.setDouble(2, Double.parseDouble(parts[1]));
preparedStatement.executeUpdate();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
```
这段代码将结果插入到MySQL数据库中。需要注意的是,这里使用了Java的JDBC API,需要将数据库连接信息传递给DriverManager.getConnection()方法,以获取数据库连接。
5. 在运行作业之前,需要确保MySQL数据库已经启动,并且已经创建了名为"phone"的数据库。如果数据库中已经存在名为"operator_market_share"的表,则将其删除,以便可以创建新的表。可以在main函数中添加以下代码:
```java
try (Connection connection = DriverManager.getConnection(dbUrl, dbUsername, dbPassword)) {
String dropTableSql = "DROP TABLE IF EXISTS operator_market_share";
PreparedStatement dropTableStatement = connection.prepareStatement(dropTableSql);
dropTableStatement.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
}
```
这段代码将删除表,以确保可以创建新的表。
完整的代码如下所示:
阅读全文
相关推荐


















