Reactive Streams (3)

Reactive Streams란 non-blocking과 back pressure를 이용한 asynchronous 스트림 처리의 표준이다.” 라고 지난 글에서 이야기 했다.

이번에는 asynchronous(비동기 처리)에 대해서 이야기해보자.

먼저 아래 간단한 코드를 실행해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
Publisher<Integer> pub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}

@Override
public void cancel() {

}
});
}
};

Subscriber<Integer> sub = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Integer integer) {
log.info("onNext: {}", integer);
}

@Override
public void onError(Throwable t) {
log.info("onError", t);
}

@Override
public void onComplete() {
log.info("onComplete");
}
};

pub.subscribe(sub);

log.info("Exit)";
}
}

- 실행결과
23:27:45.475 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onSubscribe
23:27:45.480 [main] INFO com.jongmin.reactive.practice.SchedulerEx - request: 9223372036854775807
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 1
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 2
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 3
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 4
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 5
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onComplete
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - exit

subscriber를 publisher에 등록(subscribe)하고 처리(onSubscribe -> request -> next)가 끝나면 exit 로그를 마지막으로 종료된다. 이때 subscriber와 publisher의 진행은 모두 main 스레드에서 진행된다.
즉, subscriber를 등록 후 publisher가 데이터를 push하고 처리할 때까지 main 스레드를 붙잡고 있게 된다.
만약 publisher 혹은 subscriber의 처리가 지연된다면 main 스레드는 더욱 오래 사용해야 할 것이다.

결국 publisher에 subscriber를 등록하면 별도의 스레드에서 진행하고 main 스레드는 계속해서 다른 작업을 진행하기를 원하는 것이다.

publishOn

먼저 publisher가 main 스레드가 아닌 별도의 스레드에서 동작하도록 만들어보자.

1
2
3
4
5
6
7
8
9
10
private static Publisher<Integer> publishOn(Publisher<Integer> pub) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
Executors.newSingleThreadExecutor().execute(() -> {
pub.subscribe(sub);
});
}
};
}

파라미터로 전달받은 publisher의 subscribe 메서드를 별도의 스레드에서 실행하도록 하는 publisher를 새로 만들어서 반환한다. publishOn 메서드를 기존의 publisher에 적용해 실행하면 다음과 같이 실행결과를 확인할 수 있다.

1
2
3
4
5
6
7
8
9
10
- 실행결과
23:55:57.820 [main] INFO com.jongmin.reactive.practice.SchedulerEx - exit
23:55:57.820 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onSubscribe
23:55:57.824 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - request: 9223372036854775807
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 1
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 2
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 3
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 4
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 5
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onComplete

main 스레드는 바로 해제되어 이후의 일을 진행할 수 있게 되었고 onSubscribe 이후의 처리는 모두 별도의 스레드에서 진행된다.

그러나 아직 “빠른 프로듀서”와 “느린 컨슈머”의 문제가 남아있다. publisher가 데이터를 빠르게 생산하지만 subscriber의 onNext에서 데이터를 소비하는 작업에 시간이 오래 걸리는 경우인 것이다.
이때는 subscriber 또한 별도의 스레드에서 onNext 처리를 하도록 함으로써 해결할 수 있다.

subscribeOn

subscriber도 별도의 스레드에서 동작하도록 만들어보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static Subscriber<Integer> subscriberOn(Subscriber<Integer> sub) {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Executors.newSingleThreadExecutor().execute(() -> sub.onSubscribe(s));
}

@Override
public void onNext(Integer i) {
Executors.newSingleThreadExecutor().execute(() -> sub.onNext(i));
}

@Override
public void onError(Throwable t) {
Executors.newSingleThreadExecutor().execute(() -> sub.onError(t));
}

@Override
public void onComplete() {
Executors.newSingleThreadExecutor().execute(() -> sub.onComplete());
}
};
}

