rxjava 如何和传统回调函数结合

今天看到一个 Observable.fromEmitter 的函数,这里是这个函数的 javadoc

Provides an API (via a cold Observable) that bridges the reactive world with the callback-style, generally non-backpressured world. Example: You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The rest of its methods are thread-safe.

Observable.<Event>fromEmitter(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onCompleted();
}
}

@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};

AutoCloseable c = api.someMethod(listener);

emitter.setCancellation(c::close);

}, BackpressureMode.BUFFER);

这是一个实验性的功能,用于和传统回调模式的程序对接。为了理解这个机制,我写了一个完整的例子。

package org.wcy123.rxjava1;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

import org.junit.Test;

import lombok.extern.slf4j.Slf4j;
import rx.AsyncEmitter;
import rx.Observable;

@Slf4j
public class FromEmitterTest {

    @Test
    public void main1() throws Exception {
        final ExecutorService service = Executors.newCachedThreadPool();
        final CountDownLatch latch = new CountDownLatch(3 * 4);
        Observable.fromEmitter(
                 emitter -> IntStream.range(0, 3).boxed().forEach(
                        threadIndex -> service.submit(
                                () -> {
                                    for (int i = 0; i < 4; ++i) {
                                        emitter.onNext("thread + " + threadIndex
                                                + " i = " + i);
                                        Utils.sleep(1000);
                                        latch.countDown();
                                    }
                                    if (threadIndex == 2) {
                                        emitter.onCompleted();
                                    }
                                })),
                AsyncEmitter.BackpressureMode.BUFFER)
                .subscribe(s -> log.info("item {}", s));
        log.info("提前打印这里, subscribe 没有阻塞住");
        log.info("开始等待解锁");
        latch.await();
        log.info("解锁完毕");
    }
}

这个例子的执行结果是

02:12:24.244 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 提前打印这里, subscribe 没有阻塞住
02:12:24.244 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 0
02:12:24.250 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 开始等待解锁
02:12:24.251 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 1 i = 0
02:12:24.251 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 0
02:12:25.245 [pool-1-thread-2] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 1
02:12:25.255 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 1
02:12:26.248 [pool-1-thread-3] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 2
02:12:26.249 [pool-1-thread-3] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 1 i = 2
02:12:26.257 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 2
02:12:27.252 [pool-1-thread-2] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 3
02:12:27.258 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 3
02:12:28.264 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 解锁完毕

注意到 log.info("item") 是运行在三个不同的线程中。fromEmitter 的第一个参数是一个函数,即 f, 该函数的第一个参数是 emitter ,类型是 AsyncEmitter。fromEmitter 返回一个 Observable , 这个Obverable 被订阅的时候,就会运行函数 f 。f 运行时,创建了 3 个线程,每个线程里面,都会调用 emitter 来发布数据,emitter.onNext(...) ,一旦调用这个函数,会触发后面所有的 Observable 定义的行为,触发 s->log.info("iterm{}", s) 的执行。

注意到 thread 0 并没有机会打印出来最后一个 i = 3, 因为 thread 2 提前调用了 emitter.onComplet()

latch.await() 等待所有线程结束。