RxJava源码解析(三)-背压

什么是背压(Backpressure)?

我们来看下RxJava自身对其的解释

When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, a so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.

来祭出google翻译,

当数据流通过异步步骤时,每个步骤可以以不同的速度执行不同的操作。
为了避免压倒这些步骤,这些步骤通常表现为由于临时缓冲或需要跳过/丢弃数据而增加的内存使用量,所以应用所谓的背压,这是流量控制的一种形式,其中步骤可以表达多少
物品准备好了。
这允许在通常无法知道上游将向其发送多少项的步骤的情况下约束数据流的存储器使用。

简而言之, 当异步情况下, 上流(被观察者)发送数据过快, 而下流(消费者)无法及时处理数据, 导致缓存内存增大, 最终导致OOM, 这个时候, 背压就是为了处理这种情况.

Observable / Observer

我们都知道RxJava2主要是增加了使用背压机制下调用的api, 而原来的Observable / Observer组合是不支持背压的, 那么我们先回顾下原来的Observable / Observer组合使用的订阅流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable
// 新建 ObservableCreate
.create(ObservableOnSubscribe<Int> {
var i = 0
while (true){
it.onNext(i ++)
}
})
// 新建 ObservableSubscribeOn
.subscribeOn(Schedulers.io())
// 新建 ObservableObserveOn
.observeOn(Schedulers.newThread())
// 返回 LambdaObserver
.subscribe{
Thread.sleep(5000)
Log.d("rxjava", it.toString())
}

Observable / Observer组合的订阅消费流程线程切换原理在去年都曾经写过, 这里就不再赘述, 我们主要看下, 上游的缓冲池(buffer)的做法, 相关代码在ObserveOnObserver#onNext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
...
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
...

queue = new SpscLinkedArrayQueue<T>(bufferSize);

downstream.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
...
}

public final class SpscLinkedArrayQueue<T> implements SimplePlainQueue<T> {
public SpscLinkedArrayQueue(final int bufferSize) {
int p2capacity = Pow2.roundToPowerOfTwo(Math.max(8, bufferSize));
int mask = p2capacity - 1;
AtomicReferenceArray<Object> buffer = new AtomicReferenceArray<Object>(p2capacity + 1);
producerBuffer = buffer;
producerMask = mask;
adjustLookAheadStep(p2capacity);
consumerBuffer = buffer;
consumerMask = mask;
producerLookAhead = mask - 1; // we know it's all empty to start with
soProducerIndex(0L);
}
@Override
public boolean offer(final T e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// local load of field to avoid repeated loads after volatile reads
final AtomicReferenceArray<Object> buffer = producerBuffer;
final long index = lpProducerIndex();
final int mask = producerMask;
final int offset = calcWrappedOffset(index, mask);
if (index < producerLookAhead) {
return writeToQueue(buffer, e, index, offset);
} else {
final int lookAheadStep = producerLookAheadStep;
// go around the buffer or resize if full (unless we hit max capacity)
int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask);
if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad
producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room
return writeToQueue(buffer, e, index, offset);
} else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full
return writeToQueue(buffer, e, index, offset);
} else {
resize(buffer, index, offset, e, mask); // add a buffer and link old to new
return true;
}
}
}
}

省略掉大部分不相关代码, 这里大致做的是, 当被观察者被订阅的时候, 会初始化一个SpscLinkedArrayQueue类对象, 他的内部实际维护了一个AtomicReferenceArray对象, 这个就是我们的缓冲队列, 他的初始默认容量是128, 当我们调用到onNext方法的时候, 会首先判断buffer是否已满, 若未满, 则进行插入; 若已满, 则会进行扩容处理, 在异步情况下, 我们的下游无法消费掉buffer内的缓存, 导致内存占用越来越大, 最终导致OOM

Flowable / Subscriber

好, 快速过完无背压机制下的订阅流程后, 我们来看下RxJava2是怎么通过Flowable / Subscriber来处理背压的

订阅响应流程

