[번역] Java Reactor Pattern

해당 글은 아래의 Reactor Pattern Explained 시리즈를 번역하였습니다.

Part 1

서버가 동시에 요청(이벤트)을 받을 때, 보통 요청을 처리하기 위한 이벤트 리스너를 각 스레드마다 할당해 처리하곤 합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) { }
}
}

class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { }
}
private byte[] process(byte[] cmd) { }
}

요청마다 처리를 위해 이벤트 리스너를 별도의 스레드로 만들어 사용하는 것의 단점은 컨텍스트 전환(context switching)의 오버 헤드가 크다는 것입니다. 최악의 경우, 데이터를 자주 읽거나 쓰지 않는 이벤트 리스너를 처리하는 일부 스레드는 유용한 작업을 하지 않고 주기적으로 컨텍스트를 전환할 것입니다. 스케줄러가 이러한 스레드를 CPU에 디스패치(dispatch) 할 때마다 I/O 이벤트가 발생할 때까지 스레드는 차단되어 I/O 이벤트를 기다리는 데 소비되는 모든 시간이 낭비됩니다.

위의 코드에서 ss.accept()는 클라이언트가 연결될 때까지 서버 스레드를 차단하는 블로킹 호출입니다. 서버 스레드는 ss.accept() 호출이 반환 될 때까지 이벤트 리스너를 위한 새로운 스레드의 start() 메서드를 호출 할 수 없습니다. 불필요한 컨텍스트 전환으로 인한 CPU 시간 낭비를 줄이기 위해 non-blocking I/O 개념이 탄생했습니다.

**Reactor Pattern**은 이 문제를 해결하기 위한 이벤트 처리(event handling) 디자인 패턴입니다. 하나의 Reactor가 계속 이벤트를 찾고 이벤트가 발생(trigger)하면 해당 이벤트 처리기(event handler)에게 이를 알립니다.

자바는 non-blocking 시스템을 설계하는 데 사용할 수 있는 **표준 API (java.nio)**를 제공합니다. 클라이언트가 서버로 이름(데이터)을 보내면 서버는 Hello 메시지로 응답하는 간단한 예제를 통해 Reactor 패턴에 대해서 알아보겠습니다.

Reactor 패턴의 아키텍처에는 두 가지 중요한 참여자가 있습니다.

  1. Reactor
    : Reactor는 별도의 스레드에서 실행되며 발생한 I/O 이벤트는 dispatching되어 해당 이벤트 처리기로 보내 처리합니다.
  2. Handlers
    : Handler는 Reactor로부터 I/O 이벤트를 받아 실제 작업을 수행합니다.

java.nio 패키지를 이용해서 Reactor 패턴을 구현할 것이기 때문에 nio 패키지에 속한 몇가지 class에 대한 이해가 필요합니다.

  • Channels
    : 소켓을 통해 non-blocking read를 할 수 있도록 지원하는 connection.
  • Buffers
    : 채널에 의해 직접 read되거나 write될 수 있는 배열과 같은 객체.
  • Selectors
    : Selector 는 어느 channel set 이 IO event 를 가지고 있는지를 알려준다. Selector.select() 는 I/O 이벤트가 발생한 채널 set을 return한다. return할 channel이 없다면 계속 기다리게(block) 된다. 이 block된 것을 바로 return 시켜주는 것이 Selector.wakeup()이다.
    Selector.selectedKeys()는 Selection Key 를 return 해 준다. Reactor는 이 Selection Key를 보고 어떤 handler로 넘겨줄 지를 결정한다.
  • Selection Keys
    : Selector와 Channel간의 관계를 표현해주는 객체이다. Selector가 제공한 Selection Key를 이용해 Reactor는 채널에서 발생하는 I/O 이벤트로 수행할 작업을 선택할 수 있다. ServerSocketChannel에 selector를 등록하면 key를 준다. 이 key가 SelectionKey 이다.

Reactor pattern

Selector는 계속해서 I/O 이벤트가 발생하기를 대기합니다. Reactor가 Selector.select() 메소드를 호출하면 Selector는 등록된 채널에 대해서 발생한 이벤트 정보가 들어있는 SelectionKey Set을 반환합니다. (SelectionKey는 해당 채널과 Selector와의 관계에 대한 모든 정보를 갖고 있습니다. 또한 Handler에 대한 정보도 갖고 있습니다.)

