这五个设计模式代码有哪些
时间: 2023-07-10 09:37:18 浏览: 108
以下是 Apache Flink 中常用的五个设计模式的代码示例:
1. 管道和过滤器模式:
```java
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<String>("topic", new SimpleStringSchema(), properties));
stream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] words = s.split(" ");
return new Tuple2<>(words[0], Integer.valueOf(words[1]));
}
}).keyBy(0)
.sum(1)
.print();
```
2. 观察者模式:
```java
public class MyObserver implements Observer<Integer> {
@Override
public void update(Integer value) {
// 处理接收到的数据
}
}
DataStream<Integer> stream = env.addSource(new FlinkKafkaConsumer<Integer>("topic", new SimpleIntegerSchema(), properties));
Observable<Integer> observable = Observable.from(stream);
observable.addObserver(new MyObserver());
```
3. 责任链模式:
```java
public abstract class AbstractHandler<T> {
private AbstractHandler<T> nextHandler;
public void setNextHandler(AbstractHandler<T> nextHandler) {
this.nextHandler = nextHandler;
}
public void handle(T data) {
if (canHandle(data)) {
doHandle(data);
} else if (nextHandler != null) {
nextHandler.handle(data);
}
}
protected abstract boolean canHandle(T data);
protected abstract void doHandle(T data);
}
public class IntegerHandler extends AbstractHandler<Integer> {
@Override
protected boolean canHandle(Integer data) {
return data % 2 == 0;
}
@Override
protected void doHandle(Integer data) {
// 处理偶数数据
}
}
public class StringHandler extends AbstractHandler<String> {
@Override
protected boolean canHandle(String data) {
return data.startsWith("hello");
}
@Override
protected void doHandle(String data) {
// 处理以 hello 开头的数据
}
}
AbstractHandler<Integer> integerHandler = new IntegerHandler();
AbstractHandler<String> stringHandler = new StringHandler();
integerHandler.setNextHandler(stringHandler);
DataStream<Integer> integerStream = env.addSource(new FlinkKafkaConsumer<Integer>("topic1", new SimpleIntegerSchema(), properties));
DataStream<String> stringStream = env.addSource(new FlinkKafkaConsumer<String>("topic2", new SimpleStringSchema(), properties));
integerStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
integerHandler.handle(value);
return value;
}
});
stringStream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
stringHandler.handle(value);
return value;
}
});
```
4. 策略模式:
```java
public interface AggregationStrategy<T> {
T aggregate(T value1, T value2);
}
public class SumAggregationStrategy implements AggregationStrategy<Integer> {
@Override
public Integer aggregate(Integer value1, Integer value2) {
return value1 + value2;
}
}
public class MaxAggregationStrategy implements AggregationStrategy<Integer> {
@Override
public Integer aggregate(Integer value1, Integer value2) {
return Math.max(value1, value2);
}
}
AggregationStrategy<Integer> strategy = new SumAggregationStrategy();
DataStream<Integer> stream = env.addSource(new FlinkKafkaConsumer<Integer>("topic", new SimpleIntegerSchema(), properties));
stream.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return strategy.aggregate(value1, value2);
}
});
```
5. 工厂模式:
```java
public interface Source<T> {
void open() throws Exception;
void close() throws Exception;
T read() throws Exception;
}
public class KafkaSource implements Source<String> {
private KafkaConsumer<String, String> consumer;
public KafkaSource(Properties properties) {
this.consumer = new KafkaConsumer<>(properties);
}
@Override
public void open() throws Exception {
consumer.subscribe(Collections.singletonList("topic"));
}
@Override
public void close() throws Exception {
consumer.close();
}
@Override
public String read() throws Exception {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
return record.value();
}
return null;
}
}
public enum SourceType {
KAFKA
}
public class SourceFactory {
public static <T> Source<T> createSource(SourceType type, Properties properties) {
switch (type) {
case KAFKA:
return (Source<T>) new KafkaSource(properties);
default:
throw new IllegalArgumentException("Unsupported source type: " + type);
}
}
}
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
Source<String> source = SourceFactory.createSource(SourceType.KAFKA, kafkaProperties);
source.open();
while (true) {
String data = source.read();
if (data != null) {
// 处理数据
}
}
source.close();
```
阅读全文