파라미터로 전달받은 subscriber의 onSubscribe, onNext, onError, onComplete 메서드를 별도의 스레드에서 실행하도록 하는 subscriber를 새로 만들어서 반환한다. subscriberOn 메서드를 기존의 subscriber에 적용해 실행하면 다음과 같이 실행결과를 확인할 수 있다.

1
2
3
4
5
6
7
8
9
10
- 실행결과
00:14:53.604 [main] INFO com.jongmin.reactive.practice.SchedulerEx - exit
00:14:53.604 [pool-2-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onSubscribe
00:14:53.607 [pool-2-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - request: 9223372036854775807
00:14:53.609 [pool-3-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 1
00:14:53.609 [pool-4-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 2
00:14:53.609 [pool-5-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 3
00:14:53.609 [pool-6-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 4
00:14:53.610 [pool-7-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 5
00:14:53.610 [pool-8-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onComplete

이제는 main 스레드는 subscriber를 publisher에 등록(subscribe)까지만 하고 그 이후의 작업은 publisher와 subscriber 모두 별도의 스레드에서 동작하게 되었다.

전체 코드는 다음과 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
Publisher<Integer> pub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.info("request: {}", n);
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}

@Override
public void cancel() {

}
});
}
};

Subscriber<Integer> sub = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Integer integer) {
log.info("onNext: {}", integer);
}

@Override
public void onError(Throwable t) {
log.info("onError", t);
}

@Override
public void onComplete() {
log.info("onComplete");
}
};

publishOn(pub).subscribe(subscriberOn(sub));

log.info("exit");
}

private static Publisher<Integer> publishOn(Publisher<Integer> pub) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
Executors.newSingleThreadExecutor().execute(() -> {
pub.subscribe(sub);
});
}
};
}

private static Subscriber<Integer> subscriberOn(Subscriber<Integer> sub) {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Executors.newSingleThreadExecutor().execute(() -> sub.onSubscribe(s));
}

@Override
public void onNext(Integer i) {
Executors.newSingleThreadExecutor().execute(() -> sub.onNext(i));
}

@Override
public void onError(Throwable t) {
Executors.newSingleThreadExecutor().execute(() -> sub.onError(t));
}

@Override
public void onComplete() {
Executors.newSingleThreadExecutor().execute(() -> sub.onComplete());
}
};
}
}

Reactive Streams (2)

지난번 Reactive Streams API를 구현한 예제를 바탕으로 간단한 Operator를 만들어보자.

Operator라 함은 Stream의 map연산 처럼 Publisher가 제공하는 data를 가공할 수 있도록 하는 것이다.

먼저 간단한 Publisher와 Subscriber 코드이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Slf4j
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1)
.limit(5)
.collect(Collectors.toList()));
pub.subscribe(logSub());
}

private static Publisher<Integer> iterPub(List<Integer> iter) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
iter.forEach(i -> sub.onNext(i));
sub.onComplete();
}

@Override
public void cancel() {
}
});
}
};
}

private static Subscriber<Integer> logSub() {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}

@Override
public void onError(Throwable t) {
log.info("onError", t);
}

@Override
public void onComplete() {
log.info("onComplete");
}
};
}
}

- 실행결과
23:32:28.255 [main] INFO com.jongmin.reactive.practice.PubSub - onSubscribe
23:32:28.260 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 1
23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 2
23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 3
23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 4
23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 5
23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onComplete

Operator

Publisher -> [Data1] -> Operator -> [Data2] -> Subscriber

위와 같이 Data1을 Data2로 변환하는 Operator를 만들어보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Slf4j
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1)
.limit(5)
.collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
// iterrPub -> [Data1] -> mapPub -> [Data2] -> logSub
mapPub.subscribe(logSub());
}

private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}

@Override
public void onNext(Integer i) {
sub.onNext(f.apply(i));
}

@Override
public void onError(Throwable t) {
sub.onError(t);
}

@Override
public void onComplete() {
sub.onComplete();
}
});
}
};
}

