介绍
ReactiveX
主要是响应式编程(Reactive Program
),它集合了观察者模式,迭代器模式,函数式编程的优点,是一种面向集合数据流和变化传播的编程范式,是基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming
It is sometimes called “functional reactive programming” but this is a misnomer. ReactiveX may be functional, and it may be reactive, but “functional reactive programming” is a different animal. One main point of difference is that functional reactive programming operates on values that change continuously over time, while ReactiveX operates on discrete values that are emitted over time.
Java 8
新特性
Java 8
中引入了很多针对响应式的新特性,如 Lambda
表达式,函数式接口,MAP
,Filter
,Stream
流式操作等等。其中 JDK8
的 Stream
是一个受到函数式编程和多核时代影响而产生的东西。java.util.stream
包,实现了集合的流式操作,流式操作包括集合的过滤,排序,映射等功能。根据流的操作性,又可以分为串行流和并行流。根据操作返回的结果不同,流式操作又分为中间操作和最终操作。大大方便了我们对于集合的操作:
- 最终操作:返回一特定类型的结果
- 中间操作:返回流本身
流式操作最大的特点是以疏代堵,流处理方式可以处理无限的数据,只需要消耗时间。因此流式是对数据量开放的,适用于处理大数据
RxJava
RxJava
是一个在 Java VM
上使用可观测的序列来组成异步的、基于事件的程序的库,是 JVM
的响应式扩展。最大的特点是异步,异步是通过一种扩展的观察者模式来实现的。使用了 Java 8
的新特性来实现 Reactive Extendsions
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
RxJava 2.x will target Reactive Streams APIs directly for Java 8+
仓库地址: https://github.com/ReactiveX/RxJava
依赖
app/build.gradle
中增加依赖:
1 | compile 'io.reactivex.rxjava2:rxjava:2.*.*' |
*
换成具体的版本号Sync Project
同步整个工程,自动下载 RxJava
相关依赖包
基本概念
Observable
涉及到三个概念:Observable
(可观察者,即被观察者)、 Observer
(观察者)、 subscribe
(订阅)。Observable.subscribe(Observer)
实现订阅关系, Observable
提供数据流,Observer
响应事件并处理数据
- 数据流
Observable
表示一个数据流集合,数据是通过Push
的方式发出,示例图:
You can think of the Observable class as a “push” equivalent to Iterable, which is a “pull.”
- 数据发射
如果已经拿到Observable
数据流,RxJava
会自动发射。如果新建Observable
可以实现ObservableOnSubscribe
通过代码控制手动发射
Observer
Observer
提供接收数据流的处理机制,包含如下几个回调:
void onSubscribe(@NonNull Disposable d);
表示订阅成功后,能拿到Disposable
实例,通过它可以解除订阅关系-
void onNext(@NonNull T t);
表示接收数据,所有发射过来的数据都在这接收并处理 void onError(@NonNull Throwable e);
表示事件队列异常,在事件处理过程中出异常时,该方法会被触发,同时队列自动终止,不允许再有事件发出void onComplete();
表示事件队列完结,RxJava
规定:当不会再有新的事件发出时,需要触发该方法作为结束标志
注意:在一个正确运行的事件序列中,onCompleted
和 onError
有且只有一个,也就是说二者必须唯一并且互斥,在响应的队列中只能调用一个
subscribe
理清了上面的概念后,将 Observable
和 Observer
订阅在一起,就实现了数据流的连通
1 | // 第一步:新建 Observable |
Scheduler
线程调度器
在 RxJava
的默认规则中,事件的发出和消费都是在同一个线程的,也就是在哪个线程调用 subscribe
,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava
是至关重要的,Scheduler
就是来完成异步的
subscribeOn
指定的就是发射事件的线程,即生产事件的线程
多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用subscribeOn()
只有第一次的有效,其余的会被忽略
RxJava
中的所有的数据准备,发射的回调都是在这个线程中执行的
observeOn
指定的就是订阅者接收事件的线程,即事件消费的线程
但多次指定订阅者接收线程是可以的,也就是说每调用一次observerOn()
,下游的线程就会切换一次
小结:如果不指定 subscribeOn
和 observeOn
,事件就是同步执行的,也就是发射一个数据后同步消费一个数据,这样并没有体现 RxJava
的特点,所以实际中需要调用这两个接口指定具体的事件产生和消费的线程。示例 Log
:
1 | // 数据发射和消费都在同一个线程 3633 |
系统自带的几个常用线程:
Schedulers.computation()
专为计算所使用的线程,计算指的是CPU
密集型计算,即不会被I/O
等操作限制性能的操作,用于计算任务,例如图形的计算,事件循环等。使用的是固定的线程池,大小为CPU
核数,即默认线程数等于处理器的数量。不要把I/O
操作放该线程中,否则I/O
操作的等待时间会浪费CPU
Schedulers.newThread()
总是启用新线程,并在新线程执行操作Schedulers.io()
I/O
操作:读写文件、读写数据库、网络信息交互等所使用的线程, 行为模式和newThread
差不多,区别在于io
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下更有效率。不要把计算工作放在io
中,可以避免创建不必要的线程Schedulers.single()
先开线程,同时后台线程都在同一个线程中Schedulers.from(Executor)
使用指定的Executor
作为调度器Schedulers.trampoline()
在当前线程排队执行,如果队列中已经存在其他任务AndroidSchedulers.mainThread()
RxAndroid
提供的线程, 在Android
主线程中运行,用于事件消费时更新UI
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
小结:只有 trampoline()
是在当前线程中执行,其他都是新开线程,可以通过如下代码调试是在哪个线程中:Log.d(TAG, "thread.name = " + Thread.currentThread().getName())
如下示例适用于多数的 『后台线程取数据,主线程显示』的程序策略:
1 | Observable.just(1, 2, 3, 4) |
Backpressure
背压
背压是指在异步场景中,生产者发送事件速度远快于消费者的处理速度带来的问题。如果生产者和消费者在一个线程的情况下,无论生产者的生产速度有多快,每生产一个事件都会通知消费者,等待消费者消费完毕,再生产下一个事件,所以在这种情况下,根本不存在背压问题。即同步情况下,Backpressure
问题不存在。异步场景中解决背压问题就是流速控制的一种策略,要求生产者降低事件发射速度的策略
1 | public enum BackpressureStrategy { |
BUFFER
模式默认队列大小为 128
Flowable/Subscriber
RxJava 2.*
中,Observeable
用于订阅 Observer
,是不支持背压的;而新引入的 Flowable
用于订阅 Subscriber
,支持背压。差别图:
Flowable/Subscriber
订阅示例,**Flowable
需要指定背压策略**:
1 | Flowable.create(new FlowableOnSubscribe<Integer>() { |
在选择 Observable
和 Flowable
时,官方给出了一些建议,在数据量不是很大的时候(小于 1000
)选择 Observable
其他模式
除了上面的 Observable/Observer
和 Flowable/Subscriber
模式外, RxJava
还提供如下几种模式:
- Single/SingleObserver
数据流只包含一个数据,只发射一个数据 - Completable/CompletableObserver
不发射数据(也就没有数据消费),只会回调onComplet
- Maybe/MaybeObserver
上面两个的综合体,提供了以上两种功能供选择,但是只有一个功能能生效,也就是二选一。如果发射数据,发射一个数据;如果不发射数据,只能响应onComplete
常见操作符
map
变换操作符,它的作用将类型装换,将上游类型转换为下游需要的类型
.map()
是一对一的类型转换:输入一个事件变量A
,输出一个事件变量B
flatMap
是一对多的类型转换(也就是事件拆分):将一个发送事件的上游Observable
变换为多个发送事件的Observables
。比如输入是一个事件List<T>
,转换后输出是list.count
个T
的事件流。flatMap
并不保证事件转换后的顺序,如果需要保证顺序则需要使用concatMap
zip
组合操作符,将多个 Observable
发送的事件组合到一起,然后发送这些组合事件,它按照严格的顺序应用这个函数。但只发射与发射数据项最少的那个Observable
一样多的数据,也就是事件数量和最少的那个保持一致
filter
过滤操作符,把只有满足条件的事件才能发送给消费者
concat
连接操作符,将多个 Observable
发送的事件连接到一起,顺序发送所有事件。即将并行数据串行发出
zip
是取最小那一组组合发送, concat
是将所有数据顺序连接发送
示例及分析
总结
常见操作及转换
1 | // 定义一个 T 的集合 |
问题
事件分解和组合
RxJava
有没有提供将一个事件分解成多个事件?有将多个事件过滤,减少,或者将多个事件组合等
思路:通过 flatmap
分解成数据流,使用 filter
过滤该事件流,distinct
去重,toList
重新组合
参考文档
- https://github.com/ReactiveX/RxJava/wiki
- http://reactivex.io
- http://reactivex.io/documentation/observable.html
- https://yq.aliyun.com/articles/63660
- http://gank.io/post/560e15be2dca930e00da1083
- http://www.jianshu.com/p/464fa025229e
- https://github.com/lzyzsd/Awesome-RxJava
- http://blog.csdn.net/maplejaw_/article/details/52381395
- https://zhuanlan.zhihu.com/p/24482660?refer=dreawer
- RxJava合集