Project Reactor学习--Publisher线程相关方法

Publisher号称是异步的事件流,Java里面单线程是没法异步的,同时号称底层是否使用并发对上层是透明的(即Publisher本身不会显式创建、操作线程的),所以Publisher具有若干切换代码执行上下文(其实就是线程)的方法。

Reactor使用reactor.core.scheduler.Scheduler对执行一个异步执行的操作进行抽象,底层使用ExecutorService或者ScheduledExecutorService执行这个异步操作。

Reactor提供了多种Scheduler实现,并提供了工厂类reactor.core.scheduler.Schedulers方便开发者使用。

Publisher则提供了publishOn和subscribeOn两个方法设置要使用的Scheduler。

上代码比较直观:

public class FluxPublishOn {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Flux.range(1, 20)
                //使用Schedulers.parallel()线程池执行之后的操作
                .publishOn(Schedulers.parallel())
                .doOnComplete(() -> countDownLatch.countDown())
                .subscribe(i -> {
                    System.out.println("Current Thread is "
                            + Thread.currentThread().getName() + ", value " + i);
                });
        //如果使用了Scheduler,则subscribe是异步的,主线程必须阻塞才行
        System.out.println(Thread.currentThread().getName() + "-Main thread blocking");
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "-Flow complete,Main thread run and finished");

    }
}

输出如下:可以看到subscribe方法执行的线程不在主线程中,所以主线程继续执行到System.out.println了。

publishOn方法和subscribeOn方法

这两个方法都可以将程序执行的线程切换到传入的Scheduler上。区别是publishOn会让之后的操作在Scheduler提供的线程中执行,subscribeOn会让之前的操作在Scheduler提供的线程中执行。

如下例子中,在publishOn之前执行了map操作,之后执行subscribe操作。

public class FluxPublishOnThreadSwitch {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Flux.range(1, 20)
                .map(i -> {
                    System.out.println("map in Thread " + Thread.currentThread().getName() +" value is " + i);
                    return ++i;
                })
                //使用Schedulers.parallel()线程池执行之后的操作
                .publishOn(Schedulers.parallel())
                .doOnComplete(() -> countDownLatch.countDown())
                .subscribe(i -> {
                    System.out.println("Current Thread is "
                            + Thread.currentThread().getName() + ", value " + i);
                });
        //如果使用了Scheduler,则subscribe是异步的,主线程必须阻塞才行
        System.out.println(Thread.currentThread().getName() + "-Main thread blocking");
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "-Flow complete,Main thread run and finished");
    }
}

输出如下:可以看map操作还是在主线程中执行的。而subscribe中的操作都是在parallel线程中执行的。

使用Scheduler将阻塞方法包装为异步

本质上就在另一个线程中执行阻塞方法,只不过Flux提供了Api,开发者不用关心线程的管理。

这样做的优点是可以让当前线程不用在阻塞方法上等待,而是继续去做其他事情,让线程充分工作。

如下例子对比了同步读取文件,和使用Scheduler包装同步方法为异步之后的性能。

public class PublisherWrapBlockingCall {
    public static void main(String[] args) throws InterruptedException {
        //提前构造出线程池
        Schedulers.elastic();

        String[] files = {
                "com/ljm/reactor/scheduler/PublisherWrapBlockingCall.class",
                "com/ljm/reactor/scheduler/FluxPublishOn.class",
                "com/ljm/reactor/scheduler/FluxPublishOnThreadSwitch.class"
        };
        //同步读取
        Instant start = Instant.now();
        for (String fileName : files) {
            blockingRead(fileName);
        }
        Instant end = Instant.now();
        System.out.println("\n\n>>>>>>>> blocking read cost mills:" + Duration.between(start, end).toMillis());

        //异步读取
        CountDownLatch latch = new CountDownLatch(3);
        start = Instant.now();

        for (String file : files) {
            Mono.fromRunnable(() -> blockingRead(file))
                    //让前面的操作运行在线程池中
                    .subscribeOn(Schedulers.elastic())
                    .doOnTerminate(() -> latch.countDown())
                    .subscribe();
        }
        latch.await();
        end = Instant.now();
        System.out.println("\n\n>>>>>>>>> async read cost mills:" + Duration.between(start, end).toMillis());
    }

    /**
     * 同步读取文件并打印
     * @param fileName
     */
    private static void blockingRead(String fileName) {

        InputStream in = PublisherWrapBlockingCall.class.getClassLoader().getResourceAsStream(fileName);
        try {
            int i = -1;
            while ((i = in.read()) != -1) {
                System.out.print(i);
            }
            Thread.sleep(1000);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}

运行结果:可以看到包装为异步之后,性能提升不少。

results matching ""

    No results matching ""