这五个设计模式代码有哪些

时间: 2023-07-10 13:37:18 浏览: 56
以下是 Apache Flink 中常用的五个设计模式的代码示例: 1. 管道和过滤器模式: ```java DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<String>("topic", new SimpleStringSchema(), properties)); stream.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { String[] words = s.split(" "); return new Tuple2<>(words[0], Integer.valueOf(words[1])); } }).keyBy(0) .sum(1) .print(); ``` 2. 观察者模式: ```java public class MyObserver implements Observer<Integer> { @Override public void update(Integer value) { // 处理接收到的数据 } } DataStream<Integer> stream = env.addSource(new FlinkKafkaConsumer<Integer>("topic", new SimpleIntegerSchema(), properties)); Observable<Integer> observable = Observable.from(stream); observable.addObserver(new MyObserver()); ``` 3. 责任链模式: ```java public abstract class AbstractHandler<T> { private AbstractHandler<T> nextHandler; public void setNextHandler(AbstractHandler<T> nextHandler) { this.nextHandler = nextHandler; } public void handle(T data) { if (canHandle(data)) { doHandle(data); } else if (nextHandler != null) { nextHandler.handle(data); } } protected abstract boolean canHandle(T data); protected abstract void doHandle(T data); } public class IntegerHandler extends AbstractHandler<Integer> { @Override protected boolean canHandle(Integer data) { return data % 2 == 0; } @Override protected void doHandle(Integer data) { // 处理偶数数据 } } public class StringHandler extends AbstractHandler<String> { @Override protected boolean canHandle(String data) { return data.startsWith("hello"); } @Override protected void doHandle(String data) { // 处理以 hello 开头的数据 } } AbstractHandler<Integer> integerHandler = new IntegerHandler(); AbstractHandler<String> stringHandler = new StringHandler(); integerHandler.setNextHandler(stringHandler); DataStream<Integer> integerStream = env.addSource(new FlinkKafkaConsumer<Integer>("topic1", new SimpleIntegerSchema(), properties)); DataStream<String> stringStream = env.addSource(new FlinkKafkaConsumer<String>("topic2", new SimpleStringSchema(), properties)); integerStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { integerHandler.handle(value); return value; } }); stringStream.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { stringHandler.handle(value); return value; } }); ``` 4. 策略模式: ```java public interface AggregationStrategy<T> { T aggregate(T value1, T value2); } public class SumAggregationStrategy implements AggregationStrategy<Integer> { @Override public Integer aggregate(Integer value1, Integer value2) { return value1 + value2; } } public class MaxAggregationStrategy implements AggregationStrategy<Integer> { @Override public Integer aggregate(Integer value1, Integer value2) { return Math.max(value1, value2); } } AggregationStrategy<Integer> strategy = new SumAggregationStrategy(); DataStream<Integer> stream = env.addSource(new FlinkKafkaConsumer<Integer>("topic", new SimpleIntegerSchema(), properties)); stream.keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return strategy.aggregate(value1, value2); } }); ``` 5. 工厂模式: ```java public interface Source<T> { void open() throws Exception; void close() throws Exception; T read() throws Exception; } public class KafkaSource implements Source<String> { private KafkaConsumer<String, String> consumer; public KafkaSource(Properties properties) { this.consumer = new KafkaConsumer<>(properties); } @Override public void open() throws Exception { consumer.subscribe(Collections.singletonList("topic")); } @Override public void close() throws Exception { consumer.close(); } @Override public String read() throws Exception { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { return record.value(); } return null; } } public enum SourceType { KAFKA } public class SourceFactory { public static <T> Source<T> createSource(SourceType type, Properties properties) { switch (type) { case KAFKA: return (Source<T>) new KafkaSource(properties); default: throw new IllegalArgumentException("Unsupported source type: " + type); } } } Properties kafkaProperties = new Properties(); kafkaProperties.setProperty("bootstrap.servers", "localhost:9092"); Source<String> source = SourceFactory.createSource(SourceType.KAFKA, kafkaProperties); source.open(); while (true) { String data = source.read(); if (data != null) { // 处理数据 } } source.close(); ```

