行则将至

人生在勤,不索何获

0%

RxJava 和 RxAndroid

原文地址 www.jianshu.com

RxJava 介绍

首先要说明的一点,RxAndroid 和 RxJava 是差不多的东西,只不过 RxAndroid 针对 Android 平台做了一点调整。

那么 RxJava 是什么?在其 github 上是这样讲的:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。这么讲可能还有点绕口,简单的讲实际上最重要的就是异步两字,RxJava 可以简单的实现异步操作,并且不管逻辑多么复杂,它始终能够保持简洁性。

通常在 Android 中,非 UI 线程是不能更新 UI 界面的,而一些耗时的操作我们又不能放在 UI 线程,否则会导致界面卡顿。这种情况下,我们就需要切换线程来实现,即 Handler 和 AsyncTask 来实现,但是这两种都有个缺陷,代码非常多,非常杂,可读性非常差。所以,RxJava 出现了,它能够两行代码就实现线程切换,非常的简单,使用起来就会让人感觉很爽,再也不用为异步操作写如此繁重的代码了。

RxJava 基本用法

RxJava 最核心的两个东西是 - Observables被观察者 ,事件源) - Observer/Subscriber观察者 ) - 还有将他们联系在一起的操作 subscribe订阅 ) 当被观察者发生变化时观察者能即使做出相应,就好像我们的按钮事件一样:

1
2
3
4
5
6
button.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {

}
});

在这里 button 就是被观察者,OnClickListener 就是观察者,setOnClickListener 这个方法就相当于订阅操作 ,当 button 被按下时,OnClickListener 监听到变化,调用 OnClick 做出反应,RxJava 实现的就是类似这样的一个过程。

注意这里的观察者有两种 Observer,Subscriber,这两个其实是差不多的,Subscriber 是对 Observer 的一种扩展,内部==增加了 OnStart 方法==,在事件未发送之前订阅,用于做一些准备工作,并且还有 ==unsubscribe() 用于取消订阅==。
让我们来看一下 ObServer 的内部实现:

1
2
3
4
5
6
7
8
9
public interface Observer<T> {

void onCompleted();

void onError(Throwable e);

void onNext(T t);

}

可以看到 ObServer 本身是一个接口,内部有 onNext(T t) 方法:观测到所检测的被观察者有变化时做出相应反应。onCompleted() 方法:RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。onCompleted():事件队列发生异常,要调用的方法。我们在定义一个观察者的时候,需要实现这些方法,来完成事件队列。

观察者有了,那么被观察者 Observables 怎么创建呢,RxJava 提供了一系列操作符供我们调用,其中就有很多创建型操作符,举个例子, 创建一个 Observables,发出 hello world 字符串给观察者:

1
2
3
4
5
6
7
8
9
Observable<String> myObservable = Observable.create(  
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
}
);

既然有了 Observables,那我们就可以根据这个 Observables 创建一个观察者了,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observer<String> TestObserver=new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.i(TAG,s);
}
}

这样我们就可以愉快的订阅了:

1
myObservable.subscribe(TestObserver);

这样,一个简单的 RxJava 订阅流程就完成了。这里可能很多人就有疑问了,关键的异步呢,体现在哪了?其实这个例子可能不是很明显,因为被观察者并不是一个耗时线程,不能很直观的体现异步。如果 myObservable 这是一个异步任务,比如网络请求,那么我们订阅之后,TestObserver 会一直监听 myObservable 是否有返回,如果有,那么就做出响应,本质是一样的。

RxJava 的操作符

RxJava 一个强大的地方在于它的异步,另外一个强大的地方就在于它提供了强大的操作符支持。这里说明一下几个常用的操作符:

ceate 操作符

1
2
3
4
5
6
7
8
9
Observable<String> myObservable = Observable.create(  
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
}
);

ceate 操作符创建一个被观察者,在 call 方法里持有一个观察者 Subscriber 参数,当这个 Observable 被订阅时,执行观察者相应的方法。