Selector에 등록된 하나의 ServerSocketChannel이 있습니다. ServerSocketChannel은 클라이언트에서 들어오는 연결 요청으로부터 이벤트를 수신해야합니다. 클라이언트가 연결을 요청할 때, ServerSocketChhannel은 I/O 이벤트를 받아 클라이언트에 SocketChannel을 할당해야 합니다. SelectionKey0은 ServerSocketChannel을 가지고 무엇을 해야하는지에 대한 이벤트 정보를 갖고 있습니다. SocketChhannel을 만들기 위해서는 Reactor가 SelectionKey0의 이벤트를 Acceptor에 전달해 Acceptor가 클라이언트와의 연결 요청을 수락하고 SocketChannel을 만들도록 해야합니다.

Acceptor가 클라이언트1의 연결을 수락하면 클라이언트1에 대한 SocketChannel이 생성됩니다. 이 SocketChannel역시 Selector에 등록되고 해당 채널에서 이벤트가 발생하면 해당 이벤트에 대한 정보를 포함한 SelectionKey1을 반환합니다. 이 SelectionKey1을 이용해서 해당 채널로부터 데이터를 읽고 쓸 수 있습니다. 따라서 SelectionKey1은 읽기와 쓰기를 처리하는 Handler1 객체에 바인딩 됩니다.

이후로 Reactor가 Selector.selector()를 호출했을 때 반환된 SelectionKey Set에 SelectionKey1이 있으면 SocketChannel1이 이벤트와 함께 트리거됨을 의미합니다. 이제 SelectionKey1을 보면, Reactor는 Handler1이 SelectionKey1에 바인딩되어 있으므로 Handler1에 이벤트를 전달해야한다는 것을 알고 있습니다. 반환 된 SelectionKey Set에 SelectionKey0이 있으면 ServerSocketChannel이 다른 클라이언트에서 이벤트를 수신했으며 SelectionKey0을 보고 Reactor는 해당 이벤트를 다시 Acceptor에 전달해야 함을 알고 있습니다. 이벤트가 Acceptor에 전달되면 클라이언트2에 대해 SocketChannel2를 만들고 SelectionKey2로 Selector로 SocketChannel2를 등록합니다.

Selection Key table

따라서 이 시나리오에서는 3가지 유형의 이벤트에 관심이 있습니다.

  1. accept 해야하는 ServerSocketChannel에서 트리거되는 연결 요청 이벤트.
  2. 클라이언트로 부터 송신된 데이터를 수신할 수 있을 때, SocketChannel로부터 트리거 되는 이벤트.
  3. 서버에서 클라이언트로 데이터를 송신할 때, 송신할 수 있는 준비가 되면 SocketChannel로부터 트리거 되는 이벤트.

그럼 스레드 풀은 이 작업과 어떤 관련이 있을까요? non-blocking 아키텍처의 장점은 클라이언트의 모든 요청을 처리하는 동시에 단일 스레드에서 실행되도록 서버를 작성할 수 있다는 것입니다. 서버를 설계하는 데 동시성 개념을 적용하지 않으면 이벤트에 대한 반응성이 떨어집니다. 단일 스레드일 때는 reactor가 이벤트를 handler에 전달해 처리될 때 까지는 다른 이벤트에 응답할 수 없기 때문입니다. 왜냐하면 하나의 스레드를 사용하여 모든 이벤트를 처리하기 때문입니다.

위의 아키텍처에 동시성을 추가해서 시스템의 응답 속도를 향상시킬 수 있습니다. Reactor가 이벤트를 Handler에 전달하고 새로운 Thread에서 Handler를 이용해 이벤트를 처리하면 Reacgtor는 계속해서 다른 이벤트에 응답할 수 있습니다. 또한 스레드 풀을 이용하면 시스템의 스레드 수를 제한하면서 더 효율적으로 사용할 수 있을 것 입니다.

Part 2

간단한 Reactor 패턴의 예시를 살펴보겠습니다. 클라이언트는 서버로 이름을 넣은 메시지를 전송하고 서버는 클라이언트에 Hello 메시지로 응답합니다. 스레드 풀은 Part3에서 살펴보겠습니다.

