Hadoop集群实现串并行求圆周率的代码
时间: 2024-03-14 20:46:00 浏览: 94
并行串行超线程计算圆周率.py
以下是 Hadoop 集群实现串并行求圆周率的代码:
1. 串行求圆周率
```java
import java.util.Random;
public class SerialPi {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
long n = 1000000; // 模拟次数
long m = 0; // 圆内点数
Random rand = new Random();
for (long i = 0; i < n; i++) {
double x = rand.nextDouble();
double y = rand.nextDouble();
if (x * x + y * y <= 1) { // 判断点是否在圆内
m++;
}
}
double pi = 4.0 * m / n; // 计算圆周率
System.out.println("Pi = " + pi);
long endTime = System.currentTimeMillis();
System.out.println("Time: " + (endTime - startTime) + "ms");
}
}
```
2. 并行求圆周率
首先,需要在 Hadoop 集群上安装好 Hadoop,并且准备好一个输入文件,每行包含一个模拟次数。
接着,创建一个 MapReduce 作业来并行计算圆周率。
Mapper:
```java
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
public class ParallelPiMapper extends Mapper<LongWritable, NullWritable, NullWritable, NullWritable> {
private long n; // 模拟次数
private long m; // 圆内点数
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
n = context.getConfiguration().getLong("n", 1000000);
m = 0;
}
public void map(LongWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
Random rand = new Random();
for (long i = 0; i < n; i++) {
double x = rand.nextDouble();
double y = rand.nextDouble();
if (x * x + y * y <= 1) { // 判断点是否在圆内
m++;
}
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
context.getCounter("Pi", "m").increment(m);
}
}
```
Reducer:
```java
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class ParallelPiReducer extends Reducer<NullWritable, NullWritable, NullWritable, NullWritable> {
private long m; // 圆内点数
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
m = 0;
}
public void reduce(NullWritable key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
for (NullWritable value : values) {
m += context.getCounter("Pi", "m").getValue();
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
double pi = 4.0 * m / (context.getConfiguration().getLong("n", 1000000) * context.getNumReduceTasks()); // 计算圆周率
System.out.println("Pi = " + pi);
}
}
```
Driver:
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ParallelPiDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new ParallelPiDriver(), args);
System.exit(exitCode);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: ParallelPiDriver <input> <numReduceTasks>");
return 1;
}
Configuration conf = getConf();
conf.setLong("n", Long.parseLong(args[1]));
Job job = Job.getInstance(conf, "Parallel Pi");
job.setJarByClass(ParallelPiDriver.class);
job.setMapperClass(ParallelPiMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(ParallelPiReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(Integer.parseInt(args[1]));
job.setInputFormatClass(GenerateInputFormat.class);
GenerateInputFormat.setInputPath(job, new Path(args[0]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
```
最后,使用以下命令提交作业:
```
hadoop jar ParallelPi.jar ParallelPiDriver <input> <numReduceTasks>
```
其中,`<input>` 指定输入文件路径,`<numReduceTasks>` 指定 Reduce 任务数。
阅读全文