IT/Spring

Reactive Web Application은 언제 subscribe를 할까?

물통꿀꿀이 2020. 11. 27. 01:36

Webflux를 사용하다보면, 궁금함이 생긴다.

아래 Controller를 보면,

@RestController
class CustomerController {
    @Autowired
    private lateinit var customerService: CustomerService

    @GetMapping(value = ["/customer/{id}"])
    fun getCustomer(@PathVariable id: Int): Mono<Customer> {
        return customerService.getCustomer(id)
    }

getCustomer의 리턴 타입이 Mono이다. 즉, 리턴으로 Reactor를 반환하는데 여기서 궁금증이 생긴다.

도대체 누가? 언제? subscribe를 하는 것일까?

 

일반적으로 Mono를 사용하게 되면, 최종적으로 subscribe (또는 block 등등)을 chaining 하면서 데이터를 받아온다.

그런데 Spring Controller 부분은 이후 우리가 Reactor에 대해 추가로 코드를 작성할 여지가 없다.

하지만 Postman이나 웹 등을 이용해서 테스트 해보면, Customer의 값을 확인 할 수 있다. 

도대체 어디서??

 

구글링 중 아래의 글에서 대략 궁금증을 해결 할 수 있었다.

stackoverflow.com/questions/56487429/who-calls-subscribe-on-flux-or-mono-in-reactive-webapplication

 

who calls subscribe on Flux or Mono in reactive webapplication

I am looking at some examples of reactive web applications and i am seeing them like this @RequestMapping(value = "/{id}", method = RequestMethod.GET) @ResponseBody public Mono

stackoverflow.com

풀어보면, Spring Webflux 프레임워크 내의 클래스(Internal Class)가 Mono/Flux에 subscribe를 하여 결과값을 HTTP Packet에 넣어준다는 것이다. 그리고 실제 subscribe하는 코드는 아래와 같다.

public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException {
	// Check for existing error attribute first
	if (DispatcherType.ASYNC.equals(request.getDispatcherType())) {
		Throwable ex = (Throwable) request.getAttribute(WRITE_ERROR_ATTRIBUTE_NAME);
		throw new ServletException("Failed to create response content", ex);
	}

	// Start async before Read/WriteListener registration
	AsyncContext asyncContext = request.startAsync();
	asyncContext.setTimeout(-1);

	ServletServerHttpRequest httpRequest;
	try {
		httpRequest = createRequest(((HttpServletRequest) request), asyncContext);
	}
	catch (URISyntaxException ex) {
		if (logger.isDebugEnabled()) {
			logger.debug("Failed to get request  URL: " + ex.getMessage());
		}
		((HttpServletResponse) response).setStatus(400);
		asyncContext.complete();
		return;
	}

	ServerHttpResponse httpResponse = createResponse(((HttpServletResponse) response), asyncContext, httpRequest);
	if (httpRequest.getMethod() == HttpMethod.HEAD) {
		httpResponse = new HttpHeadResponseDecorator(httpResponse);
	}
    
	AtomicBoolean isCompleted = new AtomicBoolean();
	HandlerResultAsyncListener listener = new HandlerResultAsyncListener(isCompleted, httpRequest);
	asyncContext.addListener(listener);

	HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext, isCompleted, httpRequest);
	this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber);
}

docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.html#service-javax.servlet.ServletRequest-javax.servlet.ServletResponse-

위 문서처럼 ServletHttpHandlerAdapter 에 존재하는데 코드 상 마지막 라인을 보면 subscribe를 하는 것을 알 수 있다.

그렇기에 Controller 부분에서 Reactor의 반환 값을 가지면 자연스럽게 데이터만 클라이언트에게 전달되는 것이다.

(물론 Reactor에서 onComplete 혹은 onError를 통해 데이터를 얻을 때까지 대기가 필요하지만...)