前言
最近组内大佬打算分享RxJava2的源码, 赶紧先预习一波, 防止技术分享会上有听没懂.大概个人准备了几天的时间, 打算先整理以下自己的源码阅读记录.RxJava2的源码解析系列打算分别从以下三面来阐述:
- 数据源的订阅和响应原理
- 线程切换的原理
- 背压的实现(Flowable)
基础使用的Demo
抛开线程切换和背压, 我们来写一个单纯的发送数据, 订阅响应的Demo,为了便于理解, 我们抛开链式调用来写1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33// 被观察者
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe");
emitter.onNext(123);
emitter.onComplete();
}
});
// 观察者
Observer<Integer> observer = new Observer<Integer>() {
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
public void onNext(Integer integer) {
Log.e(TAG, "onNext" + integer);
}
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
public void onComplete() {
Log.e(TAG, "onComplete");
}
};
// 订阅
observable.subscribe(observer);
ObservableSource
我们首先来看当我们创建一个Observable
(被观察者)的时候, 实际上他做了什么1
2
3
4
5
6
7
8
9
10
11
12
13public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// npe校验
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
RxJavaPlugins.onAssembly()
这个方法主要是为了hook使用, 本篇暂且不表. 所以这里Observable.create()
返回的是一个ObervableCreate
对象.它继承于Observable
, 是ObservableSource
的实现类
observable.subscribe(observer)
我们主要看订阅的时候做了什么, 先上源码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public final void subscribe(Observer<? super T> observer) {
// npe校验
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// hook, 主要返回的就是我们的observer
observer = RxJavaPlugins.onSubscribe(this, observer);
// npe校验
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
可以看到这里实际执行的是subscribeActual(observer)
这个方法, 这里调用是个抽象接口, 我们在ObervableCreate
找具体的实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void subscribeActual(Observer<? super T> observer) {
// 包装数据发射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 订阅监听
observer.onSubscribe(parent);
try {
// 上游的执行
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
1 | emitter.onNext(123); |
从source.subscribe(parent);
我们就会走到以下我们自己写的数据发送事件.这里的emitter
通过源码我们可以看到是将observer
包装后的CreateEmitter
类对象, 我们在往里面看.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
public void dispose() {
DisposableHelper.dispose(this);
}
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
通过之前将observer
传入CreateEmitter
, 调用emitter.onNext
最终调用走到了observer.onNext
.
整体的流程非常的清晰. 下面我们看下, 如果中间有多重数据转换, 是什么样的流程
数据转换实现流程
以第一个基础demo为例, 我们改造下Observable
(被观察者), 将他进行一次数据转换, 并且做一次筛除.这个demo的意思就是发送123, 中间做+1处理, 然后筛选出大于122的数据发送给观察者.这个很容易理解.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24Observable<Integer> observable =
Observable
.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe");
emitter.onNext(123);
emitter.onComplete();
}
})
.map(new Function<Integer, Integer>() {
public Integer apply(Integer integer) throws Exception {
Log.e(TAG, "map");
return integer + 1;
}
})
.filter(new Predicate<Integer>() {
public boolean test(Integer integer) throws Exception {
Log.e(TAG, "filter");
return integer > 122;
}
});
我们依旧来看下map
操作符的源码1
2
3
4public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
是不是很眼熟? 忽略掉hook, 这里返回的是ObservableMap
对象.同样, filter
操作符返回的是一个ObservableFilter
1
2
3
4public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
不论是ObservableMap
还是ObservableFilter
他们都继承于AbstractObservableWithUpstream
抽象类, 它继承于Observable
, 带有上游的Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
// 上游Obervable
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
public final ObservableSource<T> source() {
return source;
}
}
这时候, 我们重新看下订阅的处理, 当我们执行observable.subscribe(observer)
的时候, observable
最终返回的是ObservableFilter
对象, 所以我们需要看这个类对象的subscribeActual(observer)
方法.他的代码很简洁, 实际就是将我们的observer
和filter
操作符的具体操作方法包装成一个FilterObserver
对象, 然后由上游ObservableMap
对象来subscribe(订阅)它.1
2
3
4
public void subscribeActual(Observer<? super T> s) {
source.subscribe(new FilterObserver<T>(s, predicate));
}
我们已经知道Observable.subscribe(observer)
方法实际调用的是对应实现类的subscribeActual(observer)
方法, 所以我们直接去看ObservableMap.subscribeActual(observer)
方法就可以了, 他的方法与FilterObserver
内的类似, 这时候是将前面传进来的FilterObserver
对象和我们map
操作符做的操作包装成一个MapObserver
对象, 交给上游.1
2
3
4
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
这时候我们的上游是ObservableCreate
对象,它的subscribeActual(observer)
方法上文有提到, 他将MapObserver
对象包装进CreateEmitter
对象, 这个时候, 才开始执行订阅动作, 然后我们走到CreateEmitter
的onNext()
方法,实际会执行到下游观察者的onNext
方法, 在这层, 我们的观察者是MapObserver
.它继承于BasicFuseableObserver
, 表示一个流程执行中间的观察者对象. 现在我们看MapObserver
的onNext
的执行, 这里我们主要关注主流程的执行逻辑, 忽略掉其他代码, 可以看到它最终调用的是actual.onNext(v)
, 首先将我们map
操作符的逻辑处理返回的数据赋值给v
, 这里的actual
指的是我们下游的observer
(观察者), 那么这个时候是我们的FilterObserver
对象, 将v
对象通过onNext
传递下去.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
然后我们看FilterObserver
的源码, 他的onNext
逻辑就是会执行我们传进去的Predicate
对象的test
方法, 如果符合筛选逻辑, 就会通过调用下游的onNext
将数据传下去, 这个时候的下游是我们new的Observer
, 这时候的执行,我们应该就清楚了.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
final Predicate<? super T> filter;
FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
super(actual);
this.filter = filter;
}
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
actual.onNext(t);
}
} else {
actual.onNext(null);
}
}
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
public T poll() throws Exception {
for (;;) {
T v = qs.poll();
if (v == null || filter.test(v)) {
return v;
}
}
}
}
总结
订阅和数据的传输的原理就是如此. 我们用流程图来总结下上面的整个流程.
总的来说, 订阅的动作是层层递归上传到最开始的Observable
, 然后从最开始的Observable
将数据一层层往下传.
当然, 从装饰模式
来讲, 他这里的实际动作就是将Observable
做了层层装饰来传递订阅, 对设计模式有兴趣的同学可以看看相关的书籍, 对于理解这段代码有点睛之用