Reactive Streams (2)

지난번 Reactive Streams API를 구현한 예제를 바탕으로 간단한 Operator를 만들어보자.

Operator라 함은 Stream의 map연산 처럼 Publisher가 제공하는 data를 가공할 수 있도록 하는 것이다.

먼저 간단한 Publisher와 Subscriber 코드이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Slf4j
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1)
.limit(5)
.collect(Collectors.toList()));
pub.subscribe(logSub());
}

private static Publisher<Integer> iterPub(List<Integer> iter) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
iter.forEach(i -> sub.onNext(i));
sub.onComplete();
}

@Override
public void cancel() {
}
});
}
};
}

private static Subscriber<Integer> logSub() {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Integer i) {
log.info("onNext: {}", i);
}

@Override
public void onError(Throwable t) {
log.info("onError", t);
}

@Override
public void onComplete() {
log.info("onComplete");
}
};
}
}

- 실행결과
23:32:28.255 [main] INFO com.jongmin.reactive.practice.PubSub - onSubscribe
23:32:28.260 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 1
23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 2
23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 3
23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 4
23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 5
23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onComplete

Operator

Publisher -> [Data1] -> Operator -> [Data2] -> Subscriber

위와 같이 Data1을 Data2로 변환하는 Operator를 만들어보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Slf4j
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1)
.limit(5)
.collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
// iterrPub -> [Data1] -> mapPub -> [Data2] -> logSub
mapPub.subscribe(logSub());
}

private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}

@Override
public void onNext(Integer i) {
sub.onNext(f.apply(i));
}

@Override
public void onError(Throwable t) {
sub.onError(t);
}

@Override
public void onComplete() {
sub.onComplete();
}
});
}
};
}

private static Publisher<Integer> iterPub(List<Integer> iter) {
...
...
...
}

private static Subscriber<Integer> logSub() {
...
...
...
}
}

- 실행결과
23:45:19.758 [main] INFO com.jongmin.reactive.practice.PubSub - onSubscribe
23:45:19.764 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 10
23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 20
23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 30
23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 40
23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 50
23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onComplete

mapPub 메서드가 추가되었다. Data를 제공하는 Publisher와 가공에 사용할 Function을 받아 Operator(새로운 Publisher)를 반환한다.

실제 하는 일은 단순하다. 기존 Publisher와 Subscriber를 이어준다.

Operator가 기존 Publisher를 subscribe하고, 받게되는 Subscription을 기존 Subscriber에게 전달한다.

DelegateSub

Operator가 하는 일은 기존 Publisher와 Subscriber를 이어주면서, onNext 부분에서 전달받은 Function을 적용해주는 것 뿐이다.

onNext를 제외하고는 Operator 마다 코드가 반복될 수 있기 때문에 해당 부분을 DelegateSub으로 분리해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class DelegateSub implements Subscriber<Integer> {

Subscriber sub;

public DelegateSub(Subscriber sub) {
this.sub = sub;
}

@Override
public void onSubscribe(Subscription s) {
sub.onSubscribe(s);
}

@Override
public void onNext(Integer i) {
sub.onNext(i);
}

@Override
public void onError(Throwable t) {
sub.onError(t);
}

@Override
public void onComplete() {
sub.onComplete();
}
}

DelegateSub을 사용해서 기존 코드를 다음과 같이 수정할 수 있다. 필요한 onNext 메서드만 오버라이딩해서 사용한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class PubSub2 {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1)
.limit(5)
.collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
mapPub.subscribe(logSub());
}

private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub) {
@Override
public void onNext(Integer i) {
sub.onNext(f.apply(i));
}
});
}
};
}

private static Publisher<Integer> iterPub(List<Integer> iter) {
...
...
...
}

private static Subscriber<Integer> logSub() {
...
...
...
}
}

SumPub

이번에는 Publisher로부터 전달받은 Data를 전부 더하는 sum operation을 만들어보자.

기존 Publisher와 Subscriber를 onNext로 이어주지 않고, onComplete이 호출되었을 때, sum 값을 onNext로 전달한 뒤 onComplete을 호출해 종료한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Slf4j
public class PubSub2 {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1)
.limit(5)
.collect(Collectors.toList()));
Publisher<Integer> sumPub = sumPub(pub);
sumPub.subscribe(logSub());
}

private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DelegateSub(sub) {
int sum = 0;

@Override
public void onNext(Integer i) {
sum += i;
}

@Override
public void onComplete() {
sub.onNext(sum);
sub.onComplete();
}
});
}
};
}

private static Publisher<Integer> iterPub(List<Integer> iter) {
...
...
...
}

private static Subscriber<Integer> logSub() {
...
...
...
}
}

- 실행결과
00:30:48.643 [main] INFO com.jongmin.reactive.practice.PubSub2 - onSubscribe
00:30:48.648 [main] INFO com.jongmin.reactive.practice.PubSub2 - onNext: 15
00:30:48.650 [main] INFO com.jongmin.reactive.practice.PubSub2 - onComplete
Author

KimJongMin

Posted on

2019-11-07

Updated on

2021-03-22

Licensed under

댓글