阅读 rxjava 源代码之 - map
上一篇文章 写了一个极其简化的 Rxjava Observable ,现在,我试图添加一个 map 操作符。
public <R> Observable<R> map(Func1<T, R> func1) {
  return new Observable<R>(subscriber -> this.subscribe(new Subscriber<T>() {
      @Override
      public void onCompleted() {
          subscriber.onCompleted();
      }
      @Override
      public void onError(Throwable e) {
          subscriber.onError(e);
      }
      @Override
       public void onNext(T t) {
           R r = null;
           try {
               r = func1.call(t);
           } catch (Throwable e) {
               unsubscribe();
               return;
           }
           subscriber.onNext(r);
       }
      @Override
      public void unsubscribe() {
          subscriber.unsubscribe();
      }
      @Override
      public boolean isUnsubscribed() {
          return subscriber.isUnsubscribed();
      }
  }));
}
Java 本身语言限制,导致代码臃肿。代码的核心部分就是
return new Observable<R>(subscriber -> this.subscribe(new Subscriber<T>() {
    @Override
     public void onNext(T t) {
           R r = null;
           try {
               r = func1.call(t);
           } catch (Throwable e) {
               unsubscribe();
               return;
           }
           subscriber.onNext(r);
       }
}
这里看到以下几点
- map接收两个参数 ,注,对于成员函数,第一个参数是- this。
- 第一个参数是 Observable<T> this,
- 第二个参数是 Func1<T,R> func1;- func1接收一个参数- T
- func1.call(t)返回一个- R
 
- map要返回一个- Observable<R>,那么就要在- OnSubscribe的时候,需要从- this里面得到一个个- T t,然后用- func1.call(t),然后转移给下一个- subscriber。
因为 Java8 lambda 关键字的引入,我们看到函数式编程中的 variable capture 的强大。
这是一个非常简化的 map 实现,还有很多问题。
- 还有非常多的操作符和 map很类似,这里有很多重复代码。
- backpressure 没有处理。
- unsubscribe还没有处理好。subscriber 链的关系没有处理。
- 异常也没有处理好。
- 没有保证 onComplete只被调用一次 。
这个简化的实现尽管有很多问题,但是可以帮助我们理解原有复杂完整的实现。Map 的核心结构是这样
- 本身含有一个 Subscriber对象,订阅上层的Observable
- 返回一个 Observable对象,提供给下层订阅。
- 这种方法组合了 Observable,构成了一个链条。
OnSubscribe<?> onSubscribe = subscriber /*传递进来的 subscriber 参数,给下层产生数据*/
     -> {
        /* this 是上层的 Observable,订阅上层 */
        this.subscribe(new Subscriber<T>() {
        @Override
        public void onNext(T t) {
           R r = null;
           try {
               r = func1.call(t);
           } catch (Throwable e) {
               unsubscribe();
               return;
           }
          /* 当上层产生数据的时候,经过转换,传递给下层*/
           subscriber.onNext(r);
       }
      };
Observable<R> ret = new Observable<R>(onSubscribe);
return ret;
}