Code Monkey home page Code Monkey logo

reactive-streams-spring-boot's Introduction

REACTIVE STREAMS

아래는 축약 설명이고 챕터별 자세한 설명은 여기서 볼 수 있음

Duality

Iterable Observable
[Iterable] [Observable]
[Pull] [Push]
[값을 끌어온다는 의미] [값을 가져가라는 의미]
[iterator.next()] [notifyObservers(arg)]

Observer Pattern

Observer Pattern에 2가지 문제가 있다..

  1. Data의 '끝'이라는 개념이 없다.
  2. Error 처리.. Exception은 어떻게?

물론 많은 부분들이 있겠지만 위 2가지 요소를 더욱 보강한다는 전제가 Reactive Programming의 여러 기준 중 하나이다.

Reactive Streams - 표준

표준 Spec Document

그 외 Document

Publisher

https://github.com/reactive-streams/reactive-streams-jvm에 따르면

A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). In response to a call to Publisher.subscribe(Subscriber). 라고 명시되어있다.

즉 Publisher는 시퀀셜한 element들을 제공하는 Provider이고, Subscriber의 demand에 따라 publish 한다. 이는 Observable과 동등한 의미로 보면 된다. Observable의 Observable.addObserver(observer)와 같은 의미로 Publisher는 Publisher.subscribe(Subcriber)를 활용해 Receiver를 정한다.

기준 Observable Publisher
Provider O O
Receiver Observer Subscriber
Add Receiver Observable.addObserver(ob) Publisher.subscribe(sb)

Subscriber

Subscribe 는 다음과 같은 메소드를 정의한다.

  • void onSubscribe(Subscription var1);
  • void onNext(T var1);
  • void onError(Throwable var1);
  • void onComplete();

또한 Publisher가 Subscriber에게 전달하는 정보는 아래와 같은 protocol을 따른다.

onSubscribe onNext* (onError | onComplete)?

Subscriber는 onSubscribe(arg)를 통해 subscribe를 시작하고, onNext()를 통해 element를 수신한다. 여기서 onNext* 는 0 ~ N(무한대)까지 호출 가능하다는 의미이다. 마지막으로 (onError | onComplete)? 는 optional이다. 두가지 중 하나를 호출할 수 있고, 이 과정을 거치면 마치는 protocol이다.

Operator

일반적인 flow는 다음과 같다. Publisher -> Data -> Subscriber 그러나 Publisher -> [Data1] -> Operator -> [Data2] -> Operator2 -> [Data3] -> Subscriber 처럼 Operator를 활용해 Subcriber에 도달하는 Data를 컨트롤 할 수 있다. 쉽게 말해 Operator는 Data를 가공한다. JAVA8의 Stream 관련 메소드와 비슷한 의미를 가진다고 보면 된다.

자세한 사항은 소스에서 확인하면 된다. toby/operator/PubSub2.java에서 확인

아래에서 보게 될 Flux라는 Publisher에도 Operator를 활용한다.

REACTOR FLUX & MONO

Reactor는 JVM 기반을 위한 Non-Blocking 라이브러리이며, Reactive Streams의 구현체. 또한 유틸성 클래스로 Flux, Mono 라는 클래스 제공 하며 이는 위에서 설명한 Publiser 인터페이스를 구현.

이들의 차이는 시퀀스를 얼마나 전송하느냐에 따라 나뉜다.

  • Mono : 0 ~ 1개의 데이터 전달
  • Flux : 0 ~ N개의 데이터 전달
    public static void FluxTest() {
        // Flux는 Publisher 인터페이스 구현체
        Flux.<Integer> create(e -> {
            IntStream.range(1, 10).forEach(ele -> e.next(ele));
            e.complete();
        })
        .map(e -> e * 10) // map operator
        .reduce(0, (a, b) -> a + b) // reduce operator
        .subscribe(System.out::println);
        // subscriber는 System.out.println 기능만 수행
    }

FluxTest() 에서 Operator는 map, reduce 기능을 수행하고 있다. 이를 직접 Publisher, Subscriber로 구현하게 되면 아래와 같다.

    public void PubSubTest() {

        // 1 ~ 10 기본 publisher
        Publisher<Integer> publisher = new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> subscriber) {
                subscriber.onSubscribe(new Subscription() {
                    @Override
                    public void request(long l) {
                        IntStream.range(1, 10).forEach(ele -> subscriber.onNext(ele));
                        subscriber.onComplete();
                    }

                    @Override
                    public void cancel() {
                    }
                });
            }
        };

        // 로깅 subscriber
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                subscription.request(10);
            }
            @Override
            public void onNext(Integer i) {
                System.out.println(i);
            }
            @Override
            public void onError(Throwable throwable) { }
            @Override
            public void onComplete() { }
        };

        Publisher<Integer> mapPub = mapPublisher(publisher, e -> e * 10);
        Publisher<Integer> reducePub = reducePublisher(mapPub, 0, (a, b) -> a + b);
        reducePub.subscribe(subscriber);
    }

    // map Operator
    public Publisher<Integer> mapPublisher (Publisher<Integer> pub, Function<Integer, Integer> f) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                pub.subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        sub.onSubscribe(subscription);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        int next = f.apply(integer);
                        sub.onNext(next); // map
                    }

                    @Override
                    public void onError(Throwable throwable) { }

                    @Override
                    public void onComplete() {
                        sub.onComplete();
                    }
                });
            }
        };
    }

    // reduce Operator
    public Publisher<Integer> reducePublisher(Publisher<Integer> pub, int init, BiFunction<Integer, Integer, Integer> f) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> subscriber) {
                pub.subscribe(new Subscriber<Integer>() {
                    int result = init;
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        subscriber.onSubscribe(subscription);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        result = f.apply(result, integer);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                    }

                    @Override
                    public void onComplete() {
                        subscriber.onNext(result);
                        subscriber.onComplete();
                    }
                });
            }
        };
    }

