producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
时间: 2024-05-22 09:15:37 浏览: 50
This line of code sets the maximum size of the message body that can be compressed by the producer to the maximum possible integer value. This means that the producer will attempt to compress all message bodies regardless of their size. The purpose of compressing message bodies is to reduce the amount of data that needs to be transmitted over the network, which can improve performance and reduce costs. However, compressing small message bodies may not be worthwhile if the overhead of compression outweighs the benefits.
相关问题
请帮我java语言编写一个flink的应用程序,主要功能是通过一个支持并发的自定义的source,周期性查询MySQL数据库的一张order表的最近一段时间内插入或更新的数据,之后计算每30分钟内的top10店铺以及销售总额,最后保存到一个名sale-realtime-report的topic中。具体需求如下: 1.MySQL的order表包含以下字段:orderId,shopId,categoryId,productId,prices,units,counts,lastUpdateTime,其中lastUpdateTime,会自动在表中新增或者更新数据时自动更新,因此可以作为自定义source的周期性读取一段时间内的数据的比较字段; 2.flink的自定义source需要支持并发读取的能力,且可以将最近一次成功读取order表的时间保存到自定义state中,一边flink应用失败重启时,可以从state中恢复,最近一段时间的值可以自定义。并发读取时,可以自动根据应用设置的最近一段时间的值除以并发,平均划分将lastUpdateTime切分为一小段时间间隔; 3.每个店铺的销售总额totalFee的计算方式为:totalFee = prices * counts 。
好的,以下是一个Java语言编写的Flink应用程序示例,可供您参考:
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class SaleRealtimeReport {
public static void main(String[] args) throws Exception {
// 从命令行参数中获取配置文件路径
String configFile = ParameterTool.fromArgs(args).get("configFile");
// 加载配置文件
ParameterTool params = ParameterTool.fromPropertiesFile(configFile);
// 设置Flink配置
Configuration conf = new Configuration();
conf.setInteger("parallelism", params.getInt("parallelism"));
// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// 设置Kafka生产者配置
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", params.get("bootstrapServers"));
kafkaProps.setProperty("transaction.timeout.ms", params.get("transactionTimeout"));
kafkaProps.setProperty("max.in.flight.requests.per.connection", "1");
// 从MySQL数据库中读取数据的自定义source
SaleSource saleSource = new SaleSource(params);
// 计算每30分钟内的top10店铺以及销售总额,并保存到Kafka中
env.addSource(saleSource)
.keyBy(sale -> sale.getShopId())
.timeWindow(Time.minutes(30))
.apply(new SaleWindowFunction())
.map(new SaleMapFunction())
.addSink(new FlinkKafkaProducer<>(params.get("outputTopic"),
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
kafkaProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
env.execute("SaleRealtimeReport");
}
/**
* 自定义source,从MySQL数据库中读取order表数据
*/
public static class SaleSource extends RichSourceFunction<Sale> {
private final ParameterTool params;
private Connection connection;
private PreparedStatement queryStatement;
private PreparedStatement updateStatement;
private long lastUpdateTime;
public SaleSource(ParameterTool params) {
this.params = params;
}
@Override
public void open(Configuration parameters) throws Exception {
// 加载MySQL驱动
Class.forName(params.get("db.driver"));
// 建立数据库连接
connection = DriverManager.getConnection(params.get("db.url"),
params.get("db.username"), params.get("db.password"));
// 创建查询语句
String querySql = "SELECT orderId, shopId, categoryId, productId, prices, units, counts, lastUpdateTime " +
"FROM `order` " +
"WHERE lastUpdateTime > ? " +
"ORDER BY lastUpdateTime DESC";
queryStatement = connection.prepareStatement(querySql);
// 创建更新语句
String updateSql = "UPDATE `order` SET lastUpdateTime = ? WHERE orderId = ?";
updateStatement = connection.prepareStatement(updateSql);
// 获取最近更新时间
lastUpdateTime = getRuntimeContext().getState(new ValueStateDescriptor<>("lastUpdateTime", Long.class)).value();
if (lastUpdateTime == null) {
lastUpdateTime = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(params.getInt("queryTimeInterval"));
}
}
@Override
public void run(SourceContext<Sale> ctx) throws Exception {
while (true) {
// 根据并行度平均划分查询时间段
long currentTime = System.currentTimeMillis();
long timeInterval = TimeUnit.MINUTES.toMillis(params.getInt("queryTimeInterval"));
long startUpdateTime = lastUpdateTime + (currentTime - lastUpdateTime) / getRuntimeContext().getNumberOfParallelSubtasks() * getRuntimeContext().getIndexOfThisSubtask();
long endUpdateTime = startUpdateTime + timeInterval / getRuntimeContext().getNumberOfParallelSubtasks();
// 执行查询
queryStatement.setLong(1, startUpdateTime);
ResultSet resultSet = queryStatement.executeQuery();
// 解析结果并输出
List<Sale> sales = new ArrayList<>();
while (resultSet.next()) {
int orderId = resultSet.getInt("orderId");
int shopId = resultSet.getInt("shopId");
int categoryId = resultSet.getInt("categoryId");
int productId = resultSet.getInt("productId");
double prices = resultSet.getDouble("prices");
String units = resultSet.getString("units");
int counts = resultSet.getInt("counts");
long lastUpdateTime = resultSet.getLong("lastUpdateTime");
sales.add(new Sale(orderId, shopId, categoryId, productId, prices, units, counts, lastUpdateTime));
updateStatement.setLong(1, currentTime);
updateStatement.setInt(2, orderId);
updateStatement.executeUpdate();
}
resultSet.close();
ctx.collect(sales);
// 保存最近更新时间
lastUpdateTime = endUpdateTime;
getRuntimeContext().getState(new ValueStateDescriptor<>("lastUpdateTime", Long.class)).update(lastUpdateTime);
// 休眠一段时间,等待下一次查询
long sleepTime = endUpdateTime - System.currentTimeMillis();
if (sleepTime > 0) {
Thread.sleep(sleepTime);
}
}
}
@Override
public void cancel() {
// 关闭资源
try {
if (queryStatement != null) {
queryStatement.close();
}
if (updateStatement != null) {
updateStatement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
/**
* 计算每30分钟内的top10店铺以及销售总额的窗口函数
*/
public static class SaleWindowFunction implements WindowFunction<Sale, SaleWindowResult, Integer, TimeWindow> {
@Override
public void apply(Integer shopId, TimeWindow window, Iterable<Sale> sales, Collector<SaleWindowResult> out) throws Exception {
double totalFee = 0.0;
List<Sale> saleList = new ArrayList<>();
for (Sale sale : sales) {
totalFee += sale.getPrices() * sale.getCounts();
saleList.add(sale);
}
saleList.sort((s1, s2) -> Double.compare(s2.getPrices() * s2.getCounts(), s1.getPrices() * s1.getCounts()));
List<Sale> top10Sales = saleList.size() > 10 ? saleList.subList(0, 10) : saleList;
out.collect(new SaleWindowResult(shopId, totalFee, top10Sales));
}
}
/**
* 将结果转换成字符串的MapFunction
*/
public static class SaleMapFunction implements MapFunction<SaleWindowResult, String> {
@Override
public String map(SaleWindowResult saleWindowResult) throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("Shop ").append(saleWindowResult.getShopId()).append(":\n");
sb.append(" TotalFee = ").append(saleWindowResult.getTotalFee()).append("\n");
sb.append(" Top10Sales = [\n");
for (Sale sale : saleWindowResult.getTop10Sales()) {
sb.append(" {productId=").append(sale.getProductId());
sb.append(", prices=").append(sale.getPrices());
sb.append(", units=").append(sale.getUnits());
sb.append(", counts=").append(sale.getCounts()).append("}\n");
}
sb.append(" ]\n");
return sb.toString();
}
}
}
/**
* 订单数据类
*/
class Sale {
private int orderId;
private int shopId;
private int categoryId;
private int productId;
private double prices;
private String units;
private int counts;
private long lastUpdateTime;
public Sale(int orderId, int shopId, int categoryId, int productId, double prices, String units, int counts, long lastUpdateTime) {
this.orderId = orderId;
this.shopId = shopId;
this.categoryId = categoryId;
this.productId = productId;
this.prices = prices;
this.units = units;
this.counts = counts;
this.lastUpdateTime = lastUpdateTime;
}
public int getOrderId() {
return orderId;
}
public int getShopId() {
return shopId;
}
public int getCategoryId() {
return categoryId;
}
public int getProductId() {
return productId;
}
public double getPrices() {
return prices;
}
public String getUnits() {
return units;
}
public int getCounts() {
return counts;
}
public long getLastUpdateTime() {
return lastUpdateTime;
}
}
/**
* 计算结果类
*/
class SaleWindowResult {
private int shopId;
private double totalFee;
private List<Sale> top10Sales;
public SaleWindowResult(int shopId, double totalFee, List<Sale> top10Sales) {
this.shopId = shopId;
this.totalFee = totalFee;
this.top10Sales = top10Sales;
}
public int getShopId() {
return shopId;
}
public double getTotalFee() {
return totalFee;
}
public List<Sale> getTop10Sales() {
return top10Sales;
}
}
```
在上述代码中,我们首先从命令行参数中获取配置文件路径,然后加载配置文件。在配置文件中,我们可以设置Flink的并行度、Kafka的配置、MySQL的配置以及查询时间间隔等参数。然后,我们创建Flink的执行环境,并将自定义的source添加到执行环境中。自定义source会定期查询MySQL数据库中的order表,并将查询到的数据发送到后续的计算和输出中。同时,自定义source还支持并发读取和状态保存的功能。最后,我们使用Flink的窗口函数计算每30分钟内的top10店铺以及销售总额,并将结果保存到Kafka中。
注意:上述示例代码仅供参考,实际应用中可能需要根据具体的业务需求进行修改。同时,需要根据实际情况进行参数配置和性能优化。
阅读全文