[SPRING] Spring Reactive Programming
## Reactive Programming
리액티브 프로그래밍은 데이터 스트림과 변경 사항 전파를 중심으로하는 비동기 프로그래밍 패러다임
## Java 비동기
1. Callback 방식
- callback 지옥에 빠질수 있음.
2. Future 방식
- Future를 외부에서 완료시킬 수 없다. (단, 취소하거나, get()에 타임아웃을 설정할 수는 있다.)
- callback 지옥은 피할 수 있으나 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
- Future에서는 return되는 결과값을 가지고 무언가를 하려면 get()이후에만 가능하다.
- 여러 Future 조합을 사용할 수 없다. (ex. Event 정보를 가져온 다음 Event에 참석하는 회원 목록을 가져오기)
- 예외 처리용 API를 제공하지 않는다
3. CompletableFuture
- 지금 당장 필요하든 필요하지 않든 스레드 풀에서 실행됨.
4. LinstenableFuture 등등..
## Reactive Stream
- Reactive Stream은 리액티브 프로그래밍 라이브러리의 표준 사양이며 리액티브 프로그래밍에 대한 인터페이스를 제공
- RxJava, Reactor
- 예시 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
public class ReactiveStreamTest {
public static class PublisherImpl implements Publisher<Integer> {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
Queue<Integer> queue = new LinkedList<>();
IntStream.range(0, 10).forEach(queue::add);
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
System.out.println("request:" + n);
for (int i=0; i<=n; i++){
if(queue.isEmpty()) {
subscriber.onComplete();
return;
}
subscriber.onNext(queue.poll());
}
}
@Override
public void cancel() {
System.out.println("publish cancel");
}
});
}
}
public static class SubscriberImpl implements Subscriber<Integer> {
private Subscription subscription;
private long requestSize = 2;
private List<Integer> buffer = new ArrayList<>();
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(requestSize);
}
@Override
public void onNext(Integer integer) {
System.out.println(" onNext - " + integer);
buffer.add(integer);
if(buffer.size() == requestSize) {
buffer.clear(); //flush
subscription.request(requestSize);
}
}
@Override
public void onError(Throwable t) {
System.out.println("error:" + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("subscribe complete");
}
}
public static void main(String[] args) {
Publisher<Integer> publisher = new PublisherImpl();
publisher.subscribe(new SubscriberImpl());
}
}
|
cs |
- Back pressure
Publisher 에서 발행하고, Subscriber에서 구독할 때, Publisher 에서 데이터를 Subscriber 로 Push 하는 방식이 아니라, Pull 방식으로 Subscriber 가 Publisher 로 처리할 수 있는 양의 크기만큼 데이터를 요청 함으로써 Subscriber의 장애를 방지즉, 다이나믹 풀 방식의 데이터 요청을 통해서 Subscriber가 수용할 수 있는 만큼 데이터를 요청하는 방식
## Spring MVC
- Thread-per-request model
- Servlet 3.0, Servlet 3.1 에서 추가된 비동기, 논블로킹에 대한 기술
## Spring WebFlux
- 구조
- event-loop
- The event loop runs continuously in a single thread, although we can have as many event loops as the number of available cores
- The event loop processes the events from an event queue sequentially and returns immediately after registering the callback with the platform
- The platform can trigger the completion of an operation like a database call or an external service invocation
- The event loop can trigger the callback on the operation completion notification and send back the result to the original caller
성능 테스트
1. ngrinder (local)
2. Apple M1 Pro (local)
- WebFlux
- MVC
## 결론 및 내 생각
참조
https://hyunsoori.tistory.com/17
https://umbum.dev/1080
https://okky.kr/article/668901
https://www.baeldung.com/spring-webflux-concurrency
http://www.kyobobook.co.kr/product/detailViewKor.laf?mallGb=KOR&ejkGb=KOR&barcode=9791158391591
https://www.youtube.com/watch?v=8fenTR3KOJo&list=PLv-xDnFD-nnmof-yoZQN8Fs2kVljIuFyC&index=11