编写Flink程序,计算时间窗内温度超过30度的传感器数据,输入源来自Socket文本流,时间语义使用Processtime,结果保存到数据库中。传感器数据包括传感器id,时间戳和温度值。
时间: 2023-11-28 21:48:21 浏览: 91
首先,我们需要定义一个Sensor类来表示传感器数据:
```java
public class Sensor {
private String id;
private long timestamp;
private double temperature;
public Sensor(String id, long timestamp, double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "Sensor{" +
"id='" + id + '\'' +
", timestamp=" + new Date(timestamp) +
", temperature=" + temperature +
'}';
}
}
```
然后,我们可以编写Flink程序:
```java
public class TemperatureJob {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 定义输入流
DataStream<String> input = env.socketTextStream("localhost", 9999);
// 解析输入流并转换为Sensor对象
DataStream<Sensor> sensors = input.map(line -> {
String[] fields = line.split(",");
String id = fields[0];
long timestamp = Long.parseLong(fields[1]);
double temperature = Double.parseDouble(fields[2]);
return new Sensor(id, timestamp, temperature);
});
// 计算时间窗口内温度超过30度的传感器数据
DataStream<String> result = sensors
.filter(sensor -> sensor.getTemperature() > 30)
.map(sensor -> sensor.toString())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce((s1, s2) -> s1 + "\n" + s2);
// 输出结果到控制台并保存到数据库
result.print();
result.addSink(new JdbcSink<>( // 需要引入 flink-connector-jdbc 依赖
"INSERT INTO temperature (sensor_id, timestamp, temperature) VALUES (?, ?, ?)",
new JdbcStatementBuilder<Sensor>() {
@Override
public void accept(PreparedStatement preparedStatement, Sensor sensor) throws SQLException {
preparedStatement.setString(1, sensor.getId());
preparedStatement.setTimestamp(2, new Timestamp(sensor.getTimestamp()));
preparedStatement.setDouble(3, sensor.getTemperature());
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("password")
.build()));
// 启动执行环境
env.execute("Temperature Job");
}
}
```
上述程序中,我们首先创建了一个执行环境,并设置时间语义为ProcessingTime。然后,我们创建了一个输入流,并将其解析为Sensor对象。接下来,我们使用filter算子过滤出温度超过30度的传感器数据,并使用windowAll算子将其划分到10秒的时间窗口内。最后,我们使用reduce算子将每个时间窗口内的传感器数据拼接为一个字符串,并将其输出到控制台并保存到数据库。需要注意的是,保存到数据库需要引入flink-connector-jdbc依赖。
最后,我们可以使用nc命令启动一个Socket服务器,向该程序发送数据:
```
$ nc -lk 9999
sensor1,1631643170000,25.0
sensor1,1631643180000,28.0
sensor1,1631643190000,32.0
sensor1,1631643200000,35.0
sensor2,1631643170000,26.0
sensor2,1631643180000,29.0
sensor2,1631643190000,33.0
sensor2,1631643200000,36.0
```
程序输出结果如下:
```
Sensor{id='sensor1', timestamp=Fri Sep 17 19:46:30 CST 2021, temperature=32.0}
Sensor{id='sensor1', timestamp=Fri Sep 17 19:46:40 CST 2021, temperature=35.0}
Sensor{id='sensor2', timestamp=Fri Sep 17 19:46:30 CST 2021, temperature=33.0}
Sensor{id='sensor2', timestamp=Fri Sep 17 19:46:40 CST 2021, temperature=36.0}
```
阅读全文