정리하자면 FluxTest()와 PubSubTest() 는 동일한 job을 실행한다. Flux를 사용하여 코드를 줄일 수 있는 이유는 Publisher 인터페이스를 구현해놓은 클래스이기 때문이다.

Scheduler

publisher와 subscriber의 동작이 같은 스레드에서 일어날 경우, 병목을 맞이한다. 수행시간이 긴 api를 호출하는 경우가 이에 해당한다. 심각한 경우 웹 서버의 모든 스레드가 점유되는 경우가 있을 수 있다.

보통의 경우 스레드 풀 혹은 running thread를 생성하여 비동기로 이를 처리하곤 하는데, 이러한 역할을 하는 것이 스케줄러이다.

subscribeOn

위와 같은 경우 io에 오랜 시간이 소요된다. 이러한 경우 subscribeOn이라는 스케줄러를 살펴보자

subscriberOn의 flow와 description을 확인하면 된다. 이렇게 정의되어 있다. Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.

    flux.subscribeOn(Schedulers.single()).subscribe() 

subscribeOn 스케줄러는 subcriber를 별도의 레드에서 실행한다.

publishOn

publishOn은 위와 조금 다르다.

flux.publishOn(Schedulers.sigle()).subscribe()

Typically used for fast publisher, slow consumer(s) scenario. 라고 명시되어 있다. publishing 되는 data의 속도는 빠르나 이를 처리하는 subscriber 의 속도가 느린 경우 (ex. db save 등)는 publishOn을 활용하면 된다.

data를 받아서 처리하는 쪽을 별개의 스레드에서 실행한다. 이 말은 onNext, onError, onComplete 등의 역할이 별도의 스레드에서 실행된다는 말이다.

코드를 보면 subscribeOn, publishOn에 대해 더 잘 이해할 수 있다.

        Publisher<Integer> pub = sub -> {
            sub.onSubscribe(new Subscription() {
                @Override
                public void request(long l) {
                    log.debug("request()");
                    sub.onNext(1);
                    sub.onNext(2);
                    sub.onNext(3);
                    sub.onNext(4);
                    sub.onNext(5 );
                    sub.onComplete();

                }

                @Override
                public void cancel() {

                }
            });
        }; 

        pub.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                log.info("onSubscribe");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                log.debug("onNext: {}", integer);
            }

            @Override
            public void onError(Throwable throwable) {
                log.debug(throwable.getMessage());
            }

            @Override
            public void onComplete() {
                log.debug("onComplete");
            }
        });

        System.out.println("EXIT");
    }

기본적인 publisher, subscriber 위와 같다. publisher는 5개의 data를 제공하고, subcriber 는 이를 처리하는 간단한 로직이다. 먼저 subscribeOn 부터 보자.

        Publisher<Integer> subOnPub = sub -> {
            ExecutorService ex = Executors.newSingleThreadExecutor();
            ex.execute( () -> pub.subscribe(sub)); // 이 과정을 새로운 스레드로
        };

위는 Operator 개념으로 publisher를 두고 subcribeOn, publishOn의 역할을 맡기는 코드이다. 위 subOnPub 메소드를 보면, 새로운 싱글 스레드를 통해 subscriber가 실행된다. subscriber는 pub.subscribe(subscriber)가 아닌 위에서 정의한 Operator를 통해 subOnPub.subscribe(subscriber)를 실행하는 형태이다. 결국 subscriber는 subOnPub에 의해 새로운 스레드에서 실행되고, 결론적으로 subscribe를 할 때에 별도의 스레드에서 실행하게 한다.

publishOn은 data의 제공 속도에 비해 subscriber에서의 data 처리가 느린 경우 활용한다고 하였다. 예를 들어 onNext, onError 등의 개별 data 처리가 느릴 경우 아래처럼 동작시킨다.

        Publisher<Integer> pubOnPub = sub -> {
            // subOnPub와 같이 subscriber 자체가 별도의 스레드에서 동작 하는 것이 아니라,
            // 개별 data가 들어오는 부분 - (onNext, onError, onComplete등)을 별개의 스레드에서 실행한다.
            pub.subscribe(new Subscriber<Integer>() {
                ExecutorService es = Executors.newSingleThreadExecutor();
                @Override
                public void onSubscribe(Subscription subscription) {
                    sub.onSubscribe(subscription);
                }

                @Override
                public void onNext(Integer integer) {
                    es.execute(() -> sub.onNext(integer));
                }

                @Override
                public void onError(Throwable throwable) {
                    es.execute(() -> sub.onError(throwable));
                }

                @Override
                public void onComplete() {
                    es.execute(() -> sub.onComplete());
                }
            });
        };

data를 처리하는 onNext, onError, onComplete의 경우, 새로운 스레드에서 실행할 수 있다. 이는 결국 publishing 할 때에 별도의 스레드에서 실행하게 하는것이다.