클라이언트는 java.nio를 사용하여 Socket을 생성하지 않고 java.net.Socket을 사용합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class Client {
String hostIp;
int hostPort;

public Client(String hostIp, int hostPort) {
this.hostIp = hostIp;
this.hostPort = hostPort;
}

public void runClient() throws IOException {
Socket clientSocket = null;
PrintWriter out = null;
BufferedReader in = null;

try {
clientSocket = new Socket(hostIp, hostPort);
out = new PrintWriter(clientSocket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
} catch (UnknownHostException e) {
System.err.println("Unknown host: " + hostIp);
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn't connect to: " + hostIp);
System.exit(1);
}

BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
String userInput;

System.out.println("Client connected to host : " + hostIp + " port: " + hostPort);
System.out.println("Type (\"Bye\" to quit)");
System.out.println("Tell what your name is to the Server.....");

while ((userInput = stdIn.readLine()) != null) {

out.println(userInput);

// Break when client says Bye.
if (userInput.equalsIgnoreCase("Bye"))
break;

System.out.println("Server says: " + in.readLine());
}

out.close();
in.close();
stdIn.close();
clientSocket.close();
}

public static void main(String[] args) throws IOException {

Client client = new Client("127.0.0.1", 9900);
client.runClient();
}
}

서버는 Reactor 패턴을 이용해 구현합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Reactor implements Runnable {

final Selector selector;
final ServerSocketChannel serverSocketChannel;
final boolean isWithThreadPool;

Reactor(int port, boolean isWithThreadPool) throws IOException {

this.isWithThreadPool = isWithThreadPool;
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey0.attach(new Acceptor());
}


public void run() {
System.out.println("Server listening to port: " + serverSocketChannel.socket().getLocalPort());
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}

void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
r.run();
}
}

class Acceptor implements Runnable {
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
if (isWithThreadPool)
new HandlerWithThreadPool(selector, socketChannel);
else
new Handler(selector, socketChannel);
}
System.out.println("Connection Accepted by Reactor");
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}

Reactor는 Runnable을 구현하고 있으며, run() 메서드에서는 while 루프를 돌며 selector.select()를 호출하여 처리할 수 있는 이벤트 정보가 담긴 SelectionKey Set을 가져옵니다. SelectionKey에 바인드 되어있는 Handler를 가져와 dispatch합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class Handler implements Runnable {

final SocketChannel socketChannel;
final SelectionKey selectionKey;
ByteBuffer input = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
String clientName = "";

Handler(Selector selector, SocketChannel c) throws IOException {
socketChannel = c;
c.configureBlocking(false);
selectionKey = socketChannel.register(selector, 0);
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}


public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}

void read() throws IOException {
int readCount = socketChannel.read(input);
if (readCount > 0) {
readProcess(readCount);
}
state = SENDING;
// Interested in writing
selectionKey.interestOps(SelectionKey.OP_WRITE);
}

/**
* Processing of the read message. This only prints the message to stdOut.
*
* @param readCount
*/
synchronized void readProcess(int readCount) {
StringBuilder sb = new StringBuilder();
input.flip();
byte[] subStringBytes = new byte[readCount];
byte[] array = input.array();
System.arraycopy(array, 0, subStringBytes, 0, readCount);
// Assuming ASCII (bad assumption but simplifies the example)
sb.append(new String(subStringBytes));
input.clear();
clientName = sb.toString().trim();
}

void send() throws IOException {
System.out.println("Saying hello to " + clientName);
ByteBuffer output = ByteBuffer.wrap(("Hello " + clientName + "\n").getBytes());
socketChannel.write(output);
selectionKey.interestOps(SelectionKey.OP_READ);
state = READING;
}
}

Handler에는 READING, SENDING 2가지 상태가 있습니다. 채널은 한 번에 하나의 작업만 지원하기 때문에 동시에 처리할 수 없습니다. Handler가 SelectionKey에 어떻게 attach 되는지와 관심있는 연산이 OP_READ로 설정되는 부분에 유의해야합니다. Selector는 Read 이벤트가 발생할 때만 SelectionKey를 select해야 합니다. READ 프로세스가 완료되면 Handler는 상태를 SENDING으로 변경하고 관심 대상 연산을 OP_WRITE로 변경합니다. 이제 Selector는 채널이 데이터를 전송할 준비가 되었을 때 SelectionKey를 select 합니다. Write 이벤트가 Handler에 dispatch될 때, 상태가 SENDING이므로 Hello 메시지를 출력 버퍼에 씁니다. 전송이 완료되면 관심있는 작업을 OP_READ로 다시 변경하면서 Handler의 상태가 READING으로 변경됩니다.

