지난번 Reactive Streams API를 구현한 예제를 바탕으로 간단한 Operator를 만들어보자.
Operator라 함은 Stream의 map연산 처럼 Publisher가 제공하는 data를 가공할 수 있도록 하는 것이다.
먼저 간단한 Publisher와 Subscriber 코드이다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| @Slf4j public class PubSub { public static void main(String[] args) { Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1) .limit(5) .collect(Collectors.toList())); pub.subscribe(logSub()); } private static Publisher<Integer> iterPub(List<Integer> iter) { return new Publisher<Integer>() { @Override public void subscribe(Subscriber sub) { sub.onSubscribe(new Subscription() { @Override public void request(long n) { iter.forEach(i -> sub.onNext(i)); sub.onComplete(); }
@Override public void cancel() { } }); } }; }
private static Subscriber<Integer> logSub() { return new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { log.info("onSubscribe"); s.request(Long.MAX_VALUE); }
@Override public void onNext(Integer i) { log.info("onNext: {}", i); }
@Override public void onError(Throwable t) { log.info("onError", t); }
@Override public void onComplete() { log.info("onComplete"); } }; } }
- 실행결과 23:32:28.255 [main] INFO com.jongmin.reactive.practice.PubSub - onSubscribe 23:32:28.260 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 1 23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 2 23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 3 23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 4 23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 5 23:32:28.262 [main] INFO com.jongmin.reactive.practice.PubSub - onComplete
|
Operator
Publisher -> [Data1] -> Operator
-> [Data2] -> Subscriber
위와 같이 Data1을 Data2로 변환하는 Operator를 만들어보자.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| @Slf4j public class PubSub { public static void main(String[] args) { Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1) .limit(5) .collect(Collectors.toList())); Publisher<Integer> mapPub = mapPub(pub, s -> s * 10); mapPub.subscribe(logSub()); }
private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) { return new Publisher<Integer>() { @Override public void subscribe(Subscriber<? super Integer> sub) { pub.subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { sub.onSubscribe(s); }
@Override public void onNext(Integer i) { sub.onNext(f.apply(i)); }
@Override public void onError(Throwable t) { sub.onError(t); }
@Override public void onComplete() { sub.onComplete(); } }); } }; } private static Publisher<Integer> iterPub(List<Integer> iter) { ... ... ... }
private static Subscriber<Integer> logSub() { ... ... ... } }
- 실행결과 23:45:19.758 [main] INFO com.jongmin.reactive.practice.PubSub - onSubscribe 23:45:19.764 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 10 23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 20 23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 30 23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 40 23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onNext: 50 23:45:19.767 [main] INFO com.jongmin.reactive.practice.PubSub - onComplete
|
mapPub 메서드가 추가되었다. Data를 제공하는 Publisher와 가공에 사용할 Function을 받아 Operator(새로운 Publisher)를 반환한다.
실제 하는 일은 단순하다. 기존 Publisher와 Subscriber를 이어준다.
Operator가 기존 Publisher를 subscribe하고, 받게되는 Subscription을 기존 Subscriber에게 전달한다.
DelegateSub
Operator가 하는 일은 기존 Publisher와 Subscriber를 이어주면서, onNext 부분에서 전달받은 Function을 적용해주는 것 뿐이다.
onNext를 제외하고는 Operator 마다 코드가 반복될 수 있기 때문에 해당 부분을 DelegateSub으로 분리해보자.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class DelegateSub implements Subscriber<Integer> {
Subscriber sub;
public DelegateSub(Subscriber sub) { this.sub = sub; }
@Override public void onSubscribe(Subscription s) { sub.onSubscribe(s); }
@Override public void onNext(Integer i) { sub.onNext(i); }
@Override public void onError(Throwable t) { sub.onError(t); }
@Override public void onComplete() { sub.onComplete(); } }
|
DelegateSub을 사용해서 기존 코드를 다음과 같이 수정할 수 있다. 필요한 onNext 메서드만 오버라이딩해서 사용한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Slf4j public class PubSub2 { public static void main(String[] args) { Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1) .limit(5) .collect(Collectors.toList())); Publisher<Integer> mapPub = mapPub(pub, s -> s * 10); mapPub.subscribe(logSub()); }
private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) { return new Publisher<Integer>() { @Override public void subscribe(Subscriber<? super Integer> sub) { pub.subscribe(new DelegateSub(sub) { @Override public void onNext(Integer i) { sub.onNext(f.apply(i)); } }); } }; } private static Publisher<Integer> iterPub(List<Integer> iter) { ... ... ... }
private static Subscriber<Integer> logSub() { ... ... ... } }
|
SumPub
이번에는 Publisher로부터 전달받은 Data를 전부 더하는 sum operation을 만들어보자.
기존 Publisher와 Subscriber를 onNext로 이어주지 않고, onComplete이 호출되었을 때, sum 값을 onNext로 전달한 뒤 onComplete을 호출해 종료한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Slf4j public class PubSub2 { public static void main(String[] args) { Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1) .limit(5) .collect(Collectors.toList())); Publisher<Integer> sumPub = sumPub(pub); sumPub.subscribe(logSub()); }
private static Publisher<Integer> sumPub(Publisher<Integer> pub) { return new Publisher<Integer>() { @Override public void subscribe(Subscriber<? super Integer> sub) { pub.subscribe(new DelegateSub(sub) { int sum = 0;
@Override public void onNext(Integer i) { sum += i; }
@Override public void onComplete() { sub.onNext(sum); sub.onComplete(); } }); } }; } private static Publisher<Integer> iterPub(List<Integer> iter) { ... ... ... }
private static Subscriber<Integer> logSub() { ... ... ... } }
- 실행결과 00:30:48.643 [main] INFO com.jongmin.reactive.practice.PubSub2 - onSubscribe 00:30:48.648 [main] INFO com.jongmin.reactive.practice.PubSub2 - onNext: 15 00:30:48.650 [main] INFO com.jongmin.reactive.practice.PubSub2 - onComplete
|