CompletableFuture

해당 포스팅은 토비님의 토비의 봄 TV 11회 스프링 리액티브 프로그래밍 (7) CompletableFuture 라이브 코딩을 보며 따라했던 실습 내용을 바탕으로 정리한 글입니다.

실습 코드들은 IntelliJ를 이용해 SpringBoot 2.1.3.RELEASE 버전 기반으로 프로젝트를 생성 후(web, lombok 포함) 진행했습니다.

이번에는 자바8에 나온 CompletableFuture 라는 새로운 비동기 자바 프로그래밍 기술에 대해서 알아보고, 지난 3회 정도 동안 다루어 왔던 자바 서블릿, 스프링의 비동기 기술 발전의 내용을 자바 8을 기준으로 다시 재작성합니다.

CompletableFuture

먼저 간단한 코드를 통해서 CompletableFuture 사용법에 대해서 알아보겠습니다.

runAsync & thenRun

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class CFuture {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
CompletableFuture
.runAsync(() -> log.info("runAsync"))
.thenRun(() -> log.info("thenRun"))
.thenRun(() -> log.info("thenRun"));
log.info("exit");

// 별도의 pool을 설정하지 않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
}

// 결과
23:43:15.841 [main] INFO com.example.study.CFuture - exit
23:43:15.841 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - runAsync
23:43:15.845 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenRun
23:43:15.845 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenRun

supplyAsync, thenApply, thenAccept

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class CFuture {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
})
// 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
.thenApply(s -> {
log.info("thenApply {}", s);
return s + 1;
})
// 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
.thenAccept(s -> log.info("thenAccept {}", s));
log.info("exit");

// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
}

// 결과
23:50:00.650 [main] INFO com.example.study.CFuture - exit
23:50:00.650 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - supplyAsync
23:50:00.654 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenApply 1
23:50:00.656 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenAccept 2

thenCompose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class CFuture {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
})
// return이 CompletableFuture인 경우 thenCompose를 사용한다.
.thenCompose(s -> {
log.info("thenApply {}", s);
return CompletableFuture.completedFuture(s + 1);
})
// 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
.thenApply(s -> {
log.info("thenApply {}", s);
return s + 1;
})
// 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
.thenAccept(s -> log.info("thenAccept {}", s));
log.info("exit");

// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
}

// 결과
23:50:35.893 [main] INFO com.example.study.CFuture - exit
23:50:35.893 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - supplyAsync
23:50:35.897 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenApply 1
23:50:35.899 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenApply 2
23:50:35.899 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenAccept 3

exceptionally

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
public class CFuture {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
})
// return이 CompletableFuture인 경우 thenCompose를 사용한다.
.thenCompose(s -> {
log.info("thenApply {}", s);
if (1 == 1) throw new RuntimeException();
return CompletableFuture.completedFuture(s + 1);
})
// 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
.thenApply(s -> {
log.info("thenApply {}", s);
return s + 1;
})
.exceptionally(e -> {
log.info("exceptionally");
return -10;
})
// 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
.thenAccept(s -> log.info("thenAccept {}", s));
log.info("exit");

// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
}

// 결과
23:51:31.255 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - supplyAsync
23:51:31.257 [main] INFO com.example.study.CFuture - exit
23:51:31.259 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenApply 1
23:51:31.261 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - exceptionally
23:51:31.261 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenAccept -10

thenApplyAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Slf4j
public class CFuture {

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(10);

// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
}, es)
// return이 CompletableFuture인 경우 thenCompose를 사용한다.
.thenCompose(s -> {
log.info("thenApply {}", s);
return CompletableFuture.completedFuture(s + 1);
})
// 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
.thenApply(s -> {
log.info("thenApply {}", s);
return s + 2;
})
// 이 작업은 다른 스레드에서 처리를 하려고 할 때, thenApplyAsync를 사용한다.
// 스레드의 사용을 더 효율적으로 하고 자원을 더 효율적으로 사용한다.
// 현재 스레드 풀의 정책에 따라서 새로운 스레드를 할당하거나 대기중인 스레드를 사용한다. (스레드 풀 전략에 따라 다르다.)
.thenApplyAsync(s -> {
log.info("thenApply {}", s);
return s + 3;
}, es)
.exceptionally(e -> {
log.info("exceptionally");
return -10;
})
// 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
.thenAcceptAsync(s -> log.info("thenAccept {}", s), es);
log.info("exit");

// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
}

// 결과
23:54:00.043 [pool-1-thread-1] INFO com.example.study.CFuture - supplyAsync
23:54:00.043 [main] INFO com.example.study.CFuture - exit
23:54:00.047 [pool-1-thread-1] INFO com.example.study.CFuture - thenApply 1
23:54:00.048 [pool-1-thread-1] INFO com.example.study.CFuture - thenApply 2
23:54:00.049 [pool-1-thread-2] INFO com.example.study.CFuture - thenApply 4
23:54:00.049 [pool-1-thread-3] INFO com.example.study.CFuture - thenAccept 7

ListenableFuture에서 CompletableFuture로 변환

Spring 4.0에 들어간 AsyncRestTemplate이 return하는 것은 CompletableFuture가 아닌 ListenableFuture입니다.
Spring 4까지는 자바 6~8을 지원하기 때문에 CompletableFuture로 return을 만들지 못하고 계속 ListenableFuture를 유지했습니다. 따라서 ListenableFuture를 CompletableFuture로 만들어 체이닝하기 위해서는 유틸성 wrapper 메서드를 만들어 사용하면 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@Autowired
MyService myService;

static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();

toCF(rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx))
.thenCompose(s -> toCF(rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody())))
.thenCompose(s -> toCF(myService.work(s.getBody())))
.thenAccept(s -> dr.setResult(s))
.exceptionally(e -> {
dr.setErrorResult(e.getMessage());
return null;
});

// f1.addCallback(s -> {
// ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
// f2.addCallback(s2 -> {
// ListenableFuture<String> f3 = myService.work(s2.getBody());
// f3.addCallback(s3 -> {
// dr.setResult(s3);
// }, e -> {
// dr.setErrorResult(e.getMessage());
// });
// }, e -> {
// dr.setErrorResult(e.getMessage());
// });
// }, e -> {
// dr.setErrorResult(e.getMessage());
// });

return dr;
}

<T> CompletableFuture<T> toCF(ListenableFuture<T> lf) {
CompletableFuture<T> cf = new CompletableFuture<>();
lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e));
return cf;
}
}

@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}

@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}