Code Monkey home page Code Monkey logo

reactive-streams-spring-boot's Issues

Observer Pattern

Observer Pattern

관찰자 패턴은 관찰자라고 불리는 자손의 리스트를 가지고 있는 주체(Subject) 를 필요로한다. 주체는 일반적으로 자신의 메서드 중 하나(e.g. notify)를 호출해 관찰자에게 상태 변경을 알리게 된다.

GoF 에서의 옵저버 패턴 설명

  • 객체 사이에 일대 다의 의존 관계를 정의해두어, 어떤 객체의 상태가 변할 때 그 객체의 의존성을 가진 다른 객체들이 변화를 통지받고 자동으로 갱신될 수 있게 만든다.

  • 이 패턴은 이벤트 처리 기반 시스템에 필수적

  • e.g. MVC 패턴, 거의 모든 UI 라이브러리가 내부적으로 이 패턴 사용

Subject - Observer 는 1:N 관계를 이룰 수 있음. 

Subject 
   |- Observer
   |- Observer
   |- Observer
   |- Observer 

일반적인 옵저버 패턴은 Subjec와 Observer 2개의 인터페이스로 구성된다. Observer는 Subject에 등록되고 Subject로 부터 알림을 하게 되는데 Subject 스스로 이벤트를 발생시키거나 다른 구성요소에 의해 호출될 수 있다.

Observable 예제

Subject / Observer interface 명세

  • subject는 이벤트 브로드캐스트용 method, 와 구독 관리 (register, unregister) method를 포함한다.
public interface Subject<T> {
    void registerObserver(Observer<T> observer);
    void unregisterObserver(Observer<T> observer);
    void notifyObservers(T event);
}

Observer는 이벤트를 처리하는 observe method 만 존재한다.

public interface Observer<T> {
    void observe(T event);
}

Observer, Subject 모두 인터페이스에 기술된 것 이상에 대해서는 서로 알지 못한다.

단순하게 Subject interface 를 구현한 ConcreteSuject를 보자.

ConcreteSubject (주체)

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

public class ConcreteSubject implements Subject<String> {
    /**
     * multi-thread 안정성 유지를 위해 업데이트시마다 새 복사본을 생성하는 Set 구현체 사용
     * 복사 비용은 큼 -> 구독자 목록 변경은 거의 없음.
     */
    private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();

    @Override
    public void registerObserver(Observer<String> observer) {
        observers.add(observer); // register observer
    }

    @Override
    public void unregisterObserver(Observer<String> observer) {
        observers.remove(observer); // remove observer
    } 

    @Override
    public void notifyObservers(String event) {
        observers.forEach(observers -> observers.observe(event)); // notify observer
    }
}

그리고 Observer interface 를 구현한 ConcreteObserverA, B 클래스를 생성하자. 이들은 단순히 브로드캐스트 받은 이벤트 처리용 메소드인 observe(event) method를 구현한다.

public class ConcreteObserverA implements Observer<String> {
    @Override
    public void observe(String event) {
        System.out.println("Observer A:" + event);
    }
}

public class ConcreteObserverB implements Observer<String> {
    @Override
    public void observe(String event) {
        System.out.println("Observer B:" + event);
    }
}

단순히 observer 들을 등록 / 해지할 register, unregister method와, 모든 observer 에게 이벤트 브로드캐스트용 notifyObserver method를 포함한다.

테스트 코드는 아래와 같다.

    @Test
    public void observerHandleEventsFromSubject() {
        // given
        Subject<String> subject = new ConcreteSubject();
        Observer<String> observerA = new ConcreteObserverA();
        Observer<String> observerB = new ConcreteObserverB();

        // when
        subject.notifyObservers("no subscriber!");
        subject.registerObserver(observerA);
        subject.registerObserver(observerB);

        subject.notifyObservers("event !");

        subject.unregisterObserver(observerB);

        subject.notifyObservers("event for A");
        subject.unregisterObserver(observerA);

        subject.notifyObservers("no subscriber");
    }

또한 아래처럼 람다로 observer를 작성해도 된다.

    @Test
    public void subjectLeverageLambdas() {
        Subject<String> subject = new ConcreteSubject();
        subject.registerObserver(new Observer<String>() {
            @Override
            public void observe(String event) {
                System.out.println("A : " + event);
            }
        });
        subject.registerObserver(event -> System.out.println("B : " + event));
        subject.notifyObservers("event start for A & B");
    }

