写一个flink读取MySQL数据,写入file的代码
时间: 2023-03-15 08:53:11 浏览: 138
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;import java.sql.Types;public class FlinkReadMySQLWriteFile {
public static void main(String[] args) throws Exception {
//创建flink的上下文
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //设置mysql连接参数
String driverName = "com.mysql.jdbc.Driver";
String dbURL = "jdbc:mysql://localhost:3306/test";
String userName = "root";
String password = "root";
String sourceTableName = "user"; //创建mysql的连接
JDBCInputFormat.JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(driverName)
.setDBUrl(dbURL)
.setQuery(String.format("select user_name,age from %s", sourceTableName))
.setUsername(userName)
.setPassword(password)
.setRowTypeInfo(new Row(2).types(Types.VARCHAR, Types.INTEGER)); //从MySQL中读取数据
DataSet<Row> sourceDataSet = env.createInput(inputBuilder.finish()); //将数据保存到文件
String outputPath = "file:///Users/xxx/Desktop/data.csv";
JDBCOutputFormat.JDBCOutputFormatBuilder outputBuilder = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(driverName)
.setDBUrl(dbURL)
.setQuery(String.format("INSERT INTO %s(user_name,age) VALUES (?,?)", sourceTableName))
.setSqlTypes(Types.VARCHAR, Types.INTEGER)
.setUsername(userName)
.setPassword(password); sourceDataSet.map(row -> Tuple2.of(row.getField(0), row.getField(1)))
.output(outputBuilder.finish())
.setParallelism(1); env.execute("FlinkReadMySQLWriteFile");
}
}
阅读全文