just 操作符

1
Observable<String> myObservable = Observable.just("Hello, world!");

ceate 操作符的代码可以用just操作符代替,just 操作符的功能就是将一个对象转化为 Observable。

from 操作符

1
Observable<String> myObservable = Observable.from("Hello"," world!");

既然有了将单一对象转化为 Observable 的操作符,那么必须要有将多个对象转化为 Observable 的操作符,那就是 from,from 接收一个对象数组,然后逐一发射给观察者。

flatMap 操作符

1
2
3
query("王").flatMap(list -> Observable.from(list)) 
.subscribe(student ->Log.i(TAG,student.getName());
);

上面的例子用 flatMap 操作符,就可以变得很简洁,flatMap 操作符的功能是接收一个接收一个 Observable 的输出作为输入,同时输出另外一个 Observable,通常是接收一个 list,然后逐一发送 list 的元素。比如这边的 Student 数组,变成了逐一发送 student 的 Observable。

Map 操作符

1
2
3
4
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.subscribe(grade->Log.i(TAG,grade+"");
);

现在我们只想输出每个学生的成绩,我们就需要 Map 操作符,它的功能是接收一种类型的 Observable,转化为另外一种 Observable,比如这边的 Student 类型转化为了 Int 型的 Observable。

filter 操作符

1
2
3
4
5
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.filter(grade->grade>80)
.subscribe(grade->Log.i(TAG,grade+"");
);

顾名思义 filter 操作符就是过滤用的,相当于加个判断条件,比如这边的就是加上分数大于 80 的条件.

take 操作符

1
2
3
4
5
6
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.filter(grade->grade>80)
.take(5)
.subscribe(grade->Log.i(TAG,grade+"");
);

take 操作符的功能是限定个数,比如这边的功能就是限定我最多需要 5 个成绩。

doOnNext 操作符

1
2
3
4
5
6
7
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.filter(grade->grade>80)
.take(5)
.doOnNext(grade->save(grade))
.subscribe(grade->Log.i(TAG,grade+"");
);

doOnNext() 允许我们在每次输出一个元素之前做一些额外的事情,比如这里的我们用来保存成绩。

subscribeOn/observeOn 操作符

1
2
3
4
5
6
7
8
9
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.filter(grade->grade>80)
.take(5)
.doOnNext(grade->save(grade))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(grade->Log.i(TAG,grade+"");
);

这两个操作符一般都是成对出现的,他们的功能就是切换线程。subscribeOn 是指定被观察者的线程,observeOn 是指定观察者的线程。 比如这个例子中前面的订阅的工作在 IO 线程做,后面的打印功能在主线程做。

小结

怎么样,看起来我好像做了很多事情,又有判断数据,又有保存数据,又有选取数据,关键还有线程切换,然而,我实际上就写了那么一点代码,看起来是不是酷!这就是 RxJava 的魅力所在。

RxAndroid

一开始说了,RxAndroid 其实跟 RxJava 是差不多的,但是总归还是有一点变化的。比如 Android 上会有生命周期的问题,可能会导致内存泄漏:Observable 持有 Context 导致的内存泄露。在这个问题上,我们的解决方法是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
private Subscription mTestSubscription= Subscriptions.empty();

public void test(){
mTestSubscription=myObservable.subscribe(TestObserver);
}
@Override
public void onDestroy() {
super.onDestroy();
if (mTestSubscription != null && !mTestSubscription.isUnsubscribed()) {
mTestSubscription.unsubscribe();
}

}

就是在订阅的时候,用一个 Subscription 来保存它,然后在退出这个 Activity 的时候取消订阅
另外还有一些专门为 Android 设计的 RxView, 比如以下防抖动的 View:

1
2
3
RxView.clicks(btn_click)
.throttleFirst(3, TimeUnit.SECONDS)
.subscribe();