자바와 스프링의 비동기 기술

해당 포스팅은 토비님의 토비의 봄 TV 8회 스프링 리액티브 프로그래밍 (4) 자바와 스프링의 비동기 기술 라이브 코딩을 보며 따라했던 실습 내용을 바탕으로 정리한 글입니다.

실습 코드들은 IntelliJ를 이용해 SpringBoot 2.1.3.RELEASE 버전 기반으로 프로젝트를 생성 후(web, lombok 포함) 진행했습니다.

자바의 비동기 기술

ExecutorService

**ExecutorService**는 쉽게 비동기로 작업을 실행할 수 있도록 도와주는 JDK(1.5부터)에서 제공하는 interface입니다. 일반적으로 ExecutorService는 작업 할당을 위한 스레드 풀과 API를 제공합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
public class FutureEx {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();

es.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
log.info("Async");
});

log.info("Exit");
}
}

// 결과
20:35:53.892 [main] INFO com.example.study.FutureEx - Exit
20:35:55.888 [pool-1-thread-1] INFO com.example.study.FutureEx - Async

Future

Future는 자바 1.5에서 등장한 비동기 계산의 결과를 나타내는 Interface 입니다.

비동기적인 작업을 수행한다는 것은 현재 진행하고 있는 스레드가 아닌 별도의 스레드에서 작업을 수행하는 것을 말합니다. 같은 스레드에서 메서드를 호출할 때는 결과를 리턴 값을 받지만, 비동기적으로 작업을 수행할 때는 결과값을 전달받을 수 있는 무언가의 interface가 필요한데 Future가 그 역할을 합니다.

비동기 작업에서 결과를 반환하고 싶을 때는 runnable대신 callable interface를 이용하면 결과 값을 return 할 수 있습니다. 또한 예외가 발생했을 때 해당 예외를 비동기 코드를 처리하는 스레드 안에서 처리하지 않고 밖으로 던질 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class FutureEx {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();

Future<String> f = es.submit(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});

log.info(f.get());
log.info("Exit");
}
}

// 결과
20:43:11.704 [pool-1-thread-1] INFO com.example.study.FutureEx - Async
20:43:11.706 [main] INFO com.example.study.FutureEx - Hello
20:43:11.706 [main] INFO com.example.study.FutureEx - Exit

Future를 통해서 비동기 결과의 값을 가져올 때는 get 메서드를 사용합니다. 그러나 get 메서드를 호출하게 되면 비동기 작업이 완료될 때까지 해당 스레드가 blocking됩니다.

Future는 비동기적인 연산 혹은 작업을 수행하고 그 결과를 갖고 있으며, 완료를 기다리고 계산 결과를 반환(get)하는 메소드와 그 외에도 해당 연산이 완료되었는지 확인하는(isDone) 메소드를 제공합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class FutureEx {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();

Future<String> f = es.submit(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});

log.info(String.valueOf(f.isDone()));
Thread.sleep(2000);
log.info("Exit");
log.info(String.valueOf(f.isDone()));
log.info(f.get());
}
}

// 결과
00:26:00.501 [main] INFO com.example.study.FutureEx - false
00:26:02.502 [pool-1-thread-1] INFO com.example.study.FutureEx - Async
00:26:02.509 [main] INFO com.example.study.FutureEx - Exit
00:26:02.509 [main] INFO com.example.study.FutureEx - true
00:26:02.509 [main] INFO com.example.study.FutureEx - Hello

FutureTask

**FutureTask**는 비동기 작업을 생성합니다. 지금까지 위의 코드는 비동기 작업 생성과 실행을 동시에 했다면 FutureTask는 비동기 작업 생성과 실행을 분리하여 진행할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
public class FutureEx {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();

FutureTask<String> f = new FutureTask<>(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});

es.execute(f);

log.info(String.valueOf(f.isDone()));
Thread.sleep(2000);
log.info("Exit");
log.info(String.valueOf(f.isDone()));
log.info(f.get());
}
}

