springboot内嵌flink并对外部的flink集群通信
时间: 2023-12-06 12:36:04 浏览: 216
为了在Spring Boot应用程序中嵌入Flink并与外部Flink集群通信,可以使用Flink的REST API。以下是实现此目的的步骤:
1.在Spring Boot项目中添加以下依赖项:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
```
2.在Spring Boot应用程序中创建一个Flink客户端:
```java
@Configuration
public class FlinkConfig {
@Value("${flink.rest-url}")
private String restUrl;
@Bean
public StreamExecutionEnvironment streamExecutionEnvironment() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(10)));
env.getConfig().setGlobalJobParameters(getGlobalJobParameters());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend((StateBackend) new FsStateBackend("file:///tmp/checkpoints"));
return env;
}
@Bean
public FlinkRestClient flinkRestClient() {
return new FlinkRestClient(restUrl);
}
@Bean
public Configuration getGlobalJobParameters() {
final Configuration conf = new Configuration();
conf.setString("rest.url", restUrl);
return conf;
}
}
```
3.在应用程序中使用Flink客户端与外部Flink集群通信:
```java
@RestController
@RequestMapping("/flink")
public class FlinkController {
@Autowired
private FlinkRestClient flinkRestClient;
@GetMapping("/jobs")
public List<JobStatusMessage> getJobs() throws Exception {
return flinkRestClient.listJobs();
}
@PostMapping("/jobs")
public JobSubmitResponseBody submitJob(@RequestBody JobSubmitRequestBody jobSubmitRequestBody) throws Exception {
return flinkRestClient.submitJob(jobSubmitRequestBody);
}
@GetMapping("/jobs/{jobId}")
public JobDetailsInfo getJobDetails(@PathVariable String jobId) throws Exception {
return flinkRestClient.getJobDetails(jobId);
}
@GetMapping("/jobs/{jobId}/exceptions")
public List<ExceptionInfo> getJobExceptions(@PathVariable String jobId) throws Exception {
return flinkRestClient.getJobExceptions(jobId);
}
}
```
在上面的代码中,我们使用Flink REST API获取Flink作业的列表,提交Flink作业,获取Flink作业的详细信息以及获取Flink作业的异常信息。
阅读全文