@Bean Lite Mode & inter-bean references

@Bean Methods in @Configuration Classes

일반적으로 @Bean 어노테이션은 @Configuration 어노테이션이 사용된 클래스 내의 메서드에 선언이 됩니다. 이 경우 @Bean 어노테이션을 사용하는 메서드는 같은 클래스의 다른 @Bean 메소드를 직접 호출하여 참조할 수 있습니다. 이렇게하면 bean 간의 참조(reference)가 강하게 만들어집니다.

아래 코드의 실행 후 로그를 통해 a() 메서드의 결과로 생성되는 A 클래스의 빈은 b() 와 c() 메서드에서 a() 메서드를 직접 호출해 참조가 되는 것을 확인할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@SpringBootApplication
public class TestApplication {

@Autowired
private ApplicationContext context;

public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(TestApplication.class, args);
}

@Configuration
public static class TestConfiguration {

@Bean
public A a() {
return new A();
}

@Bean
public B b() {
A a = a();
System.out.println(a);
return new B();
}

@Bean
public C c() {
A a = a();
System.out.println(a);
return new C();
}
}

public static class A {
}

public static class B {
}

public static class C {
}
}


// 샐행 결과
2019-04-30 19:41:04.837 INFO 24509 --- [ main] com.example.test.TestApplication : Starting TestApplication on AL01297960.local with PID 24509 (/Users/user/work/test/build/classes/java/main started by user in /Users/user/work/test)
2019-04-30 19:41:04.841 INFO 24509 --- [ main] com.example.test.TestApplication : No active profile set, falling back to default profiles: default
2019-04-30 19:41:06.229 INFO 24509 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2019-04-30 19:41:06.262 INFO 24509 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2019-04-30 19:41:06.262 INFO 24509 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.17]
2019-04-30 19:41:06.353 INFO 24509 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-04-30 19:41:06.353 INFO 24509 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1441 ms
com.example.test.TestApplication$A@42163c37
com.example.test.TestApplication$A@42163c37
2019-04-30 19:41:06.620 INFO 24509 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-04-30 19:41:06.846 INFO 24509 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-04-30 19:41:06.850 INFO 24509 --- [ main] com.example.test.TestApplication : Started TestApplication in 2.489 seconds (JVM running for 3.45)

이러한 관계를 **빈 간의 참조(inter-bean references)**라 부릅니다. 이러한 빈 간의 참조는 @Configuration 클래스의 @Bean이 cglib wrapper에 의해 래핑되기 때문에 동작하게 됩니다.(@Bean 메서드에 대한 호출을 가로채고 Bean 인스턴스를 컨텍스트에서 반환하게 됩니다.)

@Bean Lite Mode

처음 알게된 분도 계실 수 있을텐데요, @Bean 메소드는 @Configuration으로 주석을 달지 않은 클래스 내에서도 선언 될 수도 있습니다. 이런 경우, @Bean 메서드는 **lite mode**로 처리됩니다.

lite mode의 Bean 메서드는 스프링 컨테이너에 의해 일반 팩토리 메서드로 처리됩니다. 그렇기 때문에, lite mode에서는 빈 간의 참조가 지원되지 않습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@SpringBootApplication
public class TestApplication {

@Autowired
private ApplicationContext context;

public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(TestApplication.class, args);
}

@Configuration
public static class TestConfiguration {

public A a() {
return new A();
}

@Bean
public B b() {
A a = a();
System.out.println(a);
return new B();
}

@Bean
public C c() {
A a = a();
System.out.println(a);
return new C();
}
}

public static class A {
}

public static class B {
}

public static class C {
}
}