ConcreteSubject는 주 스레드에서 observer들에게 이벤트 브로드캐스팅을 하고 있다. 따라서 이벤트 처리 시간이 긴 Observer 존재시 처리 시간을 늘어나게 된다.

아래 예시는 A Observer 의 경우 이벤트 처리를 위해 3s 이상의 시간을 필요로하고, B Observer 의 경우 이벤트 처리 시간이 빠른 경우를 나타낸다.

    @Test
    public void subjectThreadTest() {
        Subject<String> subject = new ConcreteSubject();
        subject.registerObserver(event -> { // A observer 는 이벤트 처리시간이 길다. 
            try {
                Thread.sleep(3000);
            } catch (Exception e) {

            }

            System.out.println(Thread.currentThread().getName());
            System.out.println("A: " + event);
        });

        subject.registerObserver(event -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("B: " + event);
        });

        subject.notifyObservers("event!");

        try {
            Thread.sleep(3100);
        } catch (Exception e) {

        }
    }

이벤트 처리시간이 상당히 긴 Observer가 존재할 수 있으므로 Observer 가 많을 경우 추가적으로 스레드풀을 할당해야한다. (실제 구현시는 검증된 라이브러리를 사용하자)

병렬로 이벤트를 브로드캐스팅 하기 위한 ConcreteSubject 및 테스트 코드는 아래와 같다.

public class ConcreteParallelSubject implements Subject<String> {
    private final ExecutorService ex;

    public ConcreteParallelSubject(ExecutorService ex){
        this.ex = ex;
    }
    /**
     * multi-thread 안정성 유지를 위해 업데이트시마다 새 복사본을 생성하는 Set 구현체 사용
     * 복사 비용은 큼 -> 구독자 목록 변경은 거의 없음.
     */
    private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();


    @Override
    public void registerObserver(Observer<String> observer) {
        observers.add(observer);
    }

    @Override
    public void unregisterObserver(Observer<String> observer) {
        observers.remove(observer);
    }

    @Override
    public void notifyObservers(String event) {
        observers.forEach(observers -> {
            ex.submit(() -> observers.observe(event));
        });
    }
}
    @Test
    public void subjectParallelTest() throws InterruptedException {
        final ExecutorService ex = Executors.newFixedThreadPool(10);

        Subject<String> parallelSubject = new ConcreteParallelSubject(ex);
        parallelSubject.registerObserver(event -> { // 이벤트 처리 시간이 길다. 
            try {
                Thread.sleep(3000);
            } catch (Exception e) {

            }

            System.out.println(Thread.currentThread().getName()); // thread name 
            System.out.println("A : " + event); 
        });

        parallelSubject.registerObserver(event -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("B : " + event);
        });

        parallelSubject.notifyObservers("event!");

        ex.awaitTermination(3100, TimeUnit.MILLISECONDS); // 모든 이벤트가 종료될때까지 기다린다. 
    }

여기까지가 옵저버 패턴에 대한 설명과 간단한 구현이다.

Observer Pattern VS Publisher-Subscribe Pattern

옵저버 패턴과 Publisher-subscriber (발행 -구독)패턴은 약간 다르다. 발행-구독 패턴은 '이벤트 채널'(메시지 브로커 or 이벤트 버스) 이라는 간접적인 계층을 하나 더 포함한다. 구독자는 이벤트를 브로드캐스트하는 '이벤트 채널'을 알고 있지만 이벤트 게시자가 누구인지는 신경쓰지 않는다.

e.g. 토픽 기반 시스템(카프카)

성능 튜닝

구성

 

ParallelFlux에 관해

여기서 말하는 core는 physical cpu cores 이다. (core * 2)는 hyper threading 되어진 logical cpu cores를 말한다.

1. ParallelFlux

public ParallelFlux<Map<String, Object>> parallelAttributes(OneInfo info) {
    Mono<ComponentRequest> request = fetcher.createRequest(info);
    return Flux.from(request) // Flux<ComponentRequest>
               .flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
               .parallel(core * 2) // ParallelFlux<ComponentResponse>
               .runOn(Schedulers.parallel())
               .map(received -> handler.handleResponse(received, info)));
    }

core * 2개의 thread를 활용해 fetcher::receiveFlux() 가 리턴하는 Flux를 병렬로 처리하고자 하였다.