相关推荐

最新推荐

recommend-type

Java设计模式:工厂模式——图文+代码示例(通俗易懂)

每一种设计模式都有它要解决的问题: 工厂模式最主要解决的问题就是创建者和调用者的耦合,那么代码层面其实就是取消对new的使用。 工厂模式有三种: 1. 简单工厂模式 2. 工厂方法模式 3. 抽象方法模式 先来看看,...
recommend-type

《Java设计模式》课程设计报告.docx

用myeclipse开发的java设计模式课程设计,课题内容为西瓜工厂,包含四个设计模式,单例模式、代理模式、建造者模式和抽象工厂模式,有较好的图形界面,文档内附有完整代码,适用于新手学习。
recommend-type

2020版23种Java设计模式-图解-附代码.pdf

2020最新详解学习资源,。JAVA版本,史上最优秀的23种设计模式教程,图解生动,配完整用例代码测试。
recommend-type

Java设计模式-图解-附代码

有感于设计模式在日常开发中的重要性,同时笔者也自觉对设计模式小有心得,故笔者*写二十三种设计模式的简单例子、 并整理二十三种设计模式的理论部分,综合汇总成这份Java设计模式(疯狂J*va联盟版),希望对大家...
recommend-type

用UML描述C++设计模式,且附带实现代码

C++设计模式 一、功能 二、结构图 ...//结构图 三、实现 和其他很多模式一样,学习设计模式的重点是学习每种模式的思想,而不应拘泥于它的某种具体结构图和实现。因为模式是灵活的,其实现可以是千变万化的,只是...
recommend-type

zigbee-cluster-library-specification

最新的zigbee-cluster-library-specification说明文档。
recommend-type

管理建模和仿真的文件

管理Boualem Benatallah引用此版本:布阿利姆·贝纳塔拉。管理建模和仿真。约瑟夫-傅立叶大学-格勒诺布尔第一大学,1996年。法语。NNT:电话:00345357HAL ID:电话:00345357https://theses.hal.science/tel-003453572008年12月9日提交HAL是一个多学科的开放存取档案馆,用于存放和传播科学研究论文,无论它们是否被公开。论文可以来自法国或国外的教学和研究机构,也可以来自公共或私人研究中心。L’archive ouverte pluridisciplinaire
recommend-type

MATLAB柱状图在信号处理中的应用:可视化信号特征和频谱分析

![matlab画柱状图](https://img-blog.csdnimg.cn/3f32348f1c9c4481a6f5931993732f97.png) # 1. MATLAB柱状图概述** MATLAB柱状图是一种图形化工具,用于可视化数据中不同类别或组的分布情况。它通过绘制垂直条形来表示每个类别或组中的数据值。柱状图在信号处理中广泛用于可视化信号特征和进行频谱分析。 柱状图的优点在于其简单易懂,能够直观地展示数据分布。在信号处理中,柱状图可以帮助工程师识别信号中的模式、趋势和异常情况,从而为信号分析和处理提供有价值的见解。 # 2. 柱状图在信号处理中的应用 柱状图在信号处理
recommend-type

用Spring boot和vue写一个登录注册界面

好的,这是一个比较常见的技术问题,我可以为您提供一些参考资料和思路。首先,您可以使用Spring Boot作为后端框架来处理用户认证和注册的逻辑,比如使用Spring Security实现用户登录认证。同时,您还需要设计相应的数据模型和数据库表结构来存储用户信息。在前端方面,您可以使用Vue.js作为框架来构建登录注册页面,使用Axios来发起API请求并和后端进行交互。当然,在实现过程中,还需要考虑一些具体细节,比如数据校验、安全性和用户体验等方面。希望这些信息能够帮助到您。
recommend-type

JSBSim Reference Manual

JSBSim参考手册,其中包含JSBSim简介,JSBSim配置文件xml的编写语法,编程手册以及一些应用实例等。其中有部分内容还没有写完,估计有生之年很难看到完整版了,但是内容还是很有参考价值的。