首先, 我们来跟着基础Demo来看下Flowable的订阅流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Flowable
// 创建 FlowableCreate 对象
.create(FlowableOnSubscribe<Int> {
for(i in 0..150){
it.onNext(i)
Log.d("send", i.toString())
}
}, BackpressureStrategy.LATEST)
// 创建 FlowableSubscribeOn 对象
.subscribeOn(Schedulers.io())
// 创建 FlowableObserveOn 对象
.observeOn(AndroidSchedulers.mainThread())
.subscribe (object : FlowableSubscriber<Int> {
lateinit var s: Subscription
var i = 1
override fun onComplete() {
Log.d("rxjava", "complete")
}

override fun onSubscribe(s: Subscription) {
this.s = s
s.request(130)
}

override fun onNext(t: Int?) {
Log.e("rxjava receive", "${i++} $t")
}

override fun onError(t: Throwable?) {
t?.printStackTrace()
}

})
Thread.sleep(5000)

  1. create操作符用来初始化一个FlowableCreate对象, 并返回

    1
    2
    3
    4
    5
    public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    ObjectHelper.requireNonNull(source, "source is null");
    ObjectHelper.requireNonNull(mode, "mode is null");
    return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }
  2. subscribeOn操作符 将FlowableCreate进行包装, 以FlowableSubscribeOn对象返回

    1
    2
    3
    4
    5
    6
    7
    8
    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return subscribeOn(scheduler, !(this instanceof FlowableCreate));
    }
    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
    }
  3. observeOn操作符 将FlowableSubscribeOn进行包装, 并以FlowableObserveOn对象返回

    1
    2
    3
    4
    5
    6
    7
    8
    public final Flowable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
    }
    public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

注意这里传入的butterSize()方法, 他的详细代码如下

1
2
3
4
5
6
7
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
public static int bufferSize() {
return BUFFER_SIZE;
}

FlowableObserveOn类里, prefetch表示的是初始的缓存池大小, 可以看出, 如果不指定buffer大小, 那么我们默认大小就是128

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
...
public FlowableObserveOn(
Flowable<T> source,
Scheduler scheduler,
boolean delayError,
int prefetch) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
// 预约的缓存池初始化大小
this.prefetch = prefetch;
}
...
}

  1. subscribe操作符, 以当前Demo为例, 我们这里生成了一个LambdaSubscriber, 而后调用到的是我们上个操作符流程返回的Flowable#subscribe
    1
    2
    3
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
    return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
    }
1
2
3
4
5
6
7
8
9
10
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
...

LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

...

subscribeActual(z);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

最终走到Flowable#subscribeActual(Subscriber<? super T> s)方法, 它是个抽象方法, 由调用方FlowableObserveOn对象实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
...
@Override
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();

if (s instanceof ConditionalSubscriber) {
source.subscribe(new ObserveOnConditionalSubscriber<T>(
(ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
} else {
source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
}
}
...
}

  1. 这里source对象是我们前面被包装的FlowableSubscribeOn对象, 是不是很熟悉?
    FlowableSubscribeOn类继承于Flowable, 而Flowable#subscribe(FlowableSubscriber<? super T> s)方法最终走到 Flowable#subscribeActual(Subscriber<? super T> s),上文已知这是个抽象方法, 最终执行到FlowableSubscribeOn#subscribeActual, 并将LambdaSubscriber包装成ObserveOnSubscriber作为参数传递进去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
    ...
    @Override
    public void subscribeActual(final Subscriber<? super T> s) {
    Scheduler.Worker w = scheduler.createWorker();
    final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
    // s 为 ObserveOnSubscriber 对象
    s.onSubscribe(sos);

    w.schedule(sos);
    }
    ...
    }
    • 我们先来看下s.onSubscribe(sos);执行代码
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
      implements FlowableSubscriber<T> {
      ...
      @Override
      public void onSubscribe(Subscription s) {
      if (SubscriptionHelper.validate(this.upstream, s)) {
      this.upstream = s;

      if (s instanceof QueueSubscription) {
      @SuppressWarnings("unchecked")
      QueueSubscription<T> f = (QueueSubscription<T>) s;

      int m = f.requestFusion(ANY | BOUNDARY);

      if (m == SYNC) {
      sourceMode = SYNC;
      queue = f;
      done = true;

      downstream.onSubscribe(this);
      return;
      } else
      if (m == ASYNC) {
      sourceMode = ASYNC;
      queue = f;

      downstream.onSubscribe(this);

      s.request(prefetch);

      return;
      }
      }
      // 初始化缓存队列
      // prefetch 默认大小为128
      queue = new SpscArrayQueue<T>(prefetch);
      // 下流订阅执行
      downstream.onSubscribe(this);
      // 上流的subscription request 执行
      s.request(prefetch);
      }
      }
      ...
      }

