Project Reactor学习--背压

Reactive Stream和Java 8 中Stream流的重要一个区别就是Reactive Stream支持背压(Back Pressure),

这也是Reactive Stream的主要卖点之一。

背压指的是当Subscriber请求的数据的访问超出它的处理能力时,Publisher限制数据发送速度的能力。

默认情况下,Subscriber会要求Publisher有多少数据推多少数据,能推多快就推多块。

本质上背压和TCP中的窗口限流机制比较类似,都是让消费者反馈请求数据的范围,生产者根据消费者的反馈提供一定量的数据来进行流控。

反馈请求数据范围的操作,可以在Subscriber每次完成数据的处理之后,让Subscriber自行反馈;也可以在Subscriber外部对Subscriber的消费情况进行监视,根据监视情况进行反馈。

如下例子:一个带线程池的Subscriber,当线程池的workQueue未满时,向Publisher请求数据,反之则等待一会再请求。

public class BackpressureDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        //可以观察到明显的限流
        Flux<Long> flux = Flux.interval(Duration.ofMillis(50))
                .take(50)
                .doOnComplete(() -> countDownLatch.countDown());
        flux.subscribe(new MyLimitedSubscriber(5));
        countDownLatch.await();

        //使用比count还大的limiter,相当于不限流
        System.out.println("use big limiter");
        Flux.interval(Duration.ofMillis(50))
                .take(50)
                .subscribe(new MyLimitedSubscriber(100));
    }
}
public class MyLimitedSubscriber<T> extends BaseSubscriber<T> {
    private long mills;
    private ThreadPoolExecutor threadPool;
    private int maxWaiting;
    private final Random random = new Random();

    public MyLimitedSubscriber(int maxWaiting) {
        this.maxWaiting = maxWaiting;
        this.threadPool = new ThreadPoolExecutor(
                1, 1, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxWaiting));
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        this.mills = System.currentTimeMillis();
        requestNextDatas();
    }

    @Override
    protected void hookOnComplete() {
        long now = System.currentTimeMillis();
        long time = now - this.mills;
        System.out.println("cost time:" + time / 1000 + " seconds");
        this.threadPool.shutdown();
    }

    @Override
    protected void hookOnNext(T value) {
        //提交任务
        this.threadPool.execute(new MyTask(value));
        //请求数据
        requestNextDatas();
    }


    private void requestNextDatas() {
        //计算请求的数据的范围
        int requestSize = this.maxWaiting - this.threadPool.getQueue().size();
        if (requestSize > 0) {
            System.out.println("Thread Pool can handle,request " + requestSize);
            request(requestSize);
            return;
        } else {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            requestNextDatas();
        }

    }

    class MyTask<T> implements Runnable {
        private T data;

        public MyTask(T data) {
            this.data = data;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(random.ints(100, 500).findFirst().getAsInt());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("data is " + data);
            //可以在处理完成数据之后,立刻进行请求,此时Subscriber肯定是能够可以可靠处理数据的
            //requestNextDatas()或者调用BaseSubscriber#request(1)
        }

    }
}

上面例子中Publisher在收到request之后,实际上是采用了默认的OverflowStrategy,即将数据缓存起来,当Subscriber有能力处理时,再推送过去。

背压策略

背压策略指的是当Subscriber无法及时request更多数据时,Publisher采取的措施。

可选的策略有buffer、error 、drop和latest,默认策略为buffer。

背压策略方法

可以通过onBackPressureBuffer、onBackPressureError、onBackPressureDrop、onBackPressureLatest选择不同策略。

onBackPressureBuffer

例子如下:

public class BackpressureOnBackpressureError {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(4);
        UnicastProcessor<String> hotSource = UnicastProcessor.create();
        Flux<String> hotFlux = hotSource
                .publish()
                .autoConnect()
                .onBackpressureError();

        CompletableFuture future = CompletableFuture.runAsync(() -> {
            IntStream.range(0, 50).parallel().forEach(
                    value -> {
                        threadPool.submit(() -> hotSource.onNext("value is " + value));
                    }
            );
        });
        System.out.println("future run");

        hotFlux.subscribe(new BaseSubscriber<String>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(1);
            }

