Reactive Streams (3)

Reactive Streams란 non-blocking과 back pressure를 이용한 asynchronous 스트림 처리의 표준이다.” 라고 지난 글에서 이야기 했다.

이번에는 asynchronous(비동기 처리)에 대해서 이야기해보자.

먼저 아래 간단한 코드를 실행해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
Publisher<Integer> pub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}

@Override
public void cancel() {

}
});
}
};

Subscriber<Integer> sub = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Integer integer) {
log.info("onNext: {}", integer);
}

@Override
public void onError(Throwable t) {
log.info("onError", t);
}

@Override
public void onComplete() {
log.info("onComplete");
}
};

pub.subscribe(sub);

log.info("Exit)";
}
}

- 실행결과
23:27:45.475 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onSubscribe
23:27:45.480 [main] INFO com.jongmin.reactive.practice.SchedulerEx - request: 9223372036854775807
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 1
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 2
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 3
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 4
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 5
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - onComplete
23:27:45.484 [main] INFO com.jongmin.reactive.practice.SchedulerEx - exit

subscriber를 publisher에 등록(subscribe)하고 처리(onSubscribe -> request -> next)가 끝나면 exit 로그를 마지막으로 종료된다. 이때 subscriber와 publisher의 진행은 모두 main 스레드에서 진행된다.
즉, subscriber를 등록 후 publisher가 데이터를 push하고 처리할 때까지 main 스레드를 붙잡고 있게 된다.
만약 publisher 혹은 subscriber의 처리가 지연된다면 main 스레드는 더욱 오래 사용해야 할 것이다.

결국 publisher에 subscriber를 등록하면 별도의 스레드에서 진행하고 main 스레드는 계속해서 다른 작업을 진행하기를 원하는 것이다.

publishOn

먼저 publisher가 main 스레드가 아닌 별도의 스레드에서 동작하도록 만들어보자.

1
2
3
4
5
6
7
8
9
10
private static Publisher<Integer> publishOn(Publisher<Integer> pub) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
Executors.newSingleThreadExecutor().execute(() -> {
pub.subscribe(sub);
});
}
};
}

파라미터로 전달받은 publisher의 subscribe 메서드를 별도의 스레드에서 실행하도록 하는 publisher를 새로 만들어서 반환한다. publishOn 메서드를 기존의 publisher에 적용해 실행하면 다음과 같이 실행결과를 확인할 수 있다.

1
2
3
4
5
6
7
8
9
10
- 실행결과
23:55:57.820 [main] INFO com.jongmin.reactive.practice.SchedulerEx - exit
23:55:57.820 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onSubscribe
23:55:57.824 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - request: 9223372036854775807
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 1
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 2
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 3
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 4
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 5
23:55:57.827 [pool-1-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onComplete

main 스레드는 바로 해제되어 이후의 일을 진행할 수 있게 되었고 onSubscribe 이후의 처리는 모두 별도의 스레드에서 진행된다.

그러나 아직 “빠른 프로듀서”와 “느린 컨슈머”의 문제가 남아있다. publisher가 데이터를 빠르게 생산하지만 subscriber의 onNext에서 데이터를 소비하는 작업에 시간이 오래 걸리는 경우인 것이다.
이때는 subscriber 또한 별도의 스레드에서 onNext 처리를 하도록 함으로써 해결할 수 있다.

subscribeOn

subscriber도 별도의 스레드에서 동작하도록 만들어보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static Subscriber<Integer> subscriberOn(Subscriber<Integer> sub) {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Executors.newSingleThreadExecutor().execute(() -> sub.onSubscribe(s));
}

@Override
public void onNext(Integer i) {
Executors.newSingleThreadExecutor().execute(() -> sub.onNext(i));
}

@Override
public void onError(Throwable t) {
Executors.newSingleThreadExecutor().execute(() -> sub.onError(t));
}

@Override
public void onComplete() {
Executors.newSingleThreadExecutor().execute(() -> sub.onComplete());
}
};
}

파라미터로 전달받은 subscriber의 onSubscribe, onNext, onError, onComplete 메서드를 별도의 스레드에서 실행하도록 하는 subscriber를 새로 만들어서 반환한다. subscriberOn 메서드를 기존의 subscriber에 적용해 실행하면 다음과 같이 실행결과를 확인할 수 있다.

1
2
3
4
5
6
7
8
9
10
- 실행결과
00:14:53.604 [main] INFO com.jongmin.reactive.practice.SchedulerEx - exit
00:14:53.604 [pool-2-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onSubscribe
00:14:53.607 [pool-2-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - request: 9223372036854775807
00:14:53.609 [pool-3-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 1
00:14:53.609 [pool-4-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 2
00:14:53.609 [pool-5-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 3
00:14:53.609 [pool-6-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 4
00:14:53.610 [pool-7-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onNext: 5
00:14:53.610 [pool-8-thread-1] INFO com.jongmin.reactive.practice.SchedulerEx - onComplete

이제는 main 스레드는 subscriber를 publisher에 등록(subscribe)까지만 하고 그 이후의 작업은 publisher와 subscriber 모두 별도의 스레드에서 동작하게 되었다.

전체 코드는 다음과 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@Slf4j
public class SchedulerEx {
public static void main(String[] args) {
Publisher<Integer> pub = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.info("request: {}", n);
sub.onNext(1);
sub.onNext(2);
sub.onNext(3);
sub.onNext(4);
sub.onNext(5);
sub.onComplete();
}

@Override
public void cancel() {

}
});
}
};

Subscriber<Integer> sub = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Integer integer) {
log.info("onNext: {}", integer);
}

@Override
public void onError(Throwable t) {
log.info("onError", t);
}

@Override
public void onComplete() {
log.info("onComplete");
}
};

publishOn(pub).subscribe(subscriberOn(sub));

log.info("exit");
}

private static Publisher<Integer> publishOn(Publisher<Integer> pub) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
Executors.newSingleThreadExecutor().execute(() -> {
pub.subscribe(sub);
});
}
};
}

private static Subscriber<Integer> subscriberOn(Subscriber<Integer> sub) {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Executors.newSingleThreadExecutor().execute(() -> sub.onSubscribe(s));
}

@Override
public void onNext(Integer i) {
Executors.newSingleThreadExecutor().execute(() -> sub.onNext(i));
}

@Override
public void onError(Throwable t) {
Executors.newSingleThreadExecutor().execute(() -> sub.onError(t));
}

@Override
public void onComplete() {
Executors.newSingleThreadExecutor().execute(() -> sub.onComplete());
}
};
}
}
Author

KimJongMin

Posted on

2019-11-10

Updated on

2021-03-22

Licensed under

댓글