RxJava 基础扫盲

前几天浏览了一下稀土 App,发现有个挺不错的新闻 App 实战实例。正好最近想学习一下完整项目的源码(特别是后台代码,就是各种框架等等)。然后想起以前很多大牛都谈起过 RxJava、Retrofit、Gson 等等框架,所以现在决定来学习学习这些常用的框架


RxJava 到底是什么

一个词概括:异步。说到底它就是一个实现异步操作的库。那什么是异步操作,我之前在Android 消息传递机制这篇博文中讲过 Android 是单线程模型。进程启动时,也就是 App 启动后默认就只有一个线程运行,而该线程就是我们说的主线程,也叫 UI 线程。顾名思义,UI 线程就是用来更新界面显示,而如果运行当中具有耗时操作如网络请求,数据库读写,文件下载等这些耗时操作都需要在其他线程当中去完成,完成之后再更新在 UI 界面中显示出来。这就是异步加载,而 Android 中我们有现成的方式去完成这一操作,如 AsyncTask 、Handler。那为什么还要用 RxJava?

RxJava 好在哪里

还是用一个词概括:简洁。异步操作很关键的一点是要注意程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和 Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。

假设有这样一个需求:界面上有一个自定义的视图 ImageCollectorView ,它的作用是显示多张图片,并能使用addImage(Bitmap)方法来任意增加显示的图片。现在需要程序将一个给出的目录数组folders中每个目录下的.png图片都加载出来并显示在 ImageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。常用的实现方式有多种,我这里贴出其中一种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
new Thread(){
@Override
public void run(){
super.run();
for(File folder:folders){
File[] files = folder.listFiles();
for(File file:files){
if(file.getName().endsWith(".png")){
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable(){
@Override
public void run(){
mImageCollcetorView.addImage(bitmap);
}
});
}
}
}
}
}.start();

而如果使用 RxJava,就可以写成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.from(folders)
.flatMap(new Func1<File,Observable<File>>(){
@Override
public Observable<File> call(File file){
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File,Boolean>(){
@Override
public Boolean call(File file){
return file.getName().endsWith(".png");
}
})
.map(new Func1<File,Bitmap>(){
@Override
public Bitmap call(File file){
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>(){
@Override
public void call(Bitmap bitmap){
mImageCollectorView.addImage(bitmap);
}
});

那看到这里有人可能说了这代码敲得更多了,哪来的简洁。注意,我们这里说的简洁指的是逻辑上的简洁,不是单纯的代码量少。观察一下你会发现, RxJava 的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显。

RxJava 原理与使用简析

原理

RxJava 的异步实现是通过一种通用概念下的观察者模式来实现的。那么我们了解一下设计模式当中的观察者模式。

观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。 Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过setOnClickListener()方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。

RxJava 作为一个工具库,使用的就是通用形式的观察者模式。它有四个基本概念:Observable(被观察者)、Observer(观察者)、subscribe(订阅)、事件。Observable 与 Observer 通过 subscribe 订阅这一动作联系在一起,从而使 Observable 可以在需要的时候发出事件来通知Observer。RxJava 的事件回调方法除了普通事件 onNext() 之外,还定义了两个特殊的事件 onCompleted() 和 onError()。这些方法具体内容如下:

  • onCompleted()——事件队列完结。RxJava 不仅把每个事件单独处理,还会把他们看做一个队列。RxJava 规定如果队列处理完毕,需要回调此方法作为标志
  • onError()——事件队列异常。在事件处理过程中出现异常时,此方法会被回调,同时事件停止发出

在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是, onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

使用

1.创建 Observer

Observer 即观察者,它决定事件触发的时候将有怎样的行为。RxJava 中的 Observer 接口的实现方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observer<String> observer = new Observer<String>(){
@Override
public void onNext(String s){
Log.d(TAG,"Item:"+s);
}

@Override
public void onCompleted(){
Log.d(TAG,"Completed!");
}

@Override
public void onError(Throwable e){
Log.d(TAG,"Error!");
}
}

其实 RxJava 中还有一个实现了 Observer 接口的抽象类 Subscriber。这两个类基本使用方式一样,而且在subscribe()时 Observer 对象先被转换成 Subscriber 对象再使用。但他们的区别对于使用者来说主要有两点:

  • onStart()——这是 Subscriber 增加的方法。它会在subscribe()刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), 该方法就不适用了,因为它总是在subscribe()所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用doOnSubscribe()方法。
  • unsubscribe()——这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用isUnsubscribed()先判断一下状态。 unsubscribe()这个方法很重要,因为在subscribe()之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如onPause()、onStop()等方法中)调用unsubscribe()来解除引用关系,以避免内存泄露的发生。
2.创建 Observable

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。它有三种方法来创建一个 Observable,并为它定义事件触发规则

  • create()
1
2
3
4
5
6
7
8
9
Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber){
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});

create()方法的参数 OnSubscribe 相当于一个计划表,当 Observable 被订阅时 OnSubscribe 的call()方法会被自动调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者 Subscriber 将会被调用三次onNext()和一次onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

  • just()
1
2
3
4
5
6
Observable observable = Observable.just("Hello","Hi","Aloha");
//将会依次调用
//onNext("Hello");
//onNext("Hi");
//onNext("Aloha");
//onCompleted();
  • from(T[] t)

    1
    2
    3
    4
    5
    6
    7
    String[] words = {"Hello","Hi","Aloha"};
    Observable observable = Observable.from(words);
    //将会依次调用
    //onNext("Hello");
    //onNext("Hi");
    //onNext("Aloha");
    //onCompleted();
3.subscribe()订阅

创建了 Observable 和 Observer 之后,再用subscribe()方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

1
2
3
observable.subscribe(observer);
//或者
observable.subscribe(subscriber);

我们再来关注一下subscribe()方法的内部实现

1
2
3
4
5
6
public Subscription subscribe(Subscriber subscriber){
......
subscriber.onStart();
onSubscriber.call(subscriber); //事件发送逻辑开始运行
return subscriber;
}
4.线程控制

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

在 RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate()——直接在当前线程运行,相当于不指定线程。这是默认的情况。
  • Schedulers.newThread()——总是启用新线程,并在新线程执行操作。
  • Schedulers.io()——I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和newThread()差不多,区别在于该方法的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下该方法比newThread()更有效率。不要把计算工作放在该方法中,可以避免创建不必要的线程。
  • Schdeulers.computation()——计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()中,否则 I/O 操作的等待时间会浪费 CPU。
  • AndroidSchedulers.mainThread()——它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用subscribeOn()和observeOn()两个方法来对线程进行控制了。 subscribeOn()指定subscribe()所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 observeOn() 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。举个例子

1
2
3
4
5
6
7
8
9
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action<Integer>(){
@Override
public void call(Integer number){
Log.d(TAG,"number:"+number);
}
})

上面这段代码中,由于subscribeOn(Schedulers.io())的指定,被创建的事件的内容 1、2、3、4 将会在 IO 线程发出;而由于observeOn(AndroidScheculers.mainThread())的指定,因此 subscriber 数字的打印将发生在主线程 。事实上,这种使用方式非常常见,它适用于多数的『后台线程取数据,主线程显示』的程序策略。

总结

对于 RxJava,我们应该记住两个关键字:异步、简洁。而 RxJava 还有一些比较重要的关键点需要理解,但本篇仅仅用于向初学者普及 RxJava 的一些简单原理和使用方法,更多内容大家可以查看给 Android 开发者的 RxJava 详解

本文作者:刘志宇

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 许可协议。转载请注明出处!

Donate comment here