            @Override
            protected void hookOnNext(String value) {
                System.out.println("get value " + value);
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
        Thread.sleep(500);
        System.out.println("shutdown");
        threadPool.shutdownNow();
    }
}

执行结果如下:

onBackPressureBuffer方法还可以指定缓存溢出策略,默认策略为BufferOverflowStrategy.ERROR效果即上面的例子。可选的策略还有DROP_OLDEST丢弃最旧元素,DROP_LATEST丢弃最新的元素。如下:

public class BackpressureOnBackpressureBuffer2 {

    public static void main(String[] args) {
        System.out.println("Drop Oldest");
        drop(BufferOverflowStrategy.DROP_OLDEST);
        System.out.println("Drop LASTED");
        drop(BufferOverflowStrategy.DROP_LATEST);
    }

    private static void drop(BufferOverflowStrategy bufferOverflowStrategy) {
        UnicastProcessor<String> hotSource = UnicastProcessor.create();
        //构建Flux,buffer大小为5,BufferOverflowStrategy策略为丢弃最久的元素
        Flux<String> hotFlux = getHotFlux(hotSource, 5, bufferOverflowStrategy);
        CompletableFuture future = produceData(hotSource);
        //构建Subscriber,初次请求20个元素
        BaseSubscriber<String> subscriber = createSubscriber(20);
        hotFlux.subscribe(subscriber);

        future.join();
        System.out.println("get rest elements from buffer");
        //再次获取10个元素,根据策略应返还最后的10个元素
        subscriber.request(10);
    }

    private static BaseSubscriber<String> createSubscriber(int initRequests) {
        return new BaseSubscriber<String>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(initRequests);
            }

            @Override
            protected void hookOnNext(String value) {
                System.out.println("get value " + value);
            }
        };
    }

    private static CompletableFuture produceData(UnicastProcessor<String> hotSource) {
        return CompletableFuture.runAsync(() -> {
            IntStream.range(0, 50).forEach(
                    value -> {
                        hotSource.onNext("value is " + value);
                    }
            );
        });
    }

    private static Flux<String> getHotFlux(UnicastProcessor hotSource, 
    int maxBufferSize, BufferOverflowStrategy strategy) {

        return hotSource
                .publish()
                .autoConnect()
                .onBackpressureBuffer(maxBufferSize, strategy);
    }
}

执行结果:注意get rest elements之后的值,Drop Oldest会保存最新的值,反正则是最久的值。

onBackPressureError

onBackPressureError直接抛出异常。

onBackPressureLatest

onBackPressureLates相当于onBackpressureBuffer(1, DROP_OLDEST) ,如下:

public class BackPressureOnBackpressureLatest {
    public static void main(String[] args) {
        onBackpressureLatest();
    }

    private static void onBackpressureLatest() {
        UnicastProcessor<String> hotSource = UnicastProcessor.create();
        Flux<String> hotFlux = getHotFlux(hotSource);
        CompletableFuture future = produceData(hotSource);
        //构建Subscriber,初次请求20个元素
        BaseSubscriber<String> subscriber = createSubscriber(20);
        hotFlux.subscribe(subscriber);

        future.join();
        System.out.println("get rest elements");
        //再次获取10个元素,根据策略应返还最后的10个元素
        subscriber.request(10);
    }

    private static BaseSubscriber<String> createSubscriber(int initRequests) {
        return new BaseSubscriber<String>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(initRequests);
            }

            @Override
            protected void hookOnNext(String value) {
                System.out.println("get value " + value);
            }
        };
    }

    private static CompletableFuture produceData(UnicastProcessor<String> hotSource) {
        return CompletableFuture.runAsync(() -> {
            IntStream.range(0, 50).forEach(
                    value -> {
                        hotSource.onNext("value is " + value);
                    }
            );
        });
    }

    private static Flux<String> getHotFlux(UnicastProcessor hotSource) {

        return hotSource
                .publish()
                .autoConnect()
                .onBackpressureLatest();
    }
}

运行结果:注意rest element为49,即buffer为1,并且只保存了最新的一个元素。

onBackpressureDrop

onBackpressureDrop会丢弃溢出的所有元素。

完整代码:https://github.com/pkpk1234/learn-reactor

results matching ""

    No results matching ""