阅读 rxjava 源代码
介绍
事件驱动和异步调用是两种慢慢被大家接收的编程范式了。rxjava 库利用观察者模式,把事件驱动和异步调用程序组合在一起。
基于异步调用和事件驱动的程序,经常陷入回调陷阱,导致程序的可读性下降,写出来的程序像意大利面条(callback hell, callback spaghetti)。参考 http://callbackhell.com。
本文从源代码级别上,介绍了 rxjava 中最重要的几个接口和类。
Obserable
final Observable<String> observable = Observable.empty();
Observable 只有一个成员变量 onSubscribe


onSubscribe 只有一个成员方法

Observer
class SimpleObserver implements Observer<String> {
     @Override
     public void onCompleted() {}
     @Override
     public void onError(Throwable e) {}
     @Override
     public void onNext(String s) {}
}
final Observer<String> observer = new SimpleObserver();

observer 没有任何成员变量。
subscribe
observable.subscribe(observer);

这一堆 subcribe 函数都落在了 Subscriber 参数的那个版本上了,返回值都是 Subscription
image::images/observable-11bb0.png[]
subscriber 其实就是 observer ,也是 subscription。

subscription 用来停止订阅 unsubscribe。
subscribe 比较矫情的一段代码,简化如下。
class Observable<T> {
  public final Subscription subscribe(Subscriber<? super T> subscriber) {
      onSubscribe.call(subscriber);
      return subscriber;
  }
}
- subscriber就是传进去的第一个参数- observable.subscribe(observer)。- subscriber也是一个- Observer, 因为- Subcriber继承- Observer。
- 这个参数 subscriber传递给对象onSubscribe的call方法,onSubscribe.call(subscriber)。
- subscriber作为一个- Subscription返回。 因为- Subscriber继承- Subscription。
由此可见 Subscriber 这个类才是 rxjava 的核心。subscriber 对象不停的在各个类之间流转。 各种各样不同 OnSubscribe 接口的实现,可以去产生数据源,然后调用 subscriber.onNext(data) 。
小结
一个超级简化版的 rxjava 可以是下面这个样子。
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
}
public class Observable<T> {
    final OnSubscribe<T> onSubscribe;
    public Observable(OnSubscribe<T> onSubscribe) {
        this.onSubscribe = onSubscribe;
    }
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        onSubscribe.call(subscriber);
        return subscriber;
    }
}
public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}
public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}
public abstract class Subscriber<T> implements Observer<T>, Subscription {
}
尽管 Subscriber 类所占的篇幅很小,他确实核心的一个类。
测试一下这段代码
    @Test
    public void main() throws Exception {
        final Observable<String> observable = new Observable<>(subscriber -> {
            subscriber.onNext("hello world");
            subscriber.onCompleted();
        });
        final Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("byebye");
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String s) {
                System.out.println("> " + s);
            }
            @Override
            public void unsubscribe() {
            }
            @Override
            public boolean isUnsubscribed() {
                return false;
            }
        };
        observable.subscribe(subscriber);
    }
    ```