토비의 봄 TV 5회 스프링 리액티브 프로그래밍(https://www.youtube.com/watch?v=8fenTR3KOJo) 에 나왔던 소스 코드를 정리해봅니다.
숫자 1 부터 9 까지 출력하도록 Iterable 인터페이스를 구현하는 방법을 사용한 예제.
public class Ob {
// Iterable<Integer> iter = new Iterable<Integer>() {
//
// @Override
// public Iterator<Integer> iterator() {
// return null;
// }
// };
public static void main(String[] args) {
Iterable<Integer> iter = () -> new Iterator<Integer>() {
int i = 0;
final static int MAX = 10;
@Override
public Integer next() {
return i++;
}
@Override
public boolean hasNext() {
return i < MAX;
}
};
for (Integer i : iter) { // for-each
System.out.println(i);
}
// Old style : for
for (Iterator<Integer> it = iter.iterator(); it.hasNext();) {
System.out.println(it);
}
iter.forEach((i) -> System.out.println(i)); // Java9 forEach
}
}
옵저버를 이용해서 숫자 0 부터 9까지 출력하는 예제. 위의 iterator 를 이용한 방법과 결과는 동일하지만 서로 다르게 작업했다.(duality 개념) Iterable 은 next() 를 통해 데이터를 가져오는(pull) 방식이고, observable 은 notifyObservs() 를 통해 데이터를 주는(push) 방식의 차이가 있다.
public class Ob2 {
// Iterable <-----> Observable : duality
// Pull Push
// DATA method() <-> method(DATA)
// Source -> Event/Data -> Observer
// observer 를 observable(source) 에 등록
// Observable ob;
static class IntObservable extends Observable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
setChanged();
notifyObservers(i);
}
}
}
public static void main(String[] args) {
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(Thread.currentThread().getName() + " " + arg);
}
};
IntObservable io = new IntObservable();
io.addObserver(ob);
// io.run();
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(io);
System.out.println(Thread.currentThread().getName() + " EXIT");
es.shutdown();
}
}
reactive 를 적용한 예제
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class PubSub {
public static void main(String[] args) throws InterruptedException {
// PUblisher <- Observable
// Subscriber <- Observer
Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
ExecutorService es = Executors.newSingleThreadExecutor();
Publisher<Integer> p = new Publisher<Integer>() {
Iterator<Integer> it = itr.iterator();
@Override
public void subscribe(Subscriber<? super Integer> s) {
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
es.execute(() -> {
try {
int i = 0;
while (i++ < n) {
if (it.hasNext())
s.onNext(it.next());
else {
s.onComplete();
break;
}
}
} catch (RuntimeException e) {
s.onError(e);
}
});
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> s = new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
System.out.println(Thread.currentThread().getName() + " onSubscribe");
// s.request(Long.MAX_VALUE);
// s.request(10);
this.subscription = s;
this.subscription.request(1);
}
@Override
public void onNext(Integer t) { // 옵저버 패턴의 update() 와 비슷
System.out.println(Thread.currentThread().getName() + " onNext " + t);
this.subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.out.println("onError : " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " onComplete");
}
};
p.subscribe(s); // subscriber 가 publisher 에게 subscribing 을 해야한다(구독해야 한다.)
// publisher 가 subscribe() 를 제공하고, subscriber 가 호출한다.
es.awaitTermination(10, TimeUnit.HOURS);
es.shutdown();
}
※ 위 예제는 Java 8 기준이라 rxjava-reactive-streams : 1.0.1 디펜던시 필요함.
'Thinking > Study' 카테고리의 다른 글
[study] python 기본 문법 (0) | 2020.10.21 |
---|---|
[내용 정리] 토비의 봄 TV 1회 Double dispatch example (0) | 2017.11.17 |
JAVA 8 에서 추가된 forEach 문 사용 예제 (0) | 2017.11.15 |
Apache ZooKeeper (0) | 2017.03.22 |
패턴 정리 (0) | 2015.08.24 |