결론적으로 thread pool을 활용해 publishing 되어지는 각 ComponentResponse를 handleResponse에 넘겨 병렬 처리 하고, core를 잘 활용하며 reactive 하게 처리하는 것이 의도였다.

위와 같은 코드로 부하 테스트를 실행하니 아래와 같은 결과가 나왔다.

vm vuser TPS MTT (ms) 에러율
1 50 258 193.64 0
1 100 258.1 387.17 0
1 300 255.2 1172.4 0
1 500 251.5 1981.41 0

cpu usage는 약 30%대를 유지했다.

2. Mono

 
이 코드를 작성하기 전, 위 과정을 모두 Mono로 처리하는 코드를 작성 했었고 이는 아래와 같다.

public Mono<Map<String, Object>> attributes(OneInfo info) { 
    Mono<ComponentRequest> request = fetcher.createRequest(info);
    return Mono.from(request)
               .flatMap(fetcher::receiveMono) // Mono<List<ComponentResponse>>
               .map(received -> handler.handleResponse(received, info));
    }

모두 Mono로 처리하였고 사실상 reactive 하지 않다. netty를 사용하기에 적합한 코드이지도 않고 결국 thread 1개가 모두 작업을 처리하게 된다. 또한 handleResponse는 List를 받아 for문으로 이를 처리한다.

별 기대없이 Mono 처리에 대해 부하 테스트를 하였다.

vm vuser TPS MTT (ms) 에러율
1 50 296.6 168.36 0
1 100 394.8 252.92 0
1 300 526.5 565.14 0
1 500 659.3 754.74 0

결과가 생각지도 못한 방향으로 나왔다. 후자의 경우 최대 2배 더 좋은 결과를 보이고 있었다. 또한 cpu usage는 80 ~ 90%에 달했다.

 
음?

분명 이유가 있을것이다. 먼저 ParallelFlux.parallel() 부터 tracking 해보았다.

/**
 * Prepare this {@link Flux} by dividing data on a number of 'rails' matching the
 * provided {@code parallelism} parameter, in a round-robin fashion. Note that to
 * actually perform the work in parallel, you should call {@link ParallelFlux#runOn(Scheduler)}
 * afterward.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/parallel.svg" alt="">
 *
 * @param parallelism the number of parallel rails
 *
 * @return a new {@link ParallelFlux} instance
 */
public final ParallelFlux<T> parallel(int parallelism) {
	return parallel(parallelism, Queues.SMALL_BUFFER_SIZE);
}

parallel(int parallelism)은 publishing 되어지는 data를 round-robin으로 parallelism 개의 'rail'로 나눈다.

parallel(int parallelism)은 여러 rail을 가지는 ParallelFlux를 반환하며, 실제 parallel 하게 실행하려면 ParallelFlux.runOn(Scheduler)의 파라미터로 병렬 처리가 가능한 scheduler를 넘겨야한다.

 
그럼 ParallelFlux.runOn(Scheduler)를 살펴보자.

/**
 * Specifies where each 'rail' will observe its incoming values with no work-stealing
 * and default prefetch amount.
 * <p>
 * This operator uses the default prefetch size returned by {@code
 * Queues.SMALL_BUFFER_SIZE}.
 * ----- 중략 ------
 * @param scheduler the scheduler to use
 *
 * @return the new {@link ParallelFlux} instance
 */
public final ParallelFlux<T> runOn(Scheduler scheduler) {
	return runOn(scheduler, Queues.SMALL_BUFFER_SIZE);
}

This operator uses the default prefetch size returned by {@code Queues.SMALL_BUFFER_SIZE} 라고 한다. 한 레일당 Queues.SMALL_BUFFER_SIZE 만큼의 size를 가진다.

Queues.SMALL_BUFFER_SIZE는 아래와 같다.

/**
 * A small default of available slots in a given container, compromise between intensive pipelines, small
 * subscribers numbers and memory use.
 */
public static final int SMALL_BUFFER_SIZE = Math.max(16,
			Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));

reactor.bufferSize.small을 지정하지 않았으므로 각 rail당 16개의 size를 가질 수 있다.

사실 지금 tracking 하고자 하는 것은 각 rail의 size가 아니다.
 
코드를 다시 살펴보자

return Flux.from(request) // Flux<ComponentRequest>
           .flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
           .parallel(core * 2) // ParallelFlux<ComponentResponse>
           .runOn(Schedulers.parallel())
           .map(received -> handler.handleResponse(received, info)));