// 실행 결과
```java
2019-04-30 20:02:17.530 INFO 65524 --- [ main] com.example.test.TestApplication : Starting TestApplication on AL01297960.local with PID 65524 (/Users/user/work/test/build/classes/java/main started by user in /Users/user/work/test)
2019-04-30 20:02:17.534 INFO 65524 --- [ main] com.example.test.TestApplication : No active profile set, falling back to default profiles: default
2019-04-30 20:02:18.857 INFO 65524 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2019-04-30 20:02:18.891 INFO 65524 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2019-04-30 20:02:18.891 INFO 65524 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.17]
2019-04-30 20:02:18.973 INFO 65524 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-04-30 20:02:18.973 INFO 65524 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1368 ms
com.example.test.TestApplication$A@919d542c
com.example.test.TestApplication$A@414fc49c
2019-04-30 20:02:19.240 INFO 65524 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-04-30 20:02:19.471 INFO 65524 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-04-30 20:02:19.474 INFO 65524 --- [ main] com.example.test.TestApplication : Started TestApplication in 2.406 seconds (JVM running for 3.2)

AsyncRestTemplate의 콜백 헬과 중복 작업 문제

해당 포스팅은 토비님의 토비의 봄 TV 10회 스프링 리액티브 프로그래밍 (6) AsyncRestTemplate의 콜백 헬과 중복 작업 문제 라이브 코딩을 보며 따라했던 실습 내용을 바탕으로 정리한 글입니다.

실습 코드들은 IntelliJ를 이용해 SpringBoot 2.1.3.RELEASE 버전 기반으로 프로젝트를 생성 후(web, lombok 포함) 진행했습니다.

이번에는 포스팅에서는 지난번 ListenableFuture를 사용하면서 발생한 콜백헬을 어떻게 개선할지에 대해서 이야기합니다. ListenableFuture를 Wrapping 하는 Completion이라는 클래스를 만들어, chainable하게 사용할 수 있는 방식으로 코드를 만들어봅니다.
콜백헬의 문제로는 에러를 처리하는 코드가 중복이 된다는 것도 있는데, 이 부분도 해결해봅니다.

Completion 클래스 추가

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@Autowired
MyService myService;

static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();

Completion
.from(rt.getForEntity(URL1, String.class, "hello" + idx))
.andAccept(s -> dr.setResult(s.getBody()));

/*
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
*/

return dr;
}
}

public static class Completion {

Consumer<ResponseEntity<String>> con;

Completion next;

public Completion() {
}

public Completion(Consumer<ResponseEntity<String>> con) {
this.con = con;
}

public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
Completion c = new Completion();
lf.addCallback(s -> {
c.complete(s);
}, e -> {
c.error(e);
});
return c;
}

public void andAccept(Consumer<ResponseEntity<String>> con) {
Completion c = new Completion(con);
this.next = c;
}

void complete(ResponseEntity<String> s) {
if (next != null) next.run(s);
}

private void run(ResponseEntity<String> value) {
if (con != null) con.accept(value);
}

private void error(Throwable e) {
}
}

@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}

@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

andApply 메서드 추가

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@Autowired
MyService myService;

static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();

Completion
.from(rt.getForEntity(URL1, String.class, "hello" + idx))
.andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
.andAccept(s -> dr.setResult(s.getBody()));

/*
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
*/

return dr;
}
}

public static class Completion {

Consumer<ResponseEntity<String>> con;

Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;

Completion next;

public Completion() {
}

public Completion(Consumer<ResponseEntity<String>> con) {
this.con = con;
}

public Completion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
this.fn = fn;
}

public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
Completion c = new Completion();
lf.addCallback(s -> {
c.complete(s);
}, e -> {
c.error(e);
});
return c;
}

public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
Completion c = new Completion(fn);
this.next = c;
return c;
}

public void andAccept(Consumer<ResponseEntity<String>> con) {
Completion c = new Completion(con);
this.next = c;
}

void complete(ResponseEntity<String> s) {
if (next != null) next.run(s);
}

private void run(ResponseEntity<String> value) {
if (con != null) con.accept(value);
else if (fn != null) {
ListenableFuture<ResponseEntity<String>> lf = fn.apply(value);
lf.addCallback(s -> complete(s), e -> error(e));
}
}

private void error(Throwable e) {
}
}

@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}

@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

AcceptCompletion, AsyncCompletion 클래스 추가

Completion을 결과를 받아서 사용만 하고 끝나는 Accept 처리를 하는 Completion과, 결과를 받아서 또 다른 비동기 작업을 수행하고 그 결과를 반환하는 Apply 용 Completion으로 분리합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@Autowired
MyService myService;

static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();

Completion
.from(rt.getForEntity(URL1, String.class, "hello" + idx))
.andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
.andAccept(s -> dr.setResult(s.getBody()));

/*
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
*/

return dr;
}
}

public static class AcceptCompletion extends Completion {
Consumer<ResponseEntity<String>> con;

public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
this.con = con;
}

@Override
public void run(ResponseEntity<String> value) {
con.accept(value);
}
}

public static class AsyncCompletion extends Completion {
Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;

public AsyncCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
this.fn = fn;
}

@Override
public void run(ResponseEntity<String> value) {
ListenableFuture<ResponseEntity<String>> lf = fn.apply(value);
lf.addCallback(s -> complete(s), e -> error(e));
}
}

public static class Completion {
Completion next;

public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
Completion c = new Completion();
lf.addCallback(s -> {
c.complete(s);
}, e -> {
c.error(e);
});
return c;
}

public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
Completion c = new AsyncCompletion(fn);
this.next = c;
return c;
}

public void andAccept(Consumer<ResponseEntity<String>> con) {
Completion c = new AcceptCompletion(con);
this.next = c;
}

public void complete(ResponseEntity<String> s) {
if (next != null) next.run(s);
}

public void run(ResponseEntity<String> value) {
}

public void error(Throwable e) {
}
}

@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}

@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

ErrorCompletion 클래스 추가

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@Autowired
MyService myService;

static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();

Completion
.from(rt.getForEntity(URL1, String.class, "hello" + idx))
.andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
.andError(e -> dr.setErrorResult(e))
.andAccept(s -> dr.setResult(s.getBody()));

/*
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
*/

return dr;
}
}

public static class AcceptCompletion extends Completion {
Consumer<ResponseEntity<String>> con;

public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
this.con = con;
}

@Override
public void run(ResponseEntity<String> value) {
con.accept(value);
}
}

public static class ErrorCompletion extends Completion {
Consumer<Throwable> econ;

public ErrorCompletion(Consumer<Throwable> econ) {
this.econ = econ;
}

@Override
public void run(ResponseEntity<String> value) {
if (next != null) {
next.run(value);
}
}

@Override
public void error(Throwable e) {
econ.accept(e);
}
}

public static class AsyncCompletion extends Completion {
Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;

public AsyncCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
this.fn = fn;
}

@Override
public void run(ResponseEntity<String> value) {
ListenableFuture<ResponseEntity<String>> lf = fn.apply(value);
lf.addCallback(s -> complete(s), e -> error(e));
}
}

public static class Completion {
Completion next;

public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
Completion c = new Completion();
lf.addCallback(s -> {
c.complete(s);
}, e -> {
c.error(e);
});
return c;
}

public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
Completion c = new AsyncCompletion(fn);
this.next = c;
return c;
}

public Completion andError(Consumer<Throwable> econ) {
Completion c = new ErrorCompletion(econ);
this.next = c;
return c;
}

public void andAccept(Consumer<ResponseEntity<String>> con) {
Completion c = new AcceptCompletion(con);
this.next = c;
}

public void complete(ResponseEntity<String> s) {
if (next != null) next.run(s);
}

public void run(ResponseEntity<String> value) {
}

public void error(Throwable e) {
if (next != null) next.error(e);
}
}

@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}

@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

Generic 적용

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@Autowired
MyService myService;

static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();

Completion
.from(rt.getForEntity(URL1, String.class, "hello" + idx))
.andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
.andApply(s -> myService.work(s.getBody()))
.andError(e -> dr.setErrorResult(e.toString()))
.andAccept(s -> dr.setResult(s));

/*
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
*/

return dr;
}
}

public static class AcceptCompletion<S> extends Completion<S, Void> {
Consumer<S> con;

public AcceptCompletion(Consumer<S> con) {
this.con = con;
}

@Override
public void run(S value) {
con.accept(value);
}
}

public static class ErrorCompletion<T> extends Completion<T, T> {
Consumer<Throwable> econ;

public ErrorCompletion(Consumer<Throwable> econ) {
this.econ = econ;
}

@Override
public void run(T value) {
if (next != null) {
next.run(value);
}
}

@Override
public void error(Throwable e) {
econ.accept(e);
}
}

public static class AsyncCompletion<S, T> extends Completion<S, T> {
Function<S, ListenableFuture<T>> fn;

public AsyncCompletion(Function<S, ListenableFuture<T>> fn) {
this.fn = fn;
}

@Override
public void run(S value) {
ListenableFuture<T> lf = fn.apply(value);
lf.addCallback(s -> complete(s), e -> error(e));
}
}

// S는 넘어온 파라미터, T는 결과
public static class Completion<S, T> {
Completion next;

public static <S, T> Completion<S, T> from(ListenableFuture<T> lf) {
Completion<S, T> c = new Completion<>();
lf.addCallback(s -> {
c.complete(s);
}, e -> {
c.error(e);
});
return c;
}

public <V> Completion<T, V> andApply(Function<T, ListenableFuture<V>> fn) {
Completion<T, V> c = new AsyncCompletion<>(fn);
this.next = c;
return c;
}

public Completion<T, T> andError(Consumer<Throwable> econ) {
Completion<T, T> c = new ErrorCompletion<>(econ);
this.next = c;
return c;
}

public void andAccept(Consumer<T> con) {
Completion<T, Void> c = new AcceptCompletion<>(con);
this.next = c;
}

public void complete(T s) {
if (next != null) next.run(s);
}

public void run(S value) {
}

public void error(Throwable e) {
if (next != null) next.error(e);
}
}

@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}

@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

비동기 RestTemplate과 비동기 MVC/Serlvet

해당 포스팅은 토비님의 토비의 봄 TV 9회 스프링 리액티브 프로그래밍 (5) 비동기 RestTemplate과 비동기 MVC/Serlvet 라이브 코딩을 보며 따라했던 실습 내용을 바탕으로 정리한 글입니다.

실습 코드들은 IntelliJ를 이용해 SpringBoot 2.1.3.RELEASE 버전 기반으로 프로젝트를 생성 후(web, lombok 포함) 진행했습니다.

Thread Pool Hell

스프링의 비동기 기술 을 이용해 클라이언트로부터 요청을 받은 후 실제 작업은 작업 스레드 풀에 위임하고 현재의 서블릿 스레드는 서블릿 스레드 풀에 반환 후, 다음 요청이 들어올 경우 바로 사용할 수 있게 효율적으로 처리하도록 만들었습니다.
그러나 아직 문제가 있습니다.

아주 빠르게 무언가를 계산하고 해당 처리를 끝내는 경우라면 굳이 비동기 MVC(서블릿)를 사용하지 않아도 문제가 없지만, 하나의 요청에 대한 처리를 수행하면서 외부의 서비스들을 호출하는 작업이 많이 있는 경우, 문제는 단순히 비동기를 서블릿을 사용하는 것만으로 해결할 수 없는 경우가 많이 있습니다. (서블릿 요청은 바로 사용 가능하더라도 워커 스레드가 I/O 같은 작업으로 인해 블록되기 때문입니다.)

Thread Pool Hell이란 풀 안에 있는 스레드에 대한 사용 요청이 급격하게 증가해 추가적인 요청이 들어올 때, 사용 가능한 스레드 풀의 스레드가 없기 때문에 대기 상태에 빠져 요청에 대한 응답이 느려지게 되는 상태를 말합니다.
thread pool hell

최근 서비스들은 아래의 그럼처럼 하나의 요청을 처리함에 있어 다른 서버로의 요청(Network I/O)이 많아졌습니다. 조금전 설명한 것처럼 비동기 서블릿을 사용하더라도 하나의 요청을 처리하는 동안 하나의 작업(워커) 스레드는 그 시간동안 대기상태에 빠지게 되어 결국에는 스레드 풀의 가용성이 떨어지게 됩니다. 이번 포스팅에서는 해당 문제를 해결해가는 과정을 다루고 있습니다.

service oriented architecture

Upgrade Client (For Load Test)

지난 번에 작성했던 Client를 조금 수정하도록 합니다. 기존의 Client는 100개의 스레드를 순차적으로 만들면서 서버로의 Request를 만들었던 문제가 있었습니다. 이제는 100개의 스레드를 만들고 CyclicBarrier를 이용해 100개의 스레드에서 동시에 Request를 만들도록 변경해보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
public class LoadTest {
static AtomicInteger counter = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ExecutorService es = Executors.newFixedThreadPool(100);

RestTemplate rt = new RestTemplate();
String url = "http://localhost:8080/rest?idx={idx}";

CyclicBarrier barrier = new CyclicBarrier(101);

for (int i = 0; i < 100; i++) {
// submit이 받는 callable은 return을 가질 수 있으며, exception도 던질 수 있다.
es.submit(() -> {
int idx = counter.addAndGet(1);
log.info("Thread {}", idx);
barrier.await();

StopWatch sw = new StopWatch();
sw.start();

String res = rt.getForObject(url, String.class, idx);

sw.stop();
log.info("idx: {}, Elapsed: {} -> res: {}", idx, sw.getTotalTimeSeconds(), res);
// IDE가 funtional interface가 callable임을 인식할 수 있도록 의미없는 return을 넣어준다.
return null;
});
}

// await을 만난 스레드가 101번째가 될 때, 모든 스레드들도 await에서 풀려나 이후 로직을 수행한다.
// 메인 스레드 1개, Executors.newFixedThreadPool로 생성한 스레드 100개
barrier.await();
StopWatch main = new StopWatch();
main.start();

es.shutdown();
// 지정된 시간이 타임아웃 걸리기 전이라면 대기작업이 진행될 때까지 기다린다.
// (100초안에 작업이 끝날때까지 기다리거나, 100초가 초과되면 종료)
es.awaitTermination(100, TimeUnit.SECONDS);
main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());
}
}

외부 서비스 호출 테스트

클라이언트의 요청을 받아 외부 서비스를 호출하고 해당 결과를 이용해서 응답을 돌려주는 테스트를 진행합니다. 테스트를 진행하기 위해서는 2개의 스프링 애플리케이션이 필요합니다. 2개의 스프링 애플리케이션의 설정은 다음과 같습니다.

  • Main Application
    • port: 8080
    • tomcat-max-thread-count: 1
  • Remote Application
    • port: 8081
    • tomcat-max-thread-count: 1000

Main Application

먼저 하나의 스프링 애플리케이션에 컨트롤러를 하나 준비합니다. 이 컨트롤러는 클라이언트로부터 요청을 받아 해당 요청으로부터 받은 값을 이용해 다른 외부 서비스(http://localhost:8081/service?req={req})를 호출합니다.

결국 해당 서블릿은 클라이언트 요청을 처리하면서 외부 서비스로의 Networking I/O 작업을 수행하기 때문에 외부 서비스로부터의 요청에 대한 응답을 받기 전까지는 blocking 상태가 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootApplication
@Slf4j
public class MainApplication {

@RestController
public static class MainController {
RestTemplate rt = new RestTemplate();

@GetMapping("/rest")
public String rest(int idx) {
String res = rt.getForObject("http://localhost:8081/service?req={req}",
String.class, "hello" + idx);
return res;
}
}

public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
}

application.properties 파일에서 다음과 같이 Tomcat의 스레드 개수를 1개로 설정합니다.

1
server.tomcat.max-threads=1

Remote Application

다른 하나의 스프링 애플리케이션을 생성하고 이전에 만들었던 스프링 애플리케이션의 컨트롤러 내부에서 만들었던 요청을 받아 처리할 수 있도록 컨트롤러 추가합니다. 8080 포트가 아닌 8081 포트를 사용하고 tomcat 스레드를 1000개로 설정합니다. RemoteApplication은 application.properties의 값을 사용하게 하지 않고 직접 프로퍼티를 설정해줍니다.

아래와 같이 설정하면 Intellij를 이용해서 하나의 프로젝트에서 2개의 스프링 애플리케이션을 실행할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SpringBootApplication
public class RemoteApplication {

@RestController
public static class RemoteController {
@GetMapping("/service")
public String service(String req) {
return req + "/service";
}
}

public static void main(String[] args) {
// 하나의 프로젝트에서 2개의 스프링 애플리케이션을 띄우기 위해 외부 서비스 역할을 하는 RemoteApplication은
// application.properties가 아닌 별도의 프로퍼티를 이용하도록 직접 설정한다.
System.setProperty("server.port", "8081");
System.setProperty("server.tomcat.max-threads", "1000");
SpringApplication.run(RemoteApplication.class, args);
}
}

결과 확인

MainApplication과 RemoteApplication을 각각 실행하고 Client를 이용한 테스트 결과는 다음과 같습니다.
100개의 클라이언트 요청을 처리하는데 0.4초 정도의 시간이 걸렸습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
...
...
01:54:27.539 [pool-1-thread-40] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
01:54:27.539 [pool-1-thread-40] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
01:54:27.539 [pool-1-thread-40] INFO com.example.study.LoadTest - idx: 40, Elapsed: 0.4 -> res: hello40/service
01:54:27.541 [pool-1-thread-77] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
01:54:27.541 [pool-1-thread-77] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
01:54:27.541 [pool-1-thread-77] INFO com.example.study.LoadTest - idx: 77, Elapsed: 0.401 -> res: hello77/service
01:54:27.543 [pool-1-thread-48] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
01:54:27.543 [pool-1-thread-48] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
01:54:27.543 [pool-1-thread-48] INFO com.example.study.LoadTest - idx: 48, Elapsed: 0.403 -> res: hello48/service
01:54:27.545 [pool-1-thread-8] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
01:54:27.545 [pool-1-thread-8] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
01:54:27.546 [pool-1-thread-8] INFO com.example.study.LoadTest - idx: 8, Elapsed: 0.407 -> res: hello8/service
01:54:27.548 [pool-1-thread-33] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
01:54:27.548 [pool-1-thread-33] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
01:54:27.548 [pool-1-thread-33] INFO com.example.study.LoadTest - idx: 33, Elapsed: 0.409 -> res: hello33/service
01:54:27.548 [main] INFO com.example.study.LoadTest - Total: 0.407

이번에는 RemoteApplication의 요청 처리 부분에 2초간 Thread sleep을 주고 다시 한 번 클라이언트를 이용해 테스트를 진행해봅니다.

1
2
3
4
5
6
7
8
9
10
// RemoteApplication

@RestController
public static class RemoteController {
@GetMapping("/service")
public String service(String req) throws InterruptedException {
Thread.sleep(2000);
return req + "/service";
}
}

Thread sleep을 추가하고 다시 테스트를 해보면 결과는 다음과 같습니다. 100개의 요청을 약 0.4초만에 모두 처리하던 이전과 달리 매 요청을 처리하는데 약 2초의 시간이 증가하고 있습니다. 결국 마지막 요청은 약 2 * 100 = 200초 후에서야 응답을 받을 수 있기 때문에 모든 요청에 대한 처리는 200초 정도 걸릴 것 입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
02:25:22.056 [pool-1-thread-32] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
02:25:22.058 [pool-1-thread-32] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
02:25:22.061 [pool-1-thread-32] INFO com.example.study.LoadTest - idx: 32, Elapsed: 2.233 -> res: hello32/service
02:25:24.060 [pool-1-thread-56] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
02:25:24.060 [pool-1-thread-56] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
02:25:24.060 [pool-1-thread-56] INFO com.example.study.LoadTest - idx: 56, Elapsed: 4.231 -> res: hello56/service
02:25:26.068 [pool-1-thread-93] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
02:25:26.068 [pool-1-thread-93] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
02:25:26.068 [pool-1-thread-93] INFO com.example.study.LoadTest - idx: 93, Elapsed: 6.238 -> res: hello93/service
02:25:28.077 [pool-1-thread-31] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
02:25:28.077 [pool-1-thread-31] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
02:25:28.077 [pool-1-thread-31] INFO com.example.study.LoadTest - idx: 31, Elapsed: 8.249 -> res: hello31/service
02:25:30.081 [pool-1-thread-20] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
02:25:30.082 [pool-1-thread-20] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
02:25:30.082 [pool-1-thread-20] INFO com.example.study.LoadTest - idx: 20, Elapsed: 10.254 -> res: hello20/service
02:25:32.089 [pool-1-thread-46] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
02:25:32.089 [pool-1-thread-46] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
02:25:32.089 [pool-1-thread-46] INFO com.example.study.LoadTest - idx: 46, Elapsed: 12.26 -> res: hello46/service
...
...
...

이런 결과가 나오게 된 이유는 클라이언트로부터의 요청을 받아 처리하는 Main Application의 tomcat thread가 1개이고, 1개의 서블릿 스레드를 이용해 클라이언트의 요청을 처리하는 과정에서 Remote Application에 대한 요청(Network I/O)에서 응답을 받기까지 약 2초간 스레드가 block되기 때문입니다.

AsyncRestTemplate

위의 문제는 MainApplication의 tomcat 스레드는 클라이언트의 요청을 처리하며 외부 서비스(RemoteApplication)로 요청(Network I/O)을 보낸 후, 응답이 올 때까지 대기하고 있는 상태라는 점입니다. 해당 시간동안 CPU는 아무 일을 처리하지 않기때문에 자원이 소모되고 있습니다.

이 문제를 해결하기 위해서는 API를 호출하는 작업을 비동기적으로 바꿔야합니다. tomcat 스레드는 요청에 대한 작업을 다 끝내기 전에 반환을 해서 바로 다음 요청을 처리하도록 사용합니다. 그리고 외부 서비스로부터 실제 결과를 받고 클라이언트의 요청에 응답을 보내기 위해서는 새로운 스레드를 할당 받아 사용합니다. (외부 서비스로부터 실제 결과를 받고 클라이언트에 응답을 보내기 위해서는 새로운 스레드를 할당 받아야 하지만, 외부 API를 호출하는 동안은 스레드(tomcat) 자원을 낭비하고 싶지 않다는 것이 목적이다.)

스프링 3.x 버전에서는 이 문제를 간단히 해결하기 어려웠지만 스프링 4 부터 제공하는 AsyncRestTemplate을 사용하면 이 문제를 쉽게 해결할 수 있습니다. AsyncRestTemplate은 비동기 클라이언트를 제공하는 클래스이며 ListenableFuture를 반환합니다. 스프링은 컨트롤러에서 ListenableFuture를 리턴하면 해당 스레드는 즉시 반납하고, 스프링 MVC가 자동으로 등록해준 콜백에 의해 결과가 처리됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootApplication
@Slf4j
public class MainApplication {

@RestController
public static class MainController {
// asynchronous
AsyncRestTemplate rt = new AsyncRestTemplate();

@GetMapping("/rest")
public ListenableFuture<ResponseEntity<String>> rest(int idx) {
return rt.getForEntity("http://localhost:8081/service?req={req}",
String.class, "hello" + idx);
}
}

public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
}

실행 결과를 살펴보면 100개의 요청을 동시에 처리하는데 약 2.6초의 시간이 걸렸습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
...
...
...
16:55:49.088 [pool-1-thread-4] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:55:49.089 [pool-1-thread-4] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
16:55:49.089 [pool-1-thread-4] INFO com.example.study.LoadTest - idx: 4, Elapsed: 2.658 -> res: hello4/service
16:55:49.090 [pool-1-thread-44] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:55:49.090 [pool-1-thread-44] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
16:55:49.090 [pool-1-thread-44] INFO com.example.study.LoadTest - idx: 44, Elapsed: 2.659 -> res: hello44/service
16:55:49.091 [pool-1-thread-93] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:55:49.091 [pool-1-thread-93] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
16:55:49.091 [pool-1-thread-93] INFO com.example.study.LoadTest - idx: 93, Elapsed: 2.658 -> res: hello93/service
16:55:49.095 [pool-1-thread-66] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:55:49.096 [pool-1-thread-66] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
16:55:49.096 [pool-1-thread-66] INFO com.example.study.LoadTest - idx: 66, Elapsed: 2.664 -> res: hello66/service
16:55:49.098 [pool-1-thread-16] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:55:49.098 [pool-1-thread-16] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
16:55:49.098 [pool-1-thread-16] INFO com.example.study.LoadTest - idx: 16, Elapsed: 2.667 -> res: hello16/service
16:55:49.101 [pool-1-thread-57] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:55:49.101 [pool-1-thread-57] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
16:55:49.101 [pool-1-thread-57] INFO com.example.study.LoadTest - idx: 57, Elapsed: 2.669 -> res: hello57/service
16:55:49.104 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:55:49.104 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
16:55:49.105 [pool-1-thread-2] INFO com.example.study.LoadTest - idx: 2, Elapsed: 2.674 -> res: hello2/service
16:55:49.105 [pool-1-thread-15] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:55:49.105 [pool-1-thread-15] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
16:55:49.105 [pool-1-thread-15] INFO com.example.study.LoadTest - idx: 15, Elapsed: 2.674 -> res: hello15/service
16:55:49.106 [main] INFO com.example.study.LoadTest - Total: 2.673

클라이언트의 요청이 들어 올 때, MainApplication의 스레드 상태를 살펴보면, tomcat 스레드는 그대로 1개 입니다.(http-nio-8080-exec-1) 그러나 비동기 작업을 처리하기 위해서 순간적으로 백그라운드에 100개의 스레드 새로 생성되는것을 확인할 수 있습니다.
async rest template result

Netty non-blocking I/O

지금까지 Tomcat의 스레드가 1개이지만 요청을 비동기적으로 처리함으로써 Tomcat의 스레드는 바로 반환이되어 다시 그 후의 요청에 Tomcat의 스레드를 이용해 요청을 받을 수 있었습니다. 그러나 결과적으로는 실제 비동기 요청을 처리하는 스레드는 요청의 수 만큼 계속 생성되는 것을 확인할 수 있었습니다.

이번에는 이렇게 비동기 요청을 처리하는 스레드의 수도 Netty의 non blocking I/O를 이용함으로써 비동기 요청을 처리하는 스레드도 줄여보고자 합니다. 그러면 결과적으로 tomcat의 스레드 1개, netty의 non blocking I/O를 이용하기위한 필요한 스레드의 수만큼만 생성되어 클라이언트의 요청을 모두 처리할 수 있을 것 입니다.

먼저 netty의 dependency를 build.gradle 혹은 pom.xml에 추가합니다. 저는 build.gradle에 의존성을 추가해 주었습니다.

1
2
3
4
5
dependencies {
...
implementation 'io.netty:netty-all:4.0.4.Final'
...
}

AsyncRestTemplate이 netty의 Netty4ClientHttpRequestFactory를 이용할 수 있도록 다음과 같이 설정합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootApplication
@Slf4j
public class MainApplication {

@RestController
public static class MainController {
// asynchronous + netty non-blocking
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@GetMapping("/rest")
public ListenableFuture<ResponseEntity<String>> rest(int idx) {
return rt.getForEntity("http://localhost:8081/service?req={req}",
String.class, "hello" + idx);
}
}

public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
}

다시 서버를 띄우고 테스트를 해보면 클라이언트의 요청을 전부 처리하는데 걸린 시간은 약 2.7초로 이전과 큰 차이가 없습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
...
...
...
18:24:49.958 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:24:49.958 [pool-1-thread-65] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:24:49.958 [pool-1-thread-65] INFO com.example.study.LoadTest - idx: 65, Elapsed: 2.744 -> res: hello65/service
18:24:49.964 [pool-1-thread-59] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:24:49.964 [pool-1-thread-59] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:24:49.964 [pool-1-thread-59] INFO com.example.study.LoadTest - idx: 59, Elapsed: 2.751 -> res: hello59/service
18:24:49.964 [pool-1-thread-14] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:24:49.965 [pool-1-thread-14] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:24:49.965 [pool-1-thread-14] INFO com.example.study.LoadTest - idx: 14, Elapsed: 2.752 -> res: hello14/service
18:24:49.968 [pool-1-thread-31] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:24:49.968 [pool-1-thread-31] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:24:49.968 [pool-1-thread-31] INFO com.example.study.LoadTest - idx: 31, Elapsed: 2.754 -> res: hello31/service
18:24:49.969 [pool-1-thread-63] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:24:49.969 [pool-1-thread-63] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:24:49.969 [pool-1-thread-63] INFO com.example.study.LoadTest - idx: 63, Elapsed: 2.755 -> res: hello63/service
18:24:49.969 [pool-1-thread-19] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:24:49.969 [pool-1-thread-19] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:24:49.969 [pool-1-thread-19] INFO com.example.study.LoadTest - idx: 19, Elapsed: 2.755 -> res: hello19/service
18:24:49.970 [pool-1-thread-62] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:24:49.970 [pool-1-thread-62] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:24:49.970 [pool-1-thread-62] INFO com.example.study.LoadTest - idx: 62, Elapsed: 2.756 -> res: hello62/service
18:24:49.970 [main] INFO com.example.study.LoadTest - Total: 2.755

스레드를 확인해보면 다음과 같이 tomcat 스레드 1개, netty가 non blocking I/O를 사용하는데 필요로 하는 몇개의 스레드가 추가된 것 말고는 스레드 수가 크게 증가하지 않은것을 확인할 수 있습니다.
async rest template result

DeferredResult

이전 포스팅에서 살펴 보았던 DeferredResult를 사용하면 AsyncRestTemplate을 사용하여 외부 서비스를 호출한 후, 그 결과를 다시 이용해 클라이언트의 요청에 응답하는 추가 로직 부분을 작성할 수 있습니다.

컨트롤러에서 DeferredResult 오브젝트를 반환하는 시점에는 바로 응답이 가지 않고, 추후 해당 DeferredResult 오브젝트에 값을 set(setResult, setErrorResult) 해줄 때, 클라이언트에게 응답이 가게 됩니다. 이를 이용하려면 ListenableFuture에 콜백을 추가해 해당 콜백 로직 안에서 결과를 이용해 DeferredResult 오브젝트의 set 메서드를 호출하면 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@SpringBootApplication
@Slf4j
public class MainApplication {

@RestController
public static class MainController {
// asynchronous + netty non-blocking
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
// 오브젝트를 만들어서 컨트롤러에서 리턴하면 언제가 될지 모르지만 언제인가 DeferredResult에 값을 써주면
// 그 값을 응답으로 사용
DeferredResult<String> dr = new DeferredResult<>();

ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}",
String.class, "hello" + idx);
f1.addCallback(s -> {
dr.setResult(s.getBody() + "/work");
}, e -> {
dr.setErrorResult(e.getMessage());
});

return dr;
}

}

public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
}

외부 서비스의 응답을 받아 “/work” 문자열이 추가되어 클라이언트에 전달된것을 확인할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
...
...
...
18:38:23.514 [pool-1-thread-33] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:38:23.514 [pool-1-thread-33] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:38:23.514 [pool-1-thread-33] INFO com.example.study.LoadTest - idx: 33, Elapsed: 2.345 -> res: hello33/service/work
18:38:23.515 [pool-1-thread-79] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:38:23.515 [pool-1-thread-79] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:38:23.515 [pool-1-thread-79] INFO com.example.study.LoadTest - idx: 79, Elapsed: 2.345 -> res: hello79/service/work
18:38:23.515 [pool-1-thread-80] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:38:23.515 [pool-1-thread-80] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:38:23.515 [pool-1-thread-80] INFO com.example.study.LoadTest - idx: 80, Elapsed: 2.345 -> res: hello80/service/work
18:38:23.516 [pool-1-thread-9] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:38:23.516 [pool-1-thread-9] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:38:23.516 [pool-1-thread-9] INFO com.example.study.LoadTest - idx: 9, Elapsed: 2.347 -> res: hello9/service/work
18:38:23.517 [pool-1-thread-60] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:38:23.517 [pool-1-thread-60] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:38:23.517 [pool-1-thread-60] INFO com.example.study.LoadTest - idx: 60, Elapsed: 2.347 -> res: hello60/service/work
18:38:23.517 [pool-1-thread-98] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
18:38:23.517 [pool-1-thread-98] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
18:38:23.518 [pool-1-thread-98] INFO com.example.study.LoadTest - idx: 98, Elapsed: 2.347 -> res: hello98/service/work
18:38:23.518 [main] INFO com.example.study.LoadTest - Total: 2.346

중첩된 Remote Service 사용

이번에는 외부 서비스를 하나 더 추가해보겠습니다. 외부 서비스의 요청에 대한 결과를 다시 다른 서비스를 호출하는 요청의 파라미터로 사용하면서 콜백의 구조가 복잡해지는 문제가 생기게 되었습니다. 이런 문제를 콜백 헬이라고 합니다.

다음번 포스팅에서는 콜백 헬을 해결할 수 있는 방법에 대해서 알아보도록 하겠습니다.

Remote Application

“/service2”를 추가합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@SpringBootApplication
public class RemoteApplication {

@RestController
public static class RemoteController {
@GetMapping("/service")
public String service(String req) throws InterruptedException {
Thread.sleep(2000);
return req + "/service1";
}

@GetMapping("/service2")
public String service2(String req) throws InterruptedException {
Thread.sleep(2000);
return req + "/service2";
}
}

public static void main(String[] args) {
// 하나의 프로젝트에서 2개의 스프링 애플리케이션을 띄우기 위해 외부 서비스 역할을 하는 RemoteApplication은
// application.properties가 아닌 별도의 프로퍼티를 이용하도록 직접 설정한다.
System.setProperty("server.port", "8081");
System.setProperty("server.tomcat.max-threads", "1000");
SpringApplication.run(RemoteApplication.class, args);
}
}

Main Application

“/service”를 호출한 결과를 이용해 “/service2”를 호출하도록 합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SpringBootApplication
@Slf4j
public class MainApplication {

@RestController
public static class MainController {
// asynchronous + netty non-blocking
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
// 오브젝트를 만들어서 컨트롤러에서 리턴하면 언제가 될지 모르지만 언제인가 DeferredResult에 값을 써주면
// 그 값을 응답으로 사용
DeferredResult<String> dr = new DeferredResult<>();

ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
dr.setResult(s2.getBody());
}, e -> {
dr.setErrorResult(e.getMessage());
});

}, e -> {
dr.setErrorResult(e.getMessage());
});

return dr;
}

}

public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
}

결과 확인

100개의 클라이언트 요청을 처리하는데 약 4.3초의 시간이 걸렸습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
...
...
...
19:25:19.904 [pool-1-thread-17] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
19:25:19.904 [pool-1-thread-17] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
19:25:19.905 [pool-1-thread-17] INFO com.example.study.LoadTest - idx: 17, Elapsed: 4.338 -> res: hello17/service1/service2
19:25:19.905 [pool-1-thread-87] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
19:25:19.905 [pool-1-thread-87] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
19:25:19.905 [pool-1-thread-87] INFO com.example.study.LoadTest - idx: 87, Elapsed: 4.337 -> res: hello87/service1/service2
19:25:19.905 [pool-1-thread-14] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
19:25:19.906 [pool-1-thread-14] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
19:25:19.906 [pool-1-thread-14] INFO com.example.study.LoadTest - idx: 14, Elapsed: 4.339 -> res: hello14/service1/service2
19:25:19.906 [pool-1-thread-74] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
19:25:19.906 [pool-1-thread-74] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
19:25:19.906 [pool-1-thread-74] INFO com.example.study.LoadTest - idx: 74, Elapsed: 4.338 -> res: hello74/service1/service2
19:25:19.907 [pool-1-thread-60] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
19:25:19.907 [pool-1-thread-60] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
19:25:19.907 [pool-1-thread-60] INFO com.example.study.LoadTest - idx: 60, Elapsed: 4.339 -> res: hello60/service1/service2
19:25:19.907 [pool-1-thread-38] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
19:25:19.907 [pool-1-thread-38] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
19:25:19.907 [pool-1-thread-38] INFO com.example.study.LoadTest - idx: 38, Elapsed: 4.34 -> res: hello38/service1/service2
19:25:19.907 [pool-1-thread-78] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
19:25:19.908 [pool-1-thread-78] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
19:25:19.908 [pool-1-thread-78] INFO com.example.study.LoadTest - idx: 78, Elapsed: 4.34 -> res: hello78/service1/service2
19:25:19.908 [pool-1-thread-5] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
19:25:19.908 [pool-1-thread-5] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.lang.String] as "text/plain;charset=UTF-8"
19:25:19.909 [pool-1-thread-5] INFO com.example.study.LoadTest - idx: 5, Elapsed: 4.342 -> res: hello5/service1/service2
19:25:19.909 [main] INFO com.example.study.LoadTest - Total: 4.338

자바와 스프링의 비동기 기술

해당 포스팅은 토비님의 토비의 봄 TV 8회 스프링 리액티브 프로그래밍 (4) 자바와 스프링의 비동기 기술 라이브 코딩을 보며 따라했던 실습 내용을 바탕으로 정리한 글입니다.

실습 코드들은 IntelliJ를 이용해 SpringBoot 2.1.3.RELEASE 버전 기반으로 프로젝트를 생성 후(web, lombok 포함) 진행했습니다.

자바의 비동기 기술

ExecutorService

**ExecutorService**는 쉽게 비동기로 작업을 실행할 수 있도록 도와주는 JDK(1.5부터)에서 제공하는 interface입니다. 일반적으로 ExecutorService는 작업 할당을 위한 스레드 풀과 API를 제공합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
public class FutureEx {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();

es.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
log.info("Async");
});

log.info("Exit");
}
}

// 결과
20:35:53.892 [main] INFO com.example.study.FutureEx - Exit
20:35:55.888 [pool-1-thread-1] INFO com.example.study.FutureEx - Async

Future

Future는 자바 1.5에서 등장한 비동기 계산의 결과를 나타내는 Interface 입니다.

비동기적인 작업을 수행한다는 것은 현재 진행하고 있는 스레드가 아닌 별도의 스레드에서 작업을 수행하는 것을 말합니다. 같은 스레드에서 메서드를 호출할 때는 결과를 리턴 값을 받지만, 비동기적으로 작업을 수행할 때는 결과값을 전달받을 수 있는 무언가의 interface가 필요한데 Future가 그 역할을 합니다.

비동기 작업에서 결과를 반환하고 싶을 때는 runnable대신 callable interface를 이용하면 결과 값을 return 할 수 있습니다. 또한 예외가 발생했을 때 해당 예외를 비동기 코드를 처리하는 스레드 안에서 처리하지 않고 밖으로 던질 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class FutureEx {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();

Future<String> f = es.submit(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});

log.info(f.get());
log.info("Exit");
}
}

// 결과
20:43:11.704 [pool-1-thread-1] INFO com.example.study.FutureEx - Async
20:43:11.706 [main] INFO com.example.study.FutureEx - Hello
20:43:11.706 [main] INFO com.example.study.FutureEx - Exit

Future를 통해서 비동기 결과의 값을 가져올 때는 get 메서드를 사용합니다. 그러나 get 메서드를 호출하게 되면 비동기 작업이 완료될 때까지 해당 스레드가 blocking됩니다.

Future는 비동기적인 연산 혹은 작업을 수행하고 그 결과를 갖고 있으며, 완료를 기다리고 계산 결과를 반환(get)하는 메소드와 그 외에도 해당 연산이 완료되었는지 확인하는(isDone) 메소드를 제공합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class FutureEx {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();

Future<String> f = es.submit(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});

log.info(String.valueOf(f.isDone()));
Thread.sleep(2000);
log.info("Exit");
log.info(String.valueOf(f.isDone()));
log.info(f.get());
}
}

// 결과
00:26:00.501 [main] INFO com.example.study.FutureEx - false
00:26:02.502 [pool-1-thread-1] INFO com.example.study.FutureEx - Async
00:26:02.509 [main] INFO com.example.study.FutureEx - Exit
00:26:02.509 [main] INFO com.example.study.FutureEx - true
00:26:02.509 [main] INFO com.example.study.FutureEx - Hello

FutureTask

**FutureTask**는 비동기 작업을 생성합니다. 지금까지 위의 코드는 비동기 작업 생성과 실행을 동시에 했다면 FutureTask는 비동기 작업 생성과 실행을 분리하여 진행할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
public class FutureEx {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();

FutureTask<String> f = new FutureTask<>(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});

es.execute(f);

log.info(String.valueOf(f.isDone()));
Thread.sleep(2000);
log.info("Exit");
log.info(String.valueOf(f.isDone()));
log.info(f.get());
}
}

// 결과
00:28:39.459 [main] INFO com.example.study.FutureEx - false
00:28:41.461 [pool-1-thread-1] INFO com.example.study.FutureEx - Async
00:28:41.467 [main] INFO com.example.study.FutureEx - Exit
00:28:41.467 [main] INFO com.example.study.FutureEx - true
00:28:41.467 [main] INFO com.example.study.FutureEx - Hello

비동기 작업의 결과를 가져오는 방법은 Future와 같은 결과를 다루는 handler를 이용하거나 callback을 이용하는 2가지 방법이 있습니다.
아래의 예시 코드는 FutureTask의 비동기 작업이 완료될 경우 호출되는 done() 메서드를 재정의하여 callback을 이용하는 방법입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class FutureEx {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();

FutureTask<String> f = new FutureTask<String>(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
}) {
@Override
protected void done() {
super.done();
try {
log.info(get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
};

es.execute(f);
es.shutdown();

log.info("EXIT");
}
}

// 결과
01:03:04.153 [main] INFO com.example.study.FutureEx - EXIT
01:03:06.153 [pool-1-thread-1] INFO com.example.study.FutureEx - Async
01:03:06.153 [pool-1-thread-1] INFO com.example.study.FutureEx - Hello

위 예시 코드의 callback 관련 부분을 FutureTask를 상속받아 done() 메서드를 재정의함으로써, 비동기 코드와 그 결과를 갖고 작업을 수행하는 callback을 좀 더 가독성이 좋게 작성할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
public class FutureEx {
interface SuccessCallback {
void onSuccess(String result);
}

public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;

public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
super(callable);
this.sc = Objects.requireNonNull(sc);
}

@Override
protected void done() {
super.done();
try {
this.sc.onSuccess(get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();

CallbackFutureTask f = new CallbackFutureTask(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
}, log::info);

es.execute(f);
es.shutdown();
}
}

// 결과
01:05:01.978 [main] INFO com.example.study.FutureEx - EXIT
01:05:03.977 [pool-1-thread-1] INFO com.example.study.FutureEx - Async
01:05:03.978 [pool-1-thread-1] INFO com.example.study.FutureEx - Hello

위 예시 코드에 SuccessCallback을 추가한 것처럼 ExceptionCallback을 추가하여 비동기 코드에서 예외가 발생할 경우, 해당 예외를 처리하는 callback도 추가할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Slf4j
public class FutureEx {
interface SuccessCallback {
void onSuccess(String result);
}

interface ExceptionCallback {
void onError(Throwable t);
}

public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;
ExceptionCallback ec;

public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
super(callable);
this.sc = Objects.requireNonNull(sc);
this.ec = Objects.requireNonNull(ec);
}

@Override
protected void done() {
super.done();
try {
this.sc.onSuccess(get());
/*
InterruptedException은 예외긴 예외이지만, 현재 작업을 수행하지 말고 중단해라 라고 메시지를 보내는 용도이다.
따라서 현재 스레드에 interrupt를 체크하고 종료한다.
*/
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// 래핑된 에러를 빼내어 전달한다.
ec.onError(e.getCause());
}
}
}

public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();

CallbackFutureTask f = new CallbackFutureTask(() -> {
Thread.sleep(2000);
if (1 == 1) throw new RuntimeException("Async ERROR!!!");
log.info("Async");
return "Hello";
},
s -> log.info("Result: {}", s),
e -> log.info("Error: {}", e.getMessage()));

es.execute(f);
es.shutdown();

log.info("EXIT");
}
}

// 결과
01:11:53.460 [main] INFO com.example.study.FutureEx - EXIT
01:11:55.463 [pool-1-thread-1] INFO com.example.study.FutureEx - Error: Async ERROR!!!

스프링의 비동기 기술

@Async

Spring MVC 3.2 부터 Servlet 3.0 기반의 비동기 요청 처리가 가능해졌습니다. @Async 어노테이션을 추가해 해당 메서드를 비동기적으로 호출할 수 있습니다. 해당 메서드를 호출한 호출자(caller)는 즉시 리턴하고 메소드의 실제 실행은 Spring TaskExecutor에 의해서 실행됩니다. 비동기로 실행되는 메서드는 Future 형식의 값을 리턴하고, 호출자는 해당 Future의 get() 메서드를 호출하기 전에 다른 작업을 수행할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@Service
public static class MyService {
/*
내부적으로 AOP를 이용해 복잡한 로직이 실행된다.
비동기 작업은 return값으로 바로 결과를 줄 수 없다. (Future 혹은 Callback을 이용해야 한다.)
*/
@Async
public Future<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return new AsyncResult<>("Hello");
}
}

public static void main(String[] args) {
// try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
try (ConfigurableApplicationContext c = SpringApplication.run(StudyApplication.class, args)) {
}
}

@Autowired
MyService myService;

// 모든 빈이 다 준비된 후 실행됨 (현재는 일종의 컨트롤러라고 생각)
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
Future<String> res = myService.hello();
log.info("exit: {}", res.isDone());
log.info("result: {}", res.get());
};
}
}


// 결과
2019-04-04 23:29:31.960 INFO 41618 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-04-04 23:29:31.960 INFO 41618 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 928 ms
2019-04-04 23:29:32.161 INFO 41618 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-04-04 23:29:32.337 INFO 41618 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-04-04 23:29:32.341 INFO 41618 --- [ main] com.example.study.StudyApplication : Started StudyApplication in 1.631 seconds (JVM running for 2.101)
2019-04-04 23:29:32.343 INFO 41618 --- [ main] com.example.study.StudyApplication : run()
2019-04-04 23:29:32.346 INFO 41618 --- [ main] com.example.study.StudyApplication : exit: false
2019-04-04 23:29:32.350 INFO 41618 --- [ task-1] com.example.study.StudyApplication : hello()
2019-04-04 23:29:33.351 INFO 41618 --- [ main] com.example.study.StudyApplication : result: Hello
2019-04-04 23:29:33.354 INFO 41618 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'

ListenableFuture

스프링 4.0 부터 제공하는 Future 인터페이스를 확장한 ListenableFuture를 이용하면 비동기 처리의 결과 값을 사용할 수 있는 callback을 추가할 수 있습니다.
@Async 어노테이션을 사용하는 메서드에서 스프링 4.1 부터 제공하는 ListenableFuture 인터페이스를 구현한 AsyncResult를 반환하면 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@Service
public static class MyService {
@Async
public ListenableFuture<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return new AsyncResult<>("Hello");
}
}

public static void main(String[] args) {
// try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
try (ConfigurableApplicationContext c = SpringApplication.run(StudyApplication.class, args)) {
}
}

@Autowired
MyService myService;

@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
ListenableFuture<String> f = myService.hello();
f.addCallback(s -> log.info(s), e-> log.info(e.getMessage()));
log.info("exit");

Thread.sleep(2000);
};
}
}


// 결과
2019-04-04 23:42:46.348 INFO 44559 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-04-04 23:42:46.348 INFO 44559 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 959 ms
2019-04-04 23:42:46.557 INFO 44559 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-04-04 23:42:46.736 INFO 44559 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-04-04 23:42:46.740 INFO 44559 --- [ main] com.example.study.StudyApplication : Started StudyApplication in 1.779 seconds (JVM running for 2.306)
2019-04-04 23:42:46.742 INFO 44559 --- [ main] com.example.study.StudyApplication : run()
2019-04-04 23:42:46.748 INFO 44559 --- [ main] com.example.study.StudyApplication : exit
2019-04-04 23:42:46.751 INFO 44559 --- [ task-1] com.example.study.StudyApplication : hello()
2019-04-04 23:42:47.752 INFO 44559 --- [ task-1] com.example.study.StudyApplication : Hello
2019-04-04 23:42:48.757 INFO 44559 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'

ThreadPoolTaskExecutor

@Async 어노테이션을 사용해 해당 메서드를 비동기적으로 호출할 경우 ThreadPool을 명시적으로 선언하지 않으면, 기본적으로 SimpleAsyncTaskExecutor를 사용합니다. SimpleAsyncTaskExecutor는 각 비동기 호출마다 계속 새로운 스레드를 만들어 사용하기 때문에 비효율적입니다. 이 경우 ThreadPoolTaskExecutor를 직접 만들어 사용하는게 효율적입니다.

ThreadPoolTaskExecutor는 CorePool, QueueCapacity, MaxPoolSize를 직접 설정할 수 있습니다. 각 값에 대한 설명은 코드에 추가했습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@Service
public static class MyService {
/*
기본적으로 SimpleAsyncTaskExecutor를 사용한다. 스레드를 계속 새로 만들어 사용하기 때문에 비효율적이다.
*/
@Async
// @Async("tp") ThreadPool이 여러개일 경우 직접 지정 가능하다.
public ListenableFuture<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return new AsyncResult<>("Hello");
}
}

@Bean
ThreadPoolTaskExecutor tp() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
// 1) 스레드 풀을 해당 개수까지 기본적으로 생성함. 처음 요청이 들어올 때 poll size만큼 생성한다.
te.setCorePoolSize(10);
// 2) 지금 당장은 Core 스레드를 모두 사용중일때, 큐에 만들어 대기시킨다.
te.setQueueCapacity(50);
// 3) 대기하는 작업이 큐에 꽉 찰 경우, 풀을 해당 개수까지 더 생성한다.
te.setMaxPoolSize(100);
te.setThreadNamePrefix("myThread");
return te;
}

public static void main(String[] args) {
// try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
try (ConfigurableApplicationContext c = SpringApplication.run(StudyApplication.class, args)) {
}
}

@Autowired
MyService myService;

// 모든 빈이 다 준비된 후 실행됨 (현재는 일종의 컨트롤러라고 생각)
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
ListenableFuture<String> f = myService.hello();
f.addCallback(s -> log.info(s), e-> log.info(e.getMessage()));
log.info("exit");

Thread.sleep(2000);
};
}
}


// 결과
2019-04-05 00:03:11.304 INFO 47863 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-04-05 00:03:11.304 INFO 47863 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1061 ms
2019-04-05 00:03:11.367 INFO 47863 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'tp'
2019-04-05 00:03:11.677 INFO 47863 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-04-05 00:03:11.680 INFO 47863 --- [ main] com.example.study.StudyApplication : Started StudyApplication in 1.751 seconds (JVM running for 2.208)
2019-04-05 00:03:11.681 INFO 47863 --- [ main] com.example.study.StudyApplication : run()
2019-04-05 00:03:11.686 INFO 47863 --- [ main] com.example.study.StudyApplication : exit
2019-04-05 00:03:11.687 INFO 47863 --- [ myThread1] com.example.study.StudyApplication : hello()
2019-04-05 00:03:12.691 INFO 47863 --- [ myThread1] com.example.study.StudyApplication : Hello

Servlet Async

@Async 어노테이션을 설명할 때 말했던 것처럼, Spring MVC 3.2 부터 Servlet 3.0 기반의 비동기 요청 처리가 가능해졌습니다. 기존 Controller 메서드를 Callable로 변경함으로써 비동기로 만들 수 있습니다.
Controller 메서드를 비동기로 변경해도 해당 처리가 서블릿 스레드가 아닌 다른 스레드에서 발생한다는 점을 제외하면 기존 Controller 메서드의 동작 방식과는 큰 차이가 없습니다.
(참고 : Spring MVC 3.2 Preview: Making a Controller Method Asynchronous)

Servlet 3.0 & 3.1

  • Servlet 3.0: 비동기 서블릿
    • HTTP connection은 이미 논블록킹 IO
    • 서블릿 요청 읽기, 응답 쓰기는 블록킹
    • 비동기 작업 시작 즉시 서블릿 스레드 반납
    • 비동기 작업이 완료되면 서블릿 스레드 재할당
    • 비동기 서블릿 컨텍스트 이용 (AsyncContext)
  • Servlet 3.1: 논블록킹 IO
    • 논블록킹 서블릿 요청, 응답 처리
    • Callback

      스레드가 블록되는 상황은 CPU와 메모리 자원을 많이 소모합니다. 컨텍스트 스위칭이 일어나기 때문입니다. 기본적으로 스레드가 블로킹되면 wating 상태로 변경되면서 컨텍스트 스위칭이 일어나고 추후 I/O 작업이 끝나 running 상태로 변경되면서 다시 컨텍스트 스위칭이 일어나 총 2번의 컨텍스트 스위칭이 일어납니다.
      Java InputStream과 OutputStream은 블록킹 방식이다. RequestHttpServletRequest, RequestHttpServletResponse는 InputSream과 OutputStream을 사용하기 때문에 서블릿은 기본적으로 블로킹 IO 방식이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
@GetMapping("/callable")
public Callable<String> callable() {
log.info("callable");
return () -> {
log.info("async");
Thread.sleep(2000);
return "hello";
};
}
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}


// 결과
2019-04-06 01:12:41.761 INFO 69216 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-04-06 01:12:41.762 INFO 69216 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1206 ms
2019-04-06 01:12:41.993 INFO 69216 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-04-06 01:12:42.182 INFO 69216 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-04-06 01:12:42.186 INFO 69216 --- [ main] com.example.study.StudyApplication : Started StudyApplication in 2.073 seconds (JVM running for 2.807)
2019-04-06 01:12:44.161 INFO 69216 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2019-04-06 01:12:44.162 INFO 69216 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2019-04-06 01:12:44.169 INFO 69216 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 7 ms
2019-04-06 01:12:44.190 INFO 69216 --- [nio-8080-exec-1] com.example.study.StudyApplication : callable
2019-04-06 01:12:44.198 INFO 69216 --- [ task-1] com.example.study.StudyApplication : async

실제로 비동기 서블릿은 아래의 그림처럼 동작합니다.
비동기 서블릿 구조

Client (For Load Test)

지금부터는 Spring에서 Sync Servlet을 이용할 때와 Async Servlet을 이용했을 때의 차이점을 알아보기 위해 테스트를 할 수 있도록, 먼저 여러 Request를 동시에 생성하는 Client를 작성해봅니다.
Spring에서 제공하는 RestTemplate을 이용해 100개의 Request를 동시에 호출합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class LoadTest {
private static AtomicInteger counter = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(100);

RestTemplate rt = new RestTemplate();
String url = "http://localhost:8080/callable";

StopWatch main = new StopWatch();
main.start();

for (int i = 0; i < 100; i++) {
es.execute(() -> {
int idx = counter.addAndGet(1);
log.info("Thread {}", idx);

StopWatch sw = new StopWatch();
sw.start();

rt.getForObject(url, String.class);

sw.stop();
log.info("Elapsed: {} -> {}", idx, sw.getTotalTimeSeconds());
});
}

es.shutdown();
// 지정된 시간이 타임아웃 걸리기 전이라면 대기작업이 진행될 때까지 기다린다.
// (100초안에 작업이 끝날때까지 기다리거나, 100초가 초과되면 종료)
es.awaitTermination(100, TimeUnit.SECONDS);
main.stop();
log.info("Total: {}", main.getTotalTimeSeconds());
}
}

Change Tomcat Thread Count

위의 비동기 서블릿 그림에서 볼 수 있듯이, Async Servlet은 클라이언트로부터 요청을 받은 후 실제 작업은 작업 스레드 풀에 위임하고 현재의 서블릿 스레드는 서블릿 스레드 풀에 반환 후, 다음 요청이 들어올 경우 사용할 수 있도록 합니다. 이에 반해, Sync Servlet은 요청을 받은 서블릿 스레드에서 실제 작업까지 전부 진행하기 때문에 요청에 대한 응답을 반환하기 전까지는 새로운 요청을 처리할 수 없는 상태입니다.

실제 이처럼 동작하는지 확인하기 위해서 application.properties 파일에서 다음과 같이 Tomcat의 스레드 개수를 1개로 설정합니다.

1
server.tomcat.max-threads=1

Sync vs Async

Sync

먼저 아래와 같이 Sync Servlet을 이용해 서버를 띄운 후 위의 Client 코드를 이용해 테스트를 진행합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
@GetMapping("/callable")
public String callable() throws InterruptedException {
log.info("sync");
Thread.sleep(2000);
return "hello";
}
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

해당 서버를 띄우고 Client(LoadTest) 코드를 사용해 테스트를 진행하면 결과는 다음과 같습니다. Tomcat의 스레드가 하나이며 Sync 방식으로 동작하기 때문에 한 번에 하나의 클라이언트 요청만 처리할 수 있습니다. 서버 로그를 확인하면 nio-8080-exec-1 라는 이름을 가진 한개의 스레드가 요청을 처리하고 있습니다.
동기 서블릿 테스트 결과

이번에는 JMC(Java Mission Control)를 이용해 실제 서버의 스레드 상황을 살펴보겠습니다.

JMC를 이용하기 위해서는 서버를 실행할 때 다음과 같은 JVM 옵션을 추가합니다.

1
2
3
4
5
6
-XX:+UnlockCommercialFeatures
-XX:+FlightRecorder
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Djava.rmi.server.hostname=localhost

JMC를 이용해 클라이언트 요청이 들어올 때, Thread 상태를 보면 다음과 같습니다. 동시에 100개의 클라이언트 요청이 들어왔지만, 스레드 수는 그대로 유지되고 있으며, 여러 스레드 목록 중에 nio-8080-exec-1 스레드가 존재하고 있는것을 확인할 수 있습니다.
동기 서블릿 테스트 결과 - 스레드

Async

이번에는 서버 코드를 아래와 같이 Async Servlet을 이용하도록 수정한 후 서버를 띄워 Client 코드를 이용해 테스트를 진행합니다. (작업 스레드 풀은 WebMvcConfigurer를 통해 설정해줍니다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
@GetMapping("/callable")
public Callable<String> callable() {
return () -> {
log.info("async");
Thread.sleep(2000);
return "hello";
};
}
}

@Bean
WebMvcConfigurer configurer() {
return new WebMvcConfigurer() {
// 워커 스레드 풀 설정
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(100);
te.setQueueCapacity(50);
te.setMaxPoolSize(200);
te.setThreadNamePrefix("workThread");
te.initialize();
configurer.setTaskExecutor(te);
}
};
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

Client(LoadTest) 코드를 사용해 테스트를 진행하면 결과는 다음과 같습니다. Tomcat의 스레드가 하나이지만 Async 방식으로 동작하기 때문에 해당 요청에 대한 실제 처리는 워커 스레드 풀에서 사용되고 있지 않은 스레드를 이용해 처리합니다. 서버 로그를 확인하면 nio-8080-exec-1 라는 이름을 가진 한개의 Tomcat 스레드와 workThreadX라는 이름을 가진 100개의 워커 스레드를 확인할 수 있습니다.
비동기 서블릿 테스트 결과

이번에도 역시 JMC(Java Mission Control)를 이용해 실제 서버의 스레드 상황을 살펴보겠습니다.

nio-8080-exec-1 라는 이름을 가진 한개의 Tomcat 스레드와 workThreadX라는 이름을 가진 100개의 워커 스레드를 확인할 수 있습니다.
비동기 서블릿 테스트 결과 - 스레드

DeferredResult

DeferredResult는 Spring 3.2 부터 사용 가능합니다. 비동기 요청 처리를 위해 사용하는 Callable의 대안을 제공합니다. “지연된 결과”를 의미하며 외부의 이벤트 혹은 클라이언트 요청에 의해서 지연되어 있는 HTTP 요청에 대한 응답을 나중에 써줄 수 있는 기술입니다. 별도로 워커 스레드를 만들어 대기하지 않고도 처리가 가능합니다.

DeferredResult 구조

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {
Queue<DeferredResult<String>> results = new ConcurrentLinkedQueue<>();

@GetMapping("/dr")
public DeferredResult<String> dr() {
log.info("dr");
DeferredResult<String> dr = new DeferredResult<>();
results.add(dr);
return dr;
}

@GetMapping("/dr/count")
public String drCount() {
return String.valueOf(results.size());
}

@GetMapping("/dr/event")
public String drEvent(String msg) {
for (DeferredResult<String> dr : results) {
dr.setResult("Hello " + msg);
results.remove(dr);
}
return "OK";
}
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

LoadTest 코드를 이용해 /dr로 100개의 요청을 보내고, 크롬에서 /dr/count로 DeferredResult가 담겨있는 큐의 사이즈를 확인해봅니다. 그리고 마지막으로 /dr/event로 큐에 담긴 DeferredResult 객체에 setResult로 결과를 반환합니다.
100개의 요청이 동시에 완료되는 것을 확인할 수 있습니다.
DeferredResult 결과

ResponseBodyEmitter

ResponseBodyEmitter는 Spring 4.2 부터 사용 가능합니다. 비동기 요청 처리의 결과로 하나 이상의 응답을 위해 사용되는 리턴 값 Type 입니다. DeferredResult가 하나의 결과를 생성해 요청을 처리했다면, ResponseBodyEmitter는 여러개의 결과를 만들어 요청을 처리할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

@RestController
public static class MyController {

@GetMapping("/emitter")
public ResponseBodyEmitter emitter() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();

Executors.newSingleThreadExecutor().submit(() -> {
try {
for (int i = 0; i < 50; i++) {
emitter.send("<p>Stream " + i + "</p>");
Thread.sleep(100);
}
} catch (Exception e) {
}
});

return emitter;
}
}

public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}

ResponseBodyEmitter 결과_1
ResponseBodyEmitter 결과_2

후기

우하한형제들 - 스프링 리액티브 프로그래밍 세미나를 다녀온 후 토비의 봄 TV 스프링 리액티브 프로그래밍 시리즈를 전부 보고있습니다.

토비님은 매 라이브 코딩마다 말씀하시길 단순히 코드를 보는 것과 실행 후 결과를 실제로 확인해보는 것은 또 다른 차이가 있을 수 있다고 말씀하십니다. 매우 공감합니다!

위의 내용들은 모두 라이브 코딩에 포함되어 있는 내용이지만 실제로 따라해보면서 해당 내용들을 정리하는 차원으로 작성해보았습니다. 따라하며 토비님이 라이브 코딩을 진행하셨을 때와 달라진 몇 가지를 수정한 부분도 있고, 서버의 스레드를 직접 확인해보고자 처음에는 VisualVM을 사용하려 했지만 계속 실패해 JMC을 이용해 진행했습니다.

라이브 코딩을 보며 자바와 스프링의 비동기 기술에 대해 개인적으로 궁금했던 부분들이 많이 해소되었습니다!! 앞으로 남은 내용들도 따라하며 정리해 보도록 하겠습니다.

[번역] Java Reactor Pattern

해당 글은 아래의 Reactor Pattern Explained 시리즈를 번역하였습니다.

Part 1

서버가 동시에 요청(이벤트)을 받을 때, 보통 요청을 처리하기 위한 이벤트 리스너를 각 스레드마다 할당해 처리하곤 합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) { }
}
}

class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { }
}
private byte[] process(byte[] cmd) { }
}

요청마다 처리를 위해 이벤트 리스너를 별도의 스레드로 만들어 사용하는 것의 단점은 컨텍스트 전환(context switching)의 오버 헤드가 크다는 것입니다. 최악의 경우, 데이터를 자주 읽거나 쓰지 않는 이벤트 리스너를 처리하는 일부 스레드는 유용한 작업을 하지 않고 주기적으로 컨텍스트를 전환할 것입니다. 스케줄러가 이러한 스레드를 CPU에 디스패치(dispatch) 할 때마다 I/O 이벤트가 발생할 때까지 스레드는 차단되어 I/O 이벤트를 기다리는 데 소비되는 모든 시간이 낭비됩니다.

위의 코드에서 ss.accept()는 클라이언트가 연결될 때까지 서버 스레드를 차단하는 블로킹 호출입니다. 서버 스레드는 ss.accept() 호출이 반환 될 때까지 이벤트 리스너를 위한 새로운 스레드의 start() 메서드를 호출 할 수 없습니다. 불필요한 컨텍스트 전환으로 인한 CPU 시간 낭비를 줄이기 위해 non-blocking I/O 개념이 탄생했습니다.

**Reactor Pattern**은 이 문제를 해결하기 위한 이벤트 처리(event handling) 디자인 패턴입니다. 하나의 Reactor가 계속 이벤트를 찾고 이벤트가 발생(trigger)하면 해당 이벤트 처리기(event handler)에게 이를 알립니다.

자바는 non-blocking 시스템을 설계하는 데 사용할 수 있는 **표준 API (java.nio)**를 제공합니다. 클라이언트가 서버로 이름(데이터)을 보내면 서버는 Hello 메시지로 응답하는 간단한 예제를 통해 Reactor 패턴에 대해서 알아보겠습니다.

Reactor 패턴의 아키텍처에는 두 가지 중요한 참여자가 있습니다.

  1. Reactor
    : Reactor는 별도의 스레드에서 실행되며 발생한 I/O 이벤트는 dispatching되어 해당 이벤트 처리기로 보내 처리합니다.
  2. Handlers
    : Handler는 Reactor로부터 I/O 이벤트를 받아 실제 작업을 수행합니다.

java.nio 패키지를 이용해서 Reactor 패턴을 구현할 것이기 때문에 nio 패키지에 속한 몇가지 class에 대한 이해가 필요합니다.

  • Channels
    : 소켓을 통해 non-blocking read를 할 수 있도록 지원하는 connection.
  • Buffers
    : 채널에 의해 직접 read되거나 write될 수 있는 배열과 같은 객체.
  • Selectors
    : Selector 는 어느 channel set 이 IO event 를 가지고 있는지를 알려준다. Selector.select() 는 I/O 이벤트가 발생한 채널 set을 return한다. return할 channel이 없다면 계속 기다리게(block) 된다. 이 block된 것을 바로 return 시켜주는 것이 Selector.wakeup()이다.
    Selector.selectedKeys()는 Selection Key 를 return 해 준다. Reactor는 이 Selection Key를 보고 어떤 handler로 넘겨줄 지를 결정한다.
  • Selection Keys
    : Selector와 Channel간의 관계를 표현해주는 객체이다. Selector가 제공한 Selection Key를 이용해 Reactor는 채널에서 발생하는 I/O 이벤트로 수행할 작업을 선택할 수 있다. ServerSocketChannel에 selector를 등록하면 key를 준다. 이 key가 SelectionKey 이다.

Reactor pattern

Selector는 계속해서 I/O 이벤트가 발생하기를 대기합니다. Reactor가 Selector.select() 메소드를 호출하면 Selector는 등록된 채널에 대해서 발생한 이벤트 정보가 들어있는 SelectionKey Set을 반환합니다. (SelectionKey는 해당 채널과 Selector와의 관계에 대한 모든 정보를 갖고 있습니다. 또한 Handler에 대한 정보도 갖고 있습니다.)

Selector에 등록된 하나의 ServerSocketChannel이 있습니다. ServerSocketChannel은 클라이언트에서 들어오는 연결 요청으로부터 이벤트를 수신해야합니다. 클라이언트가 연결을 요청할 때, ServerSocketChhannel은 I/O 이벤트를 받아 클라이언트에 SocketChannel을 할당해야 합니다. SelectionKey0은 ServerSocketChannel을 가지고 무엇을 해야하는지에 대한 이벤트 정보를 갖고 있습니다. SocketChhannel을 만들기 위해서는 Reactor가 SelectionKey0의 이벤트를 Acceptor에 전달해 Acceptor가 클라이언트와의 연결 요청을 수락하고 SocketChannel을 만들도록 해야합니다.

Acceptor가 클라이언트1의 연결을 수락하면 클라이언트1에 대한 SocketChannel이 생성됩니다. 이 SocketChannel역시 Selector에 등록되고 해당 채널에서 이벤트가 발생하면 해당 이벤트에 대한 정보를 포함한 SelectionKey1을 반환합니다. 이 SelectionKey1을 이용해서 해당 채널로부터 데이터를 읽고 쓸 수 있습니다. 따라서 SelectionKey1은 읽기와 쓰기를 처리하는 Handler1 객체에 바인딩 됩니다.

이후로 Reactor가 Selector.selector()를 호출했을 때 반환된 SelectionKey Set에 SelectionKey1이 있으면 SocketChannel1이 이벤트와 함께 트리거됨을 의미합니다. 이제 SelectionKey1을 보면, Reactor는 Handler1이 SelectionKey1에 바인딩되어 있으므로 Handler1에 이벤트를 전달해야한다는 것을 알고 있습니다. 반환 된 SelectionKey Set에 SelectionKey0이 있으면 ServerSocketChannel이 다른 클라이언트에서 이벤트를 수신했으며 SelectionKey0을 보고 Reactor는 해당 이벤트를 다시 Acceptor에 전달해야 함을 알고 있습니다. 이벤트가 Acceptor에 전달되면 클라이언트2에 대해 SocketChannel2를 만들고 SelectionKey2로 Selector로 SocketChannel2를 등록합니다.

Selection Key table

따라서 이 시나리오에서는 3가지 유형의 이벤트에 관심이 있습니다.

  1. accept 해야하는 ServerSocketChannel에서 트리거되는 연결 요청 이벤트.
  2. 클라이언트로 부터 송신된 데이터를 수신할 수 있을 때, SocketChannel로부터 트리거 되는 이벤트.
  3. 서버에서 클라이언트로 데이터를 송신할 때, 송신할 수 있는 준비가 되면 SocketChannel로부터 트리거 되는 이벤트.

그럼 스레드 풀은 이 작업과 어떤 관련이 있을까요? non-blocking 아키텍처의 장점은 클라이언트의 모든 요청을 처리하는 동시에 단일 스레드에서 실행되도록 서버를 작성할 수 있다는 것입니다. 서버를 설계하는 데 동시성 개념을 적용하지 않으면 이벤트에 대한 반응성이 떨어집니다. 단일 스레드일 때는 reactor가 이벤트를 handler에 전달해 처리될 때 까지는 다른 이벤트에 응답할 수 없기 때문입니다. 왜냐하면 하나의 스레드를 사용하여 모든 이벤트를 처리하기 때문입니다.

위의 아키텍처에 동시성을 추가해서 시스템의 응답 속도를 향상시킬 수 있습니다. Reactor가 이벤트를 Handler에 전달하고 새로운 Thread에서 Handler를 이용해 이벤트를 처리하면 Reacgtor는 계속해서 다른 이벤트에 응답할 수 있습니다. 또한 스레드 풀을 이용하면 시스템의 스레드 수를 제한하면서 더 효율적으로 사용할 수 있을 것 입니다.

Part 2

간단한 Reactor 패턴의 예시를 살펴보겠습니다. 클라이언트는 서버로 이름을 넣은 메시지를 전송하고 서버는 클라이언트에 Hello 메시지로 응답합니다. 스레드 풀은 Part3에서 살펴보겠습니다.

클라이언트는 java.nio를 사용하여 Socket을 생성하지 않고 java.net.Socket을 사용합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class Client {
String hostIp;
int hostPort;

public Client(String hostIp, int hostPort) {
this.hostIp = hostIp;
this.hostPort = hostPort;
}

public void runClient() throws IOException {
Socket clientSocket = null;
PrintWriter out = null;
BufferedReader in = null;

try {
clientSocket = new Socket(hostIp, hostPort);
out = new PrintWriter(clientSocket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
} catch (UnknownHostException e) {
System.err.println("Unknown host: " + hostIp);
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn't connect to: " + hostIp);
System.exit(1);
}

BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
String userInput;

System.out.println("Client connected to host : " + hostIp + " port: " + hostPort);
System.out.println("Type (\"Bye\" to quit)");
System.out.println("Tell what your name is to the Server.....");

while ((userInput = stdIn.readLine()) != null) {

out.println(userInput);

// Break when client says Bye.
if (userInput.equalsIgnoreCase("Bye"))
break;

System.out.println("Server says: " + in.readLine());
}

out.close();
in.close();
stdIn.close();
clientSocket.close();
}

public static void main(String[] args) throws IOException {

Client client = new Client("127.0.0.1", 9900);
client.runClient();
}
}

서버는 Reactor 패턴을 이용해 구현합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Reactor implements Runnable {

final Selector selector;
final ServerSocketChannel serverSocketChannel;
final boolean isWithThreadPool;

Reactor(int port, boolean isWithThreadPool) throws IOException {

this.isWithThreadPool = isWithThreadPool;
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey0.attach(new Acceptor());
}


public void run() {
System.out.println("Server listening to port: " + serverSocketChannel.socket().getLocalPort());
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}

void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
r.run();
}
}

class Acceptor implements Runnable {
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
if (isWithThreadPool)
new HandlerWithThreadPool(selector, socketChannel);
else
new Handler(selector, socketChannel);
}
System.out.println("Connection Accepted by Reactor");
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}

Reactor는 Runnable을 구현하고 있으며, run() 메서드에서는 while 루프를 돌며 selector.select()를 호출하여 처리할 수 있는 이벤트 정보가 담긴 SelectionKey Set을 가져옵니다. SelectionKey에 바인드 되어있는 Handler를 가져와 dispatch합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class Handler implements Runnable {

final SocketChannel socketChannel;
final SelectionKey selectionKey;
ByteBuffer input = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
String clientName = "";

Handler(Selector selector, SocketChannel c) throws IOException {
socketChannel = c;
c.configureBlocking(false);
selectionKey = socketChannel.register(selector, 0);
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}


public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}

void read() throws IOException {
int readCount = socketChannel.read(input);
if (readCount > 0) {
readProcess(readCount);
}
state = SENDING;
// Interested in writing
selectionKey.interestOps(SelectionKey.OP_WRITE);
}

/**
* Processing of the read message. This only prints the message to stdOut.
*
* @param readCount
*/
synchronized void readProcess(int readCount) {
StringBuilder sb = new StringBuilder();
input.flip();
byte[] subStringBytes = new byte[readCount];
byte[] array = input.array();
System.arraycopy(array, 0, subStringBytes, 0, readCount);
// Assuming ASCII (bad assumption but simplifies the example)
sb.append(new String(subStringBytes));
input.clear();
clientName = sb.toString().trim();
}

void send() throws IOException {
System.out.println("Saying hello to " + clientName);
ByteBuffer output = ByteBuffer.wrap(("Hello " + clientName + "\n").getBytes());
socketChannel.write(output);
selectionKey.interestOps(SelectionKey.OP_READ);
state = READING;
}
}

Handler에는 READING, SENDING 2가지 상태가 있습니다. 채널은 한 번에 하나의 작업만 지원하기 때문에 동시에 처리할 수 없습니다. Handler가 SelectionKey에 어떻게 attach 되는지와 관심있는 연산이 OP_READ로 설정되는 부분에 유의해야합니다. Selector는 Read 이벤트가 발생할 때만 SelectionKey를 select해야 합니다. READ 프로세스가 완료되면 Handler는 상태를 SENDING으로 변경하고 관심 대상 연산을 OP_WRITE로 변경합니다. 이제 Selector는 채널이 데이터를 전송할 준비가 되었을 때 SelectionKey를 select 합니다. Write 이벤트가 Handler에 dispatch될 때, 상태가 SENDING이므로 Hello 메시지를 출력 버퍼에 씁니다. 전송이 완료되면 관심있는 작업을 OP_READ로 다시 변경하면서 Handler의 상태가 READING으로 변경됩니다.

결과적으로 서버는 단일 스레드에서 실행되지만 서버에 연결하는 클라이언트 수에 상관없이 응답합니다.

Part 3

이번 파트에서는 Handler의 스레드 풀에 대해서 설명합니다. HandlerWithThreadPool은 Handler 클래스의 확장 버전입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class HandlerWithThreadPool extends Handler {

static ExecutorService pool = Executors.newFixedThreadPool(2);
static final int PROCESSING = 2;

public HandlerWithThreadPool(Selector sel, SocketChannel c) throws IOException {
super(sel, c);
}

void read() throws IOException {
int readCount = socketChannel.read(input);
if (readCount > 0) {
state = PROCESSING;
pool.execute(new Processer(readCount));
}
// We are interested in writing back to the client soon after read processing is done.
selectionKey.interestOps(SelectionKey.OP_WRITE);
}

// Start processing in a new Processer Thread and Hand off to the reactor thread.
synchronized void processAndHandOff(int readCount) {
readProcess(readCount);
// Read processing done. Now the server is ready to send a message to the client.
state = SENDING;
}

class Processer implements Runnable {
int readCount;
Processer(int readCount) {
this.readCount = readCount;
}
public void run() {
processAndHandOff(readCount);
}
}
}

PROCESSING이 새로 도입되었으며 read() 메서드가 override 되었습니다. 이제 Read 이벤트가 Handler에 dispatch되면 데이터를 읽지만 상태를 SENDING으로 변경하지는 않습니다. 메시지를 처리하고 스레드 풀의 다른 스레드에서 실행하고 관련 작업을 OP_WRITE로 설정하는 Processer를 생성합니다. 이 시점에서 채널에 Write 준비가 되어 있더라도 Handler는 아직 PROCESSING 상태이기 때문에 write하지 않습니다.

참고

Reactor Pattern 에 대해 알아보자

Java NIO와 멀티플렉싱 기반의 다중 접속 서버

자바 NIO에 대한 소개와 NIO와 함께 도입된 자바에서 I/O 멀티플렉싱(multiplexing)을 구현한 selector에 대해 알아봅니다. I/O 멀티플렉싱(multiplexing)에 대한 개념에 대해 아직 잘 이해하지 못하고 있다면 먼저 <멀티플렉싱 기반의 다중 접속 서버로 가기까지> 포스팅을 읽어주세요.

Overview

자바 NIO (New IO)는 기존의 자바 IO API를 대체하기 위해 자바 1.4부터 도입이 되었습니다. 새롭게 변화된 부분에 대해서 간략히 요약해보면 다음과 같습니다.

  • Channels and Buffers
    기존 IO API에서는 byte streams character streams 사용했지만, NIO에서는 channels(채널)과 buffers(버퍼)를 사용합니다. 데이터는 항상 채널에서 버퍼로 읽히거나 버퍼에서 채널로 쓰여집니다.
  • Non-blocking IO
    자바 NIO에서는 non-blocking IO를 사용할 수 있습니다. 예를 들면, 하나의 스레드는 버퍼에 데이터를 읽도록 채널에 요청할 수 있습니다. 채널이 버퍼로 데이터를 읽는 동안 스레드는 다른 작업을 수행할 수 있습니다. 데이터가 채널에서 버퍼로 읽어지면, 스레드는 해당 버퍼를 이용한 processing(처리)를 계속 할 수 있습니다. 데이터를 채널에 쓰는 경우도 non-blocking이 가능합니다.
  • Selectors
    자바 NIO에는 “selectors” 개념을 포함하고 있습니다. selector는 여러개의 채널에서 이벤트(연결이 생성됨, 데이터가 도착함)를 모니터링할 수 있는 객체입니다. 그래서 하나의 스레드에서 여러 채널에 대해 모니터링이 가능합니다.

자바 NIO는 다음과 같은 핵심 컴포넌트로 구성되어있습니다.

  • Channels
  • Buffers
  • Selectors

실제로는 더 많은 클래스와 컴포넌트가 있지만 채널, 버퍼, 셀렉터가 API의 핵심을 구성합니다.

Channels

일반적으로 NIO의 모든 IO는 채널로 시작합니다. 채널 데이터를 버퍼로 읽을 수 있고, 버퍼에서 채널로 데이터를 쓸 수 있습니다.
channel and buffer

채널은 스트림(stream)과 유사하지만 몇가지 차이점이 있습니다.

  • 채널을 통해서는 읽고 쓸 수 있지만, 스트림은 일반적으로 단방향(읽기 혹은 쓰기)으로만 가능합니다.
  • 채널은 비동기적(asynchronously)으로 읽고 쓸 수 있습니다.
  • 채널은 항상 버퍼에서 부터 읽거나 버퍼로 씁니다.

채널에는 여러가지 타입이 있습니다. 다음은 자바 NIO에 기본적으로 구현되어 있는 목록입니다.

  • Channels
    • FileChannel
      : 파일에 데이터를 읽고 쓴다.
    • DatagramChannel
      : UDP를 이용해 네트워크를 통해 데이터를 읽고 쓴다.
    • SocketChannel
      : TCP를 이용해 네트워크를 통해 데이터를 읽고 쓴다.
    • ServerSocketChannel
      : 들어오는 TCP 연결을 수신(listening)할 수 있다. 들어오는 연결마다 SocketChannel이 만들어진다.

Buffers

NIO의 버퍼는 채널과 상호작용할 때 사용됩니다. 데이터는 채널에서 버퍼로 읽혀지거나, 버퍼에서 읽혀 채널로 쓰여집니다.

버퍼에는 여러가지 타입이 있습니다. 다음은 자바 NIO에 기본적으로 구현되어 있는 목록입니다.

  • Buffers
    • ByteBuffer
    • MappedByteBuffer
    • CharBuffer
    • ShortBuffer
    • IntBuffer
    • LongBuffer
    • FloatBuffer
    • DoubleBuffer

일반적으로 버퍼를 사용하여 데이터를 읽고 쓰는 것은 4단계 프로세스를 가집니다.

  1. 버퍼에 데이터 쓰기
  2. buffer.flip() 호출
  3. 버퍼에서 데이터 읽기
  4. buffer.clear() 혹은 buffer.compact() 호출

버퍼에 데이터를 쓸 때 버퍼는 쓰여진 데이터의 양을 기록합니다. 만약 데이터를 읽어야한다면 flip() 메서드를 호출해서 버퍼를 쓰기 모드에서 읽기 모드로 전환해야 합니다. 읽기 모드에서 버퍼를 사용하면 버퍼에 쓰여진 모든 데이터를 읽을 수 있습니다.
모든 데이터를 읽은 후에는 버퍼를 지우고 다시 쓸 준비를 해야합니다. clear() 혹은 compact()를 호출함으로써 전체 버퍼를 지울 수 있습니다. (clear() 메서드는 버퍼 전체를 지우고, compact() 메서드는 이미 읽은 데이터만 지웁니다.)

Selectors

셀렉터를 사용하면 하나의 스레드가 여러 채널을 처리(handle)할 수 있습니다.
selector
셀렉터는 사용을 위해 하나 이상의 채널을 셀렉터에 등록하고 select() 메서드를 호출해 등록 된 채널 중 이벤트 준비가 완료된 하나 이상의 채널이 생길 때까지 봉쇄(block)됩니다. 메서드가 반환(return)되면 스레드는 채널에 준비 완료된 이벤트를 처리할 수 있습니다. 즉, 하나의 스레드에서 여러 채널을 관리할 수 있으므로 여러 네트워크 연결을 관리할 수 있습니다. (SocketChannel, ServerSocketChannel)

Selector 생성

Selector.open() 메서드를 통해 셀렉터를 생성할 수 있습니다.

1
Selector selector = Selector.open();

Channels 등록

생성한 셀렉터에 채널을 등록하기 위해서는 다음과 같이 채널의 register() 메서드를 호출합니다.

1
2
3
channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

셀렉터에 채널을 등록하기 위해서는 반드시 해당 채널이 non-blocking 모드로 변환되어야 합니다. (FileChannel은 non-blocking 모드로 변경이 불가능하기 때문에 셀렉터에 등록이 불가능합니다.)

register() 메서드의 두 번째 매개 변수는 셀렉터를 통해 채널에서 발생하는 이벤트 중 확인(알림)하고자 하는 이벤트의 집합을 의미합니다.
이벤트에는 4가지 종류가 있으며, 이 4가지 이벤트는 SelectionKey 상수로 표시됩니다.

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

둘 이상의 이벤트 상수는 다음과 같이 사용 가능합니다.

1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE

SelectionKey

register() 메서드를 이용해 채널을 셀렉터에 등록하면 SelectionKey 객체가 반환됩니다. 이 SelectionKey 객체에는 몇 가지 속성들이 있습니다.

  • The interest set
  • The ready set
  • The Channel
  • The Selector
  • An attached object (optional)

interest Set

interest set은 셀렉터에 등록된 채널이 확인하고자 하는 이벤트 집합(세트)입니다. 다음과 같이 SelectionKey를 이용해 해당 interest set을 확인할 수 있습니다.

1
2
3
4
5
6
int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

Ready Set

ready set은 셀렉터에 등록된 채널에서 준비되어 처리(handle) 가능한 이벤트의 집합입니다.

1
int readySet = SelectionKey.readyOps();

위와 같이 interest Set과 동일한 방식으로 확인할 수도 있지만 아래와 같이 4가지 메소드를 이용해서 확인할 수도 있습니다.

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel + Selector

SelectionKey를 이용해 채널과 셀렉터에 쉽게 접근할 수 있습니다.

1
2
3
Channel  channel  = selectionKey.channel();

Selector selector = selectionKey.selector();

Attaching Objects

SelectionKey에 객체를 첨부(attach)할 수 있습니다. 이 방법을 이용하면 채널에 추가 정보나 채널에서 사용하는 버퍼와 같은 객체들을 쉽게 첨부할 수 있습니다.

1
2
3
selectionKey.attach(theObject);

Object attachedObj = selectionKey.attachment();

selectionKey를 통해 직접 attach 하는 것 뿐만 아니라 셀렉터에 채널을 등록하면서 객체를 첨부할 수도 있습니다.

1
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

셀렉터를 이용해 채널 선택

셀렉터에 하나 이상의 채널을 등록한 후에는 select() 메소드를 호출할 수 있습니다. select() 메소드는 accept, connect, read, write 이벤트에 대해 준비(ready) 되어 있는 채널을 반환합니다. select() 메소드는 다음과 같이 3가지 방식으로 사용 가능합니다.

  • select()
    : 등록한 이벤트에 대해 하나 이상의 채널이 준비 될 때까지 봉쇄(block)됩니다. 몇개의 채널이 준비되었는지 준비된 채널의 수를 반환합니다. (마지막으로 select()를 호출한 이후 준비된 채널 수 입니다.)
  • select(long timeout)
    : 최대 timeout(ms) 동안만 봉쇄한다는 점을 제외하면 select()와 동일합니다.
  • selectNow()
    : select와 달리 봉쇄하지 않습니다. 준비된 채널이 있으면 즉시 반환됩니다.

selectedKeys()

select() 메서드를 통해 하나 이상의 준비된 채널이 발생하면, selectedKeys() 메서드를 사용해 준비된 채널의 집합을 반환 받습니다.

1
Set<SelectionKey> selectedKeys = selector.selectedKeys();

반환된 SelectionKey set을 반복해 준비된 채널에 접근할 수 있습니다. 채널의 이벤트 처리가 끝나면 keyIterator.remove()를 통해 키 세트에서 제거해야 합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.

} else if (key.isConnectable()) {
// a connection was established with a remote server.

} else if (key.isReadable()) {
// a channel is ready for reading

} else if (key.isWritable()) {
// a channel is ready for writing
}

keyIterator.remove();
}

ServerSocketChannel

NIO ServerSocketChannel은 표준 자바 네트워킹의 ServerSocket과 마찬가지로 들어오는 TCP 연결을 수신 대기 할 수 있는 채널입니다.

1
2
3
4
5
6
7
8
9
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));

while(true) {
SocketChannel socketChannel = serverSocketChannel.accept();

// do something with socketChannel...
}

Non-blocking Mode

ServerSocketChannel은 non-blocking 모드로 설정이 가능합니다. non-blocking 모드에서는 accept() 메서드가 즉시 반환되므로 들어오는 연결이 없으면 null을 반환할 수 있습니다. 이에 따라 반환 된 ServerSocketChannel이 null인지 확인해야합니다.

1
2
3
4
5
6
7
8
9
10
11
12
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

while(true){
SocketChannel socketChannel = serverSocketChannel.accept();

if (socketChannel != null) {
// do something with socketChannel...
}
}

SocketChannel

NIO SocketChannel은 TCP 네트워크 소켓에 연결된 채널입니다. 표준 자바 네트워킹의 Socket과 역할이 같습니다.

Non-blocking Mode

ServerSocketChannel과 마찬가지로 SocketChannel을 non-blocking 모드로 설정할 수 있습니다. non-blocking 모드에서는 connect(), read(), write()를 호출할 수 있습니다.

connect()

SocketChannel이 non-blocking 모드일 때, connect()를 호출하면 메서드가 연결이 설정되기 전에 반환될 수 있습니다. 연결이 설정되었는지 확인하기 위해서 finishConnect() 메서드를 이용할 수 있습니다.

1
2
3
4
5
6
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://naver.com", 80));

while(!socketChannel.finishConnect()) {
// wait, or do something else...
}

read()

non-blocking 모드일 때, read() 메서드는 데이터를 전혀 읽지 않고 반환 될 수 있습니다. 따라서 반환 된 결과(int)를 갖고 판단해야 합니다. 반환 된 결과(int)는 읽은 바이트 수를 나타냅니다.

멀티플렉싱 기반의 다중 접속 서버

지금까지 살펴본 Channel, Buffer, Selector를 이용해 간단한 echo 서버를 만들어 보았습니다.

EchoServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class EchoServer {

private static final String EXIT = "EXIT";

public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", 3000));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256);

while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {

SelectionKey key = iter.next();

if (key.isAcceptable()) {
register(selector, serverSocket);
}

if (key.isReadable()) {
answerWithEcho(buffer, key);
}
iter.remove();
}
}
}

private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
throws IOException {

SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
if (new String(buffer.array()).trim().equals(EXIT)) {
client.close();
System.out.println("Not accepting client messages anymore");
}

buffer.flip();
client.write(buffer);
buffer.clear();
}

private static void register(Selector selector, ServerSocketChannel serverSocket)
throws IOException {

SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
System.out.println("new client connected...");
}
}

실행 결과

echo_server_with_select

Java NIO vs IO

Stream Oriented vs Buffer Oriented

스트림 지향의 IO는 스트림에서 한 번에 하나 이상의 바이트를 읽는 것을 의미합니다. 읽은 바이트를 이용해 유저가 데이터를 처리해야 하며 읽힌 바이트는 따로 캐시되지 않습니다. 또한 스트림의 데이터는 임의로 유저가 앞뒤로 이동할 수 없습니다. 스트림에서 읽은 데이터를 앞뒤로 이동해야 하는 경우는 먼저 스트림의 데이터를 읽어 버퍼에 캐시해야합니다.

버퍼 지향의 NIO 방식은 조금 다릅니다. 데이터는 나중에 처리되는 임의의 버퍼로 읽어집니다. 필요에 따라 버퍼에서 앞뒤로 이동할 수 있습니다. 이로 인해 버퍼를 이용한 처리 과정에서 좀 더 유연한 사용이 가능합니다. 그러나 버퍼를 이용해 완전히 처리하려면 필요한 모든 데이터가 버퍼에 들어있는지 확인해야합니다. 또한 버퍼에 더 많은 데이터를 읽을 때, 아직 처리하지 않은 버퍼의 데이터를 덮어 쓰지 않도록 주의해야합니다.

Blocking vs Non-blocking IO

자바 IO의 스트림을 이용하면 봉쇄(block)됩니다. 즉, 스레드가 read() 혹은 write()를 호출하면 읽은 데이터가 있거나 데이터가 완전히 쓰여질 때까지 해당 스레드가 차단되어 그 동안 스레드는 아무 것도 할 수 없습니다.

자바 NIO의 Non-blocking 모드는 스레드가 채널에서 데이터 읽기를 요청할 때, 현재 사용할 수 있는 데이터가 없는 경우 사용 가능한 데이터가 준비될 때까지 기다리지 않습니다. 때문에 해당 스레드는 봉쇄되지 않고 계속 진행될 수 있습니다. 쓰기 작업 또한 마찬가지 입니다. 스레드는 일부 데이터를 채널에 쓰도록 요청할 수 있지만 완전히 쓰여지기를 기다리지는 않습니다.

Selectos

자바 NIO의 셀렉터는 하나의 스레드에서 다중 입력 채널을 관리할 수 있습니다. 이 멀티플렉싱 메커니즘을 사용하면 단일 스레드에서 여러 채널의 입출력을 쉽게 관리할 수 있습니다.

참고

멀티플렉싱 기반의 다중 접속 서버로 가기까지

소켓이란?

소켓은 네트워크 상에서 서버와 클라이언트 두개의 프로그램이 특정 포트를 통해 양방향 통신이 가능하도록 만들어주는 추상화된 장치입니다. 메모리의 유저 공간에 존재하는 프로세스(서버, 클라이언트)는 커널 공간에 생성된 소켓을 통해 데이터를 송수신할 수 있습니다.
socket

소켓은 아래와 같이 지역(로컬) IP 주소, Port 번호와 상대방의 IP 주소와 Port 번호, 그리고 수신 버퍼송신 버퍼가 존재합니다. 서버와 클라이언트의 소켓이 서로 연결된 후, 데이터가 들어오면 수신 버퍼로 수신 데이터가 쓰이고, 반대로 데이터를 내 보낼 때는 송신 버퍼에 데이터가 쓰입니다.
socket

C언어로 간단한 서버 & 클라이언트 구현

C언어를 이용해 linux와 window에서 간단하게 소켓을 이용해 echo server와 client를 만들어 보겠습니다.
코드 한줄 한줄을 전부 해석하기 보다는 주석을 참고해 server와 client에서 어떤 순서로 소켓이 만들어지고 통신이 이루어지는지에 중점을 두어 보겠습니다.

linux

“Everything is a File”라는 말이 있습니다. linux에서는 소켓도 하나의 파일(File), 더 정확히는 파일 디스크립터(File descriptor)로 생성되어 관리됩니다. 그러므로 저 수준 파일 입출력 함수를 기반으로 소켓 기반의 데이터 송수신이 가능합니다.

파일 디스크립터(File descriptor)

  • 운영체제가 만든 파일을 구분하기 위한 일종의 숫자
  • 저 수준 파일 입출력 함수는 입출력을 목적으로 파일 디스크립터를 요구한다.
  • 저 수준 파일 입출력 함수에 소켓의 파일 디스크립터를 전달하면, 소켓을 대상으로 입출력을 진행한다.

echo_server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define BUF_SIZE 1024
void error_handling(char *message);

int main(int argc, char *argv[]) {
// 파일 디스크립터를 위한 변수
int serv_sock, clnt_sock;
char message[BUF_SIZE];
int str_len, i;

struct sockaddr_in serv_adr;
struct sockaddr_in clnt_adr;
socklen_t clnt_adr_sz;

if (argc != 2) {
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}

// 1. socket 하나를 생성한다.
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
if (serv_sock == -1)
error_handling("socket() error");

memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));

// 2. socket에 IP와 Port 번호를 할당한다.
if (bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("bind() error");

// 3. server socket(listen socket)을 통해 클라이언트의 접속 요청을 대기한다.
// 5개의 수신 대기열(큐)을 생성한다.
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");

clnt_adr_sz=sizeof(clnt_adr);

for (i=0; i<5; i++) {
// 4. 클라이언트 접속 요청을 수락한다. (클라이언트와 연결된 새로운 socket이 생성된다.)
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);
if (clnt_sock == -1)
error_handling("accept() error");
else
printf("Connected client %d \n", i+1);

// 5. 클라이언트와 연결된 socket을 통해 데이터를 송수신한다.
while((str_len=read(clnt_sock, message, BUF_SIZE)) != 0)
write(clnt_sock, message, str_len);

close(clnt_sock);
}

close(serv_sock);
return 0;
}

void error_handling(char *message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}

echo_client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define BUF_SIZE 1024
void error_handling(char *message);

int main(int argc, char *argv[]) {
// 파일 디스크립터를 위한 변수
int sock;
char message[BUF_SIZE];
int str_len;
struct sockaddr_in serv_adr;

if (argc != 3) {
printf("Usage : %s <IP> <port>\n", argv[0]);
exit(1);
}

// 1. socket 하나를 생성한다.
sock = socket(PF_INET, SOCK_STREAM, 0);
if (sock == -1)
error_handling("socket() error");

memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = inet_addr(argv[1]);
serv_adr.sin_port = htons(atoi(argv[2]));

// 2. socket을 이용해 server의 server socket(listen socket)에 연결을 요청한다.
if (connect(sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("connect() error!");
else
puts("Connected...........");

while(1) {
fputs("Input message(Q to quit): ", stdout);
fgets(message, BUF_SIZE, stdin);

if (!strcmp(message,"q\n") || !strcmp(message,"Q\n"))
break;

// 3. 연결된 socket을 통해 server로부터 데이터를 송수신한다.
write(sock, message, strlen(message));
str_len = read(sock, message, BUF_SIZE-1);
message[str_len] = 0;
printf("Message from server: %s", message);
}

close(sock);
return 0;
}

void error_handling(char *message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}

결과 확인

gcc로 컴파일 후 실행하면 결과는 다음과 같습니다.
linux_simple_socket_result

window

window는 linux와 달리 파일이 아닌 별도의 소켓 구조체가 존재합니다. 별도의 소켓 구조체를 이용한 함수를 기반으로 소켓 기반의 데이터 송수신이 가능합니다.
window 코드의 결과는 위의 linux 코드의 결과와 같으므로 생략합니다.

echo_server_win.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
// window socket
#include <winsock2.h>

#define BUF_SIZE 1024
void ErrorHandling(char *message);

int main(int argc, char *argv[]) {
WSADATA wsaData;
SOCKET hServSock, hClntSock;
char message[BUF_SIZE];
int strLen, i;

SOCKADDR_IN servAdr, clntAdr;
int clntAdrSize;

if (argc != 2) {
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}

if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
ErrorHandling("WSAStartup() error!");

// 1. socket 하나를 생성한다.
hServSock = socket(PF_INET, SOCK_STREAM, 0);
if (hServSock == INVALID_SOCKET)
ErrorHandling("socket() error");

memset(&servAdr, 0, sizeof(servAdr));
servAdr.sin_family = AF_INET;
servAdr.sin_addr.s_addr = htonl(INADDR_ANY);
servAdr.sin_port = htons(atoi(argv[1]));

// 2. 생성한 socket을 server socket(listen socket)으로 등록한다.
if (bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR)
ErrorHandling("bind() error");

// 3. server socket을 통해 클라이언트의 접속 요청을 확인한다.
if (listen(hServSock, 5) == SOCKET_ERROR)
ErrorHandling("listen() error");

clntAdrSize=sizeof(clntAdr);

for (i=0; i<5; i++) {
// 4. 클라이언트 접속 요청 대기 및 허락 (클라이언트와 연결된 새로운 socket이 생성된다.)
hClntSock = accept(hServSock, (SOCKADDR*)&clntAdr, &clntAdrSize);
if (hClntSock == -1)
ErrorHandling("accept() error");
else
printf("Connected client %d \n", i+1);

// 5. 클라이언트와 연결된 socket을 통해 데이터를 송수신한다.
while((strLen=recv(hClntSock, message, BUF_SIZE, 0)) != 0)
send(hClntSock, message, strLen, 0);

closesocket(hClntSock);
}

closesocket(hServSock);
WSACleanup();
return 0;
}

void ErrorHandling(char *message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}

echo_client_win.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
// window socket
#include <winsock2.h>

#define BUF_SIZE 1024
void ErrorHandling(char *message);

int main(int argc, char *argv[]) {
WSADATA wsaData;
SOCKET hSocket;
char message[BUF_SIZE];
int strLen;
SOCKADDR_IN servAdr;

if (argc != 3) {
printf("Usage : %s <IP> <port>\n", argv[0]);
exit(1);
}

if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
ErrorHandling("WSAStartup() error!");

// 1. socket 하나를 생성한다.
hSocket = socket(PF_INET, SOCK_STREAM, 0);
if (hSocket == INVALID_SOCKET)
ErrorHandling("socket() error");

memset(&servAdr, 0, sizeof(servAdr));
servAdr.sin_family = AF_INET;
servAdr.sin_addr.s_addr = inet_addr(argv[1]);
servAdr.sin_port = htons(atoi(argv[2]));

// 2. socket을 이용해 server의 server socket(listen socket)에 연결을 요청한다.
if (connect(hSocket, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR)
ErrorHandling("connect() error!");
else
puts("Connected...........");

while(1) {
fputs("Input message(Q to quit): ", stdout);
fgets(message, BUF_SIZE, stdin);

if (!strcmp(message,"q\n") || !strcmp(message,"Q\n"))
break;

// 3. 연결된 socket을 통해 server로부터 데이터를 송수신한다.
send(hSocket, message, strlen(message), 0);
strLen = recv(hSocket, message, BUF_SIZE-1, 0);
message[strLen] = 0;
printf("Message from server: %s", message);
}

closesocket(hSocket);
WSACleanup();
return 0;
}

void ErrorHandling(char *message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}

고찰

linux와 window의 서버 & 클라이언트 소켓 생성과 연결 과정은 다음과 같습니다.
server_client_socket

  • 서버
    • 클라이언트로부터의 연결요청도 일종의 데이터 전송입니다. 따라서 연결 요청을 받아들이기 위해서도 하나의 소켓이 필요하고, 이 소켓을 가리켜 서버소켓 또는 리스닝 소켓이라고 합니다. listen 함수의 호출은 소켓을 리스닝 소켓으로 만듭니다.
    • accept 함수의 결과로 서버소켓을 통해 클라이언트로부터의 연결요청을 받으면, 연결요청 정보를 참조하여 클라이언트 소켓과의 통신을 위한 별도의 소켓을 추가로 하나 더 생성합니다. 그리고 이렇게 생성된 소켓을 대상으로 데이터의 송수신이 진행됩니다.
  • 클라이언트
    • 소켓을 생성하고 연결 요청을 위해서 connect 함수를 호출하는 것이 전부입니다.
    • 서버의 listen 함수호출 이후에야(서버소켓이 준비된 이후) connect 함수 호출이 유효합니다.

문제점

linux_simple_socket_result
위 예제의 경우 반복적(Iterable)으로 accept 함수를 호출하면, 계속해서 클라이언트의 연결요청을 수락할 수 있습니다. 그러나, 동시에 둘 이상의 클라이언트에게 서비스를 제공할 수 있는 상태는 아닙니다. (처음 소켓 연결을 맺은 클라이언트가 종료하기 전까지는 다른 클라이언트의 연결은 listen 큐에 들어가 대기해야합니다.)

이 문제를 해결하기 위해 둘 이상의 클라이언트들이 동시에 접속해 서버로부터 서비스를 제공받을 수 있는 여러 다중 접속 서버의 구현 방법들에 대해 알아보겠습니다.

다중 접속 서버 구현 방법

  • 멀티프로세스 기반 서버 : 다수의 프로세스를 생성하는 방식으로 서비스를 제공한다.
  • 멀티스레드 기반 서버 : 클라이언트의 수만큼 스레드를 생성하는 방식으로 서비스를 제공한다.
  • 멀티플렉싱 기반 서버 : 입출력 대상을 묶어서 관리하는 방식으로 서비스를 제공한다.

멀티프로세스 기반의 다중 접속 서버

멀티프로세스 기반의 다중 접속 서버는 다수의 프로세스를 생성하는 방식으로 서비스를 제공합니다.
multi_process_server

  1. 부모 프로세스는 리스닝 소켓으로 accept 함수 호출을 통해서 연결요청을 수락합니다.
  2. 이때 얻게 되는 소켓의 파일 디스크립터(클라이언트와 연결된 연결 소켓)를 자식 프로세스를 생성해 넘겨줍니다.
  3. 자식 프로세스는 전달받은 파일 디스크립터를 바탕으로 서비스를 제공합니다.

핵심은 연결이 하나 생성될 때마다 프로세스를 생성해서 해당 클라이언트에 대해 서비스를 제공하는 것입니다.

echo_multi_process_server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define BUF_SIZE 30
void error_handling(char *message);
void read_childproc(int sig);

int main(int argc, char *argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;

pid_t pid;
struct sigaction act;
socklen_t adr_sz;
int str_len, state;
char buf[BUF_SIZE];
if (argc != 2) {
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}

act.sa_handler = read_childproc;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
state = sigaction(SIGCHLD, &act, 0);
// 1. socket 하나를 생성한다.
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));

// 2. socket에 IP와 Port 번호를 할당한다.
if (bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr)) == -1)
error_handling("bind() error");
// 3. 생성한 socket을 server socket(listen socket)으로 등록한다.
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");

while(1) {
adr_sz = sizeof(clnt_adr);
// 4. 부모 프로세스는 리스닝 소켓으로 accept 함수 호출을 통해서 연결요청을 수락한다.
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
if (clnt_sock == -1)
continue;
else
puts("new client connected...");
// 5. 이때 얻게 되는 소켓의 파일 디스크립터(클라이언트와 연결된 연결 소켓)를 자식 프로세스를 생성해 넘겨준다.
pid = fork();
if (pid == -1) {
close(clnt_sock);
continue;
}
if (pid == 0) {
close(serv_sock);
// 6. 자식 프로세스는 전달받은 파일 디스크립터를 바탕으로 서비스를 제공한다.
while((str_len = read(clnt_sock, buf, BUF_SIZE)) != 0)
write(clnt_sock, buf, str_len);

close(clnt_sock);
puts("client disconnected...");
return 0;
}
else
close(clnt_sock);
}
close(serv_sock);
return 0;
}

void read_childproc(int sig) {
pid_t pid;
int status;
pid = waitpid(-1, &status, WNOHANG);
printf("removed proc id: %d \n", pid);
}
void error_handling(char *message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}

결과 확인

echo_result_multi_process
위에서 Iterable하게 구현했을때 발생했던 문제를 각 클라이언트 요청마다 별도의 프로세스를 생성함으로써 문제를 해결한 것을 확인할 수 있습니다.

고찰

  • 장점
    • 프로그램 흐름이 단순하기 때문에 이해하기 쉽습니다.
    • 안정적인 동작이 가능합니다. 운영체제에서 프로세스는 서로 독립된 실행 객체로 존재합니다. 서로 독립된 메모리 공간을 갖고 서로 다른 프로세스끼리 서로 영향을 미치지 않고 독립적으로 수행이 가능합니다.
  • 단점
    • 프로세스 복사에 따른 성능 문제가 있습니다.
    • 병렬 처리해야 하는 만큼의 프로세스를 생성해야 합니다.
    • fork에 의해 자식 프로세스가 생성될 경우, 부모 프로세스의 자원이 복사됩니다. (코드, 소켓을 포함한 모든 열린 파일들(파일 디스크립터)) 부모 프로세스로부터 accept되어 생성된 하나의 소켓에 대해 부모 프로세스와 자식 프로세스 모두에서 한 소켓에 대한 파일 디스크립터가 존재합니다. 따라서 두 파일 디스크립터를 모두 종료해야 해당 소켓을 제거할 수 있습니다.
    • 서로 다른 독립적인 메모리 공간을 갖기 때문에 프로세스간 정보 교환이 어렵다.

위의 단점들은 각 클라이언트의 요청마다 프로세스가 아닌 스레드를 생성함으로써 해결할 수 있습니다.
다음으로 멀티프로세스 기반의 다중 접속 서버의 단점을 개선할 수 있는 멀티스레드 기반의 다중 접속 서버에 대해 알아보겠습니다.

멀티스레드 기반의 다중 접속 서버

멀티스레드 기반의 다중 접속 서버는 다수의 스레드를 생성하는 방식으로 서비스를 제공합니다.
multi_thread_server

  1. 메인 스레드는 리스닝 소켓으로 accept 함수 호출을 통해서 연결요청을 수락합니다.
  2. 이때 얻게 되는 소켓의 파일 디스크립터(클라이언트와 연결된 연결 소켓)를 별도의 워커 스레드를 생성해 넘겨줍니다.
  3. 워커 스레드는 전달받은 파일 디스크립터를 바탕으로 서비스를 제공합니다.

핵심은 연결이 하나 생성될 때마다 프로세스가 아닌 스레드를 생성해서 해당 클라이언트에 대해 서비스를 제공하는 것입니다.

echo_multi_thread_server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 30

void * handle_clnt(void * arg);
void error_handling(char * msg);

int main(int argc, char *argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;

pthread_t t_id;
socklen_t adr_sz;
int str_len, state;
char buf[BUF_SIZE];
if (argc != 2) {
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}

// 1. socket 하나를 생성한다.
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));

// 2. socket에 IP와 Port 번호를 할당한다.
if (bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr)) == -1)
error_handling("bind() error");
// 3. 생성한 socket을 server socket(listen socket)으로 등록한다.
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");

while(1) {
adr_sz = sizeof(clnt_adr);
// 4. 메인 스레드는 리스닝 소켓으로 accept 함수 호출을 통해서 연결요청을 수락한다.
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);

if (clnt_sock == -1)
continue;

puts("new client connected...");

// 5. 클라이언트와 연결된 소켓의 파일 디스크립터를 워커 스레드를 생성해 넘겨준다.
pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock);
pthread_detach(t_id);
}

close(serv_sock);
return 0;
}

void * handle_clnt(void * arg) {
int clnt_sock=*((int*)arg);
int str_len=0, i;
char buf[BUF_SIZE];

// 6. 워커 스레드는 전달받은 파일 디스크립터를 바탕으로 서비스를 제공한다.
while((str_len = read(clnt_sock, buf, BUF_SIZE)) != 0)
write(clnt_sock, buf, str_len);

close(clnt_sock);
return NULL;
}

void error_handling(char * msg) {
fputs(msg, stderr);
fputc('\n', stderr);
exit(1);
}

결과 확인

echo_result_multi_thread
처음 Iterable하게 구현했을때 발생했던 문제를 각 클라이언트 요청마다 별도의 스레드를 생성함으로써 문제를 해결했으며, 클라이언트의 요청마다 각 프로세스를 할당해서 해결한 방법보다 스레드를 생성해 할당함으로써 리소스 소모를 줄였습니다.

고찰

  • 장점
    • 프로세스 복사에 따른 비용보다 스레드 생성에 대한 비용이 적다.
    • 스레드간 서로 공유하는 메모리를 갖기 때문에, 스레드간 정보 교환이 쉽다.
  • 단점
    • 하나의 프로세스 내의 다수의 스레드가 존재하기 때문에 하나의 스레드에서 문제가 생긴다면 프로세스에 영향을 미쳐 나머지 다수의 스레드에도 영향을 끼칠 수 있다.

각 클라이언트 요청마다 별도의 스레드를 생성함으로써 프로세스를 생성하던 방법보다 리소스의 비용을 줄일 수 있었고, 스레드들이 서로 공유하는 메모리를 가질 수 있는 환경이 되었습니다.
그러나 I/O 멀티플렉싱(multiplexing) 기법을 사용한다면, 각 클라이언트 마다 별도의 스레드를 생성하는 것이 아닌 하나의 스레드에서 다수의 클라이언트에 연결된 소켓(파일 디스크립터)을 괸리하고 소켓에 이벤트(read/write)가 발생할 경우에만 별도의 스레드를 만들어 해당 이벤트를 처리하도록 구현할 수 있습니다.

멀티플렉싱 기반의 다중 접속 서버

입출력 다중화란 하나의 프로세스 혹은 스레드에서 입력과 출력을 모두 다룰 수 있는 기술을 말합니다. 커널(kernel)에서는 하나의 스레드가 여러 개의 소켓(파일)을 핸들링 할 수 있는 select, poll, epoll과 같은 시스템 콜(system call)을 제공하고 있습니다.

한개의 프로세스 혹은 스레드에서 한개의 클라이언트에 대한 입출력만 처리할 수 있었던 이유는 입출력 함수가 봉쇄(block)되었기 때문에, 입출력 데이터가 준비될때까지 무한정 봉쇄되어 여러 클라이언트의 입출력을 처리할 수 없었기 때문입니다.

그러나 I/O 멀티플렉싱 기법을 사용하면 입출력 다중화에서도 입출력 함수는 여전히 봉쇄로 작동하지만, 입출력 함수를 호출하기전에 어떤 파일에서 입출력이 준비가 되었는지 확인할 수가 있습니다.

봉쇄 (block)

봉쇄를 이해하기 위해 먼저 두가지 짚고 넘어가야할 사항이 있습니다.

  1. 애플리케이션에서 I/O 작업을 하는 경우, 스레드는 데이터 준비가 완료될 때까지 대기합니다. 예를 들어 소켓을 통해 read(recvfrom)를 수행하는 경우 데이터가 네트워크를 통해 도착하는 것을 기다립니다. 패킷이 네트워크를 통해 도착하면 커널 내의 버퍼에 복사됩니다. (처음에 커널 공간에 생성된 소켓의 구조에서 송신 버퍼와 수신 버퍼가 있는 것을 보았습니다.)
  2. 커널 내의 버퍼에 복사된 데이터를 애플리케이션에서 사용하기 위해서는 커널 버퍼(kernel space)에서 유저 버퍼(user space)로 복사 후 이용해야 합니다. 애플리케이션은 유버 모드에서 유저 버퍼에만 접근이 가능하기 때문입니다.

Blocking I/O Model
blocking_io_model
프로세스(스레드)는 하나의 소켓에 대해 recvfrom을 호출하고 데이터가 kernel space 도착해 user space의 프로세스 버퍼에 복사 될 때까지 시스템 호출이 반환되지 않습니다. 즉 recvfrom은 kernel space에 데이터가 도착하길 기다리는것 부터 시작됩니다. 프로세스는 recvfrom을 호출할 때부터 반환 할 때까지 전체 프로세스가 봉쇄됩니다.

I/O Multiplexing Model
multiplexing_io_model
멀티플렉싱 모델에서는 select 함수를 호출해, 여러개의 소켓들 중 recvfrom이 가능한 소켓이 생길 때까지 대기합니다. select의 결과로 recvfrom을 호출할 수 있는 소켓의 목록이 반환되면, 해당 소켓들에 대해 recvfrom을 호출합니다.

봉쇄 모델(Blocking I/O model)에서는 하나의 프로세스(스레드)에서 하나의 소켓(파일 디스크립터)에 대해 recvfrom을 호출해 데이터가 kernel space에 도착했는지 확인하고 현재 읽을 수 있는 데이터가 없다면 봉쇄되어 대기했다면, 멀티플렉싱 모델(I/O Multiplexing Model)에서는 하나 이상의 소켓(파일 디스크립터)이 준비 될 때까지 대기할 수 있습니다.

select

select 방식은 이벤트(입력|출력|에러) 별로 감시할 파일들을 fd_set 이라는 파일 상태 테이블(fd 비트 배열)에 등록하고, 등록된 파일(파일 디스크립터)에 어떠한 이벤트가 발생했을 경우 fd_set을 확인하는 방식으로 동작합니다.
select_model
예를 들어 위와 같이 6개의 파일을 다루어야 한다고 했을 때, 6개의 파일에 대해 입출력 데이터가 준비될 때까지 이벤트를 기다리는 파일 상태 테이블을 준비합니다. 그 후 6개의 파일 중 입출력이 준비된 파일에 대해서 이벤트가 발생하면 이벤트가 발생한 파일 디스크립터의 수를 반환합니다. 이후 이벤트가 준비된 파일에 대해 입출력을 수행하는데 이미 데이터가 준비된 파일에 대해 입출력을 수행하기 때문에 봉쇄가 발생하지 않을 것이라는게 보장됩니다.

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)

  • nfds: 검사 대상이 되는 파일 디스크립터의 수
  • readfs: 읽기 이벤트를 검사할 파일 디스크립터의 목록
  • writefds: 쓰기 이벤트를 검사할 파일 디스크립터의 목록
  • exceptfds: 예외 이벤트를 검사할 파일 디스크립터의 목록
  • timeout: 이벤트를 기다릴 시간 제한
  • 반환 값: 이벤트가 발생한 파일의 갯수

반환 값이 이벤트가 발생한 파일의 디스크립터 목록이 아닌 파일의 갯수임에 주의해야합니다.

echo_select_server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/select.h>

#define BUF_SIZE 100
void error_handling(char *buf);

int main(int argc, char *argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
struct timeval timeout;
// 파일 상태 테이블 선언
fd_set reads, cpy_reads;

socklen_t adr_sz;
int fd_max, str_len, fd_num, i;
char buf[BUF_SIZE];
if (argc != 2) {
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}

serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));

if (bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr)) == -1)
error_handling("bind() error");
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");

FD_ZERO(&reads); // fd_set 테이블을 초기화한다.
FD_SET(serv_sock, &reads); // 서버 소켓(리스닝 소켓)의 이벤트 검사를 위해 fd_set 테이블에 추가한다.
fd_max = serv_sock;

while(1) {
cpy_reads = reads;
timeout.tv_sec = 5;
timeout.tv_usec = 5000;

// result
// -1: 오류 발생
// 0: 타임 아웃
// 1 이상 : 등록된 파일 디스크립터에 해당 이벤트가 발생하면 이벤트가 발생한 파일 디스크립터의 수를 반환한다.
if ((fd_num = select(fd_max+1, &cpy_reads, 0, 0, &timeout)) == -1)
break;

if (fd_num == 0)
continue;

for (i=0; i<fd_max+1; i++) {
if (FD_ISSET(i, &cpy_reads)) { // fd_set 테이블을 검사한다.
// 서버 소켓(리스닝 소켓)에 이벤트(연결 요청) 발생
if (i == serv_sock) { // connection request!
adr_sz = sizeof(clnt_adr);
clnt_sock= accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
FD_SET(clnt_sock, &reads); // fd_set 테이블에 클라이언트 소켓 디스크립터를 추가한다.
if (fd_max < clnt_sock)
fd_max = clnt_sock;
printf("connected client: %d \n", clnt_sock);
}
// 클라이언트와 연결된 소켓에 이벤트 발생
else { // read message!
str_len = read(i, buf, BUF_SIZE);
if (str_len == 0) { // close request!
FD_CLR(i, &reads); // fd_set 테이블에서 파일 디스크립터를 삭제한다.
close(i);
printf("closed client: %d \n", i);
} else {
write(i, buf, str_len); // echo!
}
}
}
}
}

close(serv_sock);
return 0;
}

void error_handling(char *buf) {
fputs(buf, stderr);
fputc('\n', stderr);
exit(1);
}

결과 확인

echo_result_select

고찰

  • 장점
    • 단일 프로세스(스레드)에서 여러 파일의 입출력 처리가 가능합니다.
    • 지원 하는 OS가 많아 이식성이 좋습니다. (POSIX 표준)
  • 단점
    • 커널에 의해서 완성되는 기능이 아닌, 순수하게 함수에 의해 완성되는 기능이다.
    • select 함수의 호출을 통해서 전달된 정보는 커널에 등록되지 않은 것이며, 그래서 select 함수를 호출할 때마다 매번 관련 정보를 전달해야 합니다.
    • select 함수의 호출 결과가 이벤트가 발생한 파일 디스크립터의 개수이기 때문에 어떤 파일 디스크립터에서 이벤트가 발생했는지 확인하기 위해서는 fd_set 테이블 전체를 검사해야 합니다. (속도가 느립니다)
    • 검사할 수 있는 fd 개수에 제한이 있습니다. (최대 1024개)
    • select 호출 때마다 데이터를 복사해야합니다. (select 함수를 호출한 후 이벤트를 처리할 때 fd_set 테이블 변경이 필요하기 때문에 미리 복사가 필요합니다)

POSIX란?

POSIX(Portable Operating System Interface)는 이식 가능 운영 체제 인터페이스의 약자로, 서로 다른 UNIX OS의 공통 API를 정리하여 이식성이 높은 유닉스 응용 프로그램을 개발하기 위한 목적으로 IEEE가 책정한 애플리케이션 인터페이스 규격입니다.

poll

poll도 select와 마찬가지로 멀티플렉싱을 구현하기 위한 방법입니다. poll이 여러 개의 파일을 다루는 방법은 select와 마찬가지로 fd(파일 디스크립터)의 이벤트를 기다리다가 이벤트가 발생하면, poll에서의 block이 해제되고, 다음 루틴에서 어떤 fd에 이벤트가 발생했는지 검사하는 방식을 사용합니다.

poll의 동작 원리는 select와 비슷하므로 생략합니다. 간단히 select와 비교해 차이점에 대해서만 알아보겠습니다.

  • 장점
    • select와 단일 프로세스(스레드)에서 여러 파일의 입출력 처리가 가능합니다.
    • select 방식처럼 표준 입력|출력|에러을 따로 감시할 필요가 없습니다.
    • select는 timeval이라는 구조체를 사용해 타임아웃 값을 세팅하지만, poll은 별다른 구조체 없이 타임아웃 기능을 지원합니다.
  • 단점
    • 일부 unix 시스템에서는 poll을 지원하지 않습니다.

epoll

epoll은 select 함수의 단점 극복을 위해 커널 레벨멀티플렉싱을 지원해줍니다. 커널에 관찰대상에 대한 정보를 한 번만 전달하고, 관찰대상의 범위, 또는 내용에 변경이 있을 때만 변경 사항을 알려줍니다. 리눅스에서는 epoll, 윈도우에서는 IOCP, 맥에서는 Kqueue가 이에 해당합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int epoll_create(int size); //size는 epoll_fd의 크기정보를 전달한다.
// 반환 값 : 실패 시 -1, 일반적으로 epoll_fd의 값을 리턴

int epoll_ctl(int epoll_fd, // epoll_fd
int operate_enum, // 어떤 변경을 할지 결정하는 enum값
int enroll_fd, // 등록할 fd
struct epoll_event* event // 관찰 대상의 관찰 이벤트 유형
);
// 반환 값 : 실패 시 -1, 성공시 0

int epoll_wait(int epoll_fd, // epoll_fd
struct epoll_event* event, // event 버퍼의 주소
int maxevents, // 버퍼에 들어갈 수 있는 구조체 최대 개수
int timeout // select의 timeout과 동일 단위는 1/1000
);
// 성공시 이벤트 발생한 파일 디스크립터 개수 반환, 실패시 -1 반환
  • epoll_create : epoll 파일 디스크립터 저장소 생성
  • epoll_ctl : 저장소에 파일 디스크립터 등록 및 삭제
  • epoll_wait : select 함수와 마찬가지로 파일 디스크립터의 변화를 대기한다.

epoll_create를 통해 생성된 epoll 인스턴스에 관찰대상을 저장 및 삭제하는 함수가 epoll_ctl이고, epoll 인스턴스에 등록된 파일 디스크립터를 대상으로 이벤트의 발생 유무를 확인하는 함수가 epoll_wait이다.

echo_epoll_server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
// 리눅스에서만 사용 가능
#include <sys/epoll.h>

#define BUF_SIZE 100
#define EPOLL_SIZE 50
void error_handling(char *buf);

int main(int argc, char *argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t adr_sz;
int str_len, i;
char buf[BUF_SIZE];

struct epoll_event *ep_events;
struct epoll_event event;
int epfd, event_cnt;

if (argc != 2) {
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}

serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr)) == -1)
error_handling("bind() error");
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");

// 커널이 관리하는 epoll 인스턴스라 불리는 파일 디스크립터의 저장소 생성
// 성공 시 epoll 파일 디스크립터, 실패시 -1 반환
epfd = epoll_create(EPOLL_SIZE);
ep_events = malloc(sizeof(struct epoll_event)*EPOLL_SIZE);

event.events = EPOLLIN;
event.data.fd = serv_sock;
// 파일 디스크립터(serv_sock)를 epoll 인스턴스에 등록한다. (관찰대상의 관찰 이벤트 유형은 EPOLLIN)
epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);

while(1) {
// 성공 시 이벤트가 발생한 파일 디스크립터이ㅡ 수, 실패 시 -1 반환
// 두 번째 인자로 전달된 주소의 메모리 공간에 이벤트 발생한 파일 디스크립터에 대한 정보가 들어있다.
event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
if (event_cnt == -1) {
puts("epoll_wait() error");
break;
}

for (i=0; i<event_cnt; i++) {
if (ep_events[i].data.fd == serv_sock) {
adr_sz = sizeof(clnt_adr);
clnt_sock= accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
event.events = EPOLLIN;
event.data.fd = clnt_sock;
// 파일 디스크립터(clnt_sock)를 epoll 인스턴스에 등록한다. (관찰대상의 관찰 이벤트 유형은 EPOLLIN)
epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
printf("connected client: %d \n", clnt_sock);
} else {
str_len = read(ep_events[i].data.fd, buf, BUF_SIZE);
if (str_len == 0) { // close request!
epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
close(ep_events[i].data.fd);
printf("closed client: %d \n", ep_events[i].data.fd);
} else {
write(ep_events[i].data.fd, buf, str_len); // echo!
}
}
}
}

close(serv_sock);
close(epfd);
return 0;
}

void error_handling(char *buf) {
fputs(buf, stderr);
fputc('\n', stderr);
exit(1);
}

결과 확인

echo_result_epoll

고찰

  • 장점
    • 상태변화의 확인을 위한, 전체 파일 디스크립터를 대상으로 하는 반복문이 필요 없습니다.
    • select 함수에 대응하는 epoll_wait 함수호출 시, 커널에서 상태정보를 유지하기 때문에 관찰대상의 정보를 매번 전달할 필요가 없습니다.
  • 단점
    • 리눅스의 select 기반 서버를 윈도우의 select 기반 서버로 변경하는 것은 간단하나, 리눅스의 epoll 기반의 서버를 윈도우의 IOCP 기반으로 변경하는 것은 select를 이용하는 것보다 번거롭습니다.

참고

컴퓨터 구조와 I/O

Computer system

일반적인 컴퓨터 시스템은 CPU, 메모리, 외부 장치(ex. 하드디스크, 키보드, 모니터, 마우스)로 구성된다. CPU와 장치 컨트롤러들은 메모리 사이클을 두고 경쟁하며 동시에 실행된다. 경쟁상황에서 공유하는 메모리에 순차적으로 접근할 수 있도록 메모리 컨트롤러가 메모리에 대한 접근을 제어한다.

장치 컨트롤러: 컴퓨터 내의 각 하드웨어 장치에는 컨트롤러(Controller: 제어기)가 존재한다. 컨트롤러는 일종의 작은 CPU로서, 컴퓨터 전체에 CPU라는 중앙 처리 장치가 있듯이 컨트롤러는 각 하드웨어 장치마다 존재하면서 이들을 제어하는 작은 CPU라고 할 수 있다.
로컬 버퍼: 장치 컨트롤러에는 장치로부터 들어오고 나가는 데이터를 임시로 저장하기 위한 작은 메모리를 가지고 있다. 이를 로컬 버퍼(local buffer)라고 부른다.
디스크나 키보드 등에서 데이터를 읽어오는 경우, 우선 로컬 버퍼에 데이터가 임시로 저장된 후 메모리에 전달된다. 이 때, 장치에서 로컬 버퍼로 읽어오는 일은 컨트롤러가 담당한다.

Computer system

I/O (Input/Output)

컴퓨터는 목적을 달성하기 위해 CPU/메모리와 외부 장치간에 정보를 주고 받는다. 이를 I/O라고 말할 수 있다.
컴퓨터에서 연산을 한다는 것은 CPU가 무언가 일을 한다는 뜻이다. 입출력 장치들의 I/O 연산은 I/O 컨트롤러가 담당하고, 컴퓨터 내에서 수행되는 연산은 메인 CPU가 담당한다.
Computer system

CPU context

어느 시점에나 CPU의 모든 레지스터 값은 CPU 컨텍스트를 정의한다. CPU 컨텍스트라 하면 프로세스 혹은 스레드에서 사용되는 데이터의 집합으로, 진행중이던 작업을 중단하고 나중에 같은 지점에서 다시 계속할 수 있도록 필요한 내용들을 포함하고 있다.

Dual mode operation

사용자 프로세스에서 운영체제(OS)를 보하하기 위해 하드웨어는 사용자 모드(user mode)커널 모드(kernel mode)라는 두 가지 모드를 제공한다. 두 가지 모드 작동은 CPU가 실행할 수 있는 작업의 유형과 범위에 제한을 둔다. (운영 체제 커널이 사용자 응용 프로그램 프로세스보다 많은 권한을 갖고 있다.)
usermode, kernelmode

System call

사용자 프로그램은 시스템 호출(System call)을 사용해서 운영 체제에게 서비스를 요청한다. 시스템 호출은 특별한 시스템 호출 예외를 사용하여 구현되어 있으며, 시스템 호출은 트랩(trap)이라고도 불린다.

시스템 호출(System call)은 운영 체제의 커널이 제공하는 서비스에 대해, 응용 프로그램의 요청에 따라 커널에 접근하기 위한 인터페이스이다. 보통 C나 C++과 같은 고급 언어로 작성된 프로그램들은 직접 시스템 호출을 사용할 수 없기 때문에 API를 통해 시스템 호출에 접근하게 하는 방법이다.
하드웨어 제어를 하는 모든 권한을 커널에서 가지고 있기 때문에 파일 시스템 같은 경우 응용 프로그램에서는 직접 제어할 수 없다. 따라서 응용 프로그램에서 하드웨어의 데이터를 가져오거나 쓰려면 커널의 장치 드라이버와 연동되어 실행되어야 한다. 결국 응용 프로그램이 파일 시스템을 이용하려면 커널의 파일 시스템 드라이버로 넘어가 실행되어야 하므로 시스템 호출 방법을 사용한다.

system call

Exceptions and Interrupt

예외(Exception)인터럽트(Interrupt)는 즉각 실행되어야 하는 이벤트를 CPU에게 알리는데 사용된다. 인터럽트는 컨트롤러가 CPU에게 이벤트를 알리기 위해 사용한다.

CPU 옆에는 인터럽트 라인(interrupt line)이 있어서, CPU가 자신의 작업을 하던 중간에 인터럽트 라인에 신호가 들어오면 하던 일을 멈추고, 인터럽트와 관련된 일을 처리한다. 좀더 정확히 CPU는 명령 하나를 수행할 때마다 인터럽트가 발생했는지를 체크하고, 인터럽트가 발생했으면 다음 명령을 수행하기 전에 인터럽트 처리를 하게 되고 그렇지 않으면 다음 명령을 계속 수행하게 된다.

Exception and Interrupt handler

예외 혹은 인터럽트가 발생하면 사용자 모드에서 커널 모드로 실행이 전환된다. 예외 혹은 인터럽트가 처리된 후 다시 사용자 모드로 전환된다.
exception and interrupt handler

조금 더 상세하게는 다음과 같은 과정을 거친다.
interrupt handle detail

  1. 커널에 들어가는 동안 현재 실행중인 프로세스의 컨텍스트(CPU의 모든 레지스터의 값)가 먼저 메모리에 저장된다.
  2. 예외 / 인터럽트를 처리한다.
  3. 복원 및 재개할 프로세스를 선택한다.
  4. 선택한 프로세스의 컨텍스트를 복구한다.
  5. 선택한 프로세스의 실행을 재개한다.

Read string system call design

문자열(String)을 입력받는 System call을 디자인 해보자.

Input buffer & Memory safety

  • 사용자 프로세스가 커널에 저장된 임의의 데이터를 읽을 수 있는 것은 바람직하지 않다.
  • 사용자 프로세스가 입력 버퍼 외부에 데이터를 기록하여 커널을 손상시킬 수 있다. 이를 Buffer overflow라고 한다.
  • 메모리 보안을 위해 사용자 프로세스는 커널 데이터에 읽고 쓸 수 없지만, 커널은 사용자 공간에 데이터를 읽고 쓸 수 있다.

Buffer pointer

커널은 사용자 공간에서 입력 버퍼에 대한 주소(pointer)를 알아야 한다.

Buffer size

커널은 입력 버퍼의 크기를 알아야하고 버퍼가 꽉 찼을 때 수행 할 작업을 결정해야한다.

Read string system call example

string system call

(객체지향의 사실과 오해) 정리


객체지향의 사실과 오해 (역할, 책임, 협력 관점에서 본 객체지향)

협력하는 객체들의 공동체

  • 객체지향의 목표는 실세계를 모방하는 것이 아니다. 오히려 새로운 세계를 창조하는 것이다.
  • 객체를 스스로 생각하고 스스로 결정하는 현실 세계의 생명체에 비유하는 것은 상태와 행위를 ‘캡슐화’하는 소프트웨어 객체의 ‘자율성’을 설명하는 데 효과적이다. 현실 세계의 사람들이 암묵적인 약속과 명시적인 계약을 기반으로 목표를 달성해 나가는 과정은 ‘메시지’를 주고받으며 공동의 목표를 달성하기 위해 ‘협력’하는 객체들의 관계를 설명하는 데 적합하다.
  • 실세계의 모방이라는 객체지향의 개념은 훌륭한 프로그램을 설계하고 구현하는 실무적인 관점에서는 부적합하지만 객체지향이라는 용어에 담긴 기본 사상을 이해하고 학습하는 데는 매우 효과적이다.
  • 역할은 관련성 높은 책임의 집합이다. 객체의 역할은 사람의 역할과 유사하게 다음과 같은 특징을 지닌다.
    • 여러 객체가 동일한 역할을 수행할 수 있다.
    • 역할은 대체 가능성을 의미한다.
    • 각 객체는 책임을 수행하는 방법을 자율적으로 선택할 수 있다.
    • 하나의 객체가 동시에 여러 역할을 수행할 수 있다.
  • 객체를 상태와 행동을 함께 지닌 실체라고 정의한다. 이 말은 객체가 협력에 참여하기 위해 어떤 행동을 해야 한다면 그 행동을 하는 데 필요한 상태도 함께 지니고 있어야 한다는 것을 의미한다.
  • 과거의 전통적인 개발 방법은 데이터와 프로세스를 엄격하게 구분한다. 이에 반해 객체지향에서는 데이터와 프로세스를 객체라는 하나의 틀 안에 함께 묶어 놓음으로써 객체의 자율성을 보장한다. 이것이 전통적인 개발 방법과 객체지향을 구분 짓는 가장 핵심적인 차이다.
  • 객체지향의 세계에서는 오직 한 가지 의사소통 수단만이 존재한다. 이를 메시지라고 한다.
  • 객체지향이란?
    • 객체지향이란 시스템을 상호작용하는 자율적인 객체들의 공동체로 바라보고 객체를 이용해 시스템을 분할하는 방법이다.
    • 자율적인 객체란 상태와 행위를 함께 지니며 스스로 자기 자신을 책임지는 객체를 의미한다.
    • 객체는 시스템의 행위를 구현하기 위해 다른 객체와 협력한다. 각 객체는 협력 내에서 정해진 역할을 수행하며 역할은 관련된 책임의 집합이다.
    • 객체는 다른 객체와 협력하기 위해 메시지를 전송하고, 메시지를 수신한 객체는 메시지를 처리하는 데 적합한 메서드를 자율적으로 선택한다.

이상한 나라의 객체

  • 인간은 행동의 과정과 결과를 단순하게 기술하기 위해 상태라는 개념을 고안했다. 상태를 이용하면 과거에 얽매이지 않고 현재를 기반으로 객체의 행동 방식을 이해할 수 있다.
  • 객체와 객체 사이의 의미 있는 연결을 링크라고 한다. 객체와 객체 사이에는 링크가 존재해야만 요청(메시지)을 보내고 받을 수 있다.
  • 객체 간의 선으로 표현되는 링크와 달리 객체를 구성하는 단순한 값은 속성이라고 한다.
  • 객체는 자율적인 존재이다. 객체지향의 세계에서 객체는 다른 객체의 상태에 직접적으로 접근할 수도, 상태를 변경할 수도 없다. 자율적인 객체는 스스로 자신의 상태를 책임져야 한다.
  • 객체의 상태는 저절로 변경되지 않는다. 객체의 상태를 변경하는 것은 객체의 자발적인 행동뿐이다.
  • 객체는 협력에 참여하는 과정에서 자기 자신의 상태뿐만 아니라 다른 객체의 상태 변경을 유발할 수도 있다.
  • 객체의 행동으로 인해 발생하는 결과는 두 가지 관점에서 설명할 수 있다.
    • 객체 자신의 상태 변경
    • 행동 내에서 협력하는 다른 객체에 대한 메시지 전송
  • 현실 세계의 객체와 객체지향 세계의 객체 사이에는 중요한 차이점이 있다. 현실과 달리 객체지향의 세계에서 모든 객체는 자신의 상태를 스스로 관리하는 자율적인 존재다.
  • 객체는 상태를 캡슐 안에 감춰둔 채 외부로 노출하지 않는다. 객체가 외부에 노출하는 것은 행동뿐이며, 외부에서 객체에 접근할 수 있는 유일한 방법 역시 행동뿐이다.
  • 객체의 행동을 유발하는 것은 외부로부터의 전달된 메시지지만 객체의 상태를 변경할지 여부는 객체 스스로 결정한다. 사실 객체에게 메시지를 전달하는 외부의 객체는 메시지를 수신하는 객체의 상태가 변경된다는 사실조차 알지 못한다.
  • 상태를 외부에 노출시키지 않고 행동을 경계로 캡슐화하는 것은 결과적으로 객체의 자율성을 높인다.
  • 상태를 잘 정의된 행동 뒤로 캡슐화하는 것은 객체의 자율성을 높이고 협력을 단순하고 유연하게 만든다.
  • 상태를 먼저 결정하고 행동을 나중에 결정하는 방법은 설계에 나쁜 영향을 끼친다.
    • 상태를 먼저 결정할 경우 캡슐화가 저해된다.
    • 객체를 협력자가 아닌 고립된 섬으로 만든다.
    • 객체의 재사용성이 저하된다.
  • 객체지향 설계는 애플리케이션에 필요한 협력을 생각하고 협력에 참여하는 데 필요한 행동을 생각한 후 행동을 수행할 객체를 선택하는 방식으로 수행된다. 행동을 결정한 후에야 행동에 필요한 정보가 무엇인지를 고려하게 되며 이 과정에서 필요한 상태가 결정된다.
  • 객체지향 세계는 현실 세계의 단순한 모방이 아니다. 소프트웨어 안에 구현된 상품 객체는 실제 세계의 상품과는 전혀 다른 양상을 띤다.
    • 현실 속에서는 수동적인 존재가 소프트웨어 객체로 구현될 때는 능동적으로 변한다.
    • 소프트웨어 객체가 현실 객체의 부분적인 특징을 모방하는 것이 아니라 현실 객체가 가지지 못한 추가적인 능력을 보유하게 된다.
    • 현실 세계와 객체지향 세계 사이의 관계를 좀 더 정확하게 설명할 수 있는 단어는 은유다.
    • 현실 속의 객체의 의미 일부가 소프트웨어 객체로 전달되기 때문에 프로그램 내의 객체는 현실 속의 객체에 대한 은유다.

역할, 책임, 협력

  • 객체의 세계에서는 협력이라는 문맥이 객체의 행동 방식을 결정한다. 중요한 것은 개별 객체가 아니라 객체들 사이에 이뤄지는 협력이다.
  • 객체지향 개발에서 가장 중요한 능력은 책임을 능숙하게 소트웨어 객체에 할당하는 것이다. 책임을 어떻게 구현할 것인가 하는 문제는 객체와 책임이 제자리를 잡은 후에 고려해도 늦지 않다. 객체와 책임이 이리저리 부유하는 상황에서 성급하게 구현에 뛰어드는 것은 변경에 취약하고 다양한 협력에 참여할 수 없는 비자율적인 객체를 낳게 된다.
  • 객체의 책임은 크게 ‘하는 것’과 ‘아는 것’의 두 가지 범주로 분류된다.
    • 하는 것
      • 객체를 생성하거나 계산을 하는 등의 스스로 하는 것
      • 다른 객체의 행동을 시작시키는 것
      • 다른 객체의 활동을 제어하고 조절하는 것
    • 아는 것
      • 개인적인 정보에 관해 아는 것
      • 관련된 객체에 관해 아는 것
      • 자신이 유도하거나 계산할 수 있는 것에 관해 아는 것
  • 책임은 객체의 외부에 제공해 줄 수 있는 정보(아는 것의 측면)와 외부에 제공해 줄 수 있는 서비스(하는 것의 측면)의 목록이다. 따라서 책임은 객체의 공용 인터페이스를 구성한다.
  • 객체가 다른 객체에게 주어진 책임을 수행하도록 요청을 보내는 것을 메시지 전송이라고 한다. 따라서 두 객체 간의 협력은 메시지를 통해 이뤄진다.
  • 객체지향 설계는 협력에 참여하기 위해 어떤 객체가 어떤 책임을 수행해야 하고 어떤 객체로부터 메시지를 수신할 것인지를 결정하는 것으로부터 시작된다. 어떤 클래스가 필요하고 어떤 메서드를 포함해야 하는지를 결정하는 것은 책임과 메시지에 대한 대략적인 윤곽을 잡은 후에 시작해도 늦지 않다.
  • 역할은 협력 내에서 다른 객체로 대체할 수 있음을 나타내는 일종의 표식이다. 협력 안에서 역할은 “이 자리는 해당 역할을 수행할 수 있는 어떤 객체라도 대신할 수 있습니다”라고 말하는 것과 같다.
  • 역할을 대체하기 위해서는 각 역할이 수신할 수 있는 메시지를 동일한 방식으로 이해해야 한다.
  • 역할은 객체지향 설계의 단순성, 유연성, 재사용성을 뒷받침하는 핵심 개념이다.
  • 역할의 대체 가능성은 행위 호환성을 의미하고, 행위 호환성은 동일한 책임의 수행을 의미한다.

객체지향 설계 기법

  • 책임-주도 설계(Responsibility-Driven Design)
    • 객체지향 설계란 애플리케이션의 기능을 구현하기 위한 협력 관계를 고안하고, 협력에 필요한 역할과 책임을 식별한 후 이를 수행할 수 있는 적절한 객체를 식별해 나가는 과정이다. 객체지향 설계의 핵심은 올바른 책임을 올바른 객체에게 할당하는 것이다.
    • 책임-주도 설계에서는 시스템의 책임을 객체의 책임으로 변환하고, 각 객체가 책임을 수행하는 중에 필요한 정보나 서비스를 제공해줄 협력자를 찾아 해당 협력자에게 책임을 할당하는 순차적인 방식으로 객체들의 협력 공동체를 구축한다. 책임-주도 설계는 개별적인 객체의 상태가 아니라 객체의 책임과 상호작용에 집중한다.
  • 디자인 패턴(Design Pattern)
    • 디자인 패턴은 책임-주도 설계의 결과를 표현한다. 패턴은 모범이되는 설계다.
    • 패턴은 반복해서 일어나는 특정한 상황에서 어떤 설계가 왜 더 효과적인지에 대한 이유를 설명한다.
    • 특정 상황에 적용 가능한 디자인 패턴을 잘 알고 있다면 책임-주도 설계의 절차를 순차적으로 따르지 않고도 시스템 안에 구현할 객체들의 역할과 책임, 협력 관계를 빠르고 손 쉽게 포착할 수 있을 것이다.
  • 테스트-주도 개발(Test-Driven Development)
    • 테스트-주도 개발의 기본 흐름은 실패하는 테스트를 작성하고, 테스트를 통과하는 가장 간단한 코드를 작성한 후, 리팩터링을 통해 중복을 제거하는 것이다.
    • 테스트-주도 개발은 객체가 이미 존재한다고 가정하고 객체에게 어떤 메시지를 전송할 것인지에 관해 먼저 생각하라고 충고한다. 그러나 이 같은 종류의 충고는 역할, 책임, 협력의 관점에서 객체를 바라보지 않을 경우 무의미하다.
    • 테스트-주도 개발은 테스트를 작성하는 것이 아니라 책임을 수행할 객체 또는 클라이언트가 기대하는 객체의 역할이 메시지를 수신할 때 어떤 결과를 반환하고 그 과정에서 어떤 객체와 협력할 것인지에 대한 기대를 코드의 형태로 작성하는 것이다.
    • 테스트를 작성하기 위해 객체의 메서드를 호출하고 반환값을 검증하는 것은 순간적으로 객체가 수행해야 하는 책임에 관해 생각한 것이다. 테스트에 필요한 간접 입력 값을 제공하기 위해 스텁(stub)을 추가하거나 간접 출력 값을 검증하기 위해 목 객체(mock object)를 사용하는 것은 객체와 협력해야 하는 협력자에 관해 고민한 결과를 코드로 표현한 것이다.

책임과 메시지

  • 객체가 어떤 행동을 하는 유일한 이유는 다른 객체로부터 요청을 수신했기 때문이다. 요청을 처리하기 위해 객체가 수행하는 행동을 책임이라고 한다. 따라서 자율적인 객체란 스스로의 의지와 판단에 따라 각자 맡은 책임을 수행하는 객체를 의미한다.
  • 적절한 책임이 자율적인 객체를 낳고, 자율적인 객체들이 모여 유연하고 단순한 협력을 낳는다. 따라서 협력에 참여하는 객체가 얼마나 자율적인지가 전체 애플리케이션의 품질을 결정한다.
  • 추상적이고 포괄적인 책임은 협력을 좀 더 다양한 환경에서 재사용할 수 있도록 유연성이라는 축복을 내려준다. 그러나 책임은 협력에 참여하는 의도를 명확하게 설명할 수 있는 수준 안에서 추상적이어야 한다.
  • 객체가 다른 객체에게 접근할 수 있는 유일한 방법은 요청을 전송하는 것뿐이다. 그리고 이 요청을 메시지라고 부른다. 메시지는 객체로 하여금 자신의 책임, 즉 행동을 수행하게 만드는 유일한 방법이다.
  • 메시지를 처리할 수 있다는 것은 객체가 해당 메시지에 해당하는 행동을 수행해야 할 책임이 있다는 것을 의미한다. 따라서 메시지의 개념은 책임의 개념과 연결된다. 송신자는 메시지 전송을 통해서만 다른 객체의 책임을 요청할 수 있고, 수신자는 오직 메시지 수신을 통해서만 자신의 책임을 수행할 수 있다. 따라서 객체가 수신할 수 있는 메시지의 모양이 객체가 수행할 책임의 모양을 결정한다.
  • 객체가 유일하기 이해할 수 있는 의사소통 수단은 메시지 뿐이며 객체는 메시지를 처리하기 위한 방법을 자율적으로 선택할 수 있다. 외부의 객체는 메시지에 관해서만 볼 수 있고 객체 내부는 볼 수 없기 때문에 자연스럽게 객체의 외부와 내부가 분리된다.
  • 메시지를 수신한 객체가 실행 시간에 메서드를 선택할 수 있다는 사실은 다른 프로그래밍 언어와 객체지향 프로그래밍 언어를 구분 짓는 핵심적인 특징 중 하나다. 이것은 프로시저 호출에 대한 실행 코드를 컴파일 시간에 결정하는 절차적인 언어와 확연히 구분되는 특징이다.
  • 다형성이란 서로 다른 유형의 객체가 동일한 메시지에 대해 서로 다르게 반응하는 것을 의미한다. 서로 다른 타입에 속하는 객체들이 동일한 메시지를 수신할 경우 서로 다른 메서드를 이용해 메시지를 처리할 수 있는 메커니즘을 가리킨다.
  • 다형성은 역할, 책임, 협력과 깊은 관련이 있다. 서로 다른 객체들이 다형성을 만족시킨다는 것은 객체들이 동일한 책임을 공유한다는 것을 의미한다.
  • 다형성에서 중요한 것은 메시지 송신자의 관점이다. 메시지 수신자들이 동일한 오퍼레이션을 서로 다른 방식으로 처리하더라도 메시지 송신자의 관점에서 이 객체들은 동일한 책임을 수행하는 것이다. 즉, 다형성은 수신자의 종류를 캡슐화한다.
  • 다형성은 송신자와 수신자 간의 객체 타입에 대한 결합도를 메시지에 대한 결합도로 낮춤으로써 달성된다. 다형성을 사용하면 메시지를 이해할 수 있는 어떤 객체와도 협력할 수 있는 유연하고 확장 가능한 구조를 만들 수 있다. 객체지향 패러다임이 강력한 이유는 다형성을 이용해 협력을 유연하게 만들 수 있기 때문이다.
  • 객체지향의 기본 개념은 책임을 수행하는 자율적인 객체들의 협력을 통해 애플리케이션을 구축하는 것이다. 객체지향의 세계에서 객체들이 서로 협력하기 위해 사용할 수 있는 유일한 방법은 메시지를 전송하는 것이다.
  • 객치지향 애플리케이션의 중심 사상은 연쇄적으로 메시지를 전송하고 수신하는 객체들 사이의 협력 관계를 기반으로 사용자에게 유용한 기능을 제공하는 것이다.
  • 클래스 기반의 객체지향 언어를 사용하는 대부분의 사람들은 객체지향 애플리케이션을 클래스의 집합으로 생각한다. 프로그래머 입장에서 클래스는 실제로 볼 수 있고 수정할 수 있는 구체적인 존재다. 그러나 클래스는 단지 동적인 객체들의 특성과 행위를 정적인 텍스트로 표현하기 위해 사용할 수 있는 추상화 도구일 뿐이다. 중요한 것은 클래스가 아니라 객체다. 클래스를 정의하는 것이 먼저가 아니라 객체들의 속성과 행위를 식별하는 것이 먼저다. 클래스는 객체의 속성과 행위를 담는 틀일 뿐이다.
  • 객체지향 패러다임으로의 전환은 시스템을 정적인 클래스들의 집합이 아니라 메시지를 주고받는 동적인 객체들의 집합으로 바라보는 것에서 시작된다. 클래스에 담길 객체들의 공통적인 행위와 속성을 포착하기 위해서는 먼저 협력하는 객체들의 관점에서 시스템을 바라봐야 한다. 진정한 객체지향 패러다임으로의 도약은 개별적인 객체가 아니라 메시지를 주고받는 객체들 사이의 커뮤니케이션에 초점을 맞출 때 일어난다.
  • 훌륭한 객체지향 설계는 어떤 객체가 어떤 메시지를 전송할 수 있는가와 어떤 객체가 어떤 메시지를 이해할 수 있는가를 중심으로 객체 사이의 협력 관계를 구성하는 것이다. 이것은 개별 객체에 초점을 맞추는 관점과는 매우 다르다. 사실 협력이라는 문맥에서 벗어나 독립적인 객체에 관해 고민하는 것은 클래스에 초점을 맞추는 것과 별다른 차이가 없다.
  • 객체지향 설계의 중심에는 메시지가 위치한다. 객체가 메시지를 선택하는 것이 아니라 메시지가 객체를 선택하게 해야 한다. 메시지가 객체를 선택하게 만들려면 메시지를 중심으로 협력을 설계해야 한다.
  • 책임-주도 설계의 핵심은 어떤 행위가 필요한지를 먼저 결정한 후에 이 행위를 수행할 객체를 결정하는 것이다. 이 과정을 흔히 What/Who 사이클이라고 한다.
  • 결론적으로 협력이라는 문맥 안에서 필요한 메시지를 먼저 결정한 후에 메시지를 수신하기에 적합한 객체를 선택한다. 그리고 수신된 메시지가 객체의 책임을 결정한다. 이것은 객체를 고립된 상태로 놓고 어떤 책임이 적절한지를 결정하는 것과는 근본적으로 다른 접근 방법이다.
  • 협력이라는 문맥 안에서 객체의 책임을 결정하는 것은 메시지다. 책임이 먼저 오고 객체가 책임을 따른다. 결과적으로 시스템이 수행해야 하는 전체 행위는 협력하는 객체들의 책임으로 분배된다.
  • 객체가 자신이 수신할 메시지를 결정하게 하지 말고 메시지가 협력에 필요한 객체를 발견하게 해야 한다.

Mockito annotation을 사용하는 field 초기화 하기

Mockito JUnit rule

Mockito에서 제공하는 @Mock, @Spy, @InjectMocks과 같은 annotation을 사용하는 field를 초기화 하는 방법으로는 2가지가 제공되고 있었다.

  • JUnit test class에 @RunWith(MockitoJUnitRunner.class)를 추가하는 방법

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @RunWith(MockitoJUnitRunner.class)
    public class MockitoTest {

    @Mock
    private List list;

    @Test
    public void shouldDoSomething() {
    list.add(100);
    }
    }
  • MockitoAnnotations.initMocks(Object)을 @Before 메서드에서 실행하는 방법

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class MockitoTest {

    @Before
    public void setup() {
    MockitoAnnotations.initMocks(this);
    }

    @Mock
    private List list;

    @Test
    public void shouldDoSomething() {
    list.add(100);
    }
    }

그리고 추가로 Mockito 1.10.17 버전부터 제공하는 JUnit rule을 이용하는 방법이 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MockitoTest {

@Rule
public MockitoRule mockito = MockitoJUnit.rule();

@Mock
private List list;

@Test
public void shouldDoSomething() {
list.add(100);
}
}

그럼 위의 3가지 방법 중 어떤 방법을 사용하는 것이 좋을까? 먼저, 3가지 방식들을 비교하기전에 Test Runner, @RunWith, Rule에 대해서 간단하게 알아보자.

Test Runner

JUnit 프레임워크에서 테스트 클래스 내에 존재하는 각각의 테스트 메소드 실행을 담 당하고 있는 클래스를 Test Runner라고 한다. Test Runner는 테스트 클래스의 구조에 맞게 테스트 메소드들을 실행하고 결과를 표시하는 역할을 수행한다. 우리 눈에는 보이지 않지만, 테스트 케이스를 IDE에서 실행하면 내부적으로 는 JUnit의 BlockJUnit4ClassRunner라는 Test Runner 클래스가 실행되고, IDE는 그 결과를 해석해서 우리에게 보기 편한 화면으로 보여준다.

대부분의 Java 통합개발환경(IDE)은 JUnit 프레임워크를 내장 지원하고 있다. 그래서 종종 JUnit이 독립적인 프레임워크라기보다는 하나의 기능처럼 생각될 수 있다. 하지만 JUnit 프레임워크는 분명 독립적인 소프트웨어이고, 애초부터 그렇게 만들어 졌다. 그렇기 때문에 명령행 프롬프트에서 실행하거나 셸 스크립트 등을 이용해서 실행할 수도 있다.

@RunWith

@RunWith annotation은 JUnit에 내장된 기본 테스트 러너인 BlockJUnit4ClassRunner 대신에 @RunWith(클래스이름.class)를 이용해 JUnit Test 클래스를 실행하기 위한 Test Runner를 명시적으로 지정할 수 있다. 지정된 클래스를 이용해 테스트 클래스 내의 테스트 메소드들을 수행하도록 지정해주는 annotation이다. 일종의 JUnit 프레임워크의 확장지점이다. 이런 구조를 이용해서 많은 애플리케이션이나 프레임워크가 자신에게 필요한 Test Runner를 직접 만들어 자신만의 고유한 기능을 추가해 테스트를 수행하고 있다. 예를 들면, 스프링 프레임워크에서 제공하는 SpringJUnit4ClassRunner, SpringRunner같은 클래스는 이 확장 기능을 이용한 대표적인 사례 중 하나다.

Rule

JUnit 4.7 버전부터 추가된 기능으로 하나의 테스트 클래스 내에서 각 테스트 메소드의 동작 방식을 재정의하거나 추가하기 위해 사용하는 기능이다. 테스트 케이스 수행을 좀 더 세밀하게 조작할 수 있게 된다.

결론

MockitoJUnit의 rule을 사용하면 MockitoJUnitRunner와 똑같은 기능을 수행하면서, 다른 Test Runner를 사용할 수 있다. 사용하고 있는 Mockito의 버전이 1.10.17, JUnit 버전이 4.7 이상이라면 @RunWith가 아닌 MockitoJUnit의 rule을 사용해서 Mockito annotation을 사용하는 field를 초기화 하자

참고