任务描述 本关任务:向HDFS中上传任意文本文件。 相关知识 判断HDFS中文件是否存在 FileSystem fs = FileSystem.get(conf);//获取对象 fs.exists(new Path(path); //判断该路径的文件是否存在,是则返回true 文件拷贝 关键代码如下: /* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */ fs.copyFromLocalFile(false, true, localPath, remotePath); 向HDFS文件追加数据 向HDFS文件中追加信息,关键代码如下: FileSystem fs = FileSystem.get(conf); Path remotePath = new Path(remoteFilePath); /* 创建一个文件读入流 */ FileInputStream in = new FileInputStream(localFilePath); /* 创建一个文件输出流,输出的内容将追加到文件末尾 */ FSDataOutputStream out = fs.append(remotePath); /* 读写文件内容 */ byte[] data = new byte[1024]; int read = -1; while ( (read = in.read(data)) > 0 ) { out.write(data, 0, read); } 编程要求 请在右侧start...end处填充代码实现相关功能,完成向HDFS中上传文本文件,如果指定的文件在HDFS中已经存在,由用户指定是追加到原有文件末尾还是覆盖原有的文件。
时间: 2024-02-06 11:04:45 浏览: 26
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
public class HDFSFileUploader {
private static final String HDFS_URL = "hdfs://localhost:9000";
private static final String HDFS_USER = "hadoop";
public static void main(String[] args) throws IOException {
// 配置Hadoop环境变量,需要在本地配置Hadoop的环境变量
System.setProperty("hadoop.home.dir", "/usr/local/hadoop");
// 创建Configuration对象
Configuration conf = new Configuration();
// 设置HDFS的URL和用户
conf.set("fs.defaultFS", HDFS_URL);
System.setProperty("HADOOP_USER_NAME", HDFS_USER);
// 获取FileSystem对象
FileSystem fs = FileSystem.get(conf);
// 本地文件路径
String localFilePath = "/path/to/local/file.txt";
// HDFS目标路径
String hdfsFilePath = "/path/to/hdfs/file.txt";
// 判断HDFS中文件是否已经存在
Path hdfsPath = new Path(hdfsFilePath);
boolean isExists = fs.exists(hdfsPath);
if (isExists) {
// 文件已经存在,让用户选择是追加还是覆盖
System.out.println("文件已经存在,您想要追加还是覆盖?(1:追加,2:覆盖)");
// 读取用户输入
int choice = System.in.read();
// 判断用户选择
if (choice == '1') {
// 追加文件内容
appendFile(fs, hdfsPath, localFilePath);
} else {
// 覆盖原有文件
uploadFile(fs, hdfsPath, localFilePath, true);
}
} else {
// 文件不存在,直接上传
uploadFile(fs, hdfsPath, localFilePath, false);
}
// 关闭FileSystem对象
fs.close();
}
/**
* 上传本地文件到HDFS
*
* @param fs FileSystem对象
* @param hdfsPath HDFS目标路径
* @param localFilePath 本地文件路径
* @param isOverwrite 是否覆盖原有文件
* @throws IOException
*/
private static void uploadFile(FileSystem fs, Path hdfsPath, String localFilePath, boolean isOverwrite) throws IOException {
// 创建本地文件对象
File localFile = new File(localFilePath);
// 创建文件输入流
FileInputStream in = new FileInputStream(localFile);
// 创建文件输出流,如果文件已经存在且需要覆盖,则覆盖原有文件
FSDataOutputStream out = fs.create(hdfsPath, isOverwrite);
// 读写文件内容
byte[] buffer = new byte[1024];
int bytesRead = -1;
while ((bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
// 关闭输入输出流
in.close();
out.close();
// 输出上传成功信息
System.out.println("文件上传成功!");
}
/**
* 追加本地文件内容到HDFS文件末尾
*
* @param fs FileSystem对象
* @param hdfsPath HDFS目标路径
* @param localFilePath 本地文件路径
* @throws IOException
*/
private static void appendFile(FileSystem fs, Path hdfsPath, String localFilePath) throws IOException {
// 创建文件输入流
FileInputStream in = new FileInputStream(localFilePath);
// 创建文件输出流,追加到文件末尾
FSDataOutputStream out = fs.append(hdfsPath);
// 读写文件内容
byte[] buffer = new byte[1024];
int bytesRead = -1;
while ((bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
// 关闭输入输出流
in.close();
out.close();
// 输出追加成功信息
System.out.println("文件追加成功!");
}
}
```
说明:
1. `HDFSFileUploader` 类中的 `HDFS_URL` 和 `HDFS_USER` 分别指定了HDFS的URL和用户。
2. `main` 方法中首先创建了一个 `Configuration` 对象,并设置了HDFS的URL和用户,然后获取了一个 `FileSystem` 对象。
3. 接着根据用户指定的本地文件路径和HDFS目标路径,判断HDFS中是否已经存在该文件。
4. 如果文件已经存在,则让用户选择是追加还是覆盖。如果选择追加,则调用 `appendFile` 方法将本地文件内容追加到HDFS文件末尾;如果选择覆盖,则调用 `uploadFile` 方法将本地文件覆盖原有文件。
5. 如果文件不存在,则直接调用 `uploadFile` 方法将本地文件上传到HDFS中。
6. `uploadFile` 方法中先创建了本地文件对象和文件输入流,然后根据 `isOverwrite` 参数创建了文件输出流,如果文件已经存在且需要覆盖,则覆盖原有文件。最后读写文件内容,关闭输入输出流,输出上传成功信息。
7. `appendFile` 方法中先创建文件输入流,然后创建文件输出流,将文件内容追加到文件末尾。最后关闭输入输出流,输出追加成功信息。