phantasmicmeans / reactive-streams-spring-boot Goto Github PK
View Code? Open in Web Editor NEWreactive programming
reactive programming
관찰자 패턴은 관찰자라고 불리는 자손의 리스트를 가지고 있는 주체(Subject) 를 필요로한다. 주체는 일반적으로 자신의 메서드 중 하나(e.g. notify)를 호출해 관찰자에게 상태 변경을 알리게 된다.
객체 사이에 일대 다의 의존 관계를 정의해두어, 어떤 객체의 상태가 변할 때 그 객체의 의존성을 가진 다른 객체들이 변화를 통지받고 자동으로 갱신될 수 있게 만든다.
이 패턴은 이벤트 처리 기반 시스템에 필수적
e.g. MVC 패턴, 거의 모든 UI 라이브러리가 내부적으로 이 패턴 사용
Subject - Observer 는 1:N 관계를 이룰 수 있음.
Subject
|- Observer
|- Observer
|- Observer
|- Observer
일반적인 옵저버 패턴은 Subjec와 Observer 2개의 인터페이스로 구성된다. Observer는 Subject에 등록되고 Subject로 부터 알림을 하게 되는데 Subject 스스로 이벤트를 발생시키거나 다른 구성요소에 의해 호출될 수 있다.
Subject / Observer interface 명세
public interface Subject<T> {
void registerObserver(Observer<T> observer);
void unregisterObserver(Observer<T> observer);
void notifyObservers(T event);
}
Observer는 이벤트를 처리하는 observe method 만 존재한다.
public interface Observer<T> {
void observe(T event);
}
Observer, Subject 모두 인터페이스에 기술된 것 이상에 대해서는 서로 알지 못한다.
단순하게 Subject interface 를 구현한 ConcreteSuject를 보자.
ConcreteSubject (주체)
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
public class ConcreteSubject implements Subject<String> {
/**
* multi-thread 안정성 유지를 위해 업데이트시마다 새 복사본을 생성하는 Set 구현체 사용
* 복사 비용은 큼 -> 구독자 목록 변경은 거의 없음.
*/
private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();
@Override
public void registerObserver(Observer<String> observer) {
observers.add(observer); // register observer
}
@Override
public void unregisterObserver(Observer<String> observer) {
observers.remove(observer); // remove observer
}
@Override
public void notifyObservers(String event) {
observers.forEach(observers -> observers.observe(event)); // notify observer
}
}
그리고 Observer interface 를 구현한 ConcreteObserverA, B 클래스를 생성하자. 이들은 단순히 브로드캐스트 받은 이벤트 처리용 메소드인 observe(event) method를 구현한다.
public class ConcreteObserverA implements Observer<String> {
@Override
public void observe(String event) {
System.out.println("Observer A:" + event);
}
}
public class ConcreteObserverB implements Observer<String> {
@Override
public void observe(String event) {
System.out.println("Observer B:" + event);
}
}
단순히 observer 들을 등록 / 해지할 register, unregister method와, 모든 observer 에게 이벤트 브로드캐스트용 notifyObserver method를 포함한다.
테스트 코드는 아래와 같다.
@Test
public void observerHandleEventsFromSubject() {
// given
Subject<String> subject = new ConcreteSubject();
Observer<String> observerA = new ConcreteObserverA();
Observer<String> observerB = new ConcreteObserverB();
// when
subject.notifyObservers("no subscriber!");
subject.registerObserver(observerA);
subject.registerObserver(observerB);
subject.notifyObservers("event !");
subject.unregisterObserver(observerB);
subject.notifyObservers("event for A");
subject.unregisterObserver(observerA);
subject.notifyObservers("no subscriber");
}
또한 아래처럼 람다로 observer를 작성해도 된다.
@Test
public void subjectLeverageLambdas() {
Subject<String> subject = new ConcreteSubject();
subject.registerObserver(new Observer<String>() {
@Override
public void observe(String event) {
System.out.println("A : " + event);
}
});
subject.registerObserver(event -> System.out.println("B : " + event));
subject.notifyObservers("event start for A & B");
}
ConcreteSubject는 주 스레드에서 observer들에게 이벤트 브로드캐스팅을 하고 있다. 따라서 이벤트 처리 시간이 긴 Observer 존재시 처리 시간을 늘어나게 된다.
아래 예시는 A Observer 의 경우 이벤트 처리를 위해 3s 이상의 시간을 필요로하고, B Observer 의 경우 이벤트 처리 시간이 빠른 경우를 나타낸다.
@Test
public void subjectThreadTest() {
Subject<String> subject = new ConcreteSubject();
subject.registerObserver(event -> { // A observer 는 이벤트 처리시간이 길다.
try {
Thread.sleep(3000);
} catch (Exception e) {
}
System.out.println(Thread.currentThread().getName());
System.out.println("A: " + event);
});
subject.registerObserver(event -> {
System.out.println(Thread.currentThread().getName());
System.out.println("B: " + event);
});
subject.notifyObservers("event!");
try {
Thread.sleep(3100);
} catch (Exception e) {
}
}
이벤트 처리시간이 상당히 긴 Observer가 존재할 수 있으므로 Observer 가 많을 경우 추가적으로 스레드풀을 할당해야한다. (실제 구현시는 검증된 라이브러리를 사용하자)
병렬로 이벤트를 브로드캐스팅 하기 위한 ConcreteSubject 및 테스트 코드는 아래와 같다.
public class ConcreteParallelSubject implements Subject<String> {
private final ExecutorService ex;
public ConcreteParallelSubject(ExecutorService ex){
this.ex = ex;
}
/**
* multi-thread 안정성 유지를 위해 업데이트시마다 새 복사본을 생성하는 Set 구현체 사용
* 복사 비용은 큼 -> 구독자 목록 변경은 거의 없음.
*/
private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();
@Override
public void registerObserver(Observer<String> observer) {
observers.add(observer);
}
@Override
public void unregisterObserver(Observer<String> observer) {
observers.remove(observer);
}
@Override
public void notifyObservers(String event) {
observers.forEach(observers -> {
ex.submit(() -> observers.observe(event));
});
}
}
@Test
public void subjectParallelTest() throws InterruptedException {
final ExecutorService ex = Executors.newFixedThreadPool(10);
Subject<String> parallelSubject = new ConcreteParallelSubject(ex);
parallelSubject.registerObserver(event -> { // 이벤트 처리 시간이 길다.
try {
Thread.sleep(3000);
} catch (Exception e) {
}
System.out.println(Thread.currentThread().getName()); // thread name
System.out.println("A : " + event);
});
parallelSubject.registerObserver(event -> {
System.out.println(Thread.currentThread().getName());
System.out.println("B : " + event);
});
parallelSubject.notifyObservers("event!");
ex.awaitTermination(3100, TimeUnit.MILLISECONDS); // 모든 이벤트가 종료될때까지 기다린다.
}
여기까지가 옵저버 패턴에 대한 설명과 간단한 구현이다.
옵저버 패턴과 Publisher-subscriber (발행 -구독)패턴은 약간 다르다. 발행-구독 패턴은 '이벤트 채널'(메시지 브로커 or 이벤트 버스) 이라는 간접적인 계층을 하나 더 포함한다. 구독자는 이벤트를 브로드캐스트하는 '이벤트 채널'을 알고 있지만 이벤트 게시자가 누구인지는 신경쓰지 않는다.
e.g. 토픽 기반 시스템(카프카)
여기서 말하는 core는 physical cpu cores 이다. (core * 2)는 hyper threading 되어진 logical cpu cores를 말한다.
public ParallelFlux<Map<String, Object>> parallelAttributes(OneInfo info) {
Mono<ComponentRequest> request = fetcher.createRequest(info);
return Flux.from(request) // Flux<ComponentRequest>
.flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
.parallel(core * 2) // ParallelFlux<ComponentResponse>
.runOn(Schedulers.parallel())
.map(received -> handler.handleResponse(received, info)));
}
core * 2개의 thread를 활용해 fetcher::receiveFlux() 가 리턴하는 Flux를 병렬로 처리하고자 하였다.
결론적으로 thread pool을 활용해 publishing 되어지는 각 ComponentResponse를 handleResponse에 넘겨 병렬 처리 하고, core를 잘 활용하며 reactive 하게 처리하는 것이 의도였다.
위와 같은 코드로 부하 테스트를 실행하니 아래와 같은 결과가 나왔다.
vm | vuser | TPS | MTT (ms) | 에러율 |
---|---|---|---|---|
1 | 50 | 258 | 193.64 | 0 |
1 | 100 | 258.1 | 387.17 | 0 |
1 | 300 | 255.2 | 1172.4 | 0 |
1 | 500 | 251.5 | 1981.41 | 0 |
cpu usage는 약 30%대를 유지했다.
이 코드를 작성하기 전, 위 과정을 모두 Mono로 처리하는 코드를 작성 했었고 이는 아래와 같다.
public Mono<Map<String, Object>> attributes(OneInfo info) {
Mono<ComponentRequest> request = fetcher.createRequest(info);
return Mono.from(request)
.flatMap(fetcher::receiveMono) // Mono<List<ComponentResponse>>
.map(received -> handler.handleResponse(received, info));
}
모두 Mono로 처리하였고 사실상 reactive 하지 않다. netty를 사용하기에 적합한 코드이지도 않고 결국 thread 1개가 모두 작업을 처리하게 된다. 또한 handleResponse는 List를 받아 for문으로 이를 처리한다.
별 기대없이 Mono 처리에 대해 부하 테스트를 하였다.
vm | vuser | TPS | MTT (ms) | 에러율 |
---|---|---|---|---|
1 | 50 | 296.6 | 168.36 | 0 |
1 | 100 | 394.8 | 252.92 | 0 |
1 | 300 | 526.5 | 565.14 | 0 |
1 | 500 | 659.3 | 754.74 | 0 |
결과가 생각지도 못한 방향으로 나왔다. 후자의 경우 최대 2배 더 좋은 결과를 보이고 있었다. 또한 cpu usage는 80 ~ 90%에 달했다.
분명 이유가 있을것이다. 먼저 ParallelFlux.parallel() 부터 tracking 해보았다.
/**
* Prepare this {@link Flux} by dividing data on a number of 'rails' matching the
* provided {@code parallelism} parameter, in a round-robin fashion. Note that to
* actually perform the work in parallel, you should call {@link ParallelFlux#runOn(Scheduler)}
* afterward.
*
* <p>
* <img class="marble" src="doc-files/marbles/parallel.svg" alt="">
*
* @param parallelism the number of parallel rails
*
* @return a new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> parallel(int parallelism) {
return parallel(parallelism, Queues.SMALL_BUFFER_SIZE);
}
parallel(int parallelism)은 publishing 되어지는 data를 round-robin으로 parallelism 개의 'rail'로 나눈다.
parallel(int parallelism)은 여러 rail을 가지는 ParallelFlux를 반환하며, 실제 parallel 하게 실행하려면 ParallelFlux.runOn(Scheduler)의 파라미터로 병렬 처리가 가능한 scheduler를 넘겨야한다.
그럼 ParallelFlux.runOn(Scheduler)를 살펴보자.
/**
* Specifies where each 'rail' will observe its incoming values with no work-stealing
* and default prefetch amount.
* <p>
* This operator uses the default prefetch size returned by {@code
* Queues.SMALL_BUFFER_SIZE}.
* ----- 중략 ------
* @param scheduler the scheduler to use
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> runOn(Scheduler scheduler) {
return runOn(scheduler, Queues.SMALL_BUFFER_SIZE);
}
This operator uses the default prefetch size returned by {@code Queues.SMALL_BUFFER_SIZE} 라고 한다. 한 레일당 Queues.SMALL_BUFFER_SIZE 만큼의 size를 가진다.
Queues.SMALL_BUFFER_SIZE는 아래와 같다.
/**
* A small default of available slots in a given container, compromise between intensive pipelines, small
* subscribers numbers and memory use.
*/
public static final int SMALL_BUFFER_SIZE = Math.max(16,
Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));
reactor.bufferSize.small을 지정하지 않았으므로 각 rail당 16개의 size를 가질 수 있다.
사실 지금 tracking 하고자 하는 것은 각 rail의 size가 아니다.
코드를 다시 살펴보자
return Flux.from(request) // Flux<ComponentRequest>
.flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
.parallel(core * 2) // ParallelFlux<ComponentResponse>
.runOn(Schedulers.parallel())
.map(received -> handler.handleResponse(received, info)));
fetcher::receiverFlux로 부터 publishing 되어지는 data 들을 (core * 2)개의 rail로 나누고, Schedulers.parallel()에 의해 병렬로 실행된다. Schedulers.parallel()은 고정 개수의 thread pool을 제공하고 default 개수는 (core * 2)이다.
뭔가 아차 싶다..
publishing 되어지는 data를 (core * 2)개의 thread pool이 잘 나누어 처리하는 것이 아니라, 필수적으로 (core * 2)개의 rail을 생성하고 이를 thread pool이 나누어 실행하게 될 것이다.
(core * 2)개 이상의 data가 publishing 되는 경우, (core * 2)개의 rail이 생성 되는것은 어찌보면 자연스러울 수? 있다. 문제는 publishing 되어지는 data가 (core * 2)개 이하임에도 불구하고 rail이 모두 생성될 수 있다는 점이다.
실제 테스트를 해보았다.
return Flux.from(request) // Flux<ComponentRequest>
.flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
.parallel(core * 2) // ParallelFlux<ComponentResponse>
.runOn(Schedulers.parallel())
.log()
.map(received -> handler.handleResponse(received, info))
.doOnComplete(() -> log.info("complete"));
rail 관련 thread 확인을 위해 5번째 라인에 log(), 각 rail에 대한 처리가 완료 될 경우 "complete"를 로깅했다.
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ctor-http-nio-2] reactor.Parallel.RunOn.1 : request(unbounded)
[ parallel-1] reactor.Parallel.RunOn.1 : onNext(com.naver.media.one.model.ComponentResponse@517119d3)
[ parallel-2] reactor.Parallel.RunOn.1 : onNext(com.naver.media.one.model.ComponentResponse@18b71d5)
[ parallel-3] reactor.Parallel.RunOn.1 : onNext(com.naver.media.one.model.ComponentResponse@70ef9467)
[ parallel-2] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-10] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-6] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-9] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-7] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-5] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-10] com.naver.media.one.OneService : complete
[ parallel-8] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-7] com.naver.media.one.OneService : complete
[ parallel-1] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-4] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-12] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-11] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-1] com.naver.media.one.OneService : complete
[ parallel-2] com.naver.media.one.OneService : complete
[ parallel-4] com.naver.media.one.OneService : complete
[ parallel-9] com.naver.media.one.OneService : complete
[ parallel-11] com.naver.media.one.OneService : complete
[ parallel-6] com.naver.media.one.OneService : complete
[ parallel-5] com.naver.media.one.OneService : complete
[ parallel-8] com.naver.media.one.OneService : complete
[ parallel-12] com.naver.media.one.OneService : complete
[ parallel-3] reactor.Parallel.RunOn.1 : onComplete()
[ parallel-3] com.naver.media.one.OneService : complete
12개의 rail이 생성 되었고, 12개의 thread가 이를 처리하고 있었다. publishing 되어지는 data는 3개 뿐임에도 말이다. 실제로 onNext는 3번 호출된다.
parallel-1, 2, 3은 onNext를 통해 publishing 되어지는 data를 처리한다. 그러나 parallel-4 ~ 12는 subscribe 이후 rail 이 비어있으므로 곧바로 onComplete()를 호출하고 이후 "complete" 한다.
Empty rail을 위해 순간적으로 12개의 모든 core를 점유 할 수 있는 상황이고, 이 부분이 가장 큰 병목지점일 것이라 생각했다.
사실 해결법은 간단하다. fetcher::receiveFlux()의 size만큼 혹은 작은 단위로 쪼개어 rail을 생성하면 된다. 이를 통해 불필요한 rail 생성 및 그에 따른 core 점유 상황을 피할 수 있다.
현재는 receiveFlux() 로 부터 publishing 되어지는 data는 3개 고정이다. 그러므로 3개의 rail을 만든 뒤 (core * 2)개의 thread pool을 이용해 처리해보자.
이는 항상 3개의 rail을 만들고 이후 thread pool 내에서 처리 될 것이다.
public ParallelFlux<Map<String, Object>> parallelAttributes(OneInfo info) {
Mono<ComponentRequest> request = fetcher.createRequest(info);
return Flux.from(request)
.flatMap(fetcher::receiveFlux)
.parallel(3)
.runOn(Schedulers.parallel())
.map(received -> handler.handleResponse(received, info));
}
vm | vuser | TPS | MTT (ms) | 에러율 |
---|---|---|---|---|
1 | 500 | 251.5 | 1981.41 | 0 |
vm | vuser | TPS | MTT (ms) | 에러율 |
---|---|---|---|---|
1 | 500 | 326.8 | 1524.91 | 0 |
cpu usage는 40%대를 유지했다.
처음 보다 조금 높게 측정 되었으나.. cpu usage는 여전히 낮다. 또한 Mono 처리에는 미치지 못했다.
handleResponse()에 별다른 로직이 없고, publishing data 또한 적어 병렬 처리 이점이 현재는 드러나지 않는 것 같다.
이번에는 병렬 처리를 제외하고 subscribeOn 스케줄링을 통해 subscibing thread를 교체하는 전략을 구성했다.
Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.
Note that if you are using an eager or blocking create(Consumer, FluxSink.OverflowStrategy) as the source, it can lead to deadlocks due to requests piling up behind the emitter. In such case, you should call subscribeOn(scheduler, false) instead.
subscribeOn은 위와 같은 경우에 사용하길 권장한다.
Typically used for slow publisher e.g., blocking IO, fast consumer(s) scenarios.
flux.subscribeOn(Schedulers.single()).subscribe()
앞서 적용했던 ParallelFlux와의 공통점은 모두 thread pool 을 통해 subscribing을 진행 한다는 점이다.
ParallelFlux와 subscribeOn의 차이점은 다음과 같다. ParallelFlux는 각 rail에 대한 처리를 각기 다른 thread가 진행한다(thread pool).
이에 따라 각 thread가 rail 별 onNext/onError/onComplete에 대한 처리를 진행하는 모습을 위에서 확인할 수 있었다.
그러나 subscribeOn는 publishing thread와 subscribing thread를 분리할 뿐 어떠한 rail을 생성하진 않는다.
즉 ParallelFlux에서 onNext/onComplete는 parallel-1, 2, 3에서 각 1번씩 실행 되었지만, subscribeOn의 경우 onNext/onComplete대해 publishing thread가 아닌 다른 하나의 thread가 onNext/onComplete를 3번 실행할 것이다.
public Flux<Map<String, Object>> fluxAttributes(OneInfo info) {
Mono<ComponentRequest> request = fetcher.createRequest(info);
return Flux.from(request)
.flatMap(fetcher::receiveFlux)
.subscribeOn(Schedulers.parallel())
.map(received -> handler.handleResponse(received, info));
}
fetcher::receiveFlux()는 약 50ms의 I/O 작업을 수행하고 있고 그에 대한 결과를 Flux로 리턴한다. 이 Flux를 subcribing 하는 경우 onNext/onError/onComplete는 다른 thread에서 실행되어야 한다.
그럼 확인 해보자
[ctor-http-nio-2] reactor.Flux.Map.2 : onSubscribe(FluxMap.MapSubscriber)
[ctor-http-nio-2] reactor.Flux.Map.2 : request(unbounded)
[ctor-http-nio-4] reactor.Flux.Map.2 : onNext({ARTICLE={request={type=ARTICLE, .. 중략 .. })
[ctor-http-nio-4] reactor.Flux.Map.2 : onNext({OFFICE_INFO={request={type=OFFICE_INFO, .. 중략 .. })
[ctor-http-nio-4] reactor.Flux.Map.2 : onNext({OFFICE_HEADLINE={request={type=OFFICE_HEADLINE, .. 중략 .. })
[ctor-http-nio-4] reactor.Flux.Map.2 : onComplete()
예상 대로 onNext/onComplete는 reactor-http-nio-4 thread에서 분리실행 되는 모습을 확인했다.
성능 테스트 결과는 아래와 같고, ParallelFlux 처리보다 높은 성능을 볼 수 있었다.
vm | vuser | TPS | MTT (ms) | 에러율 |
---|---|---|---|---|
1 | 500 | 639.9 | 740.5 | 0 |
최종적으로 Scheduling 변경이 가장 높은 tps 및 mtt를 보였다.
vm | vuser | TPS | MTT (ms) | 에러율 | cpu usage |
---|---|---|---|---|---|
1 | 500 | 251.5 | 1981.41 | 0 | 약 30 ~ 40 % |
vm | vuser | TPS | MTT (ms) | 에러율 | cpu usage |
---|---|---|---|---|---|
1 | 500 | 326.8 | 1524.91 | 0 | 약 40 ~ 50 % |
vm | vuser | TPS | MTT (ms) | 에러율 | cpu usage |
---|---|---|---|---|---|
1 | 500 | 659.3 | 754.74 | 0 | 약 80 ~ 90% |
vm | vuser | TPS | MTT (ms) | 에러율 | cpu usage |
---|---|---|---|---|---|
1 | 500 | 639.9 | 740.5 | 0 | 약 80 ~ 90% |
Application
사내 Infra
개발 및 테스트, 배포까지 모든 과정을 거쳐 고민 했던 사항 및 정리
1. one-body
public enum OneBody {
/** ARTICLE **/
ARTICLE("ARTICLE", Arrays.asList(OneTail.ARTICLE, OneTail.OFFICE_INFO)),
ARTICLE_COMMENT("ARTICLE_COMMENT", Arrays.asList(OneTail.ARTICLE_COMMENT)),
/** OFFICE **/
OFFICE_INFO("OFFICE_INFO", Arrays.asList(OneTail.OFFICE_INFO)),
OFFICE_HEADLINE("OFFICE_HEADLINE", Arrays.asList(OneTail.OFFICE_HEADLINE)),
OFFICE_RANKING("OFFICE_RANKING", OneTail.getAllDependencies("OFFICE_RANKING"));
... 생략 ...
}
2. one-tail
public enum OneTail {
/** ARTICLE **/
ARTICLE("ARTICLE"),
ARTICLE_NEWS("ARTICLE_NEWS"),
ARTICLE_COMMENT("ARTICLE_COMMENT"),
/** OFFICE **/
OFFICE_INFO("OFFICE_INFO"),
OFFICE_HEADLINE("OFFICE_HEADLINE"),
OFFICE_RANKING_VIEW("OFFICE_RANKING_VIEW"),
OFFICE_RANKING_ALL_AGE10("OFFICE_RANKING_ALL_AGE10"),
OFFICE_RANKING_ALL_AGE20("OFFICE_RANKING_ALL_AGE20"),
OFFICE_RANKING_ALL_AGE30("OFFICE_RANKING_ALL_AGE30"),
OFFICE_RANKING_ALL_AGE40("OFFICE_RANKING_ALL_AGE40"),
OFFICE_RANKING_ALL_AGE50("OFFICE_RANKING_ALL_AGE50"),
OFFICE_RANKING_ALL_AGE60("OFFICE_RANKING_ALL_AGE60");
... 생략 ...
}
One-Service Flow는 간략하게 설명하면 다음과 같다.
이 과정을 최대한 리액티브하게 처리 하는것이 중요하다고 생각했음
설명
public Mono<Map<String, Object>> attributes(OneInfo info) {
Mono<ComponentRequest> request = fetcher.createRequest(info);
return Mono.from(request)
.flatMap(fetcher::receiveMono) // Mono<List<ComponentResponse>>
.map(received -> handler.handleResponse(received, info)); // Mono<Map<String, Object>>
}
예상했던 단점
테스트 결과
vm | vuser | TPS | MTT (ms) | 에러율 | cpu usage |
---|---|---|---|---|---|
1 | 500 | 659.3 | 754.74 | 0 | 약 80 ~ 90% |
설명
Flux<ComponentResponse>
public ParallelFlux<Map<String, Object>> parallelAttributes(OneInfo info) {
Mono<ComponentRequest> request = fetcher.createRequest(info);
return Flux.from(request) // Flux<ComponentRequest>
.flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
.parallel(3) // ParallelFlux<ComponentResponse>
.runOn(Schedulers.parallel()) // fixed thread pool에서 parallel 실행
.map(received -> handler.handleResponse(received, info)); // ParallelFlux<Map<String, Object>>
}
예상했던 단점
테스트 결과
vm | vuser | TPS | MTT (ms) | 에러율 | cpu usage |
---|---|---|---|---|---|
1 | 500 | 326.8 | 1524.91 | 0 | 약 40 ~ 50 % |
Flux<ComponentResponse>
public Flux<Map<String, Object>> fluxAttributes(OneInfo info) {
Mono<ComponentRequest> request = fetcher.createRequest(info);
return Flux.from(request) // Flux<ComponentRequest>
.flatMap(fetcher::receiveFlux) // Flux<ComponentReponse>
.subscribeOn(Schedulers.parallel())
.map(received -> handler.handleResponse(received, info));
// 1 thread(in thread-pool) subscribe
}
예상했던 단점
테스트 결과
vm | vuser | TPS | MTT (ms) | 에러율 | cpu usage |
---|---|---|---|---|---|
1 | 500 | 639.9 | 740.5 | 0 | 약 80 ~ 90% |
3개의 시도중에는 가장 이상적? 구조
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.