fetcher::receiverFlux로 부터 publishing 되어지는 data 들을 (core * 2)개의 rail로 나누고, Schedulers.parallel()에 의해 병렬로 실행된다. Schedulers.parallel()은 고정 개수의 thread pool을 제공하고 default 개수는 (core * 2)이다.

 
원인 발견!?

뭔가 아차 싶다..

publishing 되어지는 data를 (core * 2)개의 thread pool이 잘 나누어 처리하는 것이 아니라, 필수적으로 (core * 2)개의 rail을 생성하고 이를 thread pool이 나누어 실행하게 될 것이다.

(core * 2)개 이상의 data가 publishing 되는 경우, (core * 2)개의 rail이 생성 되는것은 어찌보면 자연스러울 수? 있다. 문제는 publishing 되어지는 data가 (core * 2)개 이하임에도 불구하고 rail이 모두 생성될 수 있다는 점이다.

실제 테스트를 해보았다.

return Flux.from(request) // Flux<ComponentRequest>
           .flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
           .parallel(core * 2) // ParallelFlux<ComponentResponse>
           .runOn(Schedulers.parallel())
           .log()
           .map(received -> handler.handleResponse(received, info))
           .doOnComplete(() -> log.info("complete"));

rail 관련 thread 확인을 위해 5번째 라인에 log(), 각 rail에 대한 처리가 완료 될 경우 "complete"를 로깅했다.

[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1                 : request(unbounded)
[     parallel-1] reactor.Parallel.RunOn.1                 : onNext(com.naver.media.one.model.ComponentResponse@517119d3)
[     parallel-2] reactor.Parallel.RunOn.1                 : onNext(com.naver.media.one.model.ComponentResponse@18b71d5)
[     parallel-3] reactor.Parallel.RunOn.1                 : onNext(com.naver.media.one.model.ComponentResponse@70ef9467)
[     parallel-2] reactor.Parallel.RunOn.1                 : onComplete()
[    parallel-10] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-6] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-9] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-7] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-5] reactor.Parallel.RunOn.1                 : onComplete()
[    parallel-10] com.naver.media.one.OneService           : complete
[     parallel-8] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-7] com.naver.media.one.OneService           : complete
[     parallel-1] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-4] reactor.Parallel.RunOn.1                 : onComplete()
[    parallel-12] reactor.Parallel.RunOn.1                 : onComplete()
[    parallel-11] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-1] com.naver.media.one.OneService           : complete
[     parallel-2] com.naver.media.one.OneService           : complete
[     parallel-4] com.naver.media.one.OneService           : complete
[     parallel-9] com.naver.media.one.OneService           : complete
[    parallel-11] com.naver.media.one.OneService           : complete
[     parallel-6] com.naver.media.one.OneService           : complete
[     parallel-5] com.naver.media.one.OneService           : complete
[     parallel-8] com.naver.media.one.OneService           : complete
[    parallel-12] com.naver.media.one.OneService           : complete
[     parallel-3] reactor.Parallel.RunOn.1                 : onComplete()
[     parallel-3] com.naver.media.one.OneService           : complete

12개의 rail이 생성 되었고, 12개의 thread가 이를 처리하고 있었다. publishing 되어지는 data는 3개 뿐임에도 말이다. 실제로 onNext는 3번 호출된다.

parallel-1, 2, 3은 onNext를 통해 publishing 되어지는 data를 처리한다. 그러나 parallel-4 ~ 12는 subscribe 이후 rail 이 비어있으므로 곧바로 onComplete()를 호출하고 이후 "complete" 한다.

Empty rail을 위해 순간적으로 12개의 모든 core를 점유 할 수 있는 상황이고, 이 부분이 가장 큰 병목지점일 것이라 생각했다.

 
해결

사실 해결법은 간단하다. fetcher::receiveFlux()의 size만큼 혹은 작은 단위로 쪼개어 rail을 생성하면 된다. 이를 통해 불필요한 rail 생성 및 그에 따른 core 점유 상황을 피할 수 있다.

현재는 receiveFlux() 로 부터 publishing 되어지는 data는 3개 고정이다. 그러므로 3개의 rail을 만든 뒤 (core * 2)개의 thread pool을 이용해 처리해보자.

