jdk9 代替observable
时间: 2023-10-16 16:11:35 浏览: 41
在 JDK 9 中,Observable 类已被标记为过时,推荐使用 Flow API 中的 Publisher 接口和 Subscriber 接口代替。Flow API 是在 Java 9 中引入的响应式编程的一个标准化 API。
使用 Flow API 的好处是更加灵活,支持背压(backpressure)控制。背压是指在进行异步流处理时,当数据源发送数据速度过快,而消费者处理数据速度过慢时,需要对数据源进行流量控制,避免数据积压导致内存溢出等问题。Flow API 提供了一种标准的背压机制,允许消费者向数据源发出信号,告知数据源当前的处理能力,从而实现流量控制。
下面是使用 Flow API 的一个示例:
```java
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
public class FlowDemo {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Processor<String, String> processor = new Processor<String, String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Received item: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Processing completed.");
}
};
Subscriber<String> subscriber = new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Received item: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Processing completed.");
}
};
publisher.subscribe(processor);
processor.subscribe(subscriber);
publisher.submit("Hello");
publisher.submit("World");
publisher.close();
Thread.sleep(1000);
}
}
```
该示例中,SubmissionPublisher 是一个实现了 Publisher 接口的类,用于发布数据。Processor 是一个实现了 Processor 接口的类,用于对发布的数据进行处理,然后再将处理后的数据传递给下一个 Subscriber。Subscriber 是一个实现了 Subscriber 接口的类,用于接收处理后的数据。在这个示例中,我们创建了一个 Publisher、一个 Processor 和一个 Subscriber,然后将它们连接起来,最后发布了两个数据项。在运行示例时,可以看到两个 Subscriber 都成功接收到了数据,并且能够正确地处理背压。
相关推荐
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)