FLUX

Publisher의 구현체중 하나로서, 0 ~ N개의 데이터를 전송한다. 기존 Publisher와 동일하게 각 전달마다 onNext()를 발생시킨다. Flux는 추상클래스로 정의되어 있다.

public abstract class Flux<T> implements Publisher<T> 

Flux는 Reactive Streams에서 정의한 Publisher의 구현체로서 0-N개의 데이터를 발행할 수 있다. 하나의 데이터를 전달할 때 마다 onNext() 이벤트를 발생한다. Flux 내의 모든 데이터 처리가 완료되면, onComplete() 이벤트가 발생하며, 전달 과정에 문제가 생기면 onError() 가 발생한다.

MONO

Mono 또한 Publisher 인터페이스를 구현하는 구현체인데, Flux와의 차이점은 데이터 처리 개수이다. Flux는 0 ~ N개의 데이터를 처리하지만, Mono는 0 ~ 1개의 데이터를 처리한다. onNext(), onComplete(), onError()는 동일하다.

Flux 및 Mono 생성

먼저 Flux를 생성하는 가장 간단한 방법은 제공되는 팩토리 메소드를 활용하는 것이다.

  • just()
  • range()
  • fromArray(), fromIterable(), fromStream()
  • empty()
Flux<String> flux = Flux.just("A", "B"...);
Flux<String> flux = Flux.range(0, 5);
Flux<String> flux = Flux.fromArray(new String[] {"A", "B"...});
Flux<String> flux = Flux.fromIterable(...);
Flux<String> flux = Flux.fromStream(..);
Flux<String> flux = Flux.empty();

상세는 테스트 코드 확인 하면 됨.

reactive-streams-spring-boot's People

Contributors

phantasmicmeans avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

reactive-streams-spring-boot's Issues

Observer Pattern

Observer Pattern

관찰자 패턴은 관찰자라고 불리는 자손의 리스트를 가지고 있는 주체(Subject) 를 필요로한다. 주체는 일반적으로 자신의 메서드 중 하나(e.g. notify)를 호출해 관찰자에게 상태 변경을 알리게 된다.

GoF 에서의 옵저버 패턴 설명

  • 객체 사이에 일대 다의 의존 관계를 정의해두어, 어떤 객체의 상태가 변할 때 그 객체의 의존성을 가진 다른 객체들이 변화를 통지받고 자동으로 갱신될 수 있게 만든다.

  • 이 패턴은 이벤트 처리 기반 시스템에 필수적

  • e.g. MVC 패턴, 거의 모든 UI 라이브러리가 내부적으로 이 패턴 사용

Subject - Observer 는 1:N 관계를 이룰 수 있음. 

Subject 
   |- Observer
   |- Observer
   |- Observer
   |- Observer 

일반적인 옵저버 패턴은 Subjec와 Observer 2개의 인터페이스로 구성된다. Observer는 Subject에 등록되고 Subject로 부터 알림을 하게 되는데 Subject 스스로 이벤트를 발생시키거나 다른 구성요소에 의해 호출될 수 있다.

Observable 예제

Subject / Observer interface 명세

  • subject는 이벤트 브로드캐스트용 method, 와 구독 관리 (register, unregister) method를 포함한다.
public interface Subject<T> {
    void registerObserver(Observer<T> observer);
    void unregisterObserver(Observer<T> observer);
    void notifyObservers(T event);
}

Observer는 이벤트를 처리하는 observe method 만 존재한다.

public interface Observer<T> {
    void observe(T event);
}

Observer, Subject 모두 인터페이스에 기술된 것 이상에 대해서는 서로 알지 못한다.

단순하게 Subject interface 를 구현한 ConcreteSuject를 보자.

ConcreteSubject (주체)

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

public class ConcreteSubject implements Subject<String> {
    /**
     * multi-thread 안정성 유지를 위해 업데이트시마다 새 복사본을 생성하는 Set 구현체 사용
     * 복사 비용은 큼 -> 구독자 목록 변경은 거의 없음.
     */
    private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();

    @Override
    public void registerObserver(Observer<String> observer) {
        observers.add(observer); // register observer
    }

    @Override
    public void unregisterObserver(Observer<String> observer) {
        observers.remove(observer); // remove observer
    } 

    @Override
    public void notifyObservers(String event) {
        observers.forEach(observers -> observers.observe(event)); // notify observer
    }
}

그리고 Observer interface 를 구현한 ConcreteObserverA, B 클래스를 생성하자. 이들은 단순히 브로드캐스트 받은 이벤트 처리용 메소드인 observe(event) method를 구현한다.

public class ConcreteObserverA implements Observer<String> {
    @Override
    public void observe(String event) {
        System.out.println("Observer A:" + event);
    }
}

public class ConcreteObserverB implements Observer<String> {
    @Override
    public void observe(String event) {
        System.out.println("Observer B:" + event);
    }
}

단순히 observer 들을 등록 / 해지할 register, unregister method와, 모든 observer 에게 이벤트 브로드캐스트용 notifyObserver method를 포함한다.