// 결과
00:28:39.459 [main] INFO com.example.study.FutureEx - false
00:28:41.461 [pool-1-thread-1] INFO com.example.study.FutureEx - Async
00:28:41.467 [main] INFO com.example.study.FutureEx - Exit
00:28:41.467 [main] INFO com.example.study.FutureEx - true
00:28:41.467 [main] INFO com.example.study.FutureEx - Hello

비동기 작업의 결과를 가져오는 방법은 Future와 같은 결과를 다루는 handler를 이용하거나 callback을 이용하는 2가지 방법이 있습니다.
아래의 예시 코드는 FutureTask의 비동기 작업이 완료될 경우 호출되는 done() 메서드를 재정의하여 callback을 이용하는 방법입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class FutureEx {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();

FutureTask<String> f = new FutureTask<String>(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
}) {
@Override
protected void done() {
super.done();
try {
log.info(get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
};

es.execute(f);
es.shutdown();

log.info("EXIT");
}
}

// 결과
01:03:04.153 [main] INFO com.example.study.FutureEx - EXIT
01:03:06.153 [pool-1-thread-1] INFO com.example.study.FutureEx - Async
01:03:06.153 [pool-1-thread-1] INFO com.example.study.FutureEx - Hello

위 예시 코드의 callback 관련 부분을 FutureTask를 상속받아 done() 메서드를 재정의함으로써, 비동기 코드와 그 결과를 갖고 작업을 수행하는 callback을 좀 더 가독성이 좋게 작성할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
public class FutureEx {
interface SuccessCallback {
void onSuccess(String result);
}

public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;

public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
super(callable);
this.sc = Objects.requireNonNull(sc);
}

@Override
protected void done() {
super.done();
try {
this.sc.onSuccess(get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();

CallbackFutureTask f = new CallbackFutureTask(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
}, log::info);

es.execute(f);
es.shutdown();
}
}

// 결과
01:05:01.978 [main] INFO com.example.study.FutureEx - EXIT
01:05:03.977 [pool-1-thread-1] INFO com.example.study.FutureEx - Async
01:05:03.978 [pool-1-thread-1] INFO com.example.study.FutureEx - Hello

위 예시 코드에 SuccessCallback을 추가한 것처럼 ExceptionCallback을 추가하여 비동기 코드에서 예외가 발생할 경우, 해당 예외를 처리하는 callback도 추가할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Slf4j
public class FutureEx {
interface SuccessCallback {
void onSuccess(String result);
}

interface ExceptionCallback {
void onError(Throwable t);
}

public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;
ExceptionCallback ec;

public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
super(callable);
this.sc = Objects.requireNonNull(sc);
this.ec = Objects.requireNonNull(ec);
}

@Override
protected void done() {
super.done();
try {
this.sc.onSuccess(get());
/*
InterruptedException은 예외긴 예외이지만, 현재 작업을 수행하지 말고 중단해라 라고 메시지를 보내는 용도이다.
따라서 현재 스레드에 interrupt를 체크하고 종료한다.
*/
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// 래핑된 에러를 빼내어 전달한다.
ec.onError(e.getCause());
}
}
}

public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();

CallbackFutureTask f = new CallbackFutureTask(() -> {
Thread.sleep(2000);
if (1 == 1) throw new RuntimeException("Async ERROR!!!");
log.info("Async");
return "Hello";
},
s -> log.info("Result: {}", s),
e -> log.info("Error: {}", e.getMessage()));

es.execute(f);
es.shutdown();

log.info("EXIT");
}
}

// 결과
01:11:53.460 [main] INFO com.example.study.FutureEx - EXIT
01:11:55.463 [pool-1-thread-1] INFO com.example.study.FutureEx - Error: Async ERROR!!!

스프링의 비동기 기술

@Async

