在许多软件编程任务中,你或多或少期待你的指令将会按照你已经写好的顺序,依次增量执行和完成。但在ReactiveX,很多指令可以通过“观察者”并行执行,其结果将以任意顺序被捕获。你定义了一种“可观察的形式“的检索和转换数据机制而不是调用方法,然后订阅观察者给它,每当之前定义好的机制已经准备好了,这些机制就会触发常设的哨兵去捕获并反馈结果。
这种方法的优点是,当你有一大堆的任务是不相互依赖,你就可以同时执行他们,而不是等待每一个来启动下一个前完成,这样你的整个任务包只需要花最长的任务时间。
有很多属于来描述异步编程和设计模型。本文将使用下列术语:一个观察者(observer)订阅可观察到的(Observable)。可观察到的(Observable)通过调用观察者的方法来发射项目或通知给它的所有观察者(observer)。
观察者有些时候也被称作是订阅者,观看者,响应者。因此这样的模式通常就叫做响应模式。
在很多存在UI操作的地方,UI上的操作不应该等待耗时执行程序的完成而阻塞。在一般编程模式下,都会采用异步线程+回调的方式完成这样的交互操作。不过当回调层次越来越多的时候,那代码可维护性将变得很麻烦。因此ReactiveX最出色的地方就是将多个操作过程按照自定义顺序组合完成最终结果,在每次一的操作中只需要关心业务逻辑本身的执行即可。
这些话描述起来比较生硬,一些简单的使用介绍可以见如下站点:
本文主要针对实现过程做梳理和剖析,因此基础部分不在做过多阐述。
OK,接下来讲讲ReactiveX中几个比较重要的概念
在ReactiveX中,一个观察者observer订阅到一个可观察的对象Observable。无论是某一个还是多个Observable执行,这些观察者都会做出响应。这样的模式有利于并发操作,因为这样不需要去等待Observable去广播,但是它创建了一个观察者形式的哨兵,此哨兵在今后的任何时间里随时准备做适当的响应,Observable也会做出这样的响应。
下面这这张图很好的说明了什么是Observables和observers,以及他们之间的转换关系
关于Observers的创建
下面是采用了伪代码来展示Observers的实现过程:
- 同步方式:
- 调用一个方法
- 用一个变量存储方法返回值
- 使用这个变量作为一个新的值做其他事情
例如:1
2
3// 写一个回调方法,并且指定到 `returnVal`
returnVal = someMethod(itsParameters);
// 使用returnVal做新的事情
- 异步方式:
- 定义一个方法,此方法是做一些事情并带有来之于异步调用的返回值;这个方法也是observer的一部分
- 定义异步调用自身作为一个Observable
- 通过订阅的方式连接observer到Observable(这个过程也是初始化Observable的actions)
- 执行你的业务;每当调用返回,observer的方法将会操作它自身返回值,这里的返回值是通过Observable广播
例如:1
2
3
4
5
6
7
8// 定义但是不执行, 订阅者的onNext方法
// (这个例子中observer只有一个onNext方法)
def myOnNext = { it -> do something useful with it };
// 定义但是不执行的Observable
def myObservable = someObservable(itsParameters);
// 发起订阅并执行
myObservable.subscribe(myOnNext);
// 继续执行相应的业务逻辑
onNext, onCompleted, and onError
订阅方法就是展示了observer如何连接到Observable。oberver实现了下列方法的一些子集:
- onNext
每当Observable广播数据时将会调用该方法。这个方法将会被作为Observable的一个广播项目参数被发送 - onError
Observable调用此方法表示它内部已经发生异常数据或者发生一些其他错误。这样停止观察,并且也不会做将来的调用onNext或者onCompleted。该onError方法作为它的参数来指示了错误的原因。 - onCompleted
Observable在已经调用了onNext方法作为最后的时间,如果没有遇到任何错误,那么该方法将会被调用
通过Observable的定义,它可能调用onNext零次或者很多次,并且接下来的调用可能是onCompleted或者onError方法,但是不是同时调用,这都是最终才会被调用。在调用过程中,onNext通常称作任务的执行,而onCompleted或者onError被称作任务的结果通知
下面是一个subscribe调用例子:1
2
3
4
5
6def myOnNext = { item -> /* 任务执行 */ };
def myError = { throwable -> /* 失败时的响应 */ };
def myComplete = { /* 成功后的响应 */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// 继续执行相应的业务逻辑
更多相关信息也可以参考
Unsubscribing取消订阅
在ReactiveX实现中,有一个特殊的observer接口是Subscriber,这个接口中有一个unsubscribe方法。当你调用此方法,表示订阅者不在对当前任何被订阅的Observables。如果没有其他observer,那么当前的Observables就会选择停止对新数据的广播。
退订结果将会通过应用于哪些之前观察者订阅了的Observable的操作连来联级返回。这个操作将会导致整个连接链上的每一个环节都停止发送动作。这个过程虽然不能保证立即发生,但是,在没有观察者仍然观察这些回调数据的时候,Observable是有可能试图去发送或者广播数据的。
Observables的“冷”与“热”
Observable具体在什么时候发送他的数据队列?这依赖于Observable。一个“热”的Observable可能随着它的创建就会立即发送回调数据,哪些之后订阅到Observable的任何observer也可以立即发起对观察队列的监听。另一方面,一个“冷”Observable就会等待,直到一个观察者observer订阅它之前开始发送动作,所以这样就能保证观察者从一开始就能看到整个序列。
Composition
Observables和observers只是ReactiveX的一个开始。通过对标准的观察者模式的稍微扩展,更好的去处理了事件序列,而不是单个回调。
真正的核心就是“无扩展”,操作符允许你去转换,合并,操作以及同发送序列被Observables一起发送。也就是说操作符和操作结果都是可发送,可传递的。
ReactiveX的操作符允许你以声明的方式一起构成异步序列,同时还保持着回调函数的高效率, 但没有嵌套的回调处理程序通常与异步系统相关的问题。
这里罗列一下Observable中定义的一些主要功能点:
- Observable的创建
Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, and Timer - Observable发送项目的转换
Buffer, FlatMap, GroupBy, Map, Scan, and Window - Observable过滤
Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, and TakeLast - Observable合并
And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, and Zip - 错误处理操作符
Catch and Retry - 实用工具操作符
Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, and Using - 条件和布尔运算符
All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, and TakeWhile - 数学和聚集操作符
Average, Concat, Count, Max, Min, Reduce, and Sum - 转换操作符
To - 可连接到Observable的操作符
Connect, Publish, RefCount, and Replay
下面来详细说一下ReactiveX的操作符知识
ReactiveX有一些列的操作集合,但是在不同的语言上表现都是大相径庭的。在一些特殊的语言可能还会有特定的定义操作符。
链式操作符
大多数操作符操作一个Observable并且返回一个Observable。这样允许开发人员以一个链式的方式一个接一个的执行操作符。在可修改的链式中每一个操作结果Observable都是来之于上一个操作,这里的操作也就是定义的operator。
这里有一些类似于构造器Builder模式,该模式描述了一个含有一系方法的特定类通过操作方法来操作具有相同功能的类的每一项。这个模式也允许你以类似的方式去链式操作方法。在Builder模式中,操作方法出现的顺序在链式中可能不是那么重要,但是在Observable中的操作符顺序就很重要。
Observable操作符链不会依赖于原来的Observable去操作原始的链,但他们会反过来操作,每一个在Observable上的正在操作的operator都是上一个操作立即产生的。
我们也可以自己选择自定义操作符,具体如何实现可以参考Implementing Your Own Operators。
上面在Observable中罗列过简单的功能点,下面罗列一下按照类别划分的操作符以及各自的功能:
创建Observables
创建新的Observables的操作符
- Create——通过调用observer方法编程从头创建一个Observable
- Defer——不立即创建Observable,直到observer触发订阅动作。此方法为每一个observer创建一个新的Observable
- Empty/Never/Throw——为非常精确和有限的行为创建Observables
- From——将其他对象或数据结构转换成一个Observable
- Interval——创建一个具有发出一个整数序列间隔为一个特定的时间间隔的Observable
- Just——把一个对象或一组对象转换成一个Observable,同时该Observable发送这样的对象
- Range——创建一个Observable,发送一系列连续的整数
- Repeat——创建一个Observable,发送一个特定的项目或项目重复序列
- Start——创建一个Observable,发送一个函数的返回值
- Timer——创建一个Observable,在一个给定的一段时间延迟后发送一个对象或者项目
转换Observables
转换被一个Observable发送的项目的操作符
- Buffer——定期收集从Observable中发出的数据到集合中,并且发送这些集合而不是发送一次
- FlatMap——将一个Observable发送的数据或者项目转换到Observables中,然后把这些数据压缩成一个单个的Observable
- GroupBy——拆分一个Observable成多个Observable组,并且每个组发送的数据会租床成一个不同的发送数据组,当然这些发送数据时来至于原始的Observable。这些分组都是通过划分key来实现
- Map——转换一个Observable发送的每个数据或者项目映射到一个函数上
- Scan——应用一个函数给一个Observable发送出来的每一想数据,并且是按照顺序发送每个连续值
- Window——定期细分条目从一个Observable到Observable的windows,并且发送结果是这些windows而不是一次发送原始的数据或者项目
过滤Observables
过滤被Observable发送的项目的操作符
- Debounce——如果Observable在一个特定时间间隔过去后没有发送其他数据或者项目,那么它只发送一个数据或者项目
- Distinct——该Observable不可以发送重复的数据
- ElementAt——只发送被Observable发送的某一个元素
- Filter——一个Observable只发送通过来特定测试描述语的匹配项
- First——只发出第一项,或第一项符合条件的项
- IgnoreElements——不发送任何数据,但是必须反馈它的中断通知
- Last——只发送最后一项
- Sample——发出Observables周期时间间隔内最新的项
- Skip——跳过发送前几项
- SkipLast——跳过发送后几项
- Take——仅仅发送前几项
- TakeLast——仅仅发送后几项
合并Observables
将多个Observables合并成单个的Observable的操作符
- And/Then/When——通过Pattern和Plan媒介将两个或者多个Observables发送的数据或项目合并成集合
- CombineLatest——当某一项数据由两个Observables发送时,通过一个特殊的函数来合并每一个Observable发送的项,并且最终发送数据是该函数的结果
- Join——合并两个Observables发送的结果数据。其中两个Observable的结果遵循如下规则:每当一个Observable在定义的数据窗口中发送一个数据都是依据另外一个Observable发送的数据。
- Merge——通过合并多个Observables发送的结果数据将多个Observables合并成一个
- StartWith——在Observable源开始发送数据项目之前发送一个指定的项目序列
- Switch——转换一个Observable,并且发送Observables到一个单个Observable,这个单个的Observable发送的项目就是转换之前的Observables最近发送的项目
- Zip——通过特定的函数合并多个Observable的结果,并且对于每个组合都发出单独的项目数据,这些数据就是之前定义的合并函数
错误处理操作符
错误处理操作符主要用于帮助来之于一个Observable里的错误通知的恢复功能
- Catch——从OnError方法通知中恢复持续的没有错误的序列
- Retry——如果一个源Observable发送一个onError通知,重新订阅给它,希望它将没有错误的执行完成
实用工具操作符
一个实用的操作符工具箱
- Delay——按照一个特定量及时的将Observable发送的结果数据向前推移
- Do——注册一个事件去监听Observable生命周期
- Materialize/Dematerialize——代表发送出来的项目数据或者通知,或相反过程
- ObserveOn——指定一个observer将会观察这个Observable的调度
- Serialize——强制一个Observable去做序列化调用
- Subscribe——操作可观测的排放和通知
- SubscribeOn——指定一个Observable在被订阅的时候应该使用的调度
- TimeInterval——转换一个Observable的发送项目到另一个项目,在这些发送项之间,此项目具有指示这些发送的时间开销功能
- Timeout——镜像源Observable,但如果某段时间过后没有任何通知发出将会发出一个错误通知
- Timestamp——给一个Observable发送的每一个项目附加一个时间戳
- Using——创建一个一次性的资源,这个资源就像Observable一样有相同的寿命
条件和布尔运算操作符
评估一个或者多个Observables或者被Observables发送的项目的操作符
- All——确定发出的所有项目满足某些标准
- Amb——给定两个或两个以上的Observable来源,从只有第一个可见发出一个项目发送所有的项目数据
- Contains——决定是否Observable发出一个特定的项
- DefaultIfEmpty——发送项从Observable源,或者如果Observable源没有任何发送内容,那么将会发送一个默认的项
- SequenceEqual——确定两个Observables发出相同的序列条目
- SkipUntil——丢弃Observable发出的项,直到第二个Observable发出一项
- SkipWhile——丢弃Observable发出的项,直到指定的条件变成了false
- TakeUntil——在第二个Observable发送一项或者终止之后,丢弃Observable发出的项
- TakeWhile——在指定的条件变成了false之后,丢弃Observable发出的项
数学和聚集操作符
- 操作一个被Observable发送出来的一整个项目序列操作符
- Average——计算一个Observable发送所有结果的平均值,并且发送这个值
- Concat——发送两个或两个以上Observables没有交叉的值
- Count——计算Observable源发出的项目数据数量,只发出这个值
- Max——确定,发送最大值项
- Min——确定,发送最小值项
- Reduce——应用一个函数给一个Observable发送的项,并且发送该函数的结果
- Sum——计算Observable发送的所有数据的求和,并且发送这个求和结果
转换操作符
- To——将一个Observable转换到另一个对象或数据结构
可连接到Observable的操作符
指定Observables有更多精确控制订阅动态的操作符
- Connect——定义一个可连接的Observable发送项目数据给它的订阅者
- Publish——把一个普通的Observable转化为一个可连接的Observable(向下转换)
- RefCount——把一个可连接的Observable转化成一个看起来就行一个普通的Observable(向上转换)
- Replay——确保所有的Observables能看到所有发送的相同的项目数据序列,及时是在Observable已经开始发送后才订阅的
由于Single是Observable的一个衍生变体,因此这里就不再做介绍。有兴趣的同学可以查看ReactiveX–Single文档
一个Subject是一种桥梁或者也可以叫做代理,一个Subject在ReactiveX的实现中既是一个observer也是一个Observable。因为它本身是一个observer,它能订阅到一个或者多个Observables中,同时它也是一个Observable,他通过重新发送项目数据,能遍历它所有的observers,同时,它也能发送新的项目数据。
因为一个Subject订阅到一个Observable时,这将会触发Observable开始发送他的项目数据(当然这里的操作必须是定义Observable为“冷的”)。
这里还有一些其他介绍可以参考:
- To Use or Not to Use Subject from Dave Sexton’s blog
- Introduction to Rx: Subject
- 101 Rx Samples: ISubject
and ISubject - Advanced RxJava: Subject by Dávid Karnok
- Using Subjects by Dennis Stoyanov
各种不同的Subject类型
这里有四种不同类型的Subject来满足于特定的使用场景。 注意:下面的示例图中每一条带有向右箭头横线都是一个单向过程,蓝色的subscribe()方法表示每一次订阅触发执行函数。每一次的订阅触发即图中蓝色箭头。
AsyncSubject
只有在源Observable完成之后,一个AsyncSubject将会发送由源Observable发送的最后一个值。(如果源Observable并没有发送任何值,那么AsyncSubject在完成的时候也不会发送任何值)
AsyncSubject将会发送相同的最终值给接下来的observers。但是,如果源Observable因为错误而中断,AsyncSubject并不会发送任何值,但是会传递来之于源Observable的错误通知。
通俗的来讲,异步的Subject在每次触发subscribe()方法发送项目的时候,只有在源Observable结束后才会发送源发送的结果。
更详细介绍参考
BehaviorSubject
当一个observer订阅到一个BehaviorSubject上时,通过发送当源Observable发送的最近项目数据,这个observer将会被触发执行。并且它会继续发送源Observable后续发送的项。
但是,如果源Observable发生错误而中断,BehaviorSubject将不会发送任何数据给随后的observers。不过,来之于源Observable的错误通知任然会传递。
更详细介绍参考
PublishSubject
PublishSubject的主要职责就是将源Observable发送的所有数据发送给随后一个已经订阅了的observer。
值得注意的就是,一个PublishSubject可能在创建的时候立即发送项目数据,不过在Subject的创建和observer订阅到这个Subject的这段时间中,一个或者多个发送项目数据可能存在丢失的风险。如果你要确保传送所有的源Observable发送项,你可以使用Observable的Create方式来构建,以便你能手动重新构建“冷的”Observable行为。或者你也可以使用ReplaySubject。
如果源Observable发生错误而中断,PublishSubject将不会发送任何项给接下来的observer。不过,来之于源Observable的错误通知任然会传递。
ReplaySubject
ReplaySubject发送过去源Observable发送的所有项目数据给任意的observer,不管observer在什么时候订阅。
一旦replay缓冲项逐渐增长超过了一个固定值后,ReplaySubject将会丢弃旧的项。或者给已经发送的数据指定一个有效时间,在失效过后就会扔掉。
如果你使用ReplaySubject作为一个observer,必须确保不会再多线程中调用onNext方法,因为这可能导致乱序调用,这是违反了Observable定义规则的。并且会创建一个有歧义的Subject去replay。
更详细介绍参考
如果你想在多线程中使用Observable的联级操作链,你可以在特殊的Schedulers上去制定这些操作链去操作。
在ReactiveX中Observable操作符将Scheduler作为一个变量,这些操作在一个特定的Scheduler上做一些操作或者所有的工作。
默认情况下,你应用操作链到Observable上做一些事情的时候,这将会通知它的observers,在同一个线程中它的Subscribe方法将会被调起。根据Observable应有的操作,定义一个不同的Scheduler,SubscribeOn操作就会改变这些行为。ObserveOn操作指定一个不同的Scheduler,这个Scheduler主要用于Observable去发送通知给它自身的observers上。
如下图所示,SubscribeOn操作指派哪一个Observable线程将会开始操作,操作链中的所有操作都可以被调起。另一方面,在操作出现的地方,ObserveOn会影响下面的Observable将使用的线程。对于这样的原因,在Observable操作链中,你可以在多个点多次调用ObserveOn方法,这样来确保多线程上的这些操作的执行。
结束语
由于这篇文章大部分来至ReactiveX原始文档,加上一些的个人理解形成。有一些翻译或者个人理解会有一定的偏差,再后续会继续修正。如果阅读到这篇文章的同学发现有不妥当的地方,还请回复指出,谢谢。
另外,后面有时间打算针对Rxjava做一下源码上的分析。