ObservableZipIterable 源码分析
时间: 2023-08-05 19:07:47 浏览: 41
ObservableZipIterable 是 RxJava 中的一个类,用于将多个 Observable 转换成单个 Observable ,并将它们的元素打包成一个元素。下面是该类的源码分析:
```
final class ObservableZipIterable<T, U, V> extends Observable<V> {
final Iterable<? extends ObservableSource<? extends T>> sources;
final Function<? super Object[], ? extends V> zipper;
ObservableZipIterable(Iterable<? extends ObservableSource<? extends T>> sources,
Function<? super Object[], ? extends V> zipper) {
this.sources = sources;
this.zipper = zipper;
}
@Override
public void subscribeActual(Observer<? super V> s) {
@SuppressWarnings("unchecked")
Iterator<? extends ObservableSource<? extends T>> it = sources.iterator();
// 检查源 Observable 是否为空
if (!it.hasNext()) {
EmptyDisposable.error(new NoSuchElementException(), s);
return;
}
// 创建一个动态数组,用于缓存每个源 Observable 的元素
// 这里使用了可变长数组 ArrayList,因为不知道每个源 Observable 会产生多少个元素
// 使用 ArrayList 可以动态地添加元素
int n = 0;
ObservableSource<? extends T>[] sources = new ObservableSource[8];
try {
while (it.hasNext()) {
ObservableSource<? extends T> p = it.next();
if (p == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), s);
return;
}
if (n == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[n + (n >> 2)];
System.arraycopy(sources, 0, b, 0, n);
sources = b;
}
sources[n++] = p;
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
return;
}
// 创建一个数组,用于缓存每个源 Observable 的 Observer
// 与上面的 sources 数组一样,这里也使用了 ArrayList,因为不知道每个源 Observable 会产生多少个元素
// 使用 ArrayList 可以动态地添加元素
ObservableZip<T>[] zip = new ObservableZip[n];
for (int i = 0; i < n; i++) {
zip[i] = new ObservableZip<T>(sources[i]);
}
// 创建一个 ZipCoordinator 对象,用于协调多个 Observable 的订阅和元素的打包
ZipCoordinator<T, V> coordinator = new ZipCoordinator<T, V>(s, zipper, n, zip);
// 订阅每个源 Observable
coordinator.subscribe(zip);
// 执行协调器的 run 方法,开始打包元素
coordinator.run();
}
}
```
从上面的源码中可以看出,ObservableZipIterable 类实际上是一个 Observable 的子类,它的 subscribeActual 方法用于订阅源 Observable,并将它们的元素打包成一个元素。
在 subscribeActual 方法中,首先使用 sources.iterator() 获取源 Observable 的迭代器,然后依次遍历每个源 Observable,并把它们的元素缓存在一个动态数组中。如果在遍历过程中遇到了 null 值或者源 Observable 为空,则会发送 onError 事件,并终止订阅。
接着,根据每个源 Observable 的个数创建一个 ObservableZip 数组,用于缓存每个源 Observable 的 Observer。然后创建一个 ZipCoordinator 对象,用于协调多个 Observable 的订阅和元素的打包。
最后,订阅每个源 Observable,并执行协调器的 run 方法,开始打包元素。在 run 方法中,协调器会等待每个源 Observable 发送元素,并将它们打包成一个元素,然后发送给订阅者。如果有任意一个源 Observable 发出了 onError 事件,则会直接发送 onError 事件,并终止订阅。