테스트 코드는 아래와 같다.

    @Test
    public void observerHandleEventsFromSubject() {
        // given
        Subject<String> subject = new ConcreteSubject();
        Observer<String> observerA = new ConcreteObserverA();
        Observer<String> observerB = new ConcreteObserverB();

        // when
        subject.notifyObservers("no subscriber!");
        subject.registerObserver(observerA);
        subject.registerObserver(observerB);

        subject.notifyObservers("event !");

        subject.unregisterObserver(observerB);

        subject.notifyObservers("event for A");
        subject.unregisterObserver(observerA);

        subject.notifyObservers("no subscriber");
    }

또한 아래처럼 람다로 observer를 작성해도 된다.

    @Test
    public void subjectLeverageLambdas() {
        Subject<String> subject = new ConcreteSubject();
        subject.registerObserver(new Observer<String>() {
            @Override
            public void observe(String event) {
                System.out.println("A : " + event);
            }
        });
        subject.registerObserver(event -> System.out.println("B : " + event));
        subject.notifyObservers("event start for A & B");
    }

ConcreteSubject는 주 스레드에서 observer들에게 이벤트 브로드캐스팅을 하고 있다. 따라서 이벤트 처리 시간이 긴 Observer 존재시 처리 시간을 늘어나게 된다.

아래 예시는 A Observer 의 경우 이벤트 처리를 위해 3s 이상의 시간을 필요로하고, B Observer 의 경우 이벤트 처리 시간이 빠른 경우를 나타낸다.

    @Test
    public void subjectThreadTest() {
        Subject<String> subject = new ConcreteSubject();
        subject.registerObserver(event -> { // A observer 는 이벤트 처리시간이 길다. 
            try {
                Thread.sleep(3000);
            } catch (Exception e) {

            }

            System.out.println(Thread.currentThread().getName());
            System.out.println("A: " + event);
        });

        subject.registerObserver(event -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("B: " + event);
        });

        subject.notifyObservers("event!");

        try {
            Thread.sleep(3100);
        } catch (Exception e) {

        }
    }

이벤트 처리시간이 상당히 긴 Observer가 존재할 수 있으므로 Observer 가 많을 경우 추가적으로 스레드풀을 할당해야한다. (실제 구현시는 검증된 라이브러리를 사용하자)

병렬로 이벤트를 브로드캐스팅 하기 위한 ConcreteSubject 및 테스트 코드는 아래와 같다.

public class ConcreteParallelSubject implements Subject<String> {
    private final ExecutorService ex;

    public ConcreteParallelSubject(ExecutorService ex){
        this.ex = ex;
    }
    /**
     * multi-thread 안정성 유지를 위해 업데이트시마다 새 복사본을 생성하는 Set 구현체 사용
     * 복사 비용은 큼 -> 구독자 목록 변경은 거의 없음.
     */
    private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();


    @Override
    public void registerObserver(Observer<String> observer) {
        observers.add(observer);
    }

    @Override
    public void unregisterObserver(Observer<String> observer) {
        observers.remove(observer);
    }

    @Override
    public void notifyObservers(String event) {
        observers.forEach(observers -> {
            ex.submit(() -> observers.observe(event));
        });
    }
}
    @Test
    public void subjectParallelTest() throws InterruptedException {
        final ExecutorService ex = Executors.newFixedThreadPool(10);

        Subject<String> parallelSubject = new ConcreteParallelSubject(ex);
        parallelSubject.registerObserver(event -> { // 이벤트 처리 시간이 길다. 
            try {
                Thread.sleep(3000);
            } catch (Exception e) {

            }

            System.out.println(Thread.currentThread().getName()); // thread name 
            System.out.println("A : " + event); 
        });

        parallelSubject.registerObserver(event -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("B : " + event);
        });

        parallelSubject.notifyObservers("event!");

        ex.awaitTermination(3100, TimeUnit.MILLISECONDS); // 모든 이벤트가 종료될때까지 기다린다. 
    }

여기까지가 옵저버 패턴에 대한 설명과 간단한 구현이다.

Observer Pattern VS Publisher-Subscribe Pattern

옵저버 패턴과 Publisher-subscriber (발행 -구독)패턴은 약간 다르다. 발행-구독 패턴은 '이벤트 채널'(메시지 브로커 or 이벤트 버스) 이라는 간접적인 계층을 하나 더 포함한다. 구독자는 이벤트를 브로드캐스트하는 '이벤트 채널'을 알고 있지만 이벤트 게시자가 누구인지는 신경쓰지 않는다.

e.g. 토픽 기반 시스템(카프카)

성능 튜닝

구성

 

ParallelFlux에 관해

여기서 말하는 core는 physical cpu cores 이다. (core * 2)는 hyper threading 되어진 logical cpu cores를 말한다.

1. ParallelFlux

public ParallelFlux<Map<String, Object>> parallelAttributes(OneInfo info) {
    Mono<ComponentRequest> request = fetcher.createRequest(info);
    return Flux.from(request) // Flux<ComponentRequest>
               .flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
               .parallel(core * 2) // ParallelFlux<ComponentResponse>
               .runOn(Schedulers.parallel())
               .map(received -> handler.handleResponse(received, info)));
    }

core * 2개의 thread를 활용해 fetcher::receiveFlux() 가 리턴하는 Flux를 병렬로 처리하고자 하였다.

결론적으로 thread pool을 활용해 publishing 되어지는 각 ComponentResponse를 handleResponse에 넘겨 병렬 처리 하고, core를 잘 활용하며 reactive 하게 처리하는 것이 의도였다.

위와 같은 코드로 부하 테스트를 실행하니 아래와 같은 결과가 나왔다.