private static Publisher<Integer> iterPub(List<Integer> iter) {
...
...
...
}

private static Subscriber<Integer> logSub() {
...
...
...
}
}

- 실행결과
23:45:19.758 [main] INFO com.jongmin.reactive.practice.PubSub - onSubscribe
23:45:19.764 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 10
23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 20
23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 30
23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 40
23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 50
23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onComplete

mapPub 메서드가 추가되었다. Data를 제공하는 Publisher와 가공에 사용할 Function을 받아 Operator(새로운 Publisher)를 반환한다.

실제 하는 일은 단순하다. 기존 Publisher와 Subscriber를 이어준다.

Operator가 기존 Publisher를 subscribe하고, 받게되는 Subscription을 기존 Subscriber에게 전달한다.

DelegateSub

Operator가 하는 일은 기존 Publisher와 Subscriber를 이어주면서, onNext 부분에서 전달받은 Function을 적용해주는 것 뿐이다.

onNext를 제외하고는 Operator 마다 코드가 반복될 수 있기 때문에 해당 부분을 DelegateSub으로 분리해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class DelegateSub implements Subscriber<Integer> {

Subscriber sub;

public DelegateSub(Subscriber sub) {
this.sub = sub;
}

@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}

@Override
public void onNext(Integer i) {
sub.onNext(i);
}

@Override
public void onError(Throwable t) {
sub.onError(t);
}

@Override
public void onComplete() {
sub.onComplete();
}
}

DelegateSub을 사용해서 기존 코드를 다음과 같이 수정할 수 있다. 필요한 onNext 메서드만 오버라이딩해서 사용한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class PubSub2 {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1)
.limit(5)
.collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
mapPub.subscribe(logSub());
}

private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub) {
@Override
public void onNext(Integer i) {
sub.onNext(f.apply(i));
}
});
}
};
}

private static Publisher<Integer> iterPub(List<Integer> iter) {
...
...
...
}

private static Subscriber<Integer> logSub() {
...
...
...
}
}

SumPub

이번에는 Publisher로부터 전달받은 Data를 전부 더하는 sum operation을 만들어보자.

기존 Publisher와 Subscriber를 onNext로 이어주지 않고, onComplete이 호출되었을 때, sum 값을 onNext로 전달한 뒤 onComplete을 호출해 종료한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Slf4j
public class PubSub2 {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1)
.limit(5)
.collect(Collectors.toList()));
Publisher<Integer> sumPub = sumPub(pub);
sumPub.subscribe(logSub());
}

private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub) {
int sum = 0;

@Override
public void onNext(Integer i) {
sum += i;
}

@Override
public void onComplete() {
sub.onNext(sum);
sub.onComplete();
}
});
}
};
}

private static Publisher<Integer> iterPub(List<Integer> iter) {
...
...
...
}

private static Subscriber<Integer> logSub() {
...
...
...
}
}

- 실행결과
00:30:48.643 [main] INFO com.jongmin.reactive.practice.PubSub2 - onSubscribe
00:30:48.648 [main] INFO com.jongmin.reactive.practice.PubSub2 - onNext: 15
00:30:48.650 [main] INFO com.jongmin.reactive.practice.PubSub2 - onComplete

Reactive Streams (1)

Reactive Streams 란?

reactive-streams.org 에서는 Reactive Streams를 다음과 같이 정의하고 있다.

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.

Reactive Streams란 non-blocking과 back pressure를 이용한 asynchronous 스트림 처리의 표준이다.

중요한 키워드가 여러개 등장 했는데, 먼저 back pressure에 대해서 알아보자.

Back Pressure

Back Pressure는 Reactive Streams에서 가장 중요한 요소라고 할 수 있다. Back Pressure가 등장하게 된 배경을 이해하기 위해서 먼저 옵저버 패턴을 이해하고 옵저버 패턴이 갖고 있는 문제점을 인식할 수 있어야한다.

Observable & Observer

