Project Reactor学习--如何将阻塞方法包装为非阻塞方法
前面讲了Reactor基础功能如何使用,但是对实际应用没有多大价值,现在来讲一讲如何使用Reactor实现有意义的功能。
Project Reactor作为一个Reactive库,作用就是将阻塞的方法包装为非阻塞的方法,并且为其添加诸如背压等额外的功能。
为什么要非阻塞
当方法是阻塞时,调用方法的线程会被阻塞,从而时间浪费在方法等待上。如果线程中还有不依赖于这个阻塞方法调用结果的代码需要执行,那么也需要等待阻塞方法结束之后才能执行,这种情况在微服务架构中很常见。如果是在web容器如果tomcat中,阻塞方法还会阻塞IO线程,导致web容器无法处理新的请求,从而影响性能。
如果方法是非阻塞,则线程可以继续执行其他的操作,当方法执行完成之后,使用某种方式通知调用者。
现在假设有一个网站,用户登录首页,首页需要查询用户基本信息,网站的公告,用户的待处理task。其中查询用户基本信息和网站公告之间是没有任何依赖顺序的,而用户的代办则需要在用户基本信息查询完成之后,依赖于查询出的用户名进行查询(不要纠结于使用用户名查询,而不是用户id查询,举个例子而已)。
假设查询用户信息的方法getUserInfo耗时50ms,查询公告getNotices耗时50ms,查询用户代办getTodos耗时100ms。
那么在阻塞的场景下,时间线应该是这样的:
使用代码模拟:
public class Caller {
public static void main(String[] args) {
blockingCall();
}
private static void blockingCall() {
HomePageService homePageService = new HomePageService();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
String userInfo = homePageService.getUserInfo();
System.out.println(userInfo);
System.out.println(homePageService.getNotice());
System.out.println(homePageService.getTodos(userInfo));
stopWatch.stop();
System.out.println("call methods costs " + stopWatch.getTime() + " mills");
}
}
public class HomePageService {
public String getUserInfo() {
return EchoMethod.echoAfterTime("get user info", 50, TimeUnit.MILLISECONDS);
}
public String getNotice() {
return EchoMethod.echoAfterTime("get notices", 50, TimeUnit.MILLISECONDS);
}
public String getTodos(String userInfo) {
return EchoMethod.echoAfterTime("get todos", 100, TimeUnit.MILLISECONDS);
}
}
EchoMethod中使用TimeUtil.sleep进行阻塞模拟:
public class EchoMethod {
/**
* 模拟阻塞方法
*
* @param str
* @param delay
* @param timeUnit
* @return
*/
public static String echoAfterTime(String str, int delay, TimeUnit timeUnit) {
try {
timeUnit.sleep(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
return str;
}
}
运行结果如下:
如果将阻塞方法包装为非阻塞方法,时间线可以优化为如下:
使用Thread和Callback包装
如下HomePageServiceThreadsAndCallbackWrapper使用ThreadPool异步执行任务,并且在任务成功完成,异常终止以及最终完成时,调用相应的Callback函数。
public class HomePageServiceThreadsAndCallbackWrapper {
private final HomePageService homePageService;
private final ExecutorService threadPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
public ThreadsAndCallbackWrapper(HomePageService homePageService) {
this.homePageService = homePageService;
Runtime.getRuntime().addShutdownHook(new Thread(() -> threadPool.shutdownNow()));
}
void getUserInfoAsync(Consumer<String> sucessCallback, Consumer<Throwable> errorCallback, Runnable finallyCallback) {
threadPool.submit(() -> {
try {
String userInfo = this.homePageService.getUserInfo();
sucessCallback.accept(userInfo);
} catch (Throwable ex) {
errorCallback.accept(ex);
} finally {
finallyCallback.run();
}
});
}
void getNoticeAsync(Consumer<String> sucessCallback, Consumer<Throwable> errorCallback, Runnable finallyCallback) {
threadPool.submit(() -> {
try {
String notice = this.homePageService.getNotice();
sucessCallback.accept(notice);
} catch (Throwable ex) {
errorCallback.accept(ex);
} finally {
finallyCallback.run();
}
});
}
void getTodos(String userInfo, Consumer<String> sucessCallback, Consumer<Throwable> errorCallback, Runnable finallyCallback) {
threadPool.submit(() -> {
try {
String todos = this.homePageService.getTodos(userInfo);
sucessCallback.accept(todos);
} catch (Throwable ex) {
errorCallback.accept(ex);
} finally {
finallyCallback.run();
}
});
}
}
调用方也需要进行相应修改,以适应异步特点。
private static void threadAndCallbackCall() {
//用于让调用者线程等待多个异步任务全部结束
CountDownLatch ct = new CountDownLatch(3);
HomePageService homePageService = new HomePageService();
HomePageServiceThreadsAndCallbackWrapper homePageServiceFutureWrapper
= new HomePageServiceThreadsAndCallbackWrapper(homePageService);
//统一的finallyCallback
Runnable finallyCallback = () -> {
ct.countDown();
};
StopWatch stopWatch = new StopWatch();
stopWatch.start();
//获取用户信息
homePageServiceFutureWrapper.getUserInfoAsync(
(userInfo) -> {
System.out.println(userInfo);
//由于获取todo依赖于用户信息,必须在此处调用
homePageServiceFutureWrapper.getTodos(userInfo,
(todos) -> {
System.out.println(todos);
}, System.err::println, finallyCallback);
}, System.err::println, finallyCallback
);
//获取notice
homePageServiceFutureWrapper.getNoticeAsync(System.out::println, System.err::println, finallyCallback);
//等待异步操作全部结束并统计耗时
ct.await();
stopWatch.stop();
System.out.println("thread and callbakc async call methods costs " + stopWatch.getTime() + " mills");
//退出JVM线程,触发HomePageServiceThreadsAndCallbackWrapper中线程池的shutdownHook
System.exit(0);
}
注意异步任务之间如果有依赖,则需要在回调中进行调用,如此处todo依赖于用户信息。
运行结果:可以看到异步调用的确比同步调用快不少。
使用Thread和Callback的缺点:需要在代码中显示对线程或者线程池进行操作;如果依赖链很长,则可能出现回调地狱。
使用CompletableFuture进行包装
CompletableFuture是Java8新增的异步任务工具类,对Future进行了扩展,解决了Future只能阻塞获取结果,Future之间无法链式依赖调用,Future没有异常处理接口的缺点。
CompletableFuture.supplyAsync()可以将阻塞方法包装为异步,同时thenCompose()则可以链式调用依赖的方法,即将上一个异步调用的结果推送给下一个异步调用作为输入,如果将thenRun放置到最后,则相当于finallyCallback,exceptionally()则可以设置异常处理器。
public class HomePageSerivceCompletableFutureWrapper {
private final HomePageService homePageService;
public HomePageSerivceCompletableFutureWrapper(HomePageService homePageService) {
this.homePageService = homePageService;
}
CompletableFuture<String> getUserInfoAsync() {
return CompletableFuture.supplyAsync(this.homePageService::getUserInfo);
}
CompletableFuture<String> getNoticeAsync() {
return CompletableFuture.supplyAsync(this.homePageService::getNotice);
}
CompletableFuture<String> getTodosAsync(String userInfo) {
return CompletableFuture.supplyAsync(() -> this.homePageService.getTodos(userInfo));
}
}
调用者:
private static void completableFutureCall() throws InterruptedException {
//用于让调用者线程等待多个异步任务全部结束
CountDownLatch ct = new CountDownLatch(2);
HomePageService homePageService = new HomePageService();
HomePageSerivceCompletableFutureWrapper homePageSerivceCompletableFutureWrapper =
new HomePageSerivceCompletableFutureWrapper(homePageService);
//统一的finallyCallback
Runnable finallyCallback = () -> {
ct.countDown();
};
StopWatch stopWatch = new StopWatch();
stopWatch.start();
homePageSerivceCompletableFutureWrapper
.getUserInfoAsync()
//依赖调用
.thenCompose(userInfo -> {
System.out.println(userInfo);
return homePageSerivceCompletableFutureWrapper.getTodosAsync(userInfo);
})
.thenAcceptAsync(System.out::println)
.thenRun(finallyCallback);
homePageSerivceCompletableFutureWrapper
.getNoticeAsync()
.thenAcceptAsync(System.out::println)
.thenRun(finallyCallback);
//等待异步操作全部结束并统计耗时
ct.await();
stopWatch.stop();
System.out.println("CompletableFuture async call methods costs " + stopWatch.getTime() + " mills");
}
可以明显看到代码简洁了不少。
运行结果:
使用Publisher进行包装
在Project Reactor中,异步产生数据的方法都扩展自接口Publisher,官方提供的实现有Flux和Mono。这个例子里面,由于只返回单个数据,所以更好的方式是使用Mono。
Mono.fromCallable可以包装有返回值的方法,配合subscribeOn可以包装为异步执行,而flatMap则可以将异步返回的值传递给依赖的方法中。
public class HomePageServicePublisherWrapper {
private final HomePageService homePageService;
//线程池
private Scheduler executor = Schedulers.elastic();
public HomePageServicePublisherWrapper(HomePageService homePageService) {
this.homePageService = homePageService;
}
public Mono<String> getUserInfoAsync() {
return Mono
.fromCallable(this.homePageService::getUserInfo)
.subscribeOn(this.executor);
}
public Mono<String> getNoticeAsync() {
return Mono
.fromCallable(this.homePageService::getNotice)
.subscribeOn(this.executor);
}
public Mono<String> getTodosAsync(String userInfo) {
return Mono
.fromCallable(() -> this.homePageService.getTodos(userInfo))
.subscribeOn(this.executor);
}
}
private static void publisherCall() throws InterruptedException {
//用于让调用者线程等待多个异步任务全部结束
CountDownLatch ct = new CountDownLatch(2);
//统一的finallyCallback
Runnable finallyCallback = () -> {
ct.countDown();
};
StopWatch stopWatch = new StopWatch();
HomePageService homePageService = new HomePageService();
HomePageServicePublisherWrapper homePageServicePublisherWrapper =
new HomePageServicePublisherWrapper(homePageService);
homePageServicePublisherWrapper
.getUserInfoAsync()
//由于初始化线程池很耗时,所以将stopWatch放置到此处
//真是系统中,线程池应该提前初始化,而不应该用于一次性的方法
.doOnSubscribe(subscription -> {
stopWatch.start();
})
//消费userInfo
.doOnNext(System.out::println)
//调用依赖于userInfo的getTodos
.flatMap((userInfo) -> homePageServicePublisherWrapper.getTodosAsync(userInfo))
//消费todos
.doOnNext(System.out::println)
.doFinally(s -> finallyCallback.run())
.subscribe();
homePageServicePublisherWrapper
.getNoticeAsync()
.doOnNext(System.out::println)
.doFinally((s) -> {
finallyCallback.run();
})
.subscribe();
ct.await();
stopWatch.stop();
System.out.println("Publisher async call methods costs " + stopWatch.getTime() + " mills");
}
运行结果:
将返回单个值的方法包装为异步的套路,大致如上。
下一节介绍下将返回多值的方法封装为异步的套路(其实之前的Demo里面已经有大致框架,但是使用的是简单数据),比如封装JDBC查询。