결과적으로 서버는 단일 스레드에서 실행되지만 서버에 연결하는 클라이언트 수에 상관없이 응답합니다.

Part 3

이번 파트에서는 Handler의 스레드 풀에 대해서 설명합니다. HandlerWithThreadPool은 Handler 클래스의 확장 버전입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class HandlerWithThreadPool extends Handler {

static ExecutorService pool = Executors.newFixedThreadPool(2);
static final int PROCESSING = 2;

public HandlerWithThreadPool(Selector sel, SocketChannel c) throws IOException {
super(sel, c);
}

void read() throws IOException {
int readCount = socketChannel.read(input);
if (readCount > 0) {
state = PROCESSING;
pool.execute(new Processer(readCount));
}
// We are interested in writing back to the client soon after read processing is done.
selectionKey.interestOps(SelectionKey.OP_WRITE);
}

// Start processing in a new Processer Thread and Hand off to the reactor thread.
synchronized void processAndHandOff(int readCount) {
readProcess(readCount);
// Read processing done. Now the server is ready to send a message to the client.
state = SENDING;
}

class Processer implements Runnable {
int readCount;
Processer(int readCount) {
this.readCount = readCount;
}
public void run() {
processAndHandOff(readCount);
}
}
}

PROCESSING이 새로 도입되었으며 read() 메서드가 override 되었습니다. 이제 Read 이벤트가 Handler에 dispatch되면 데이터를 읽지만 상태를 SENDING으로 변경하지는 않습니다. 메시지를 처리하고 스레드 풀의 다른 스레드에서 실행하고 관련 작업을 OP_WRITE로 설정하는 Processer를 생성합니다. 이 시점에서 채널에 Write 준비가 되어 있더라도 Handler는 아직 PROCESSING 상태이기 때문에 write하지 않습니다.

참고

Reactor Pattern 에 대해 알아보자

Java NIO와 멀티플렉싱 기반의 다중 접속 서버

자바 NIO에 대한 소개와 NIO와 함께 도입된 자바에서 I/O 멀티플렉싱(multiplexing)을 구현한 selector에 대해 알아봅니다. I/O 멀티플렉싱(multiplexing)에 대한 개념에 대해 아직 잘 이해하지 못하고 있다면 먼저 <멀티플렉싱 기반의 다중 접속 서버로 가기까지> 포스팅을 읽어주세요.

Overview

자바 NIO (New IO)는 기존의 자바 IO API를 대체하기 위해 자바 1.4부터 도입이 되었습니다. 새롭게 변화된 부분에 대해서 간략히 요약해보면 다음과 같습니다.

  • Channels and Buffers
    기존 IO API에서는 byte streams character streams 사용했지만, NIO에서는 channels(채널)과 buffers(버퍼)를 사용합니다. 데이터는 항상 채널에서 버퍼로 읽히거나 버퍼에서 채널로 쓰여집니다.
  • Non-blocking IO
    자바 NIO에서는 non-blocking IO를 사용할 수 있습니다. 예를 들면, 하나의 스레드는 버퍼에 데이터를 읽도록 채널에 요청할 수 있습니다. 채널이 버퍼로 데이터를 읽는 동안 스레드는 다른 작업을 수행할 수 있습니다. 데이터가 채널에서 버퍼로 읽어지면, 스레드는 해당 버퍼를 이용한 processing(처리)를 계속 할 수 있습니다. 데이터를 채널에 쓰는 경우도 non-blocking이 가능합니다.
  • Selectors
    자바 NIO에는 “selectors” 개념을 포함하고 있습니다. selector는 여러개의 채널에서 이벤트(연결이 생성됨, 데이터가 도착함)를 모니터링할 수 있는 객체입니다. 그래서 하나의 스레드에서 여러 채널에 대해 모니터링이 가능합니다.

자바 NIO는 다음과 같은 핵심 컴포넌트로 구성되어있습니다.

  • Channels
  • Buffers
  • Selectors

실제로는 더 많은 클래스와 컴포넌트가 있지만 채널, 버퍼, 셀렉터가 API의 핵심을 구성합니다.

Channels

일반적으로 NIO의 모든 IO는 채널로 시작합니다. 채널 데이터를 버퍼로 읽을 수 있고, 버퍼에서 채널로 데이터를 쓸 수 있습니다.
channel and buffer

