Earth Guardian

You are not LATE!You are not EARLY!

0%

RxJava

介绍

ReactiveX

主要是响应式编程(Reactive Program),它集合了观察者模式,迭代器模式,函数式编程的优点,是一种面向集合数据流和变化传播的编程范式,是基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流
ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming
It is sometimes called “functional reactive programming” but this is a misnomer. ReactiveX may be functional, and it may be reactive, but “functional reactive programming” is a different animal. One main point of difference is that functional reactive programming operates on values that change continuously over time, while ReactiveX operates on discrete values that are emitted over time.

Java 8 新特性

Java 8 中引入了很多针对响应式的新特性,如 Lambda 表达式,函数式接口,MAPFilterStream 流式操作等等。其中 JDK8Stream 是一个受到函数式编程和多核时代影响而产生的东西。java.util.stream 包,实现了集合的流式操作,流式操作包括集合的过滤,排序,映射等功能。根据流的操作性,又可以分为串行流和并行流。根据操作返回的结果不同,流式操作又分为中间操作和最终操作。大大方便了我们对于集合的操作:

  • 最终操作:返回一特定类型的结果
  • 中间操作:返回流本身

流式操作最大的特点是以疏代堵,流处理方式可以处理无限的数据,只需要消耗时间。因此流式是对数据量开放的,适用于处理大数据

RxJava

RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库,是 JVM 的响应式扩展。最大的特点是异步,异步是通过一种扩展的观察者模式来实现的。使用了 Java 8 的新特性来实现 Reactive Extendsions
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
RxJava 2.x will target Reactive Streams APIs directly for Java 8+

仓库地址: https://github.com/ReactiveX/RxJava

依赖

app/build.gradle 中增加依赖:

1
2
compile 'io.reactivex.rxjava2:rxjava:2.*.*' 
compile 'io.reactivex.rxjava2:rxandroid:2.*.*'

* 换成具体的版本号
Sync Project 同步整个工程,自动下载 RxJava 相关依赖包

基本概念

Observable

涉及到三个概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)。Observable.subscribe(Observer) 实现订阅关系, Observable 提供数据流,Observer 响应事件并处理数据

  • 数据流
    Observable 表示一个数据流集合,数据是通过 Push 的方式发出,示例图:

0013-rxjava-Observable_legend.png

You can think of the Observable class as a “push” equivalent to Iterable, which is a “pull.”

  • 数据发射
    如果已经拿到 Observable 数据流,RxJava 会自动发射。如果新建 Observable 可以实现 ObservableOnSubscribe 通过代码控制手动发射

Observer

Observer 提供接收数据流的处理机制,包含如下几个回调:

  • void onSubscribe(@NonNull Disposable d);
    表示订阅成功后,能拿到 Disposable 实例,通过它可以解除订阅关系
  • void onNext(@NonNull T t);
    表示接收数据,所有发射过来的数据都在这接收并处理
  • void onError(@NonNull Throwable e);
    表示事件队列异常,在事件处理过程中出异常时,该方法会被触发,同时队列自动终止,不允许再有事件发出
  • void onComplete();
    表示事件队列完结,RxJava 规定:当不会再有新的事件发出时,需要触发该方法作为结束标志

注意:在一个正确运行的事件序列中,onCompletedonError 有且只有一个,也就是说二者必须唯一并且互斥,在响应的队列中只能调用一个

subscribe

理清了上面的概念后,将 ObservableObserver 订阅在一起,就实现了数据流的连通

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 第一步:新建 Observable 
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@io.reactivex.annotations.NonNull
ObservableEmitter<String> e) throws Exception {
// 1. 订阅成功后,Observable 开始发射数据
e.onNext("1111");
e.onNext("2222");
e.onNext("3333");
// 发射数据结束符
e.onComplete();
}
// 第二步:新建 Observer
// 第三步:订阅 subscrible
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {}

@Override
public void onNext(@io.reactivex.annotations.NonNull String s) {
// 2. Observer 开始接收数据并处理
Log.d("XMT", "s = " + s);
}

@Override
public void onError(@io.reactivex.annotations.NonNull Throwable e) {}

@Override
public void onComplete() {}
});

Scheduler 线程调度器

RxJava 的默认规则中,事件的发出和消费都是在同一个线程的,也就是在哪个线程调用 subscribe,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的,Scheduler 就是来完成异步的

  • subscribeOn
    指定的就是发射事件的线程,即生产事件的线程
    多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略

RxJava 中的所有的数据准备,发射的回调都是在这个线程中执行的

  • observeOn
    指定的就是订阅者接收事件的线程,即事件消费的线程
    但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次

