java将String类型的9000.0转成Long类型
时间: 2024-10-23 17:02:55 浏览: 33
在Java中,将字符串转换为long类型通常需要先解析数字,然后再将其赋值给long变量。如果你的字符串"9000.0"代表的是一个整数部分为9000的浮点数,你可以这样做:
```java
String str = "9000.0";
try {
Long num = Long.parseLong(str);
// 如果小数部分被忽略,可以直接转换
System.out.println(num); // 输出:9000
} catch (NumberFormatException e) {
System.out.println("输入的不是一个有效的长整数");
}
```
如果字符串表示的是一个精确的数值(例如不包含小数点),那么转换可以成功。但如果包含小数点并且不是整数,`Long.parseLong()`会抛出`NumberFormatException`异常。
如果你想包括小数部分,你需要先去掉小数点,然后转换:
```java
String str = "9000.0";
double doubleValue = Double.parseDouble(str);
num = (long) Math.floor(doubleValue); // 将小数部分向下取整
System.out.println(num); // 输出:9000
```
相关问题
在客户端的IDEA中创建项目hdfsAPI,创建并编辑源程序hdfsTest.java,完成以下功能:(1)判断HDFS是否存在/Test目录,如果不存在,则创建/Test目录。 (2)向HDFS上传文件(以自己姓名拼音+学号作为文件名,例如zhangsan20250001.txt) (3)下载(2)上传的文件到本地 (4)将(2)上传文件的内容输出到终端 (5)查看(2)上传文件的路径、读写权限、大小、创建时间等信息 (6)向(2)上传的文件追加内容 (7)重命名(2)上传的文件 (8)删除(2)上传的文件
好的,我会逐一回答你的问题。
首先,为了完成上述功能,需要安装Hadoop并启动HDFS服务,同时需要在IDEA中配置Hadoop的环境变量和相关配置。
接下来,我将按照你的要求逐一回答你的问题:
1. 判断HDFS是否存在/Test目录,如果不存在,则创建/Test目录。
可以使用以下代码实现:
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class hdfsTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/Test");
if (fs.exists(path)) {
System.out.println("目录已存在!");
} else {
fs.mkdirs(path);
System.out.println("目录创建成功!");
}
fs.close();
}
}
```
2. 向HDFS上传文件(以自己姓名拼音+学号作为文件名,例如zhangsan20250001.txt)
可以使用以下代码实现:
```java
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class hdfsTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf);
String filename = "zhangsan20250001.txt";
Path path = new Path("/Test/" + filename);
InputStream in = hdfsTest.class.getClassLoader().getResourceAsStream(filename);
fs.create(path).write(read(in));
System.out.println("上传成功!");
fs.close();
}
public static byte[] read(InputStream in) throws IOException {
byte[] buf = new byte[1024];
int len;
ByteArrayOutputStream out = new ByteArrayOutputStream();
while ((len = in.read(buf)) != -1) {
out.write(buf, 0, len);
}
in.close();
out.close();
return out.toByteArray();
}
}
```
3. 下载(2)上传的文件到本地
可以使用以下代码实现:
```java
import java.io.FileOutputStream;
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class hdfsTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf);
String filename = "zhangsan20250001.txt";
Path path = new Path("/Test/" + filename);
InputStream in = fs.open(path);
FileOutputStream out = new FileOutputStream(filename);
byte[] buf = new byte[1024];
int len;
while ((len = in.read(buf)) != -1) {
out.write(buf, 0, len);
}
in.close();
out.close();
fs.close();
System.out.println("下载成功!");
}
}
```
4. 将(2)上传文件的内容输出到终端
可以使用以下代码实现:
```java
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class hdfsTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf);
String filename = "zhangsan20250001.txt";
Path path = new Path("/Test/" + filename);
InputStream in = fs.open(path);
byte[] buf = new byte[1024];
int len;
while ((len = in.read(buf)) != -1) {
System.out.print(new String(buf, 0, len));
}
in.close();
fs.close();
}
}
```
5. 查看(2)上传文件的路径、读写权限、大小、创建时间等信息
可以使用以下代码实现:
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
public class hdfsTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf);
String filename = "zhangsan20250001.txt";
Path path = new Path("/Test/" + filename);
FileStatus status = fs.getFileStatus(path);
System.out.println("路径:" + status.getPath());
System.out.println("读写权限:" + status.getPermission());
System.out.println("大小:" + status.getLen() + " bytes");
System.out.println("创建时间:" + formatTime(status.getModificationTime()));
fs.close();
}
public static String formatTime(long time) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(new Date(time));
}
}
```
6. 向(2)上传的文件追加内容
可以使用以下代码实现:
```java
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class hdfsTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf);
String filename = "zhangsan20250001.txt";
Path path = new Path("/Test/" + filename);
InputStream in = new FileInputStream(filename);
byte[] buf = new byte[1024];
int len;
while ((len = in.read(buf)) != -1) {
fs.append(path).write(buf, 0, len);
}
in.close();
fs.close();
System.out.println("追加成功!");
}
}
```
7. 重命名(2)上传的文件
可以使用以下代码实现:
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
public class hdfsTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf);
String oldName = "zhangsan20250001.txt";
String newName = "lisi20250002.txt";
Path oldPath = new Path("/Test/" + oldName);
Path newPath = new Path("/Test/" + newName);
fs.rename(oldPath, newPath);
fs.close();
System.out.println("重命名成功!");
}
}
```
8. 删除(2)上传的文件
可以使用以下代码实现:
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
public class hdfsTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf);
String filename = "lisi20250002.txt";
Path path = new Path("/Test/" + filename);
fs.delete(path, true);
fs.close();
System.out.println("删除成功!");
}
}
```
以上就是关于使用Java操作HDFS的全部代码,希望可以帮助到你。
请使用java编写flink消费kafka写入hive集群的demo
以下是一个简单的Java实现,使用Flink消费Kafka并将数据写入Hive集群。请根据实际情况进行修改并适当添加错误处理。
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
public class FlinkKafkaHiveDemo {
private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
private static final String KAFKA_TOPIC = "test";
private static final String HIVE_METASTORE_URI = "thrift://localhost:9083";
private static final String HIVE_DATABASE = "default";
private static final String HIVE_TABLE = "test";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps);
kafkaConsumer.setStartFromEarliest();
DataStream<String> input = env.addSource(kafkaConsumer);
DataStream<String> transformed = input.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
// 进行数据转换
return value;
}
});
Properties hiveProps = new Properties();
hiveProps.setProperty("hive.metastore.uris", HIVE_METASTORE_URI);
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(hiveProps);
Hive hive = Hive.get(hiveConf);
try {
Table table = new Table();
table.setDbName(HIVE_DATABASE);
table.setTableName(HIVE_TABLE);
table.setTableType("EXTERNAL_TABLE");
List<FieldSchema> columns = List.of(new FieldSchema("col1", TypeInfoFactory.stringTypeInfo.getTypeName(), ""));
table.setFields(columns);
table.getParameters().put("EXTERNAL", "TRUE");
table.getParameters().put("LOCATION", "/user/hive/warehouse/" + HIVE_DATABASE + ".db/" + HIVE_TABLE);
hive.createTable(table);
} catch (HiveException e) {
e.printStackTrace();
}
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.defaultFS", "hdfs://localhost:9000");
Path outputPath = new Path("/user/hive/warehouse/" + HIVE_DATABASE + ".db/" + HIVE_TABLE);
FileSystem fs = FileSystem.get(hadoopConf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(KAFKA_TOPIC, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), kafkaProducerProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
transformed.writeAsText("/tmp/flink-hive-output", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE).setParallelism(1);
transformed.writeUsingOutputFormat(new HiveOutputFormat(hiveConf, HIVE_DATABASE, HIVE_TABLE)).setParallelism(1);
env.execute("Flink Kafka Hive Demo");
}
private static class HiveOutputFormat extends org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat<String> {
private final HiveConf hiveConf;
private final String database;
private final String table;
public HiveOutputFormat(HiveConf hiveConf, String database, String table) {
super();
this.hiveConf = hiveConf;
this.database = database;
this.table = table;
}
@Override
public org.apache.hadoop.mapred.RecordWriter getRecordWriter(FileSystem ignored, org.apache.hadoop.mapred.JobConf jobConf, String name, org.apache.hadoop.util.Progressable progressable) throws IOException {
try {
return new HiveRecordWriter(hiveConf, database, table);
} catch (HiveException e) {
throw new IOException(e);
}
}
}
private static class HiveRecordWriter implements org.apache.hadoop.mapred.RecordWriter<LongWritable, Text> {
private final HiveConf hiveConf;
private final String database;
private final String table;
private final org.apache.hadoop.hive.ql.metadata.Table hiveTable;
private final TextInputFormat inputFormat;
private final NullOutputFormat<Text, Text> outputFormat;
public HiveRecordWriter(HiveConf hiveConf, String database, String table) throws HiveException {
this.hiveConf = hiveConf;
this.database = database;
this.table = table;
this.hiveTable = Hive.get(hiveConf).getTable(database, table);
this.inputFormat = new TextInputFormat();
this.outputFormat = new NullOutputFormat<>();
}
@Override
public void write(LongWritable key, Text value) throws IOException {
try {
inputFormat.addInputPath(new org.apache.hadoop.mapred.FileSplit(new Path(value.toString()), 0, Long.MAX_VALUE, new String[0]));
org.apache.hadoop.mapred.RecordReader<LongWritable, Text> reader = inputFormat.getRecordReader(new org.apache.hadoop.mapred.FileSplit(new Path(value.toString()), 0, Long.MAX_VALUE, new String[0]), new org.apache.hadoop.mapred.JobConf(hiveConf), null);
org.apache.hadoop.mapred.OutputCollector<Text, Text> collector = outputFormat.getRecordWriter(new org.apache.hadoop.mapred.JobConf(hiveConf), null, null, null);
LongWritable keyWritable = reader.createKey();
Text valueWritable = reader.createValue();
while (reader.next(keyWritable, valueWritable)) {
collector.collect(null, valueWritable);
}
reader.close();
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException {
}
}
}
```
阅读全文