채널은 스트림(stream)과 유사하지만 몇가지 차이점이 있습니다.

  • 채널을 통해서는 읽고 쓸 수 있지만, 스트림은 일반적으로 단방향(읽기 혹은 쓰기)으로만 가능합니다.
  • 채널은 비동기적(asynchronously)으로 읽고 쓸 수 있습니다.
  • 채널은 항상 버퍼에서 부터 읽거나 버퍼로 씁니다.

채널에는 여러가지 타입이 있습니다. 다음은 자바 NIO에 기본적으로 구현되어 있는 목록입니다.

  • Channels
    • FileChannel
      : 파일에 데이터를 읽고 쓴다.
    • DatagramChannel
      : UDP를 이용해 네트워크를 통해 데이터를 읽고 쓴다.
    • SocketChannel
      : TCP를 이용해 네트워크를 통해 데이터를 읽고 쓴다.
    • ServerSocketChannel
      : 들어오는 TCP 연결을 수신(listening)할 수 있다. 들어오는 연결마다 SocketChannel이 만들어진다.

Buffers

NIO의 버퍼는 채널과 상호작용할 때 사용됩니다. 데이터는 채널에서 버퍼로 읽혀지거나, 버퍼에서 읽혀 채널로 쓰여집니다.

버퍼에는 여러가지 타입이 있습니다. 다음은 자바 NIO에 기본적으로 구현되어 있는 목록입니다.

  • Buffers
    • ByteBuffer
    • MappedByteBuffer
    • CharBuffer
    • ShortBuffer
    • IntBuffer
    • LongBuffer
    • FloatBuffer
    • DoubleBuffer

일반적으로 버퍼를 사용하여 데이터를 읽고 쓰는 것은 4단계 프로세스를 가집니다.

  1. 버퍼에 데이터 쓰기
  2. buffer.flip() 호출
  3. 버퍼에서 데이터 읽기
  4. buffer.clear() 혹은 buffer.compact() 호출

버퍼에 데이터를 쓸 때 버퍼는 쓰여진 데이터의 양을 기록합니다. 만약 데이터를 읽어야한다면 flip() 메서드를 호출해서 버퍼를 쓰기 모드에서 읽기 모드로 전환해야 합니다. 읽기 모드에서 버퍼를 사용하면 버퍼에 쓰여진 모든 데이터를 읽을 수 있습니다.
모든 데이터를 읽은 후에는 버퍼를 지우고 다시 쓸 준비를 해야합니다. clear() 혹은 compact()를 호출함으로써 전체 버퍼를 지울 수 있습니다. (clear() 메서드는 버퍼 전체를 지우고, compact() 메서드는 이미 읽은 데이터만 지웁니다.)

Selectors

셀렉터를 사용하면 하나의 스레드가 여러 채널을 처리(handle)할 수 있습니다.
selector
셀렉터는 사용을 위해 하나 이상의 채널을 셀렉터에 등록하고 select() 메서드를 호출해 등록 된 채널 중 이벤트 준비가 완료된 하나 이상의 채널이 생길 때까지 봉쇄(block)됩니다. 메서드가 반환(return)되면 스레드는 채널에 준비 완료된 이벤트를 처리할 수 있습니다. 즉, 하나의 스레드에서 여러 채널을 관리할 수 있으므로 여러 네트워크 연결을 관리할 수 있습니다. (SocketChannel, ServerSocketChannel)

Selector 생성

Selector.open() 메서드를 통해 셀렉터를 생성할 수 있습니다.

1
Selector selector = Selector.open();

Channels 등록

생성한 셀렉터에 채널을 등록하기 위해서는 다음과 같이 채널의 register() 메서드를 호출합니다.

1
2
3
channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

셀렉터에 채널을 등록하기 위해서는 반드시 해당 채널이 non-blocking 모드로 변환되어야 합니다. (FileChannel은 non-blocking 모드로 변경이 불가능하기 때문에 셀렉터에 등록이 불가능합니다.)

register() 메서드의 두 번째 매개 변수는 셀렉터를 통해 채널에서 발생하는 이벤트 중 확인(알림)하고자 하는 이벤트의 집합을 의미합니다.
이벤트에는 4가지 종류가 있으며, 이 4가지 이벤트는 SelectionKey 상수로 표시됩니다.

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

둘 이상의 이벤트 상수는 다음과 같이 사용 가능합니다.