可以看到, 在这个时候, 就会执行到消费者的onSubscribe方法, 然后会执行到SubscribeOnSubscriber#request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
implements FlowableSubscriber<T>, Subscription, Runnable {
@Override
public void request(final long n) {
if (SubscriptionHelper.validate(n)) {
Subscription s = this.upstream.get();
if (s != null) {
requestUpstream(n, s);
} else {
BackpressureHelper.add(requested, n);
s = this.upstream.get();
if (s != null) {
long r = requested.getAndSet(0L);
if (r != 0L) {
requestUpstream(r, s);
}
}
}
}
}
}

SubscribeOnSubscriber实现了Subscription的接口, request方法主要用来处理内部缓存最大值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Subscription {
/**
* No events will be sent by a {@link Publisher} until demand is signaled via this method.
* <p>
* It can be called however often and whenever needed—but the outstanding cumulative demand must never exceed Long.MAX_VALUE.
* An outstanding cumulative demand of Long.MAX_VALUE may be treated by the {@link Publisher} as "effectively unbounded".
* <p>
* Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
* <p>
* A {@link Publisher} can send less than is requested if the stream ends but
* then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.
*
* @param n the strictly positive number of elements to requests to the upstream {@link Publisher}
*/
public void request(long n);

public void cancel();
}

  • 这里, 和原来的线程切换原理相同, 根据scheduler.createWorker根据工厂模式的帮助, 创建新的Worker对象, 它的内部的工作原理都是通过线程池来执行对应的Runnable, 而我们包装后所得的SubscribeOnSubscriber对象正是继承于Runnable, 我们可以看他内部实现的run
    方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
    implements FlowableSubscriber<T>, Subscription, Runnable {
    ...
    SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) {
    this.downstream = actual;
    this.worker = worker;
    this.source = source;
    this.upstream = new AtomicReference<Subscription>();
    this.requested = new AtomicLong();
    this.nonScheduledRequests = !requestOn;
    }

    @Override
    public void run() {
    lazySet(Thread.currentThread());
    Publisher<T> src = source;
    source = null;
    src.subscribe(this);
    }
    ...
    }

这里的source是在FlowableSubscribeOn初始化的时候, 通过构造函数传入的, 而我们知道, 他的内部是FlowableCreate

1
2
3
4
5
public FlowableSubscribeOn(Flowable<T> source, Scheduler scheduler, boolean nonScheduledRequests) {
super(source);
this.scheduler = scheduler;
this.nonScheduledRequests = nonScheduledRequests;
}

  • 根据前面的套路, 我们直接去看FlowableCreate下的subscribeActual实现方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Override
    public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;

    switch (backpressure) {
    case MISSING: {
    emitter = new MissingEmitter<T>(t);
    break;
    }
    case ERROR: {
    emitter = new ErrorAsyncEmitter<T>(t);
    break;
    }
    case DROP: {
    emitter = new DropAsyncEmitter<T>(t);
    break;
    }
    case LATEST: {
    emitter = new LatestAsyncEmitter<T>(t);
    break;
    }
    default: {
    emitter = new BufferAsyncEmitter<T>(t, bufferSize());
    break;
    }
    }
    // SubscribeOnSubscriber.onSubscribe
    t.onSubscribe(emitter);
    try {
    // 执行被观察者发送事件
    source.subscribe(emitter);
    } catch (Throwable ex) {
    Exceptions.throwIfFatal(ex);
    emitter.onError(ex);
    }
    }

可以看到, 根据我们选择的背压策略, 新建不同类型继承于BaseEmitter类对象.

  • 而这时候, 首先执行的是SubscribeOnSubscriber#onSubscribe
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Override
    public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.setOnce(this.upstream, s)) {
    // 初始128
    long r = requested.getAndSet(0L);
    if (r != 0L) {
    requestUpstream(r, s);
    }
    }
    }
    // 切换线程到正确的目标线程上
    // 最终执行 Subscription#request(n)
    void requestUpstream(final long n, final Subscription s) {
    if (nonScheduledRequests || Thread.currentThread() == get()) {
    s.request(n);
    } else {
    worker.schedule(new Request(s, n));
    }
    }

