请帮我java语言编写一个flink的应用程序,主要功能是通过一个支持并发的自定义的source,周期性查询MySQL数据库的一张order表的最近一段时间内插入或更新的数据,之后计算每30分钟内的top10店铺以及销售总额,最后保存到一个名sale-realtime-report的topic中。具体需求如下: 1.MySQL的order表包含以下字段:orderId,shopId,categoryId,productId,prices,units,counts,lastUpdateTime,其中lastUpdateTime,会自动在表中新增或者更新数据时自动更新,因此可以作为自定义source的周期性读取一段时间内的数据的比较字段; 2.flink的自定义source需要支持并发读取的能力,且可以将最近一次成功读取order表的时间保存到自定义state中,一边flink应用失败重启时,可以从state中恢复,最近一段时间的值可以自定义。并发读取时,可以自动根据应用设置的最近一段时间的值除以并发,平均划分将lastUpdateTime切分为一小段时间间隔; 3.每个店铺的销售总额totalFee的计算方式为:totalFee = prices * counts 。
时间: 2023-06-17 10:08:34 浏览: 139
好的,以下是一个Java语言编写的Flink应用程序示例,可供您参考:
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class SaleRealtimeReport {
public static void main(String[] args) throws Exception {
// 从命令行参数中获取配置文件路径
String configFile = ParameterTool.fromArgs(args).get("configFile");
// 加载配置文件
ParameterTool params = ParameterTool.fromPropertiesFile(configFile);
// 设置Flink配置
Configuration conf = new Configuration();
conf.setInteger("parallelism", params.getInt("parallelism"));
// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// 设置Kafka生产者配置
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", params.get("bootstrapServers"));
kafkaProps.setProperty("transaction.timeout.ms", params.get("transactionTimeout"));
kafkaProps.setProperty("max.in.flight.requests.per.connection", "1");
// 从MySQL数据库中读取数据的自定义source
SaleSource saleSource = new SaleSource(params);
// 计算每30分钟内的top10店铺以及销售总额,并保存到Kafka中
env.addSource(saleSource)
.keyBy(sale -> sale.getShopId())
.timeWindow(Time.minutes(30))
.apply(new SaleWindowFunction())
.map(new SaleMapFunction())
.addSink(new FlinkKafkaProducer<>(params.get("outputTopic"),
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
kafkaProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
env.execute("SaleRealtimeReport");
}
/**
* 自定义source,从MySQL数据库中读取order表数据
*/
public static class SaleSource extends RichSourceFunction<Sale> {
private final ParameterTool params;
private Connection connection;
private PreparedStatement queryStatement;
private PreparedStatement updateStatement;
private long lastUpdateTime;
public SaleSource(ParameterTool params) {
this.params = params;
}
@Override
public void open(Configuration parameters) throws Exception {
// 加载MySQL驱动
Class.forName(params.get("db.driver"));
// 建立数据库连接
connection = DriverManager.getConnection(params.get("db.url"),
params.get("db.username"), params.get("db.password"));
// 创建查询语句
String querySql = "SELECT orderId, shopId, categoryId, productId, prices, units, counts, lastUpdateTime " +
"FROM `order` " +
"WHERE lastUpdateTime > ? " +
"ORDER BY lastUpdateTime DESC";
queryStatement = connection.prepareStatement(querySql);
// 创建更新语句
String updateSql = "UPDATE `order` SET lastUpdateTime = ? WHERE orderId = ?";
updateStatement = connection.prepareStatement(updateSql);
// 获取最近更新时间
lastUpdateTime = getRuntimeContext().getState(new ValueStateDescriptor<>("lastUpdateTime", Long.class)).value();
if (lastUpdateTime == null) {
lastUpdateTime = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(params.getInt("queryTimeInterval"));
}
}
@Override
public void run(SourceContext<Sale> ctx) throws Exception {
while (true) {
// 根据并行度平均划分查询时间段
long currentTime = System.currentTimeMillis();
long timeInterval = TimeUnit.MINUTES.toMillis(params.getInt("queryTimeInterval"));
long startUpdateTime = lastUpdateTime + (currentTime - lastUpdateTime) / getRuntimeContext().getNumberOfParallelSubtasks() * getRuntimeContext().getIndexOfThisSubtask();
long endUpdateTime = startUpdateTime + timeInterval / getRuntimeContext().getNumberOfParallelSubtasks();
// 执行查询
queryStatement.setLong(1, startUpdateTime);
ResultSet resultSet = queryStatement.executeQuery();
// 解析结果并输出
List<Sale> sales = new ArrayList<>();
while (resultSet.next()) {
int orderId = resultSet.getInt("orderId");
int shopId = resultSet.getInt("shopId");
int categoryId = resultSet.getInt("categoryId");
int productId = resultSet.getInt("productId");
double prices = resultSet.getDouble("prices");
String units = resultSet.getString("units");
int counts = resultSet.getInt("counts");
long lastUpdateTime = resultSet.getLong("lastUpdateTime");
sales.add(new Sale(orderId, shopId, categoryId, productId, prices, units, counts, lastUpdateTime));
updateStatement.setLong(1, currentTime);
updateStatement.setInt(2, orderId);
updateStatement.executeUpdate();
}
resultSet.close();
ctx.collect(sales);
// 保存最近更新时间
lastUpdateTime = endUpdateTime;
getRuntimeContext().getState(new ValueStateDescriptor<>("lastUpdateTime", Long.class)).update(lastUpdateTime);
// 休眠一段时间,等待下一次查询
long sleepTime = endUpdateTime - System.currentTimeMillis();
if (sleepTime > 0) {
Thread.sleep(sleepTime);
}
}
}
@Override
public void cancel() {
// 关闭资源
try {
if (queryStatement != null) {
queryStatement.close();
}
if (updateStatement != null) {
updateStatement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
/**
* 计算每30分钟内的top10店铺以及销售总额的窗口函数
*/
public static class SaleWindowFunction implements WindowFunction<Sale, SaleWindowResult, Integer, TimeWindow> {
@Override
public void apply(Integer shopId, TimeWindow window, Iterable<Sale> sales, Collector<SaleWindowResult> out) throws Exception {
double totalFee = 0.0;
List<Sale> saleList = new ArrayList<>();
for (Sale sale : sales) {
totalFee += sale.getPrices() * sale.getCounts();
saleList.add(sale);
}
saleList.sort((s1, s2) -> Double.compare(s2.getPrices() * s2.getCounts(), s1.getPrices() * s1.getCounts()));
List<Sale> top10Sales = saleList.size() > 10 ? saleList.subList(0, 10) : saleList;
out.collect(new SaleWindowResult(shopId, totalFee, top10Sales));
}
}
/**
* 将结果转换成字符串的MapFunction
*/
public static class SaleMapFunction implements MapFunction<SaleWindowResult, String> {
@Override
public String map(SaleWindowResult saleWindowResult) throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("Shop ").append(saleWindowResult.getShopId()).append(":\n");
sb.append(" TotalFee = ").append(saleWindowResult.getTotalFee()).append("\n");
sb.append(" Top10Sales = [\n");
for (Sale sale : saleWindowResult.getTop10Sales()) {
sb.append(" {productId=").append(sale.getProductId());
sb.append(", prices=").append(sale.getPrices());
sb.append(", units=").append(sale.getUnits());
sb.append(", counts=").append(sale.getCounts()).append("}\n");
}
sb.append(" ]\n");
return sb.toString();
}
}
}
/**
* 订单数据类
*/
class Sale {
private int orderId;
private int shopId;
private int categoryId;
private int productId;
private double prices;
private String units;
private int counts;
private long lastUpdateTime;
public Sale(int orderId, int shopId, int categoryId, int productId, double prices, String units, int counts, long lastUpdateTime) {
this.orderId = orderId;
this.shopId = shopId;
this.categoryId = categoryId;
this.productId = productId;
this.prices = prices;
this.units = units;
this.counts = counts;
this.lastUpdateTime = lastUpdateTime;
}
public int getOrderId() {
return orderId;
}
public int getShopId() {
return shopId;
}
public int getCategoryId() {
return categoryId;
}
public int getProductId() {
return productId;
}
public double getPrices() {
return prices;
}
public String getUnits() {
return units;
}
public int getCounts() {
return counts;
}
public long getLastUpdateTime() {
return lastUpdateTime;
}
}
/**
* 计算结果类
*/
class SaleWindowResult {
private int shopId;
private double totalFee;
private List<Sale> top10Sales;
public SaleWindowResult(int shopId, double totalFee, List<Sale> top10Sales) {
this.shopId = shopId;
this.totalFee = totalFee;
this.top10Sales = top10Sales;
}
public int getShopId() {
return shopId;
}
public double getTotalFee() {
return totalFee;
}
public List<Sale> getTop10Sales() {
return top10Sales;
}
}
```
在上述代码中,我们首先从命令行参数中获取配置文件路径,然后加载配置文件。在配置文件中,我们可以设置Flink的并行度、Kafka的配置、MySQL的配置以及查询时间间隔等参数。然后,我们创建Flink的执行环境,并将自定义的source添加到执行环境中。自定义source会定期查询MySQL数据库中的order表,并将查询到的数据发送到后续的计算和输出中。同时,自定义source还支持并发读取和状态保存的功能。最后,我们使用Flink的窗口函数计算每30分钟内的top10店铺以及销售总额,并将结果保存到Kafka中。
注意:上述示例代码仅供参考,实际应用中可能需要根据具体的业务需求进行修改。同时,需要根据实际情况进行参数配置和性能优化。
阅读全文