vm vuser TPS MTT (ms) 에러율
1 50 258 193.64 0
1 100 258.1 387.17 0
1 300 255.2 1172.4 0
1 500 251.5 1981.41 0

cpu usage는 약 30%대를 유지했다.

2. Mono

 
이 코드를 작성하기 전, 위 과정을 모두 Mono로 처리하는 코드를 작성 했었고 이는 아래와 같다.

public Mono<Map<String, Object>> attributes(OneInfo info) { 
    Mono<ComponentRequest> request = fetcher.createRequest(info);
    return Mono.from(request)
               .flatMap(fetcher::receiveMono) // Mono<List<ComponentResponse>>
               .map(received -> handler.handleResponse(received, info));
    }

모두 Mono로 처리하였고 사실상 reactive 하지 않다. netty를 사용하기에 적합한 코드이지도 않고 결국 thread 1개가 모두 작업을 처리하게 된다. 또한 handleResponse는 List를 받아 for문으로 이를 처리한다.

별 기대없이 Mono 처리에 대해 부하 테스트를 하였다.

vm vuser TPS MTT (ms) 에러율
1 50 296.6 168.36 0
1 100 394.8 252.92 0
1 300 526.5 565.14 0
1 500 659.3 754.74 0

결과가 생각지도 못한 방향으로 나왔다. 후자의 경우 최대 2배 더 좋은 결과를 보이고 있었다. 또한 cpu usage는 80 ~ 90%에 달했다.

 
음?

분명 이유가 있을것이다. 먼저 ParallelFlux.parallel() 부터 tracking 해보았다.

/**
 * Prepare this {@link Flux} by dividing data on a number of 'rails' matching the
 * provided {@code parallelism} parameter, in a round-robin fashion. Note that to
 * actually perform the work in parallel, you should call {@link ParallelFlux#runOn(Scheduler)}
 * afterward.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/parallel.svg" alt="">
 *
 * @param parallelism the number of parallel rails
 *
 * @return a new {@link ParallelFlux} instance
 */
public final ParallelFlux<T> parallel(int parallelism) {
	return parallel(parallelism, Queues.SMALL_BUFFER_SIZE);
}

parallel(int parallelism)은 publishing 되어지는 data를 round-robin으로 parallelism 개의 'rail'로 나눈다.

parallel(int parallelism)은 여러 rail을 가지는 ParallelFlux를 반환하며, 실제 parallel 하게 실행하려면 ParallelFlux.runOn(Scheduler)의 파라미터로 병렬 처리가 가능한 scheduler를 넘겨야한다.

 
그럼 ParallelFlux.runOn(Scheduler)를 살펴보자.

/**
 * Specifies where each 'rail' will observe its incoming values with no work-stealing
 * and default prefetch amount.
 * <p>
 * This operator uses the default prefetch size returned by {@code
 * Queues.SMALL_BUFFER_SIZE}.
 * ----- 중략 ------
 * @param scheduler the scheduler to use
 *
 * @return the new {@link ParallelFlux} instance
 */
public final ParallelFlux<T> runOn(Scheduler scheduler) {
	return runOn(scheduler, Queues.SMALL_BUFFER_SIZE);
}

This operator uses the default prefetch size returned by {@code Queues.SMALL_BUFFER_SIZE} 라고 한다. 한 레일당 Queues.SMALL_BUFFER_SIZE 만큼의 size를 가진다.

Queues.SMALL_BUFFER_SIZE는 아래와 같다.

/**
 * A small default of available slots in a given container, compromise between intensive pipelines, small
 * subscribers numbers and memory use.
 */
public static final int SMALL_BUFFER_SIZE = Math.max(16,
			Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));

reactor.bufferSize.small을 지정하지 않았으므로 각 rail당 16개의 size를 가질 수 있다.

사실 지금 tracking 하고자 하는 것은 각 rail의 size가 아니다.
 
코드를 다시 살펴보자

return Flux.from(request) // Flux<ComponentRequest>
           .flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
           .parallel(core * 2) // ParallelFlux<ComponentResponse>
           .runOn(Schedulers.parallel())
           .map(received -> handler.handleResponse(received, info)));

fetcher::receiverFlux로 부터 publishing 되어지는 data 들을 (core * 2)개의 rail로 나누고, Schedulers.parallel()에 의해 병렬로 실행된다. Schedulers.parallel()은 고정 개수의 thread pool을 제공하고 default 개수는 (core * 2)이다.

 
원인 발견!?

뭔가 아차 싶다..

publishing 되어지는 data를 (core * 2)개의 thread pool이 잘 나누어 처리하는 것이 아니라, 필수적으로 (core * 2)개의 rail을 생성하고 이를 thread pool이 나누어 실행하게 될 것이다.

(core * 2)개 이상의 data가 publishing 되는 경우, (core * 2)개의 rail이 생성 되는것은 어찌보면 자연스러울 수? 있다. 문제는 publishing 되어지는 data가 (core * 2)개 이하임에도 불구하고 rail이 모두 생성될 수 있다는 점이다.

실제 테스트를 해보았다.

return Flux.from(request) // Flux<ComponentRequest>
           .flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
           .parallel(core * 2) // ParallelFlux<ComponentResponse>
           .runOn(Schedulers.parallel())
           .log()
           .map(received -> handler.handleResponse(received, info))
           .doOnComplete(() -> log.info("complete"));