Spring MVC 3.2 부터 Servlet 3.0 기반의 비동기 요청 처리가 가능해졌습니다. @Async 어노테이션을 추가해 해당 메서드를 비동기적으로 호출할 수 있습니다. 해당 메서드를 호출한 호출자(caller)는 즉시 리턴하고 메소드의 실제 실행은 Spring TaskExecutor에 의해서 실행됩니다. 비동기로 실행되는 메서드는 Future 형식의 값을 리턴하고, 호출자는 해당 Future의 get() 메서드를 호출하기 전에 다른 작업을 수행할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@Service
public static class MyService {
/*
내부적으로 AOP를 이용해 복잡한 로직이 실행된다.
비동기 작업은 return값으로 바로 결과를 줄 수 없다. (Future 혹은 Callback을 이용해야 한다.)
*/
@Async
public Future<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return new AsyncResult<>("Hello");
}
}

public static void main(String[] args) {
// try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
try (ConfigurableApplicationContext c = SpringApplication.run(StudyApplication.class, args)) {
}
}

@Autowired
MyService myService;

// 모든 빈이 다 준비된 후 실행됨 (현재는 일종의 컨트롤러라고 생각)
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
Future<String> res = myService.hello();
log.info("exit: {}", res.isDone());
log.info("result: {}", res.get());
};
}
}


// 결과
2019-04-04 23:29:31.960 INFO 41618 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-04-04 23:29:31.960 INFO 41618 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 928 ms
2019-04-04 23:29:32.161 INFO 41618 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-04-04 23:29:32.337 INFO 41618 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-04-04 23:29:32.341 INFO 41618 --- [ main] com.example.study.StudyApplication : Started StudyApplication in 1.631 seconds (JVM running for 2.101)
2019-04-04 23:29:32.343 INFO 41618 --- [ main] com.example.study.StudyApplication : run()
2019-04-04 23:29:32.346 INFO 41618 --- [ main] com.example.study.StudyApplication : exit: false
2019-04-04 23:29:32.350 INFO 41618 --- [ task-1] com.example.study.StudyApplication : hello()
2019-04-04 23:29:33.351 INFO 41618 --- [ main] com.example.study.StudyApplication : result: Hello
2019-04-04 23:29:33.354 INFO 41618 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'

ListenableFuture

스프링 4.0 부터 제공하는 Future 인터페이스를 확장한 ListenableFuture를 이용하면 비동기 처리의 결과 값을 사용할 수 있는 callback을 추가할 수 있습니다.
@Async 어노테이션을 사용하는 메서드에서 스프링 4.1 부터 제공하는 ListenableFuture 인터페이스를 구현한 AsyncResult를 반환하면 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@Service
public static class MyService {
@Async
public ListenableFuture<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return new AsyncResult<>("Hello");
}
}

public static void main(String[] args) {
// try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
try (ConfigurableApplicationContext c = SpringApplication.run(StudyApplication.class, args)) {
}
}

@Autowired
MyService myService;

@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
ListenableFuture<String> f = myService.hello();
f.addCallback(s -> log.info(s), e-> log.info(e.getMessage()));
log.info("exit");

Thread.sleep(2000);
};
}
}


// 결과
2019-04-04 23:42:46.348 INFO 44559 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-04-04 23:42:46.348 INFO 44559 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 959 ms
2019-04-04 23:42:46.557 INFO 44559 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-04-04 23:42:46.736 INFO 44559 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-04-04 23:42:46.740 INFO 44559 --- [ main] com.example.study.StudyApplication : Started StudyApplication in 1.779 seconds (JVM running for 2.306)
2019-04-04 23:42:46.742 INFO 44559 --- [ main] com.example.study.StudyApplication : run()
2019-04-04 23:42:46.748 INFO 44559 --- [ main] com.example.study.StudyApplication : exit
2019-04-04 23:42:46.751 INFO 44559 --- [ task-1] com.example.study.StudyApplication : hello()
2019-04-04 23:42:47.752 INFO 44559 --- [ task-1] com.example.study.StudyApplication : Hello
2019-04-04 23:42:48.757 INFO 44559 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'

