본문 바로가기

Thinking/Study

[내용 정리] 토비의 봄 TV 5회

728x90

토비의 봄 TV 5회 스프링 리액티브 프로그래밍(https://www.youtube.com/watch?v=8fenTR3KOJo) 에 나왔던 소스 코드를 정리해봅니다.


숫자 1 부터 9 까지 출력하도록 Iterable 인터페이스를 구현하는 방법을 사용한 예제.

public class Ob {

// Iterable<Integer> iter = new Iterable<Integer>() {

//

// @Override

// public Iterator<Integer> iterator() {

// return null;

// }

// };


public static void main(String[] args) {

Iterable<Integer> iter = () -> new Iterator<Integer>() {

int i = 0;

final static int MAX = 10;


@Override

public Integer next() {

return i++;

}


@Override

public boolean hasNext() {

return i < MAX;

}

};


for (Integer i : iter) { // for-each

System.out.println(i);

}


// Old style : for

for (Iterator<Integer> it = iter.iterator(); it.hasNext();) {

System.out.println(it);

}


iter.forEach((i) -> System.out.println(i)); // Java9 forEach

}

}


옵저버를 이용해서 숫자 0 부터 9까지 출력하는 예제. 위의 iterator 를 이용한 방법과 결과는 동일하지만 서로 다르게 작업했다.(duality 개념) Iterable 은 next() 를 통해 데이터를 가져오는(pull) 방식이고, observable 은 notifyObservs() 를 통해 데이터를 주는(push) 방식의 차이가 있다.

public class Ob2 {


// Iterable <-----> Observable : duality

// Pull                     Push

// DATA method() <-> method(DATA)


// Source -> Event/Data -> Observer

// observer 를 observable(source) 에 등록

// Observable ob;


static class IntObservable extends Observable implements Runnable {

@Override

public void run() {

for (int i = 0; i < 10; i++) {

setChanged();

notifyObservers(i);

}

}

}


public static void main(String[] args) {

Observer ob = new Observer() {

@Override

public void update(Observable o, Object arg) {

System.out.println(Thread.currentThread().getName() + " " + arg);

}

};


IntObservable io = new IntObservable();

io.addObserver(ob);

// io.run();


ExecutorService es = Executors.newSingleThreadExecutor();

es.execute(io);

System.out.println(Thread.currentThread().getName() + " EXIT");

es.shutdown();

}

}


reactive 를 적용한 예제

import org.reactivestreams.Publisher;

import org.reactivestreams.Subscriber;

import org.reactivestreams.Subscription;


public class PubSub {

public static void main(String[] args) throws InterruptedException {

// PUblisher <- Observable

// Subscriber <- Observer

Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);

ExecutorService es = Executors.newSingleThreadExecutor();


Publisher<Integer> p = new Publisher<Integer>() {

Iterator<Integer> it = itr.iterator();


@Override

public void subscribe(Subscriber<? super Integer> s) {

s.onSubscribe(new Subscription() {


@Override

public void request(long n) {

es.execute(() -> {

try {

int i = 0;

while (i++ < n) {

if (it.hasNext())

s.onNext(it.next());

else {

s.onComplete();

break;

}

}

} catch (RuntimeException e) {

s.onError(e);

}

});

}


@Override

public void cancel() {


}

});

}

};


Subscriber<Integer> s = new Subscriber<Integer>() {

Subscription subscription;


@Override

public void onSubscribe(Subscription s) {

System.out.println(Thread.currentThread().getName() + " onSubscribe");

// s.request(Long.MAX_VALUE);

// s.request(10);

this.subscription = s;

this.subscription.request(1);

}


@Override

public void onNext(Integer t) { // 옵저버 패턴의 update() 와 비슷

System.out.println(Thread.currentThread().getName() + " onNext " + t);

this.subscription.request(1);

}


@Override

public void onError(Throwable t) {

System.out.println("onError : " + t.getMessage());

}


@Override

public void onComplete() {

System.out.println(Thread.currentThread().getName() + " onComplete");

}

};


p.subscribe(s); // subscriber 가 publisher 에게 subscribing 을 해야한다(구독해야 한다.)

// publisher 가 subscribe() 를 제공하고, subscriber 가 호출한다.


es.awaitTermination(10, TimeUnit.HOURS);

es.shutdown();

}


※  위 예제는 Java 8 기준이라 rxjava-reactive-streams : 1.0.1 디펜던시 필요함.

'Thinking > Study' 카테고리의 다른 글

[study] python 기본 문법  (0) 2020.10.21
[내용 정리] 토비의 봄 TV 1회 Double dispatch example  (0) 2017.11.17
JAVA 8 에서 추가된 forEach 문 사용 예제  (0) 2017.11.15
Apache ZooKeeper  (0) 2017.03.22
패턴 정리  (0) 2015.08.24