옵저버 패턴(observer pattern) 은 객체의 상태 변화를 관찰하는 관찰자들, 즉 옵저버들의 목록을 객체에 등록하여 상태 변화가 있을 때마다 메서드 등을 통해 객체가 직접 목록의 각 옵저버에게 통지하도록 하는 디자인 패턴이다. 주로 분산 이벤트 핸들링 시스템을 구현하는 데 사용된다. 발행/구독 모델로 알려져 있기도 하다.

옵저버 패턴 - 위키백과

예를 들면, 안드로이드에서 Button이 클릭되었을 때 실행할 함수를 onclicklistener에 추가하는데 이와 같이 이벤트 핸들링 처리를 위해 사용되는 패턴이다. 이 패턴에는 Observable과 Observer가 등장한다.

  • Osbservable: 등록된 Observer들을 관리하며, 새로운 데이터(이벤트)가 들어오면 등록된 Observer에게 데이터를 전달한다. 데이터를 생성해서 전달하기 때문에 Publisher(발행)라고 부른다.
  • Observer: Observable로 부터 데이터(이벤트)를 받을 수 있다. 데이터를 전달 받기 때문에 Subscriber(구독)라고 부른다.

Java는 이미 JDK 1.0 부터 옵저버 패턴을 쉽게 구현할 수 있는 인터페이스를 제공하고 있다. 아래의 코드는 JDK 1.0에 포함된 Observable과 Observer 인터페이스를 사용해 만든 간단한 예시 코드이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
public class Ob {
// Source -> Event/Data -> Observer
static class IntObservable extends Observable implements Runnable {

@Override
public void run() {
for (int i = 1; i <= 10; i++) {
setChanged();
notifyObservers(i); // push
}
}
}

public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
log.info("{}", arg);
}
};

IntObservable io = new IntObservable();
io.addObserver(ob);

ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(io);

log.info("EXIT");
es.shutdown();
}
}

- 실행결과
01:47:30.715 [main] INFO com.jongmin.reactive.practice.Ob - EXIT
01:47:30.715 [pool-1-thread-1] INFO com.jongmin.reactive.practice.Ob - 1
01:47:30.719 [pool-1-thread-1] INFO com.jongmin.reactive.practice.Ob - 2
01:47:30.719 [pool-1-thread-1] INFO com.jongmin.reactive.practice.Ob - 3
01:47:30.719 [pool-1-thread-1] INFO com.jongmin.reactive.practice.Ob - 4
01:47:30.719 [pool-1-thread-1] INFO com.jongmin.reactive.practice.Ob - 5
01:47:30.719 [pool-1-thread-1] INFO com.jongmin.reactive.practice.Ob - 6
01:47:30.719 [pool-1-thread-1] INFO com.jongmin.reactive.practice.Ob - 7
01:47:30.719 [pool-1-thread-1] INFO com.jongmin.reactive.practice.Ob - 8
01:47:30.719 [pool-1-thread-1] INFO com.jongmin.reactive.practice.Ob - 9
01:47:30.719 [pool-1-thread-1] INFO com.jongmin.reactive.practice.Ob - 10

문제점

옵저버 패턴에서는 Publisher(Observable)이 Subscriber(Observer)에게 데이터(이벤트)를 Push(notifyObservers)하는 방식으로 전달한다. 이때, Publisher는 Subscriber의 상태에 상관없이 데이터를 전달하는데만 집중한다.

만약, Subscriber는 1초에 10개의 데이터를 처리할 수 있는데 Publisher가 1초에 20개의 데이터를 전달(Push)한다면 어떤 문제가 발생할까? 다음과 같은 문제가 발생할 수 있다.

  • Subscriber에 별도의 queue(버퍼)를 두고 처리하지 않고 대기중인 데이터를 저장할 수 있다.
  • 하지만, queue의 사용 가능한 공간도 전부 금방 소모될 것이다.
  • queue의 크기를 넘어가게 되면 데이터는 소실될 것이다.
  • queue의 크기를 너무 크게 생성하면 OOM(Out Of Memory) 문제가 발생할 수 있다.

