publicstaticvoidmain(String[] args)throws ExecutionException, InterruptedException { // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다. CompletableFuture .runAsync(() -> log.info("runAsync")) .thenRun(() -> log.info("thenRun")) .thenRun(() -> log.info("thenRun")); log.info("exit");
// 별도의 pool을 설정하지 않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다. ForkJoinPool.commonPool().shutdown(); ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS); } }
// 결과 23:43:15.841 [main] INFO com.example.study.CFuture - exit 23:43:15.841 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - runAsync 23:43:15.845 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenRun 23:43:15.845 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenRun
publicstaticvoidmain(String[] args)throws ExecutionException, InterruptedException { // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다. CompletableFuture .supplyAsync(() -> { log.info("supplyAsync"); return1; }) // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다. .thenApply(s -> { log.info("thenApply {}", s); return s + 1; }) // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다. .thenAccept(s -> log.info("thenAccept {}", s)); log.info("exit");
// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다. ForkJoinPool.commonPool().shutdown(); ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS); } }
// 결과 23:50:00.650 [main] INFO com.example.study.CFuture - exit 23:50:00.650 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - supplyAsync 23:50:00.654 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenApply 1 23:50:00.656 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenAccept 2
publicstaticvoidmain(String[] args)throws ExecutionException, InterruptedException { // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다. CompletableFuture .supplyAsync(() -> { log.info("supplyAsync"); return1; }) // return이 CompletableFuture인 경우 thenCompose를 사용한다. .thenCompose(s -> { log.info("thenApply {}", s); return CompletableFuture.completedFuture(s + 1); }) // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다. .thenApply(s -> { log.info("thenApply {}", s); return s + 1; }) // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다. .thenAccept(s -> log.info("thenAccept {}", s)); log.info("exit");
// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다. ForkJoinPool.commonPool().shutdown(); ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS); } }
// 결과 23:50:35.893 [main] INFO com.example.study.CFuture - exit 23:50:35.893 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - supplyAsync 23:50:35.897 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenApply 1 23:50:35.899 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenApply 2 23:50:35.899 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenAccept 3
publicstaticvoidmain(String[] args)throws ExecutionException, InterruptedException { // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다. CompletableFuture .supplyAsync(() -> { log.info("supplyAsync"); return1; }) // return이 CompletableFuture인 경우 thenCompose를 사용한다. .thenCompose(s -> { log.info("thenApply {}", s); if (1 == 1) thrownew RuntimeException(); return CompletableFuture.completedFuture(s + 1); }) // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다. .thenApply(s -> { log.info("thenApply {}", s); return s + 1; }) .exceptionally(e -> { log.info("exceptionally"); return -10; }) // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다. .thenAccept(s -> log.info("thenAccept {}", s)); log.info("exit");
// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다. ForkJoinPool.commonPool().shutdown(); ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS); } }
// 결과 23:51:31.255 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - supplyAsync 23:51:31.257 [main] INFO com.example.study.CFuture - exit 23:51:31.259 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenApply 1 23:51:31.261 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - exceptionally 23:51:31.261 [ForkJoinPool.commonPool-worker-1] INFO com.example.study.CFuture - thenAccept -10
publicstaticvoidmain(String[] args)throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10);
// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다. CompletableFuture .supplyAsync(() -> { log.info("supplyAsync"); return1; }, es) // return이 CompletableFuture인 경우 thenCompose를 사용한다. .thenCompose(s -> { log.info("thenApply {}", s); return CompletableFuture.completedFuture(s + 1); }) // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다. .thenApply(s -> { log.info("thenApply {}", s); return s + 2; }) // 이 작업은 다른 스레드에서 처리를 하려고 할 때, thenApplyAsync를 사용한다. // 스레드의 사용을 더 효율적으로 하고 자원을 더 효율적으로 사용한다. // 현재 스레드 풀의 정책에 따라서 새로운 스레드를 할당하거나 대기중인 스레드를 사용한다. (스레드 풀 전략에 따라 다르다.) .thenApplyAsync(s -> { log.info("thenApply {}", s); return s + 3; }, es) .exceptionally(e -> { log.info("exceptionally"); return -10; }) // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다. .thenAcceptAsync(s -> log.info("thenAccept {}", s), es); log.info("exit");
// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다. ForkJoinPool.commonPool().shutdown(); ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS); } }
// 결과 23:54:00.043 [pool-1-thread-1] INFO com.example.study.CFuture - supplyAsync 23:54:00.043 [main] INFO com.example.study.CFuture - exit 23:54:00.047 [pool-1-thread-1] INFO com.example.study.CFuture - thenApply 1 23:54:00.048 [pool-1-thread-1] INFO com.example.study.CFuture - thenApply 2 23:54:00.049 [pool-1-thread-2] INFO com.example.study.CFuture - thenApply 4 23:54:00.049 [pool-1-thread-3] INFO com.example.study.CFuture - thenAccept 7
ListenableFuture에서 CompletableFuture로 변환
Spring 4.0에 들어간 AsyncRestTemplate이 return하는 것은 CompletableFuture가 아닌 ListenableFuture입니다. Spring 4까지는 자바 6~8을 지원하기 때문에 CompletableFuture로 return을 만들지 못하고 계속 ListenableFuture를 유지했습니다. 따라서 ListenableFuture를 CompletableFuture로 만들어 체이닝하기 위해서는 유틸성 wrapper 메서드를 만들어 사용하면 됩니다.
@Bean public ThreadPoolTaskExecutor myThreadPool(){ ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); te.setCorePoolSize(1); te.setMaxPoolSize(1); te.initialize(); return te; }