Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
时间: 2024-04-06 21:29:04 浏览: 102
这是一个使用Hadoop MapReduce框架实现的Word Count程序的驱动程序代码。其中`job.setMapperClass(WordCountMapper.class)`指定了Map阶段使用的Mapper类,`job.setReducerClass(WordCountReducer.class)`指定了Reduce阶段使用的Reducer类,`FileInputFormat.addInputPath(job, new Path(args[0]))`指定输入路径,`FileOutputFormat.setOutputPath(job, new Path(args[1]))`指定输出路径。程序最后调用`job.waitForCompletion(true)`提交作业并等待作业完成,`System.exit(job.waitForCompletion(true) ? 0 : 1)`表示当作业完成时退出程序并返回0,否则返回1。
相关问题
public class TCAPP { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1.获取job Configuration conf=new Configuration(); Job job=Job.getInstance(conf); // 2.指定job使用的类
job.setJarByClass(TCAPP.class);
// 3.指定map和reduce使用的类
job.setMapperClass(TCMapper.class);
job.setReducerClass(TCReducer.class);
// 4.指定map输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.指定reduce输出的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.指定输入和输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 7.提交job
boolean result=job.waitForCompletion(true);
System.exit(result?0:1);
}
这是一个MapReduce程序的主函数,可以实现对文本数据进行词频统计。其中,指定了Map和Reduce类,指定了输入和输出路径,最后提交job并退出程序。
public class MarketShareDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MarketShareDriver.class); job.setMapperClass(MarketShareMapper.class); job.setReducerClass(MarketShareReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inPath = new Path("data/phone.txt"); FileInputFormat.setInputPaths(job,inPath); Path outPath = new Path("data/PhoneMarketShare"); FileSystem fs = outPath.getFileSystem(conf); if (fs.exists(outPath)){ fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } } 这段代码怎么修改才能把数据也存到MySQL中
要将数据存储到MySQL中,需要在Mapper或Reducer中添加连接MySQL的代码,并将结果写入MySQL中。以下是一个简单的示例:
1. 添加MySQL连接信息
```java
// 定义MySQL连接信息
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/test";
String username = "root";
String password = "password";
// 注册JDBC驱动程序
Class.forName(driver);
// 获取数据库连接
Connection conn = DriverManager.getConnection(url, username, password);
// 创建Statement对象
Statement stmt = conn.createStatement();
```
2. 在Mapper或Reducer中将结果写入MySQL中
```java
// 将结果写入MySQL中
while (values.hasNext()) {
// 获取一条记录
Text value = values.next();
String[] fields = value.toString().split(",");
// 插入数据
String sql = "INSERT INTO PhoneMarketShare (brand, marketShare) VALUES ('" + fields[0] + "', '" + fields[1] + "')";
stmt.executeUpdate(sql);
}
```
完整代码如下所示:
```java
public class MarketShareDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MarketShareDriver.class);
job.setMapperClass(MarketShareMapper.class);
job.setReducerClass(MarketShareReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path inPath = new Path("data/phone.txt");
FileInputFormat.setInputPaths(job,inPath);
Path outPath = new Path("data/PhoneMarketShare");
FileSystem fs = outPath.getFileSystem(conf);
if (fs.exists(outPath)){
fs.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job,outPath);
boolean b = job.waitForCompletion(true);
// 将结果写入MySQL中
if (b) {
// 定义MySQL连接信息
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/test";
String username = "root";
String password = "password";
// 注册JDBC驱动程序
Class.forName(driver);
// 获取数据库连接
Connection conn = DriverManager.getConnection(url, username, password);
// 创建Statement对象
Statement stmt = conn.createStatement();
// 将结果写入MySQL中
Iterator<OutputFormat> it = job.getOutputFormatClass().newInstance().getRecordWriter(null, job, null, null).asInstanceOf[FileOutputFormat<Text, Text>].getRecordWriter(job).asInstanceOf[SequenceFileRecordWriter[Text, Text]].getValues().iterator()
while (it.hasNext()) {
// 获取一条记录
Text value = it.next();
String[] fields = value.toString().split(",");
// 插入数据
String sql = "INSERT INTO PhoneMarketShare (brand, marketShare) VALUES ('" + fields[0] + "', '" + fields[1] + "')";
stmt.executeUpdate(sql);
}
}
System.exit(b ? 0 : 1);
}
}
```
注意:以上代码仅作为示例,实际使用时需要根据实际情况进行修改和调整。
阅读全文