小结:如果不指定 subscribeOnobserveOn,事件就是同步执行的,也就是发射一个数据后同步消费一个数据,这样并没有体现 RxJava 的特点,所以实际中需要调用这两个接口指定具体的事件产生和消费的线程。示例 Log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 数据发射和消费都在同一个线程 3633
14:55 3633-3633/D/: Observable, thread.name = main
// 发射一个数据
14:55 3633-3633/D/: Observable, emit data 111
// 消费一个数据
14:55 3633-3633/D/: Observer, thread.name = main, s = 111
14:55 3633-3633/D/: Observable, emit data 222
14:55 3633-3633/D/: Observer, thread.name = main, s = 222
14:55 3633-3633/D/: Observable, emit data 333
14:55 3633-3633/D/: Observer, thread.name = main, s = 333
// 数据发射完毕
14:55 3633-3633/D/: Observable, emit data Complete
// 消费事件回调
14:55 3633-3633/D/: Observer, onCompleted


// 数据发射和消费在不同线程,后台处理前台回调
// 后台处理线程 ID 5673,一次发射完所有数据
14:57 5536-5673/D/: Observable, thread.name = RxComputationThreadPool-2
14:57 5536-5673/D/: Observable, emit data 111
14:57 5536-5673/D/: Observable, emit data 222
14:57 5536-5673/D/: Observable, emit data 333
14:57 5536-5673/D/: Observable, emit data Complete
// 异步处理,主线程 ID 5536 逐个逐个事件来消费
14:57 5536-5536/D/: Observer, thread.name = main, s = 111
14:57 5536-5536/D/: Observer, thread.name = main, s = 222
14:57 5536-5536/D/: Observer, thread.name = main, s = 333
14:57 5536-5536/D/: Observer, onCompleted

系统自带的几个常用线程:

  • Schedulers.computation()
    专为计算所使用的线程,计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,用于计算任务,例如图形的计算,事件循环等。使用的是固定的线程池,大小为 CPU 核数,即默认线程数等于处理器的数量。不要把 I/O 操作放该线程中,否则 I/O 操作的等待时间会浪费 CPU
  • Schedulers.newThread()
    总是启用新线程,并在新线程执行操作
  • Schedulers.io()
    I/O 操作:读写文件、读写数据库、网络信息交互等所使用的线程, 行为模式和 newThread 差不多,区别在于 io 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下更有效率。不要把计算工作放在 io 中,可以避免创建不必要的线程
  • Schedulers.single()
    先开线程,同时后台线程都在同一个线程中
  • Schedulers.from(Executor)
    使用指定的 Executor 作为调度器
  • Schedulers.trampoline()
    在当前线程排队执行,如果队列中已经存在其他任务
  • AndroidSchedulers.mainThread()
    RxAndroid 提供的线程, 在 Android 主线程中运行,用于事件消费时更新 UI
    Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

小结:只有 trampoline() 是在当前线程中执行,其他都是新开线程,可以通过如下代码调试是在哪个线程中:Log.d(TAG, "thread.name = " + Thread.currentThread().getName())

如下示例适用于多数的 『后台线程取数据,主线程显示』的程序策略:

1
2
3
4
5
6
7
8
9
10
11
Observable.just(1, 2, 3, 4)
// 指定 subscribe() 发生在 computation 线程
.subscribeOn(Schedulers.computation())
// 指定 Subscriber 的回调发生在 Android 主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
...
}
});

Backpressure 背压

背压是指在异步场景中,生产者发送事件速度远快于消费者的处理速度带来的问题。如果生产者和消费者在一个线程的情况下,无论生产者的生产速度有多快,每生产一个事件都会通知消费者,等待消费者消费完毕,再生产下一个事件,所以在这种情况下,根本不存在背压问题。即同步情况下,Backpressure 问题不存在。异步场景中解决背压问题就是流速控制的一种策略,要求生产者降低事件发射速度的策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}

BUFFER 模式默认队列大小为 128

Flowable/Subscriber

RxJava 2.* 中,Observeable 用于订阅 Observer,是不支持背压的;而新引入的 Flowable 用于订阅 Subscriber,支持背压。差别图:

0013-rxjava-Flowable.png

Flowable/Subscriber 订阅示例,**Flowable 需要指定背压策略**:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@io.reactivex.annotations.NonNull
FlowableEmitter<Integer> e) throws Exception {
// 无限循环发射数据
for(int i = 0; ; i++){
e.onNext(i);
}
}
// 使用 BUFFER 策略
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// 每次只请求一部分数据处理解决背压问题
// 充分体现流式处理思想,生产端可以无限发射大数据
// 数据的消费只是时间问题,时间足够长就可以处理足够多的数据
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Integer integer) {
// 处理数据
Log.d("XMT", "i = " + integer);
}

