Cold流和Hot流

Cold流不论订阅者在何时订阅该数据流,总是能收到数据流中产生的全部消息,所以Cold流是肯定保存了数据流中所有数据的。

Hot流则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。

构造Hot流

两种方式:将已有 Cold流转变为Hot流和使用Processor动态产生数据。

将已有Cold流转变为Hot流

只需要调用publish方法即可,只是要注意,添加非第一个Subscriber前,需要调用一下connect方法。如下例子:

public class ConvertCold2Hot {
    public static void main(String[] args) throws InterruptedException {
        ConnectableFlux<Long> flux = Flux.interval(Duration.ofSeconds(1))
                .take(10)
                .publish();
        flux.subscribe(aLong -> {
            System.out.println("subscriber1 ,value is " + aLong);
        });

        Thread.sleep(5000);
        //加入第二个Subscriber之前,需要connect一下
        flux.connect();
        flux.subscribe(aLong -> {
            System.out.println("subscriber2 ,value is " + aLong);
        });
        flux.blockLast();
    }
}

执行结果如下:注意subscriber2获取的值从5开始了,因为此时Hot流中的数据从5开始的。

使用Processor构造Hot流

使用Processor的publish方法即可构造出一个Hot Stream,调用同一个Processor实例的onNext方法即可为之前构造的Hot Stream提供数据。如下例子:

public class HotStreamByProcessor {
    public static void main(String[] args) throws InterruptedException {
        //使用Reactor提供的Processor工具类
        UnicastProcessor<String> hotSource = UnicastProcessor.create();
        //构造Hot Stream,同时配置为autoConnect,避免每加入一个Subscriber都需要调用一次connect方法
        Flux<String> hotFlux = hotSource
                .publish()
                .autoConnect();

        //异步为Hot Stream提供数据
        CompletableFuture future = CompletableFuture.runAsync(() -> {
            IntStream.range(0, 10).forEach(
                    value -> {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        //调用Processor的onNext即可以为Processor关联的Hot Stream提供数据
                        hotSource.onNext("value is " + value);
                    }
            );
        });

        hotFlux.subscribe(s -> System.out.println("subsciber1: " + s));
        Thread.sleep(500);
        hotFlux.subscribe(s -> System.out.println("subsciber2: " + s));
        //提供完数据之后,调用Processor的onComplete关闭Hot Stream
        future.thenRun(() -> hotSource.onComplete());
        future.join();
    }
}

执行结果如下:可以看到subscribe2的值从4开始。

results matching ""

    No results matching ""