ThreadPoolTaskExecutor

@Async 어노테이션을 사용해 해당 메서드를 비동기적으로 호출할 경우 ThreadPool을 명시적으로 선언하지 않으면, 기본적으로 SimpleAsyncTaskExecutor를 사용합니다. SimpleAsyncTaskExecutor는 각 비동기 호출마다 계속 새로운 스레드를 만들어 사용하기 때문에 비효율적입니다. 이 경우 ThreadPoolTaskExecutor를 직접 만들어 사용하는게 효율적입니다.

ThreadPoolTaskExecutor는 CorePool, QueueCapacity, MaxPoolSize를 직접 설정할 수 있습니다. 각 값에 대한 설명은 코드에 추가했습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@Service
public static class MyService {
/*
기본적으로 SimpleAsyncTaskExecutor를 사용한다. 스레드를 계속 새로 만들어 사용하기 때문에 비효율적이다.
*/
@Async
// @Async("tp") ThreadPool이 여러개일 경우 직접 지정 가능하다.
public ListenableFuture<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return new AsyncResult<>("Hello");
}
}

@Bean
ThreadPoolTaskExecutor tp() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
// 1) 스레드 풀을 해당 개수까지 기본적으로 생성함. 처음 요청이 들어올 때 poll size만큼 생성한다.
te.setCorePoolSize(10);
// 2) 지금 당장은 Core 스레드를 모두 사용중일때, 큐에 만들어 대기시킨다.
te.setQueueCapacity(50);
// 3) 대기하는 작업이 큐에 꽉 찰 경우, 풀을 해당 개수까지 더 생성한다.
te.setMaxPoolSize(100);
te.setThreadNamePrefix("myThread");
return te;
}

public static void main(String[] args) {
// try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
try (ConfigurableApplicationContext c = SpringApplication.run(StudyApplication.class, args)) {
}
}

@Autowired
MyService myService;

// 모든 빈이 다 준비된 후 실행됨 (현재는 일종의 컨트롤러라고 생각)
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
ListenableFuture<String> f = myService.hello();
f.addCallback(s -> log.info(s), e-> log.info(e.getMessage()));
log.info("exit");

Thread.sleep(2000);
};
}
}


// 결과
2019-04-05 00:03:11.304 INFO 47863 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-04-05 00:03:11.304 INFO 47863 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1061 ms
2019-04-05 00:03:11.367 INFO 47863 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'tp'
2019-04-05 00:03:11.677 INFO 47863 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-04-05 00:03:11.680 INFO 47863 --- [ main] com.example.study.StudyApplication : Started StudyApplication in 1.751 seconds (JVM running for 2.208)
2019-04-05 00:03:11.681 INFO 47863 --- [ main] com.example.study.StudyApplication : run()
2019-04-05 00:03:11.686 INFO 47863 --- [ main] com.example.study.StudyApplication : exit
2019-04-05 00:03:11.687 INFO 47863 --- [ myThread1] com.example.study.StudyApplication : hello()
2019-04-05 00:03:12.691 INFO 47863 --- [ myThread1] com.example.study.StudyApplication : Hello

Servlet Async

@Async 어노테이션을 설명할 때 말했던 것처럼, Spring MVC 3.2 부터 Servlet 3.0 기반의 비동기 요청 처리가 가능해졌습니다. 기존 Controller 메서드를 Callable로 변경함으로써 비동기로 만들 수 있습니다.
Controller 메서드를 비동기로 변경해도 해당 처리가 서블릿 스레드가 아닌 다른 스레드에서 발생한다는 점을 제외하면 기존 Controller 메서드의 동작 방식과는 큰 차이가 없습니다.
(참고 : Spring MVC 3.2 Preview: Making a Controller Method Asynchronous)