这里的Subscription是前面BaseEmitter对象, 将其内部的缓存容量大小设为默认的128大小.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}

void onRequested() {
// default is no-op
}
}

  • 到这里我们的订阅整个流程已经完成, 然后我们通过实现的被观察者的发送事件, 通过emitter来循环发送自增整型i

    • 我们来看下排除掉背压控制后的响应流程, 关于几个背压策略我们后文细讲, 这里我们需要知道MissingEmitter是不会处理背压, 所以我们从他的onNext看起
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      static final class MissingEmitter<T> extends BaseEmitter<T> {
      MissingEmitter(Subscriber<? super T> downstream) {
      super(downstream);
      }

      @Override
      public void onNext(T t) {
      if (isCancelled()) {
      return;
      }

      if (t != null) {
      downstream.onNext(t);
      } else {
      onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
      return;
      }
      ...
      }

      }

    直接执行下流的onNext方法, 而它的下流是SubscribeOnSubscriber

    1
    2
    3
    4
    5
    6
    7
    static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
    implements FlowableSubscriber<T>, Subscription, Runnable {
    @Override
    public void onNext(T t) {
    downstream.onNext(t);
    }
    }
  • SubscribeOnSubscriber的下流是ObserveOnSubscriber, 他的onNext方法在父类BaseObserveOnSubscriber
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    abstract static class BaseObserveOnSubscriber<T>
    extends BasicIntQueueSubscription<T>
    implements FlowableSubscriber<T>, Runnable {
    @Override
    public final void onNext(T t) {
    if (done) {
    return;
    }
    if (sourceMode == ASYNC) {
    trySchedule();
    return;
    }
    if (!queue.offer(t)) {
    upstream.cancel();

    error = new MissingBackpressureException("Queue is full?!");
    done = true;
    }
    trySchedule();
    }
    final void trySchedule() {
    if (getAndIncrement() != 0) {
    return;
    }
    worker.schedule(this);
    }

    @Override
    public final void run() {
    if (outputFused) {
    runBackfused();
    } else if (sourceMode == SYNC) {
    runSync();
    } else {
    runAsync();
    }
    }
    }

ObserveOnSubscriber内部维护的queueSpscArrayQueue, 当维护的队列满了以后, 直接返回false, 而不会再做扩容操作, 当检测到内部缓存队列已满的时候, 会设置done为true, 异常为MissingBackpressureException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public boolean offer(E e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// local load of field to avoid repeated loads after volatile reads
final int mask = this.mask;
final long index = producerIndex.get();
final int offset = calcElementOffset(index, mask);
if (index >= producerLookAhead) {
int step = lookAheadStep;
if (null == lvElement(calcElementOffset(index + step, mask))) { // LoadLoad
producerLookAhead = index + step;
} else if (null != lvElement(offset)) {
return false;
}
}
soElement(offset, e); // StoreStore
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
return true;
}

来看下他的异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Override
void runAsync() {
int missed = 1;

final Subscriber<? super T> a = downstream;
final SimpleQueue<T> q = queue;

long e = produced;

for (;;) {

long r = requested.get();

while (e != r) {
boolean d = done;
T v;

try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

cancelled = true;
upstream.cancel();
q.clear();

a.onError(ex);
worker.dispose();
return;
}

boolean empty = v == null;

if (checkTerminated(d, empty, a)) {
return;
}

if (empty) {
break;
}

a.onNext(v);

e++;
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
}
upstream.request(e);
e = 0L;
}
}

if (e == r && checkTerminated(done, q.isEmpty(), a)) {
return;
}

int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
if (cancelled) {
clear();
return true;
}
if (d) {
if (delayError) {
if (empty) {
cancelled = true;
Throwable e = error;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
Throwable e = error;
if (e != null) {
cancelled = true;
clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
cancelled = true;
a.onComplete();
worker.dispose();
return true;
}
}
}

return false;
}

执行流程基本与ObservableObserveOn相同, 这里不再赘述.
可以注意到, 当done为true并且error不为空的时候, 会调用到下流的onError, 最终是抛出MissingBackpressureException异常
大概的流程图可参考下图流程图

背压策略

Emitter的类图如下emitterUML

BaseEmitter

