Reactor学习--Flux高级静态工厂方法
这里所谓的高级指的是静态工厂方法的输入一般为其他Publisher实例,和之前简单的静态工厂方法的输入相比高级一些。
这些静态工厂方法又可以细分为如下几类:
- 构造一个周期性产生递增Long序列的Flux。
- 接收一个Publisher,将其包装为Flux流。
- 接收一个Supplier,延迟构造Publisher。
- 接收多个已有Flux,将其组合为一个Flux。
- create方法,将已有的异步事件流,包装为Flux流
interval方法
interval方法周期性生成从0开始的的Long。周期从delay之后启动,每隔period时间返回一个加1后的Long。
注意,interval方法返回的Flux运行在另外的线程中,main线程需要休眠或者阻塞之后才能看到周期性的输出。如下:
public class Interval {
public static void main(String[] args) throws InterruptedException {
Flux.interval(Duration.ofSeconds(1), Duration.ofSeconds(1)).subscribe(System.out::println);
Thread.sleep(5000);
}
}
运行之后输出如下:
from(Publisher<? extends T> source)方法
将已有的Publisher包装为一个Flux流。如下例子将已有的Flux和Mono包装为Flux。
public class FromPublisher {
public static void main(String[] args) {
Publisher<Integer> fluxPublisher = Flux.just(1, 2, 3);
Publisher<Integer> monoPublisher = Mono.just(0);
System.out.println("Flux from flux");
Flux.from(fluxPublisher).subscribe(System.out::println);
System.out.println("Flux from mono");
Flux.from(monoPublisher).subscribe(System.out::println);
}
}
defer方法
defer构造出的Flux流,每次调用subscribe方法时,都会调用Supplier获取Publisher实例作为输入。
如果Supplier每次返回的实例不同,则可以构造出和subscribe次数相关的Flux源数据流。
如果每次都返回相同的实例,则和from(Publisher<? extends T> source)效果一样。
如下例子构造了一个和subscribe次数相关的Flux。
public class Defer {
public static void main(String[] args) {
AtomicInteger subscribeTime = new AtomicInteger(1);
//实现这一的效果,返回的数据流为1~5乘以当前subscribe的次数
Supplier<? extends Publisher<Integer>> supplier = () -> {
Integer[] array = {1, 2, 3, 4, 5};
int currentTime = subscribeTime.getAndIncrement();
for (int i = 0; i < array.length; i++) {
array[i] *= currentTime;
}
return Flux.fromArray(array);
};
Flux<Integer> deferedFlux = Flux.defer(supplier);
subscribe(deferedFlux, subscribeTime);
subscribe(deferedFlux, subscribeTime);
subscribe(deferedFlux, subscribeTime);
}
private static void subscribe(Flux<Integer> deferedFlux, AtomicInteger subscribeTime) {
System.out.println("Subscribe time is "+subscribeTime.get());
deferedFlux.subscribe(System.out::println);
}
}
输出如下:
concat方法和merge方法
concat 及其重载方法接收 多个Publisher拼接为一个Flux返回,返回元素时首先返回接收到的第一个Publisher流中的元素,直到第一个Publisher流结束之后,才开始返回第二个Publisher流中的元素,依次类推... 如果发生异常,Flux流会立刻异常终止。
public class Concat {
public static void main(String[] args) {
Flux<Integer> source1 = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> source2 = Flux.just(6, 7, 8, 9, 10);
Flux<Integer> concated = Flux.concat(source1, source2);
concated.subscribe(new MySubscriber("concated"));
}
}
有些场景不希望前面流中的异常影响后面的流,可以使用concatDelayError方法。
concatDelayError 和 concat的方法功能相同,唯一不同在于异常处理。concatDelayError会等待所有的流处理完成之后,再将异常传播下去。
public class ConcatDelayError {
public static void main(String[] args) {
Flux<Integer> sourceWithErrorNumFormat = Flux.just("1", "2", "3", "4", "Five").map(
str -> Integer.parseInt(str)
);
Flux<Integer> source = Flux.just("5", "6", "7", "8", "9").map(
str -> Integer.parseInt(str)
);
Flux<Integer> concated = Flux.concatDelayError(sourceWithErrorNumFormat, source);
concated.subscribe(new MySubscriber("concatDelayError"));
}
}
运行结果如下,可以看到第一个流中的异常数据类型造成的NumberFormatException异常没有影响后的流,而是最后才传播出去。
concat方法中流只能依次执行,即使后面的流先产生了数据也是如此。如果场景要求尽快返回数据,而无论流的排序,则可以使用merger方法。
merge和concat方法类似,只是不会依次返回每个Publisher流中数据,而是哪个Publisher中先有数据生成,就立刻返回。如果发生异常,会立刻抛出异常并终止。这里使用interval构造两个周期流。subscribe之后,一个等待1秒后启动,一个等待2秒后启动。
public class Merge {
public static void main(String[] args) throws InterruptedException {
Flux<Long> flux1 = Flux.interval(Duration.ofSeconds(1), Duration.ofSeconds(1));
Flux<Long> flux2 = Flux.interval(Duration.ofSeconds(2), Duration.ofSeconds(1));
Flux<Long> mergedFlux = Flux.merge(flux1, flux2);
mergedFlux.subscribe(System.out::println);
Thread.sleep(5000);
}
}
输出如下:可以看到,即使flux1还未complete,flux2就开始从0周期性进行输出了。
还有一类场景,即要尽快返回数据,又要考虑流的顺序,即同时有数据生成时,优先输出排在前面的流,此时可以使用mergeSequential方法。
create方法
create方法,将已有的异步事件流,包装为Flux流。
MyEventProcessor是一个异步产生事件的组件,MyEventListener则是监听事件的组件。通过create方法,在事件产生时调用FluxSink的next或者complete方法,即可为Flux产生数据或者结束Flux。
public class FluxBridge {
private static MyEventProcessor myEventProcessor = new ScheduledSingleListenerEventProcessor();
public static void main(String[] args) {
Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onEvents(List<String> chunk) {
for (String s : chunk) {
if ("end".equalsIgnoreCase(s)) {
sink.complete();
myEventProcessor.shutdown();
} else {
sink.next(s);
}
}
}
public void processComplete() {
sink.complete();
}
});
}).log().subscribe(System.out::println);
myEventProcessor.fireEvents("abc", "efg", "123", "456", "end");
System.out.println("main thread exit");
}
}
public class ScheduledSingleListenerEventProcessor implements MyEventProcessor {
private MyEventListener<String> eventListener;
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
@Override
public void register(MyEventListener<String> eventListener) {
this.eventListener = eventListener;
}
@Override
public void fireEvents(String... values) {
//每个半秒发送一个事件
executor.schedule(() -> eventListener.onEvents(Arrays.asList(values)),
500, TimeUnit.MILLISECONDS);
}
@Override
public void processComplete() {
executor.schedule(() -> eventListener.processComplete(),
500, TimeUnit.MILLISECONDS);
}
@Override
public void shutdown() {
this.executor.shutdownNow();
}
}
输出如下: