Project Reactor学习--异常处理Operator
之前的文章中介绍了Reactor中Operator的分类和日志相关的Operator,现在介绍下异常处理相关的Operator。
对于异常处理,Reactor除了默认的立刻抛出的处理方式之外,还提供三类处理方式:简单记录日志、fallback方法以及retry重试。
如果对Spring Cloud中的Hystrix比较熟悉,可以发现这也是Hystrix处理异常的方式,所以使用Reactor,我们应该也可以实现类似于断路器的功能,后续我们可以试试。
这里继续介绍异常处理Operator。
静态fallback值(StaticFallbackValue)
通过onErrorReturn方法加工返回的Publisher,提供了静态fallback值,即Publisher异常时,返回一个编译时写死的值,这也StaticFallbackValue中Static的意思。如下例子:
public class StaticFallbackValue {
public static void main(String[] args) {
Flux<Integer> flux = Flux.just(0)
.map(i -> 1 / i)
//异常时返回0
.onErrorReturn(0);
//输出应该为0
flux.log().subscribe(System.out::println);
}
}
例子中我们还有log Operator记录每个step,根据预期Flux不会抛出异常,而是返回静态fallbakc值0,运行结果如下:完全符合我们的预期。
静态fallback条件值(StaticFallbackConditionValue)
在编译时写死fallbakc value并不灵活,所以Reactor提供了根据异常信息返回不同fallback value的功能。
onErrorReturn可以根据异常的信息,返回不同的值。如下:
public class StaticFallbackConditionValue {
public static void main(String[] args) {
//1. 根据异常类型进行判断
Flux<Integer> flux = Flux.just(0)
.map(i -> 1 / i)
//ArithmeticException异常时返回1
.onErrorReturn(NullPointerException.class, 0)
.onErrorReturn(ArithmeticException.class, 1);
//输出应该为1
flux.log().subscribe(System.out::println);
final String nullStr = null;
//just不允许对象为null
Flux<String> stringFlux = Flux.just("")
.map(str -> nullStr.toString())
//NullPointerException异常时返回字符串NullPointerException
.onErrorReturn(NullPointerException.class, "NullPointerException")
.onErrorReturn(ArithmeticException.class, "ArithmeticException");
//输出应该为NullPointerException
stringFlux.log().subscribe(System.out::println);
//2. 根据Predicate进行判断
AtomicInteger index = new AtomicInteger(0);
Flux.just(0, 1, 2, 3)
.map(i -> {
index.incrementAndGet();
return 1 / i;
})
.onErrorReturn(NullPointerException.class, 0)
.onErrorReturn(e -> index.get() < 2, 1)
//因为上一个onErrorReturn匹配了条件,所以异常传播被关闭,之后的
//onErrorReturn不会再被触发
.onErrorReturn(e -> index.get() < 1, 2)
//因为异常类型为NumberFormatException,此处应打印1
.log().subscribe(System.out::println);
}
}
可以多次调用onErrorReturn,最匹配的一个会处理异常,一旦异常被处理,异常传播则会结束,后面的onErrorReturn不会再接收到异常。
运行结果如下:
onErrorResume
onErrorReturn在发生异常时结束流,后面的数据也不会再被发送。 但是很多场景中,并不希望一个异常数据影响整个流,此时可以使用onErrorResume替代onErrorReturn。onErrorResumej接收一个Function<? super Throwable, ? extends Publisher<? extends T>> 对象。可以认为该对象是一个fallback method,接收异常信息,输出和流中数据的类型相同的值,使用这个返回值替代异常的数据值返回给Subscriber。 例子如下:
public class FallbackMethod {
private static Function<? super Throwable, ? extends Publisher<String>> fallback
= e -> Mono.just(e.getMessage());
public static void main(String[] args) {
//1. 默认方法
Flux<String> flux = Flux.just("0", "1", "2", "abc")
.map(i -> Integer.parseInt(i) + "")
.onErrorResume(e -> Mono.just("input string is not a number ," + e.getMessage()));
flux.log().subscribe(System.out::println);
//2. 根据异常类型选择返回方法
flux = Flux.just("0", "1", "2", "abc")
.map(i -> Integer.parseInt(i) + "")
.onErrorResume(ArithmeticException.class, e -> Mono.just("ArithmeticException:" + e.getMessage()))
.onErrorResume(NumberFormatException.class, e -> Mono.just("input string is not a number"))
//如果上面列出的异常类型都不满足,使用默认方法
.onErrorResume(e -> Mono.just(e.getMessage()));
// 因为异常类型为NumberFormatException,此处应该打印字符串input string is not a number
flux.log().subscribe(System.out::println);
//3. 根据Predicate选择返回方法
flux = Flux.just("0", "1", "2", "abc")
.map(i -> Integer.parseInt(i) + "")
.onErrorResume(e -> e.getMessage().equals("For input string: \"abc\""),
e -> Mono.just("exception data is abc"))
//onErrorResume可以和onErrorReturn混合使用
.onErrorReturn("SystemException");
//因为判断条件,此处应该打印exception data is abc
flux.log().subscribe(System.out::println);
}
}
运行结果如下:可以看到,Flux流并没有因为异常数据结束,而是使用fallback method的返回值返回给Subscriber了。
doOnError
有些场景下,只是想简单记录下日志,并不想提供异常时作为替代的返回值,也不想影响默认的异常传播机制,此时可以使用doOnError。如下:
public class DoOnError {
public static void main(String[] args) {
//1. 默认doOnError方法
Flux<String> flux = Flux.just("0", "1", "2", "abc","3")
.map(i -> Integer.parseInt(i) + "")
.doOnError(e -> e.printStackTrace())
.onErrorReturn("System exception");
flux.log().subscribe(System.out::println);
//2. 根据异常类型选择doError方法
flux = Flux.just("0", "1", "2", "abc","3")
.map(i -> Integer.parseInt(i) + "")
.doOnError(RuntimeException.class, e -> {
System.err.println("发生了RuntimeException");
e.printStackTrace();
})
.doOnError(NumberFormatException.class, e -> {
System.err.println("发生了NumberFormatException");
e.printStackTrace();
})
.onErrorReturn("System exception");
//因为异常类型为NumberFormatException,此处应打印字符串发生了NumberFormatException
//又因为doOnError不会阻止异常传播,所以onErrorReturn会执行,返回字符串System exception
flux.log().subscribe(System.out::println);
//3. 根据Predicate选择doError方法
// 注意doOnError不会阻止异常传播,所以onErrorReturn可以多次触发
flux = Flux.just("0", "1", "2", "abc","3")
.map(i -> Integer.parseInt(i) + "")
.doOnError(e -> e instanceof Throwable, e -> {
System.err.println("异常类型为Throwable");
})
.doOnError(e -> e instanceof Exception, e -> {
System.err.println("同时异常类型为Exception");
})
.doOnError(e -> e instanceof NumberFormatException, e -> {
System.err.println("并且异常类型为NumberFormatException");
})
.doOnError(e -> e instanceof Error, e -> {
System.err.println("异常类型为Error");
})
.onErrorReturn("System exception");
//因为异常类型为NumberFormatException,所以前面3个doOnError都会被调用
flux.log().subscribe(System.out::println);
}
}
重试
简单粗暴的异常处理方式,一次不成功就来两次,两次不成功就三次,以此类推。之前说过,每添加一个Operator,都是返回一个新的Publisher,此处也不例外,下面的例子可以清晰的证明。
public class Retying {
public static void main(String[] args) throws InterruptedException {
//默认异常retry
Flux<String> flux = Flux.just("0", "1", "2", "abc")
.map(i -> Integer.parseInt(i) + "")
.retry(2);
flux.subscribe(newSub());
//带条件判断的retry
System.out.println("-------------------------------------------------");
Thread.sleep(500);
flux = Flux.just("0", "1", "2", "abc")
.map(i -> Integer.parseInt(i) + "")
.retry(1, e -> e instanceof Exception);
flux.subscribe(newSub());
}
private static Subscriber<String> newSub() {
return new BaseSubscriber<String>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("start");
request(1);
}
@Override
protected void hookOnNext(String value) {
System.out.println("get value is " + Integer.parseInt(value));
request(1);
}
@Override
protected void hookOnComplete() {
System.out.println("Complete");
}
@Override
protected void hookOnError(Throwable throwable) {
System.err.println(throwable.getMessage());
}
};
}
}
运行后输出如下:可以看到每次retry时,Flux中的数据都重新又被Subscriber消费了一次。所以,如果需要使用retry异常机制,应该保证Subscriber在消费数据的方法是幂等的,否则可能出现数据重复消费的情况,从而导致系统和业务异常。
受检异常的处理
非受检异常会被Reactor传播,而受检异常必须被用户代码try catch,为了让受检异常被reactor的异常传播机制和异常处理机制支持,可以使用如下步骤处理:
- 使用 Exceptions.propagate将受检异常包装为非受检异常并重新抛出传播出去。
- onError、error回调等异常处理操作获取到异常之后,可以调用Exceptions.unwrap取得原受检的异常。
如下:
public class CheckedExceptionHandle {
public static void main(String[] args) {
Flux<String> flux = Flux.just("abc", "def", "exception", "ghi")
.map(s -> {
try {
return doSth(s);
} catch (FileNotFoundException e) {
// 包装并传播异常
throw Exceptions.propagate(e);
}
});
//abc、def正常打印,然后打印 参数异常
flux.subscribe(System.out::println,
e -> {
//获取原始受检异常
Throwable sourceEx = Exceptions.unwrap(e);
//判断异常类型并处理
if (sourceEx instanceof FileNotFoundException) {
System.err.println(((FileNotFoundException) sourceEx).getMessage());
} else {
System.err.println("Other exception");
}
});
}
public static String doSth(String str) throws FileNotFoundException {
if ("exception".equals(str)) {
throw new FileNotFoundException("参数异常");
} else {
return str.toUpperCase();
}
}
}
输出如下:可以看到受检异FileNotFoundException被封装后抛出,然后再onErrorCallback中捕获并转换为真实异常类型。完整代码:https://github.com/pkpk1234/learn-reactor