Servlet 3.0 & 3.1

  • Servlet 3.0: 비동기 서블릿
    • HTTP connection은 이미 논블록킹 IO
    • 서블릿 요청 읽기, 응답 쓰기는 블록킹
    • 비동기 작업 시작 즉시 서블릿 스레드 반납
    • 비동기 작업이 완료되면 서블릿 스레드 재할당
    • 비동기 서블릿 컨텍스트 이용 (AsyncContext)
  • Servlet 3.1: 논블록킹 IO
    • 논블록킹 서블릿 요청, 응답 처리
    • Callback

      스레드가 블록되는 상황은 CPU와 메모리 자원을 많이 소모합니다. 컨텍스트 스위칭이 일어나기 때문입니다. 기본적으로 스레드가 블로킹되면 wating 상태로 변경되면서 컨텍스트 스위칭이 일어나고 추후 I/O 작업이 끝나 running 상태로 변경되면서 다시 컨텍스트 스위칭이 일어나 총 2번의 컨텍스트 스위칭이 일어납니다.
      Java InputStream과 OutputStream은 블록킹 방식이다. RequestHttpServletRequest, RequestHttpServletResponse는 InputSream과 OutputStream을 사용하기 때문에 서블릿은 기본적으로 블로킹 IO 방식이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
@GetMapping("/callable")
public Callable<String> callable() {
log.info("callable");
return () -> {
log.info("async");
Thread.sleep(2000);
return "hello";
};
}
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}


// 결과
2019-04-06 01:12:41.761 INFO 69216 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-04-06 01:12:41.762 INFO 69216 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1206 ms
2019-04-06 01:12:41.993 INFO 69216 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-04-06 01:12:42.182 INFO 69216 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-04-06 01:12:42.186 INFO 69216 --- [ main] com.example.study.StudyApplication : Started StudyApplication in 2.073 seconds (JVM running for 2.807)
2019-04-06 01:12:44.161 INFO 69216 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2019-04-06 01:12:44.162 INFO 69216 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2019-04-06 01:12:44.169 INFO 69216 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 7 ms
2019-04-06 01:12:44.190 INFO 69216 --- [nio-8080-exec-1] com.example.study.StudyApplication : callable
2019-04-06 01:12:44.198 INFO 69216 --- [ task-1] com.example.study.StudyApplication : async

실제로 비동기 서블릿은 아래의 그림처럼 동작합니다.
비동기 서블릿 구조

Client (For Load Test)

지금부터는 Spring에서 Sync Servlet을 이용할 때와 Async Servlet을 이용했을 때의 차이점을 알아보기 위해 테스트를 할 수 있도록, 먼저 여러 Request를 동시에 생성하는 Client를 작성해봅니다.
Spring에서 제공하는 RestTemplate을 이용해 100개의 Request를 동시에 호출합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class LoadTest {
private static AtomicInteger counter = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(100);

RestTemplate rt = new RestTemplate();
String url = "http://localhost:8080/callable";

StopWatch main = new StopWatch();
main.start();

for (int i = 0; i < 100; i++) {
es.execute(() -> {
int idx = counter.addAndGet(1);
log.info("Thread {}", idx);

StopWatch sw = new StopWatch();
sw.start();

rt.getForObject(url, String.class);

sw.stop();
log.info("Elapsed: {} -> {}", idx, sw.getTotalTimeSeconds());
});
}

es.shutdown();
// 지정된 시간이 타임아웃 걸리기 전이라면 대기작업이 진행될 때까지 기다린다.
// (100초안에 작업이 끝날때까지 기다리거나, 100초가 초과되면 종료)
es.awaitTermination(100, TimeUnit.SECONDS);
main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());
}
}

Change Tomcat Thread Count