해결 방법

Observable과 Observer의 문제를 어떻게 해결할 수 있을까? Publisher가 Subscriber에게 데이터를 Push 하던 기존의 방식을 Subscriber가 Publisher에게 자신이 처리할 수 있는 만큼의 데이터를 Request하는 방식으로 해결할 수 있다. 필요한(처리할 수 있는) 만큼만 요청해서 Pull하는 것이다. 데이터 요청의 크기가 Subscriber에 의해서 결정되는 것이다. 이를 dynamic pull 방식이라 부르며, Back Pressure의 기본 원리이다.

Reactive Streams API

Reactive Streams는 표준화된 API이다. 2013년 netflix, pivotal, lightbend의 엔지니어들에 의해서 처음 시작되어, 2015 4월에 JVM에 대한 1.0.0 스펙이 릴리즈 되었다.
Java 9부터는 reactive streams이 java.util.concurrent의 패키지 아래 Flow라는 형태로 JDK에 포함되었다. 기존에 reactive streams가 가진 API와 스펙, pull방식을 사용하는 원칙을 그대로 수용하였다.

아래는 Reactive Streams API이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}

public interface Subscription {
public void request(long n);
public void cancel();
}

public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}

실제로 보면 굉장히 간단한 API들의 조합으로 이루어져 있다.

  • Publisher: Subscriber를 받아들이는 subscribe 메서드 하나만 갖는다.
  • Subscriber: 데이터를 받아 처리할 수 있는 onNext, 에러를 처리하는 onError, 모든 데이터를 받아 완료되었을 때는 onComplete, 그리고 Publisher로부터 Subscription을 전달 받는 onSubscribe 메서드로 이루어진다.
  • Subscription: n개의 데이터를 요청하는 request와 구독을 취소하는 cancel을 갖는다.

전체적인 흐름은 다음과 같다.
reactive streams

  1. Subscriber가 Publisher에게 구독을 요청한다.
  2. Publisher는 Subscriber의 onSubscribe 메서드를 통해 Subscription을 전달한다.
  3. Subscriber는 Publisher에게 직접 데이터를 요청하지 않고 Subscription을 통해 요청한다.
  4. Publisher는 Subscription을 통해 onNext에 데이터를 전달하고 완료되면 onComplete, 에러가 발생하면 onError에 전달한다.

Example

마지막으로 Reactive Streams API를 간단하게 구현해 테스트 해보자.

Reactive Streams API의 Interface는 간단해 보이지만 이를 구현한 구현체는 Reactive Streams Specification을 만족해야만 한다. 구현체가 Specification을 만족하는지는 Reactive Streams TCK(Technology Compatibility Kit)라는 도구를 이용해 검증할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Slf4j
public class PubSub {
public static void main(String[] args) {
Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5);

Publisher p = new Publisher() {
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = iter.iterator();

subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
while(n-- > 0) {
if (it.hasNext()) {
subscriber.onNext(it.next());
} else {
subscriber.onComplete();
break;
}
}
}

@Override
public void cancel() {
log.info("cancel");
}
});
}
};

Subscriber<Integer> s = new Subscriber<Integer>() {
Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
log.info("onSubscribe");
this.subscription = subscription;
this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
log.info("onNext: {}", item);
this.subscription.request(1);
}

@Override
public void onError(Throwable t) {
log.info("onError");
}

@Override
public void onComplete() {
log.info("onComplete");
}
};

p.subscribe(s);
}
}

- 실행결과
00:19:08.655 [main] INFO com.jongmin.reactive.practice.PubSub - onSubscribe
00:19:08.660 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 1
00:19:08.662 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 2
00:19:08.663 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 3
00:19:08.663 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 4
00:19:08.663 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 5
00:19:08.663 [main] INFO com.jongmin.reactive.practice.PubSub - onComplete