我们首先看下BaseEmitter, 它是一个抽象类, 由于继承AtomicLong, 所以本身可维护下流缓存阈值, 而他内部维护的downsteam表示的是下流的Subscriber, 在本文的demo中, 它就是SubscribeOnSubscriber对象, 他的内部还实现了request(long n)方法, 这个方法主要是设置内部的缓存允许阈值, 详细代码我们再上文有带到.

NoOverflowBaseAsyncEmitter

NoOverflowBaseAsyncEmitter继承于BaseEmitter, 并覆写了onNext, 这里的代码也比较容易理解, 在缓存大小允许范围内发送数据, 则直接发送, 并计数-1, 否则调用onOverflow抽象方法, 可以看出onOverflow就是出现背压情况时候的实际处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}

...

if (get() != 0) {
downstream.onNext(t);
// 阈值-1
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
// 超出阈值情况处理
abstract void onOverflow();
}

ErrorAsyncEmitter

ErrorAsyncEmitter继承于NoOverflowBaseAsyncEmitter, 当出现背压情况的时候, 则直接调用onError, 抛出MissingBackpressureException异常

1
2
3
4
5
6
7
8
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
...
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}

}

DropAsyncEmitter

DropAsyncEmitter继承于NoOverflowBaseAsyncEmitter, 背压策略不做任务处理, 则当超出缓存允许大小的时候, 对应的发送数据就不会再调用到下流的响应事件, 相当于drop

1
2
3
4
5
6
7
8
9
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

...
@Override
void onOverflow() {
// nothing to do
}

}

LatestAsyncEmitter

它的内部维护了一个AtomicReferencequeue, 可以理解为其自身的缓存池, 而这个池只有一个内存大小, 它在调用到onNext方法的时候, 将发送的数据存入queue, 并调用drain, 我们主要看下drain方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
void drain() {
// 省略保证线程安全相关代码
...
final Subscriber<? super T> a = downstream;
final AtomicReference<T> q = queue;

for (;;) {
long r = get();
long e = 0L;

while (e != r) {
if (isCancelled()) {
q.lazySet(null);
return;
}

boolean d = done;

T o = q.getAndSet(null);

boolean empty = o == null;

if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}

if (empty) {
break;
}

a.onNext(o);

e++;
}

if (e == r) {
if (isCancelled()) {
q.lazySet(null);
return;
}

boolean d = done;

boolean empty = q.get() == null;

if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
}

if (e != 0) {
BackpressureHelper.produced(this, e);
}

...
}
}

我们可以看到, 它在内部缓存阈值允许范围内, 会遍历读取queue的数据, 并将其发送给下流,当然这里queue的缓存大小也只有一个, 所以才会出现消费的最后一个数据会是上流发送的最后一个数据

BufferAsyncEmitter

BufferAsyncEmitter内部维护了一个128大小的SpscLinkedArrayQueue缓存队列, SpscLinkedArrayQueue我们在ObserveOnObserver中见过, 他在存入数据的时候, 当达到队列最大长度的时候, 会进行扩容处理, 而通过BufferAsyncEmitter每次发送数据的时候, 首先会存入到queue中, 然后调用drain, 这里的代码与LatestAsyncEmitter也很相似, 区别只在于缓冲区的区别, 而当缓存区一再扩容的时候, 有可能出现OOM的现象, 他的逻辑其实与ObserveOnObserver内的消费逻辑是一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
void drain() {
...
final Subscriber<? super T> a = downstream;
final SpscLinkedArrayQueue<T> q = queue;

for (;;) {
long r = get();
long e = 0L;

while (e != r) {
if (isCancelled()) {
q.clear();
return;
}

boolean d = done;

T o = q.poll();

boolean empty = o == null;

if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}

if (empty) {
break;
}

a.onNext(o);

e++;
}

if (e == r) {
if (isCancelled()) {
q.clear();
return;
}

boolean d = done;

boolean empty = q.isEmpty();

if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
}

if (e != 0) {
BackpressureHelper.produced(this, e);
}

...
}
}

背压总结

我们再回头看下, 几种背压策略的效果

策略 效果
MISSING 无任何背压策略执行, 除非调用onBackpressureXXX
ERROR 抛出异常
BUFFER 内部维护可扩容的缓存池, 效果与Observer一样, 可能导致OOM
DROP 如果下流无法跟上上流发射速度, 则会丢弃这块数据
LATEST 当下流无法跟上上流的发射速度的时候, 则只保存最近生产的数据