위의 비동기 서블릿 그림에서 볼 수 있듯이, Async Servlet은 클라이언트로부터 요청을 받은 후 실제 작업은 작업 스레드 풀에 위임하고 현재의 서블릿 스레드는 서블릿 스레드 풀에 반환 후, 다음 요청이 들어올 경우 사용할 수 있도록 합니다. 이에 반해, Sync Servlet은 요청을 받은 서블릿 스레드에서 실제 작업까지 전부 진행하기 때문에 요청에 대한 응답을 반환하기 전까지는 새로운 요청을 처리할 수 없는 상태입니다.

실제 이처럼 동작하는지 확인하기 위해서 application.properties 파일에서 다음과 같이 Tomcat의 스레드 개수를 1개로 설정합니다.

1
server.tomcat.max-threads=1

Sync vs Async

Sync

먼저 아래와 같이 Sync Servlet을 이용해 서버를 띄운 후 위의 Client 코드를 이용해 테스트를 진행합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
@GetMapping("/callable")
public String callable() throws InterruptedException {
log.info("sync");
Thread.sleep(2000);
return "hello";
}
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

해당 서버를 띄우고 Client(LoadTest) 코드를 사용해 테스트를 진행하면 결과는 다음과 같습니다. Tomcat의 스레드가 하나이며 Sync 방식으로 동작하기 때문에 한 번에 하나의 클라이언트 요청만 처리할 수 있습니다. 서버 로그를 확인하면 nio-8080-exec-1 라는 이름을 가진 한개의 스레드가 요청을 처리하고 있습니다.
동기 서블릿 테스트 결과

이번에는 JMC(Java Mission Control)를 이용해 실제 서버의 스레드 상황을 살펴보겠습니다.

JMC를 이용하기 위해서는 서버를 실행할 때 다음과 같은 JVM 옵션을 추가합니다.

1
2
3
4
5
6
-XX:+UnlockCommercialFeatures
-XX:+FlightRecorder
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Djava.rmi.server.hostname=localhost

JMC를 이용해 클라이언트 요청이 들어올 때, Thread 상태를 보면 다음과 같습니다. 동시에 100개의 클라이언트 요청이 들어왔지만, 스레드 수는 그대로 유지되고 있으며, 여러 스레드 목록 중에 nio-8080-exec-1 스레드가 존재하고 있는것을 확인할 수 있습니다.
동기 서블릿 테스트 결과 - 스레드

Async

이번에는 서버 코드를 아래와 같이 Async Servlet을 이용하도록 수정한 후 서버를 띄워 Client 코드를 이용해 테스트를 진행합니다. (작업 스레드 풀은 WebMvcConfigurer를 통해 설정해줍니다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
@GetMapping("/callable")
public Callable<String> callable() {
return () -> {
log.info("async");
Thread.sleep(2000);
return "hello";
};
}
}

@Bean
WebMvcConfigurer configurer() {
return new WebMvcConfigurer() {
// 워커 스레드 풀 설정
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(100);
te.setQueueCapacity(50);
te.setMaxPoolSize(200);
te.setThreadNamePrefix("workThread");
te.initialize();
configurer.setTaskExecutor(te);
}
};
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

Client(LoadTest) 코드를 사용해 테스트를 진행하면 결과는 다음과 같습니다. Tomcat의 스레드가 하나이지만 Async 방식으로 동작하기 때문에 해당 요청에 대한 실제 처리는 워커 스레드 풀에서 사용되고 있지 않은 스레드를 이용해 처리합니다. 서버 로그를 확인하면 nio-8080-exec-1 라는 이름을 가진 한개의 Tomcat 스레드와 workThreadX라는 이름을 가진 100개의 워커 스레드를 확인할 수 있습니다.
비동기 서블릿 테스트 결과

이번에도 역시 JMC(Java Mission Control)를 이용해 실제 서버의 스레드 상황을 살펴보겠습니다.

