阅读 rxjava 源代码
介绍
事件驱动和异步调用是两种慢慢被大家接收的编程范式了。rxjava 库利用观察者模式,把事件驱动和异步调用程序组合在一起。
基于异步调用和事件驱动的程序,经常陷入回调陷阱,导致程序的可读性下降,写出来的程序像意大利面条(callback hell, callback spaghetti)。参考 http://callbackhell.com。
本文从源代码级别上,介绍了 rxjava 中最重要的几个接口和类。
Obserable
final Observable<String> observable = Observable.empty();
Observable 只有一个成员变量 onSubscribe


onSubscribe 只有一个成员方法

Observer
class SimpleObserver implements Observer<String> {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {}
}
final Observer<String> observer = new SimpleObserver();

observer 没有任何成员变量。
subscribe
observable.subscribe(observer);

这一堆 subcribe 函数都落在了 Subscriber 参数的那个版本上了,返回值都是 Subscription
image::images/observable-11bb0.png[]
subscriber 其实就是 observer ,也是 subscription。

subscription 用来停止订阅 unsubscribe。
subscribe 比较矫情的一段代码,简化如下。
class Observable<T> {
public final Subscription subscribe(Subscriber<? super T> subscriber) {
onSubscribe.call(subscriber);
return subscriber;
}
}
subscriber就是传进去的第一个参数observable.subscribe(observer)。subscriber也是一个Observer, 因为Subcriber继承Observer。- 这个参数
subscriber传递给对象onSubscribe的call方法,onSubscribe.call(subscriber)。 subscriber作为一个Subscription返回。 因为Subscriber继承Subscription。
由此可见 Subscriber 这个类才是 rxjava 的核心。subscriber 对象不停的在各个类之间流转。 各种各样不同 OnSubscribe 接口的实现,可以去产生数据源,然后调用 subscriber.onNext(data) 。
小结
一个超级简化版的 rxjava 可以是下面这个样子。
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
}
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
public Observable(OnSubscribe<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}
public final Subscription subscribe(Subscriber<? super T> subscriber) {
onSubscribe.call(subscriber);
return subscriber;
}
}
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
public abstract class Subscriber<T> implements Observer<T>, Subscription {
}
尽管 Subscriber 类所占的篇幅很小,他确实核心的一个类。
测试一下这段代码
@Test
public void main() throws Exception {
final Observable<String> observable = new Observable<>(subscriber -> {
subscriber.onNext("hello world");
subscriber.onCompleted();
});
final Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("byebye");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("> " + s);
}
@Override
public void unsubscribe() {
}
@Override
public boolean isUnsubscribed() {
return false;
}
};
observable.subscribe(subscriber);
}
```