spring@Async异步获取结果

spring@Async异步获取结果

涉及到两个问题:

  • 解决同类调用@Async方法无效问题。
  • 用completableFuture替换Future,解决Future.get()获取结果阻塞的问题。

这里只提供利用@Async注解的解决方案,因为使用了spring,更倾向于用spring的注解来解决问题.把线程池注册成bean交给spring管理,比自己每次生成销毁或者自己控制全局线程池要优雅的多.

另一种解决方案,因为线程池是通过@bean方式注册成bean,直接@autowied线程池,然后利用正常的线程池+countDownLatch也可以解决(未验证)。

以下是基于@async的解决方案:

代码不连续,这里只贴一下关键代码,以免非关键代码干扰思路,不懂的留言或者邮件我cheyantao@foxmail.com,看见了及时回你。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
.
.
.
//spring上下文用来获取当前类的代理
@Autowired private ApplicationContext applicationContext;
.
.
.
//就是一个普通的方法,无视之,不要被干扰视线
public Page<SourceVo> page(Pageable pageable, String selectId, String keyword) {
.
.
//需要countDownLatch阻塞等待结果。
CountDownLatch countDownLatch = new CountDownLatch(sourceDo.getContent().size());
for (Source source : sourceDo.getContent()) {
.
.
.
//这里通过applicationContext获取代理类,解决调用同类方法@Async不生效
SourceService serviceProxy = applicationContext.getBean(SourceService.class);
//在这里调用了@Aysnc的方法,看不懂建议瞅一眼completableFuture的相关知识。
serviceProxy
.requestDataByte(source, component)
.thenAccept(
s -> {
sourceVo.setDataByte(s);
content.add(sourceVo);
countDownLatch.countDown();
});
}
try {
countDownLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("获取数据量线程阻塞异常", e);
}
}
.
.
.
//@Async注解的方法,这里是异步获取的字符串
@Override
@Async("fetchExecutor")
public CompletableFuture<String> requestDataByte(Source source, Component component) {
String metricName = this.buildMetricNames(source);
if (StringUtils.isBlank(metricName)) {
//返回空字符串结果
return AsyncResult.forValue("").completable();
}
try {
.
.
.
//AsyncResult.forValue(字符串).completable()返回正确结果,不要被forValue()里的句子干扰实现了,就是一个 普通的业务逻辑方法
return AsyncResult.forValue(this.getValueFromMetric(response.getMetrics().get(0)))
.completable();
} catch (Exception ex) {
log.error("获取数据量失败,sourceId:{}", source.getId(), ex);
return AsyncResult.forValue("").completable();
}
}

OK ,以上.