1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE

SelectionKey

register() 메서드를 이용해 채널을 셀렉터에 등록하면 SelectionKey 객체가 반환됩니다. 이 SelectionKey 객체에는 몇 가지 속성들이 있습니다.

  • The interest set
  • The ready set
  • The Channel
  • The Selector
  • An attached object (optional)

interest Set

interest set은 셀렉터에 등록된 채널이 확인하고자 하는 이벤트 집합(세트)입니다. 다음과 같이 SelectionKey를 이용해 해당 interest set을 확인할 수 있습니다.

1
2
3
4
5
6
int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

Ready Set

ready set은 셀렉터에 등록된 채널에서 준비되어 처리(handle) 가능한 이벤트의 집합입니다.

1
int readySet = SelectionKey.readyOps();

위와 같이 interest Set과 동일한 방식으로 확인할 수도 있지만 아래와 같이 4가지 메소드를 이용해서 확인할 수도 있습니다.

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel + Selector

SelectionKey를 이용해 채널과 셀렉터에 쉽게 접근할 수 있습니다.

1
2
3
Channel  channel  = selectionKey.channel();

Selector selector = selectionKey.selector();

Attaching Objects

SelectionKey에 객체를 첨부(attach)할 수 있습니다. 이 방법을 이용하면 채널에 추가 정보나 채널에서 사용하는 버퍼와 같은 객체들을 쉽게 첨부할 수 있습니다.

1
2
3
selectionKey.attach(theObject);

Object attachedObj = selectionKey.attachment();

selectionKey를 통해 직접 attach 하는 것 뿐만 아니라 셀렉터에 채널을 등록하면서 객체를 첨부할 수도 있습니다.

1
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

셀렉터를 이용해 채널 선택

셀렉터에 하나 이상의 채널을 등록한 후에는 select() 메소드를 호출할 수 있습니다. select() 메소드는 accept, connect, read, write 이벤트에 대해 준비(ready) 되어 있는 채널을 반환합니다. select() 메소드는 다음과 같이 3가지 방식으로 사용 가능합니다.

  • select()
    : 등록한 이벤트에 대해 하나 이상의 채널이 준비 될 때까지 봉쇄(block)됩니다. 몇개의 채널이 준비되었는지 준비된 채널의 수를 반환합니다. (마지막으로 select()를 호출한 이후 준비된 채널 수 입니다.)
  • select(long timeout)
    : 최대 timeout(ms) 동안만 봉쇄한다는 점을 제외하면 select()와 동일합니다.
  • selectNow()
    : select와 달리 봉쇄하지 않습니다. 준비된 채널이 있으면 즉시 반환됩니다.

selectedKeys()

select() 메서드를 통해 하나 이상의 준비된 채널이 발생하면, selectedKeys() 메서드를 사용해 준비된 채널의 집합을 반환 받습니다.

1
Set<SelectionKey> selectedKeys = selector.selectedKeys();

반환된 SelectionKey set을 반복해 준비된 채널에 접근할 수 있습니다. 채널의 이벤트 처리가 끝나면 keyIterator.remove()를 통해 키 세트에서 제거해야 합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.

} else if (key.isConnectable()) {
// a connection was established with a remote server.

} else if (key.isReadable()) {
// a channel is ready for reading

} else if (key.isWritable()) {
// a channel is ready for writing
}

keyIterator.remove();
}

ServerSocketChannel

NIO ServerSocketChannel은 표준 자바 네트워킹의 ServerSocket과 마찬가지로 들어오는 TCP 연결을 수신 대기 할 수 있는 채널입니다.

1
2
3
4
5
6
7
8
9
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));

while(true) {
SocketChannel socketChannel = serverSocketChannel.accept();

// do something with socketChannel...
}

Non-blocking Mode

ServerSocketChannel은 non-blocking 모드로 설정이 가능합니다. non-blocking 모드에서는 accept() 메서드가 즉시 반환되므로 들어오는 연결이 없으면 null을 반환할 수 있습니다. 이에 따라 반환 된 ServerSocketChannel이 null인지 확인해야합니다.

1
2
3
4
5
6
7
8
9
10
11
12
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

while(true){
SocketChannel socketChannel = serverSocketChannel.accept();

if (socketChannel != null) {
// do something with socketChannel...
}
}

SocketChannel