@Override
public void onError(Throwable t) {}

@Override
public void onComplete() {}
});

在选择 ObservableFlowable 时,官方给出了一些建议,在数据量不是很大的时候(小于 1000)选择 Observable

其他模式

除了上面的 Observable/ObserverFlowable/Subscriber 模式外, RxJava 还提供如下几种模式:

  • Single/SingleObserver
    数据流只包含一个数据,只发射一个数据
  • Completable/CompletableObserver
    不发射数据(也就没有数据消费),只会回调 onComplet
  • Maybe/MaybeObserver
    上面两个的综合体,提供了以上两种功能供选择,但是只有一个功能能生效,也就是二选一。如果发射数据,发射一个数据;如果不发射数据,只能响应 onComplete

常见操作符

map

变换操作符,它的作用将类型装换,将上游类型转换为下游需要的类型

  • .map()
    是一对一的类型转换:输入一个事件变量 A,输出一个事件变量 B
  • flatMap
    是一对多的类型转换(也就是事件拆分):将一个发送事件的上游 Observable 变换为多个发送事件的 Observables。比如输入是一个事件 List<T>,转换后输出是list.countT 的事件流。flatMap 并不保证事件转换后的顺序,如果需要保证顺序则需要使用 concatMap

zip

组合操作符,将多个 Observable 发送的事件组合到一起,然后发送这些组合事件,它按照严格的顺序应用这个函数。但只发射与发射数据项最少的那个Observable 一样多的数据,也就是事件数量和最少的那个保持一致

0013-rxjava-rxjava_zip.png

filter

过滤操作符,把只有满足条件的事件才能发送给消费者

concat

连接操作符,将多个 Observable 发送的事件连接到一起,顺序发送所有事件。即将并行数据串行发出

0013-rxjava-rxjava_concat.png

zip 是取最小那一组组合发送, concat 是将所有数据顺序连接发送

示例及分析

总结

常见操作及转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 定义一个 T 的集合 
List<T> list = new ArrayList<T>();

// List<T> 转换为 Observable<List<T>>
// 这个集合只含有一个数据 List<T>
Observable<List<T>> listObservable = Observable.fromArray(lists);

// List<T> 转换为 Observable<T>
// 这个集合含有 list.count 个数据 T
Observable<T> observable = Observable.fromIterable<list>

// 获取指定位置的元素
// 这个集合只含有一个数据 T
Observable<T> tObservable = listObservable.flatMap(list -> Observable.just(list.get(0));

// blocking 同步执行并获取数据
// Observable<List<T>> (只包含一个元素的情况下) 转换为 List<T>
List<T> tList = listObservable.blockingFirst();

// Observable<T> (只包含一个元素的情况下) 转换为 T
T t = observable.blockingFirst();

// 获取 Observable<T> 的长度,即有多少个 T
Single<Long> count = observable.count();

// Observable<List<T>> 转换为 Observable<T>
// 通过flatmap转换
Observable<List<T>> funs = null;
funs.flatMap(new Function<List<T>, ObservableSource<T>>() {
@Override
public ObservableSource<T> apply(@NonNull List<T> funs) throws Exception {
return Observable.fromIterable(funs);
}
});

// Observable<T> 转换为 Observable<List<T>>
// 通过toList() 转换
Observable<T>.toList();

// *<List<T>> 转换为 Single<List<T>> 的方法
先转换成 *<T> 再通过 toList() 转换为 Single<List<T>>
Single.concat(remoteFuns, cachedFuns)
// concat 后为 Floweable<List<Fun>>
.flatMap(new Function<List<Fun>, Publisher<Fun>>() {
@Override
public Publisher<Fun> apply(@NonNull List<Fun> funs) throws Exception {
return Flowable.fromIterable(funs);
}
})
// 转换后为 Flowable<Fun>
.distinct()
// 再转换为 Single<List<Fun>>
.toList();

问题

事件分解和组合

RxJava 有没有提供将一个事件分解成多个事件?有将多个事件过滤,减少,或者将多个事件组合等
思路:通过 flatmap 分解成数据流,使用 filter 过滤该事件流,distinct 去重,toList 重新组合

参考文档

  1. https://github.com/ReactiveX/RxJava/wiki
  2. http://reactivex.io
  3. http://reactivex.io/documentation/observable.html
  4. https://yq.aliyun.com/articles/63660
  5. http://gank.io/post/560e15be2dca930e00da1083
  6. http://www.jianshu.com/p/464fa025229e
  7. https://github.com/lzyzsd/Awesome-RxJava
  8. http://blog.csdn.net/maplejaw_/article/details/52381395
  9. https://zhuanlan.zhihu.com/p/24482660?refer=dreawer
  10. RxJava合集