rail 관련 thread 확인을 위해 5번째 라인에 log(), 각 rail에 대한 처리가 완료 될 경우 "complete"를 로깅했다.

[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[     parallel-1] reactor.Parallel.RunOn.1                 : onNext(com.naver.media.one.model.ComponentResponse@517119d3)
[     parallel-2] reactor.Parallel.RunOn.1                 : onNext(com.naver.media.one.model.ComponentResponse@18b71d5)
[     parallel-3] reactor.Parallel.RunOn.1                 : onNext(com.naver.media.one.model.ComponentResponse@70ef9467)
[     parallel-2] reactor.Parallel.RunOn.1                 : onComplete()
[    parallel-10] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-6] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-9] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-7] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-5] reactor.Parallel.RunOn.1                 : onComplete()
[    parallel-10] com.naver.media.one.OneService           : complete
[     parallel-8] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-7] com.naver.media.one.OneService           : complete
[     parallel-1] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-4] reactor.Parallel.RunOn.1                 : onComplete()
[    parallel-12] reactor.Parallel.RunOn.1                 : onComplete()
[    parallel-11] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-1] com.naver.media.one.OneService           : complete
[     parallel-2] com.naver.media.one.OneService           : complete
[     parallel-4] com.naver.media.one.OneService           : complete
[     parallel-9] com.naver.media.one.OneService           : complete
[    parallel-11] com.naver.media.one.OneService           : complete
[     parallel-6] com.naver.media.one.OneService           : complete
[     parallel-5] com.naver.media.one.OneService           : complete
[     parallel-8] com.naver.media.one.OneService           : complete
[    parallel-12] com.naver.media.one.OneService           : complete
[     parallel-3] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-3] com.naver.media.one.OneService           : complete

12개의 rail이 생성 되었고, 12개의 thread가 이를 처리하고 있었다. publishing 되어지는 data는 3개 뿐임에도 말이다. 실제로 onNext는 3번 호출된다.

parallel-1, 2, 3은 onNext를 통해 publishing 되어지는 data를 처리한다. 그러나 parallel-4 ~ 12는 subscribe 이후 rail 이 비어있으므로 곧바로 onComplete()를 호출하고 이후 "complete" 한다.

Empty rail을 위해 순간적으로 12개의 모든 core를 점유 할 수 있는 상황이고, 이 부분이 가장 큰 병목지점일 것이라 생각했다.

 
해결

사실 해결법은 간단하다. fetcher::receiveFlux()의 size만큼 혹은 작은 단위로 쪼개어 rail을 생성하면 된다. 이를 통해 불필요한 rail 생성 및 그에 따른 core 점유 상황을 피할 수 있다.

현재는 receiveFlux() 로 부터 publishing 되어지는 data는 3개 고정이다. 그러므로 3개의 rail을 만든 뒤 (core * 2)개의 thread pool을 이용해 처리해보자.

이는 항상 3개의 rail을 만들고 이후 thread pool 내에서 처리 될 것이다.

    public ParallelFlux<Map<String, Object>> parallelAttributes(OneInfo info) {
        Mono<ComponentRequest> request = fetcher.createRequest(info);
        return Flux.from(request)
                   .flatMap(fetcher::receiveFlux)
                   .parallel(3)
                   .runOn(Schedulers.parallel())
                   .map(received -> handler.handleResponse(received, info));
    }

Before

vm vuser TPS MTT (ms) 에러율
1 500 251.5 1981.41 0

After

vm vuser TPS MTT (ms) 에러율
1 500 326.8 1524.91 0

cpu usage는 40%대를 유지했다.

처음 보다 조금 높게 측정 되었으나.. cpu usage는 여전히 낮다. 또한 Mono 처리에는 미치지 못했다.

scheduling 변경

handleResponse()에 별다른 로직이 없고, publishing data 또한 적어 병렬 처리 이점이 현재는 드러나지 않는 것 같다.

이번에는 병렬 처리를 제외하고 subscribeOn 스케줄링을 통해 subscibing thread를 교체하는 전략을 구성했다.

출처 - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#subscribeOn-reactor.core.scheduler.Scheduler-

Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.

Note that if you are using an eager or blocking create(Consumer, FluxSink.OverflowStrategy) as the source, it can lead to deadlocks due to requests piling up behind the emitter. In such case, you should call subscribeOn(scheduler, false) instead.

subscribeOn은 위와 같은 경우에 사용하길 권장한다.

Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.

  flux.subscribeOn(Schedulers.single()).subscribe() 

앞서 적용했던 ParallelFlux와의 공통점은 모두 thread pool 을 통해 subscribing을 진행 한다는 점이다.

ParallelFlux와 subscribeOn의 차이점은 다음과 같다. ParallelFlux는 각 rail에 대한 처리를 각기 다른 thread가 진행한다(thread pool).

이에 따라 각 thread가 rail 별 onNext/onError/onComplete에 대한 처리를 진행하는 모습을 위에서 확인할 수 있었다.

그러나 subscribeOn는 publishing thread와 subscribing thread를 분리할 뿐 어떠한 rail을 생성하진 않는다.

즉 ParallelFlux에서 onNext/onComplete는 parallel-1, 2, 3에서 각 1번씩 실행 되었지만, subscribeOn의 경우 onNext/onComplete대해 publishing thread가 아닌 다른 하나의 thread가 onNext/onComplete를 3번 실행할 것이다.