이는 항상 3개의 rail을 만들고 이후 thread pool 내에서 처리 될 것이다.

    public ParallelFlux<Map<String, Object>> parallelAttributes(OneInfo info) {
        Mono<ComponentRequest> request = fetcher.createRequest(info);
        return Flux.from(request)
                   .flatMap(fetcher::receiveFlux)
                   .parallel(3)
                   .runOn(Schedulers.parallel())
                   .map(received -> handler.handleResponse(received, info));
    }

Before

vm vuser TPS MTT (ms) 에러율
1 500 251.5 1981.41 0

After

vm vuser TPS MTT (ms) 에러율
1 500 326.8 1524.91 0

cpu usage는 40%대를 유지했다.

처음 보다 조금 높게 측정 되었으나.. cpu usage는 여전히 낮다. 또한 Mono 처리에는 미치지 못했다.

scheduling 변경

handleResponse()에 별다른 로직이 없고, publishing data 또한 적어 병렬 처리 이점이 현재는 드러나지 않는 것 같다.

이번에는 병렬 처리를 제외하고 subscribeOn 스케줄링을 통해 subscibing thread를 교체하는 전략을 구성했다.

출처 - https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#subscribeOn-reactor.core.scheduler.Scheduler-

Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.

Note that if you are using an eager or blocking create(Consumer, FluxSink.OverflowStrategy) as the source, it can lead to deadlocks due to requests piling up behind the emitter. In such case, you should call subscribeOn(scheduler, false) instead.

subscribeOn은 위와 같은 경우에 사용하길 권장한다.

Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.

  flux.subscribeOn(Schedulers.single()).subscribe() 

앞서 적용했던 ParallelFlux와의 공통점은 모두 thread pool 을 통해 subscribing을 진행 한다는 점이다.

ParallelFlux와 subscribeOn의 차이점은 다음과 같다. ParallelFlux는 각 rail에 대한 처리를 각기 다른 thread가 진행한다(thread pool).

이에 따라 각 thread가 rail 별 onNext/onError/onComplete에 대한 처리를 진행하는 모습을 위에서 확인할 수 있었다.

그러나 subscribeOn는 publishing thread와 subscribing thread를 분리할 뿐 어떠한 rail을 생성하진 않는다.

즉 ParallelFlux에서 onNext/onComplete는 parallel-1, 2, 3에서 각 1번씩 실행 되었지만, subscribeOn의 경우 onNext/onComplete대해 publishing thread가 아닌 다른 하나의 thread가 onNext/onComplete를 3번 실행할 것이다.

public Flux<Map<String, Object>> fluxAttributes(OneInfo info) {
        Mono<ComponentRequest> request = fetcher.createRequest(info);
        return Flux.from(request)
                   .flatMap(fetcher::receiveFlux)
                   .subscribeOn(Schedulers.parallel())
                   .map(received -> handler.handleResponse(received, info));
    }

fetcher::receiveFlux()는 약 50ms의 I/O 작업을 수행하고 있고 그에 대한 결과를 Flux로 리턴한다. 이 Flux를 subcribing 하는 경우 onNext/onError/onComplete는 다른 thread에서 실행되어야 한다.

그럼 확인 해보자