nio-8080-exec-1 라는 이름을 가진 한개의 Tomcat 스레드와 workThreadX라는 이름을 가진 100개의 워커 스레드를 확인할 수 있습니다.
비동기 서블릿 테스트 결과 - 스레드

DeferredResult

DeferredResult는 Spring 3.2 부터 사용 가능합니다. 비동기 요청 처리를 위해 사용하는 Callable의 대안을 제공합니다. “지연된 결과”를 의미하며 외부의 이벤트 혹은 클라이언트 요청에 의해서 지연되어 있는 HTTP 요청에 대한 응답을 나중에 써줄 수 있는 기술입니다. 별도로 워커 스레드를 만들어 대기하지 않고도 처리가 가능합니다.

DeferredResult 구조

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
Queue<DeferredResult<String>> results = new ConcurrentLinkedQueue<>();

@GetMapping("/dr")
public DeferredResult<String> dr() {
log.info("dr");
DeferredResult<String> dr = new DeferredResult<>();
results.add(dr);
return dr;
}

@GetMapping("/dr/count")
public String drCount() {
return String.valueOf(results.size());
}

@GetMapping("/dr/event")
public String drEvent(String msg) {
for (DeferredResult<String> dr : results) {
dr.setResult("Hello " + msg);
results.remove(dr);
}
return "OK";
}
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

LoadTest 코드를 이용해 /dr로 100개의 요청을 보내고, 크롬에서 /dr/count로 DeferredResult가 담겨있는 큐의 사이즈를 확인해봅니다. 그리고 마지막으로 /dr/event로 큐에 담긴 DeferredResult 객체에 setResult로 결과를 반환합니다.
100개의 요청이 동시에 완료되는 것을 확인할 수 있습니다.
DeferredResult 결과

ResponseBodyEmitter

ResponseBodyEmitter는 Spring 4.2 부터 사용 가능합니다. 비동기 요청 처리의 결과로 하나 이상의 응답을 위해 사용되는 리턴 값 Type 입니다. DeferredResult가 하나의 결과를 생성해 요청을 처리했다면, ResponseBodyEmitter는 여러개의 결과를 만들어 요청을 처리할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {

@GetMapping("/emitter")
public ResponseBodyEmitter emitter() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();

Executors.newSingleThreadExecutor().submit(() -> {
try {
for (int i = 0; i < 50; i++) {
emitter.send("<p>Stream " + i + "</p>");
Thread.sleep(100);
}
} catch (Exception e) {
}
});

return emitter;
}
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

ResponseBodyEmitter 결과_1
ResponseBodyEmitter 결과_2

후기

우하한형제들 - 스프링 리액티브 프로그래밍 세미나를 다녀온 후 토비의 봄 TV 스프링 리액티브 프로그래밍 시리즈를 전부 보고있습니다.

토비님은 매 라이브 코딩마다 말씀하시길 단순히 코드를 보는 것과 실행 후 결과를 실제로 확인해보는 것은 또 다른 차이가 있을 수 있다고 말씀하십니다. 매우 공감합니다!

위의 내용들은 모두 라이브 코딩에 포함되어 있는 내용이지만 실제로 따라해보면서 해당 내용들을 정리하는 차원으로 작성해보았습니다. 따라하며 토비님이 라이브 코딩을 진행하셨을 때와 달라진 몇 가지를 수정한 부분도 있고, 서버의 스레드를 직접 확인해보고자 처음에는 VisualVM을 사용하려 했지만 계속 실패해 JMC을 이용해 진행했습니다.

라이브 코딩을 보며 자바와 스프링의 비동기 기술에 대해 개인적으로 궁금했던 부분들이 많이 해소되었습니다!! 앞으로 남은 내용들도 따라하며 정리해 보도록 하겠습니다.

자바와 스프링의 비동기 기술

https://jongmin92.github.io/2019/03/31/Java/java-async-1/

Author

KimJongMin

Posted on

2019-03-31

Updated on

2021-03-22

Licensed under

댓글