public Flux<Map<String, Object>> fluxAttributes(OneInfo info) {
        Mono<ComponentRequest> request = fetcher.createRequest(info);
        return Flux.from(request)
                   .flatMap(fetcher::receiveFlux)
                   .subscribeOn(Schedulers.parallel())
                   .map(received -> handler.handleResponse(received, info));
    }

fetcher::receiveFlux()는 약 50ms의 I/O 작업을 수행하고 있고 그에 대한 결과를 Flux로 리턴한다. 이 Flux를 subcribing 하는 경우 onNext/onError/onComplete는 다른 thread에서 실행되어야 한다.

그럼 확인 해보자

[ctor-http-nio-2] reactor.Flux.Map.2                       : onSubscribe(FluxMap.MapSubscriber)
[ctor-http-nio-2] reactor.Flux.Map.2                       : request(unbounded)
[ctor-http-nio-4] reactor.Flux.Map.2                       : onNext({ARTICLE={request={type=ARTICLE, .. 중략 .. })
[ctor-http-nio-4] reactor.Flux.Map.2                       : onNext({OFFICE_INFO={request={type=OFFICE_INFO, .. 중략 .. })
[ctor-http-nio-4] reactor.Flux.Map.2                       : onNext({OFFICE_HEADLINE={request={type=OFFICE_HEADLINE, .. 중략 .. })
[ctor-http-nio-4] reactor.Flux.Map.2                       : onComplete()

예상 대로 onNext/onComplete는 reactor-http-nio-4 thread에서 분리실행 되는 모습을 확인했다.

성능 테스트 결과는 아래와 같고, ParallelFlux 처리보다 높은 성능을 볼 수 있었다.

vm vuser TPS MTT (ms) 에러율
1 500 639.9 740.5 0

 

튜닝 결과

최종적으로 Scheduling 변경이 가장 높은 tps 및 mtt를 보였다.

ParallelFlux Before

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 251.5 1981.41 0 약 30 ~ 40 %

ParallelFlux After

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 326.8 1524.91 0 약 40 ~ 50 %

Mono

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 659.3 754.74 0 약 80 ~ 90%

Scheduling 변경

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 639.9 740.5 0 약 80 ~ 90%

결론

  • tomcat의 경우 Mono, Scheduling 변경 처리와 성능은 비슷하나 cpu usage는 100%에 달한다. 또한 netty와 비교해 thread usage 차이는 크다.
  • 현재는 처리할 data 및 로직이 적어 Mono처리 또한 높은 성능을 나타내는 것으로 보인다.
  • 그러나 data, 그리고 cpu 연산을 필요로 하는 로직이 늘어날 수록 Scheduling 변경 버전 혹은 ParallelFlux After 버전이 더 높은 효과를 나타낼 것으로 기대한다.

Development

기술 스택

Application

  • spring boot 2 / webflux / netty / mustache

사내 Infra

  • ncloud / ncc / pinpoint / ngrinder

성능 테스트

 

Development

개발 및 테스트, 배포까지 모든 과정을 거쳐 고민 했던 사항 및 정리

Component 구조

  • component 구조는 아래와 같고, OneBody : OneTail = 1 : N 관계이다.

1. one-body

public enum OneBody {

    /** ARTICLE **/
    ARTICLE("ARTICLE", Arrays.asList(OneTail.ARTICLE, OneTail.OFFICE_INFO)),
    ARTICLE_COMMENT("ARTICLE_COMMENT", Arrays.asList(OneTail.ARTICLE_COMMENT)),

    /** OFFICE **/
    OFFICE_INFO("OFFICE_INFO", Arrays.asList(OneTail.OFFICE_INFO)),
    OFFICE_HEADLINE("OFFICE_HEADLINE", Arrays.asList(OneTail.OFFICE_HEADLINE)),
    OFFICE_RANKING("OFFICE_RANKING", OneTail.getAllDependencies("OFFICE_RANKING"));
    ... 생략 ...
}

2. one-tail

public enum OneTail {
    /** ARTICLE **/
    ARTICLE("ARTICLE"),
    ARTICLE_NEWS("ARTICLE_NEWS"),
    ARTICLE_COMMENT("ARTICLE_COMMENT"),

    /** OFFICE **/
    OFFICE_INFO("OFFICE_INFO"),
    OFFICE_HEADLINE("OFFICE_HEADLINE"),
    OFFICE_RANKING_VIEW("OFFICE_RANKING_VIEW"),
    OFFICE_RANKING_ALL_AGE10("OFFICE_RANKING_ALL_AGE10"),
    OFFICE_RANKING_ALL_AGE20("OFFICE_RANKING_ALL_AGE20"),
    OFFICE_RANKING_ALL_AGE30("OFFICE_RANKING_ALL_AGE30"),
    OFFICE_RANKING_ALL_AGE40("OFFICE_RANKING_ALL_AGE40"),
    OFFICE_RANKING_ALL_AGE50("OFFICE_RANKING_ALL_AGE50"),
    OFFICE_RANKING_ALL_AGE60("OFFICE_RANKING_ALL_AGE60");
    ... 생략 ...
}

주로 했던 고민 및 궁금증

  • Tomcat vs Netty
  • 렌더링 이 주 기능인 application에서 netty가 역량을 발휘할 지?
  • Netty
    • default worker thread - cpu core * 2
    • netty의 효과를 보기위해 최대한 core 점유 상황을 만들지 않아야함
    • 적절한 스레드 교체 전략 및 리액티브 프로그래밍을 적용 해야함

 

How?

One-Service Flow는 간략하게 설명하면 다음과 같다.

  1. User Request 인입 -> service type에 따른 Fetcher Request 생성
  2. Fetcher Http Call -> Fetcher Response 수신
  3. Fetcher Response 내의 Component 별 가공
  4. Rendering

이 과정을 최대한 리액티브하게 처리 하는것이 중요하다고 생각했음

 

시도 1. Mono 처리

설명

  • 가장 간단한 시도, Fetcher Response는 List 로 받음.
  • handleResponse()는 for 문을 통해 component 별 가공 및 처리
    public Mono<Map<String, Object>> attributes(OneInfo info) {
        Mono<ComponentRequest> request = fetcher.createRequest(info); 
        return Mono.from(request)
                   .flatMap(fetcher::receiveMono) // Mono<List<ComponentResponse>>
                   .map(received -> handler.handleResponse(received, info)); // Mono<Map<String, Object>>
    }

예상했던 단점

  • Mono.from(request) 부터는 모두 동일 thread publishing & subscribing
  • fetcher::receiveMono()는 network I/O 작업, 지연시 thread 점유에 따른 병목
  • Fetcher Response 내의 component가 많아지고 처리가 복잡 할수록 thread 점유 시간 증대
    (handleResponse 처리 시간 증가)
  • thread per connection 구조에 어울린다고 생각했음

테스트 결과

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 659.3 754.74 0 약 80 ~ 90%
  • 음.. 의외로 잘나옴
  • api-mock은 50ms 로 지연이 크게 없음 / handleResponse() 는 처리 시간 소요가 작음
  • 성능 테스트 환경에 따른 결과일 것으로 추정

 

시도 2. ParallelFlux 처리

설명

  • Fetcher Response를 Flux Stream으로 수신 => Flux<ComponentResponse>
  • 3개의 rail을 생성 (reason: component 3개 고정.. ARTICLE, OFFICE_INFO, OFFICE_HEADLINE)
  • 각 rail parallel 처리 (각 component 가공 및 처리 할당, 1 rail = 1 component)
  • 나름 netty에서 활용하기 적합하다고 생각했음
    public ParallelFlux<Map<String, Object>> parallelAttributes(OneInfo info) {
        Mono<ComponentRequest> request = fetcher.createRequest(info);
        return Flux.from(request) // Flux<ComponentRequest>
                   .flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
                   .parallel(3) // ParallelFlux<ComponentResponse>
                   .runOn(Schedulers.parallel()) // fixed thread pool에서 parallel 실행 
                   .map(received -> handler.handleResponse(received, info)); // ParallelFlux<Map<String, Object>>
    }

예상했던 단점

  • 현재 처리 할 Component는 3개에 불과, 시간이 소요되는 작업도 아닌 상황
  • 따라서 parallel 처리가 효율적이지 않을 수 있음

테스트 결과

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 326.8 1524.91 0 약 40 ~ 50 %
  • tps, cpu usage가 그닥..
  • parallel 처리가 효율적이지 않음.
  • component 처리(handleResponse)에 대한 시간 소요가 거의 없어 single thread로 처리하는 것이 훨씬 높은 성능을 보이는 것으로 추정

 

시도 3. Flux 처리

  • Fetcher Response를 Flux Stream으로 수신 => Flux<ComponentResponse>
  • Component 처리 thread 분리 (subscribing thread 분리)
  • 즉, 각 component 가공 및 처리를 다른 thread-pool에서 실행
    public Flux<Map<String, Object>> fluxAttributes(OneInfo info) {
        Mono<ComponentRequest> request = fetcher.createRequest(info);
        return Flux.from(request) // Flux<ComponentRequest>
                   .flatMap(fetcher::receiveFlux)  // Flux<ComponentReponse>
                   .subscribeOn(Schedulers.parallel())
                   .map(received -> handler.handleResponse(received, info));  
                   // 1 thread(in thread-pool) subscribe
    }

예상했던 단점

  • Component 가공 및 처리가 길어지면 thread 점유 시간 증가
  • But, 현재 처리 할 Component가 적고 시간 소요가 없음
  • 따라서 예상 단점은 크게 없다고 판단

테스트 결과

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 639.9 740.5 0 약 80 ~ 90%

3개의 시도중에는 가장 이상적? 구조

 

Tomcat

  • netty -> tomcat으로 변경 (webflux는 그대로)
  • 시도 1, 3 기준, tps 는 비슷하나 cpu usage 100%
  • thread usage는 비교할 수 없을듯
  • 성능상의 큰 차이는 없었음
     

얻은 것

  • publisher, operator, subscriber 및 리액티브 프로그래밍에 대한 이해 및 숙련도

끝내며

  • fail over 전략 및 장애 대응에 많은 시간 투자?를 하는것이 굉장히 바람직해 보였다..
  • 너무 programming에 관한 고민한 것이 아닌가
  • 익숙해지기 시작한 새로운 기술에 대해 더 깊게 파보지 못한 아쉬움
  • thymeleaf reactive rendering은 결과가 어떨지 궁금하다..
  • 마지막으로 리액티브를 잘 활용하는 것은 쉽지 않은것 같다
     

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.