rxjava flatmap 源码解析
时间: 2023-07-23 18:14:49 浏览: 242
RxJava中的flatMap操作符是一个非常常用的操作符,它可以将一个Observable发射的事件序列转换成多个Observables,然后将这些Observables发射的事件序列合并后再发射出去。
下面是flatMap操作符的源码解析:
```java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableFlatMap<>(this, mapper, false, Integer.MAX_VALUE, bufferSize()));
}
```
可以看到,flatMap操作符的实现是通过创建一个ObservableFlatMap对象来完成的。其中,mapper参数表示将原始Observable发射的事件转换成的新Observable,它是一个Function类型的参数,即接受一个T类型的参数并返回一个ObservableSource类型的结果。
ObservableFlatMap的构造函数如下所示:
```java
ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
this.source = source;
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
```
ObservableFlatMap的核心实现是在subscribeActual方法中完成的:
```java
@Override
public void subscribeActual(Observer<? super R> observer) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, observer, mapper)) {
return;
}
source.subscribe(new FlatMapObserver<>(observer, mapper, delayErrors, maxConcurrency, bufferSize));
}
```
在subscribeActual方法中,首先判断源Observable是否可以直接转换为ObservableScalarXMap,如果可以的话直接进行转换,否则创建一个FlatMapObserver对象并进行订阅。
FlatMapObserver是flatMap的核心实现类,它实现了Observer接口,并且在接收到源Observable发射的事件时,会先将事件转换成新的Observable,然后将新Observable的发射事件序列合并到一个新的Observable中,最后再将新的Observable发射出去。
```java
static final class FlatMapObserver<T, R> extends AtomicInteger implements Observer<T>, Disposable {
// ...
@Override
public void onNext(T t) {
ObservableSource<? extends R> o;
try {
o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
return;
}
if (cancelled) {
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
queue.offer(t);
return;
}
wip++;
}
}
o.subscribe(new InnerObserver(inner, delayErrors, this));
}
// ...
}
```
在FlatMapObserver的onNext方法中,首先调用mapper将源Observable发射的事件转换成新的Observable,并进行非空检查。然后判断当前的并发度是否达到了最大值,如果达到了最大值,就将源Observable发射的事件放到队列中。否则,就将并发度加1,并订阅新Observable。
InnerObserver是FlatMapObserver的内部类,它实现了Observer接口,并在接收到来自新Observable的发射事件序列时,将它们合并到一个新的Observable中,并将新的Observable发射出去。
```java
static final class InnerObserver<R> implements Observer<R> {
// ...
@Override
public void onNext(R t) {
if (done) {
return;
}
inner.onNext(t);
}
// ...
}
```
当所有的新Observable都完成后,FlatMapObserver会调用onComplete方法通知观察者。如果发生了异常,FlatMapObserver会调用onError方法通知观察者。同时,FlatMapObserver还实现了Disposable接口,可以通过dispose方法取消订阅。
综上所述,flatMap操作符的实现是比较复杂的,它通过创建ObservableFlatMap对象,并在subscribeActual方法中创建FlatMapObserver对象来完成转换操作。在FlatMapObserver中,它还需要实现对新Observable的订阅以及将新Observable发射的事件合并到一个新的Observable中。
阅读全文