NIO SocketChannel은 TCP 네트워크 소켓에 연결된 채널입니다. 표준 자바 네트워킹의 Socket과 역할이 같습니다.

Non-blocking Mode

ServerSocketChannel과 마찬가지로 SocketChannel을 non-blocking 모드로 설정할 수 있습니다. non-blocking 모드에서는 connect(), read(), write()를 호출할 수 있습니다.

connect()

SocketChannel이 non-blocking 모드일 때, connect()를 호출하면 메서드가 연결이 설정되기 전에 반환될 수 있습니다. 연결이 설정되었는지 확인하기 위해서 finishConnect() 메서드를 이용할 수 있습니다.

1
2
3
4
5
6
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://naver.com", 80));

while(!socketChannel.finishConnect()) {
// wait, or do something else...
}

read()

non-blocking 모드일 때, read() 메서드는 데이터를 전혀 읽지 않고 반환 될 수 있습니다. 따라서 반환 된 결과(int)를 갖고 판단해야 합니다. 반환 된 결과(int)는 읽은 바이트 수를 나타냅니다.

멀티플렉싱 기반의 다중 접속 서버

지금까지 살펴본 Channel, Buffer, Selector를 이용해 간단한 echo 서버를 만들어 보았습니다.

EchoServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class EchoServer {

private static final String EXIT = "EXIT";

public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", 3000));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256);

while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {

SelectionKey key = iter.next();

if (key.isAcceptable()) {
register(selector, serverSocket);
}

if (key.isReadable()) {
answerWithEcho(buffer, key);
}
iter.remove();
}
}
}

private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
throws IOException {

SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
if (new String(buffer.array()).trim().equals(EXIT)) {
client.close();
System.out.println("Not accepting client messages anymore");
}

buffer.flip();
client.write(buffer);
buffer.clear();
}

private static void register(Selector selector, ServerSocketChannel serverSocket)
throws IOException {

SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
System.out.println("new client connected...");
}
}

실행 결과

echo_server_with_select

Java NIO vs IO

Stream Oriented vs Buffer Oriented

스트림 지향의 IO는 스트림에서 한 번에 하나 이상의 바이트를 읽는 것을 의미합니다. 읽은 바이트를 이용해 유저가 데이터를 처리해야 하며 읽힌 바이트는 따로 캐시되지 않습니다. 또한 스트림의 데이터는 임의로 유저가 앞뒤로 이동할 수 없습니다. 스트림에서 읽은 데이터를 앞뒤로 이동해야 하는 경우는 먼저 스트림의 데이터를 읽어 버퍼에 캐시해야합니다.

버퍼 지향의 NIO 방식은 조금 다릅니다. 데이터는 나중에 처리되는 임의의 버퍼로 읽어집니다. 필요에 따라 버퍼에서 앞뒤로 이동할 수 있습니다. 이로 인해 버퍼를 이용한 처리 과정에서 좀 더 유연한 사용이 가능합니다. 그러나 버퍼를 이용해 완전히 처리하려면 필요한 모든 데이터가 버퍼에 들어있는지 확인해야합니다. 또한 버퍼에 더 많은 데이터를 읽을 때, 아직 처리하지 않은 버퍼의 데이터를 덮어 쓰지 않도록 주의해야합니다.

Blocking vs Non-blocking IO

자바 IO의 스트림을 이용하면 봉쇄(block)됩니다. 즉, 스레드가 read() 혹은 write()를 호출하면 읽은 데이터가 있거나 데이터가 완전히 쓰여질 때까지 해당 스레드가 차단되어 그 동안 스레드는 아무 것도 할 수 없습니다.

자바 NIO의 Non-blocking 모드는 스레드가 채널에서 데이터 읽기를 요청할 때, 현재 사용할 수 있는 데이터가 없는 경우 사용 가능한 데이터가 준비될 때까지 기다리지 않습니다. 때문에 해당 스레드는 봉쇄되지 않고 계속 진행될 수 있습니다. 쓰기 작업 또한 마찬가지 입니다. 스레드는 일부 데이터를 채널에 쓰도록 요청할 수 있지만 완전히 쓰여지기를 기다리지는 않습니다.

Selectos

자바 NIO의 셀렉터는 하나의 스레드에서 다중 입력 채널을 관리할 수 있습니다. 이 멀티플렉싱 메커니즘을 사용하면 단일 스레드에서 여러 채널의 입출력을 쉽게 관리할 수 있습니다.

참고