[ctor-http-nio-2] reactor.Flux.Map.2                       : onSubscribe(FluxMap.MapSubscriber)
[ctor-http-nio-2] reactor.Flux.Map.2                       : request(unbounded)
[ctor-http-nio-4] reactor.Flux.Map.2                       : onNext({ARTICLE={request={type=ARTICLE, .. 중략 .. })
[ctor-http-nio-4] reactor.Flux.Map.2                       : onNext({OFFICE_INFO={request={type=OFFICE_INFO, .. 중략 .. })
[ctor-http-nio-4] reactor.Flux.Map.2                       : onNext({OFFICE_HEADLINE={request={type=OFFICE_HEADLINE, .. 중략 .. })
[ctor-http-nio-4] reactor.Flux.Map.2                       : onComplete()

예상 대로 onNext/onComplete는 reactor-http-nio-4 thread에서 분리실행 되는 모습을 확인했다.

성능 테스트 결과는 아래와 같고, ParallelFlux 처리보다 높은 성능을 볼 수 있었다.

vm vuser TPS MTT (ms) 에러율
1 500 639.9 740.5 0

 

튜닝 결과

최종적으로 Scheduling 변경이 가장 높은 tps 및 mtt를 보였다.

ParallelFlux Before

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 251.5 1981.41 0 약 30 ~ 40 %

ParallelFlux After

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 326.8 1524.91 0 약 40 ~ 50 %

Mono

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 659.3 754.74 0 약 80 ~ 90%

Scheduling 변경

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 639.9 740.5 0 약 80 ~ 90%

결론

  • tomcat의 경우 Mono, Scheduling 변경 처리와 성능은 비슷하나 cpu usage는 100%에 달한다. 또한 netty와 비교해 thread usage 차이는 크다.
  • 현재는 처리할 data 및 로직이 적어 Mono처리 또한 높은 성능을 나타내는 것으로 보인다.
  • 그러나 data, 그리고 cpu 연산을 필요로 하는 로직이 늘어날 수록 Scheduling 변경 버전 혹은 ParallelFlux After 버전이 더 높은 효과를 나타낼 것으로 기대한다.

Development

기술 스택

Application

  • spring boot 2 / webflux / netty / mustache

사내 Infra

  • ncloud / ncc / pinpoint / ngrinder

성능 테스트

 

Development

개발 및 테스트, 배포까지 모든 과정을 거쳐 고민 했던 사항 및 정리

Component 구조

  • component 구조는 아래와 같고, OneBody : OneTail = 1 : N 관계이다.

1. one-body

public enum OneBody {

    /** ARTICLE **/
    ARTICLE("ARTICLE", Arrays.asList(OneTail.ARTICLE, OneTail.OFFICE_INFO)),
    ARTICLE_COMMENT("ARTICLE_COMMENT", Arrays.asList(OneTail.ARTICLE_COMMENT)),

    /** OFFICE **/
    OFFICE_INFO("OFFICE_INFO", Arrays.asList(OneTail.OFFICE_INFO)),
    OFFICE_HEADLINE("OFFICE_HEADLINE", Arrays.asList(OneTail.OFFICE_HEADLINE)),
    OFFICE_RANKING("OFFICE_RANKING", OneTail.getAllDependencies("OFFICE_RANKING"));
    ... 생략 ...
}

2. one-tail

public enum OneTail {
    /** ARTICLE **/
    ARTICLE("ARTICLE"),
    ARTICLE_NEWS("ARTICLE_NEWS"),
    ARTICLE_COMMENT("ARTICLE_COMMENT"),

    /** OFFICE **/
    OFFICE_INFO("OFFICE_INFO"),
    OFFICE_HEADLINE("OFFICE_HEADLINE"),
    OFFICE_RANKING_VIEW("OFFICE_RANKING_VIEW"),
    OFFICE_RANKING_ALL_AGE10("OFFICE_RANKING_ALL_AGE10"),
    OFFICE_RANKING_ALL_AGE20("OFFICE_RANKING_ALL_AGE20"),
    OFFICE_RANKING_ALL_AGE30("OFFICE_RANKING_ALL_AGE30"),
    OFFICE_RANKING_ALL_AGE40("OFFICE_RANKING_ALL_AGE40"),
    OFFICE_RANKING_ALL_AGE50("OFFICE_RANKING_ALL_AGE50"),
    OFFICE_RANKING_ALL_AGE60("OFFICE_RANKING_ALL_AGE60");
    ... 생략 ...
}

주로 했던 고민 및 궁금증

  • Tomcat vs Netty
  • 렌더링 이 주 기능인 application에서 netty가 역량을 발휘할 지?
  • Netty
    • default worker thread - cpu core * 2
    • netty의 효과를 보기위해 최대한 core 점유 상황을 만들지 않아야함
    • 적절한 스레드 교체 전략 및 리액티브 프로그래밍을 적용 해야함

 

How?

One-Service Flow는 간략하게 설명하면 다음과 같다.

  1. User Request 인입 -> service type에 따른 Fetcher Request 생성
  2. Fetcher Http Call -> Fetcher Response 수신
  3. Fetcher Response 내의 Component 별 가공
  4. Rendering

이 과정을 최대한 리액티브하게 처리 하는것이 중요하다고 생각했음

 

시도 1. Mono 처리

설명

  • 가장 간단한 시도, Fetcher Response는 List 로 받음.
  • handleResponse()는 for 문을 통해 component 별 가공 및 처리
    public Mono<Map<String, Object>> attributes(OneInfo info) {
        Mono<ComponentRequest> request = fetcher.createRequest(info); 
        return Mono.from(request)
                   .flatMap(fetcher::receiveMono) // Mono<List<ComponentResponse>>
                   .map(received -> handler.handleResponse(received, info)); // Mono<Map<String, Object>>
    }

예상했던 단점

  • Mono.from(request) 부터는 모두 동일 thread publishing & subscribing
  • fetcher::receiveMono()는 network I/O 작업, 지연시 thread 점유에 따른 병목
  • Fetcher Response 내의 component가 많아지고 처리가 복잡 할수록 thread 점유 시간 증대
    (handleResponse 처리 시간 증가)
  • thread per connection 구조에 어울린다고 생각했음

테스트 결과

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 659.3 754.74 0 약 80 ~ 90%
  • 음.. 의외로 잘나옴
  • api-mock은 50ms 로 지연이 크게 없음 / handleResponse() 는 처리 시간 소요가 작음
  • 성능 테스트 환경에 따른 결과일 것으로 추정

 

시도 2. ParallelFlux 처리

설명

  • Fetcher Response를 Flux Stream으로 수신 => Flux<ComponentResponse>
  • 3개의 rail을 생성 (reason: component 3개 고정.. ARTICLE, OFFICE_INFO, OFFICE_HEADLINE)
  • 각 rail parallel 처리 (각 component 가공 및 처리 할당, 1 rail = 1 component)
  • 나름 netty에서 활용하기 적합하다고 생각했음
    public ParallelFlux<Map<String, Object>> parallelAttributes(OneInfo info) {
        Mono<ComponentRequest> request = fetcher.createRequest(info);
        return Flux.from(request) // Flux<ComponentRequest>
                   .flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
                   .parallel(3) // ParallelFlux<ComponentResponse>
                   .runOn(Schedulers.parallel()) // fixed thread pool에서 parallel 실행 
                   .map(received -> handler.handleResponse(received, info)); // ParallelFlux<Map<String, Object>>
    }

예상했던 단점

  • 현재 처리 할 Component는 3개에 불과, 시간이 소요되는 작업도 아닌 상황
  • 따라서 parallel 처리가 효율적이지 않을 수 있음

테스트 결과

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 326.8 1524.91 0 약 40 ~ 50 %
  • tps, cpu usage가 그닥..
  • parallel 처리가 효율적이지 않음.
  • component 처리(handleResponse)에 대한 시간 소요가 거의 없어 single thread로 처리하는 것이 훨씬 높은 성능을 보이는 것으로 추정

 

시도 3. Flux 처리

  • Fetcher Response를 Flux Stream으로 수신 => Flux<ComponentResponse>
  • Component 처리 thread 분리 (subscribing thread 분리)
  • 즉, 각 component 가공 및 처리를 다른 thread-pool에서 실행
    public Flux<Map<String, Object>> fluxAttributes(OneInfo info) {
        Mono<ComponentRequest> request = fetcher.createRequest(info);
        return Flux.from(request) // Flux<ComponentRequest>
                   .flatMap(fetcher::receiveFlux)  // Flux<ComponentReponse>
                   .subscribeOn(Schedulers.parallel())
                   .map(received -> handler.handleResponse(received, info));  
                   // 1 thread(in thread-pool) subscribe
    }

예상했던 단점

  • Component 가공 및 처리가 길어지면 thread 점유 시간 증가
  • But, 현재 처리 할 Component가 적고 시간 소요가 없음
  • 따라서 예상 단점은 크게 없다고 판단

테스트 결과

vm vuser TPS MTT (ms) 에러율 cpu usage
1 500 639.9 740.5 0 약 80 ~ 90%

3개의 시도중에는 가장 이상적? 구조

 

Tomcat

  • netty -> tomcat으로 변경 (webflux는 그대로)
  • 시도 1, 3 기준, tps 는 비슷하나 cpu usage 100%
  • thread usage는 비교할 수 없을듯
  • 성능상의 큰 차이는 없었음
     

얻은 것

  • publisher, operator, subscriber 및 리액티브 프로그래밍에 대한 이해 및 숙련도

끝내며

  • fail over 전략 및 장애 대응에 많은 시간 투자?를 하는것이 굉장히 바람직해 보였다..
  • 너무 programming에 관한 고민한 것이 아닌가
  • 익숙해지기 시작한 새로운 기술에 대해 더 깊게 파보지 못한 아쉬움
  • thymeleaf reactive rendering은 결과가 어떨지 궁금하다..
  • 마지막으로 리액티브를 잘 활용하는 것은 쉽지 않은것 같다
     

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.