hadoop集群实现圆周率串行和并行计算的完整java代码
时间: 2024-03-23 15:41:42 浏览: 191
下面是基于Hadoop分布式计算框架实现串行和并行计算圆周率的完整Java代码。
1. 串行版本
```java
import java.util.Random;
public class PiSerial {
public static void main(String[] args) {
int n = Integer.parseInt(args[0]);
Random random = new Random();
int count = 0;
for (int i = 0; i < n; ++i) {
double x = random.nextDouble();
double y = random.nextDouble();
if (x * x + y * y <= 1) {
count += 1;
}
}
double pi = 4.0 * count / n;
System.out.println("Pi: " + pi);
}
}
```
2. 并行版本
```java
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class PiParallel {
public static class PiMapper extends Mapper<Object, Text, NullWritable, DoubleWritable> {
private final static DoubleWritable one = new DoubleWritable(1);
private final static NullWritable nullWritable = NullWritable.get();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
int n = Integer.parseInt(value.toString());
Random random = new Random();
int count = 0;
for (int i = 0; i < n; ++i) {
double x = random.nextDouble();
double y = random.nextDouble();
if (x * x + y * y <= 1) {
count += 1;
}
}
double pi = 4.0 * count / n;
context.write(nullWritable, new DoubleWritable(pi));
}
}
public static class PiReducer extends Reducer<NullWritable, DoubleWritable, NullWritable, DoubleWritable> {
private final static NullWritable nullWritable = NullWritable.get();
public void reduce(NullWritable key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum = 0;
int count = 0;
for (DoubleWritable val : values) {
sum += val.get();
count += 1;
}
double pi = sum / count;
context.write(nullWritable, new DoubleWritable(pi));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "pi");
job.setJarByClass(PiParallel.class);
job.setMapperClass(PiMapper.class);
job.setReducerClass(PiReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(DoubleWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
以上代码片段可能需要根据实际情况进行修改和调整。需要注意的是,在并行版本中,输入文件的每行表示总点数,而不是单个点的坐标。