Notice
Recent Posts
Recent Comments
Link
«   2024/07   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30 31
Archives
Today
Total
관리 메뉴

개발자일기

Spring mvc NIO + webflux 본문

개발

Spring mvc NIO + webflux

ka0oll 2020. 3. 24. 00:54

ListenableFuture를 이용해서 NIO구현하기

AsyncRestTemplate를 Netty의 NioEventLoopGroup(NIO)를 이용하여 Api 호출

NIO사용시 API호출후 블로킹 않고 호출결과가 도달시 NIO를 통해 event를 전달받는 방식이다.

  • 내부적으론 비블록킹 callback 방식, callback이 NioEventLoopGroup thread로 호출됨

결과적으로는 request, reponse는 웹서버의 쓰레드가 처리

API의 호출은 네티의 NioEventLoopGroup (논블럭킹 방식)으로 네티 쓰레드 사용

@Slf4j
@RestController
@RequestMapping()
public class ListenableFutureController {

    AsyncRestTemplate nioAsyncRestTemplate = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

    @GetMapping("/none-block")
    public ListenableFuture<ResponseEntity<String>> noneBlock() {
        //톰캣 쓰레드
        log.info("requestThread : {}", Thread.currentThread().getName());
        ListenableFuture<ResponseEntity<String>> listenableFuture = nioAsyncRestTemplate.getForEntity("http://localhost:8081/sleep", String.class);
        listenableFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
             @Override
             public void onFailure(Throwable ex) {

             }

             @Override
             public void onSuccess(ResponseEntity<String> result) {
                 // 네티 쓰레드이다. callback까지는 evnetLoop쓰레드가 하고 결과값 셋팅하고 그후로는 tomcat쓰레드가 진행한다.
                 log.info("onSuccessThread : {}", Thread.currentThread().getName());
             }
         });

        //실제 response는 tomcat Thread가 처리한다.
        return listenableFuture;
    }

    @GetMapping("/block")
    public String block() {
        new RestTemplate().getForEntity("http://localhost:8081/sleep", String.class);
        return "test";
    }


    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {

        int count = 100;
        //String url = "http://localhost:8080/block";
        String url = "http://localhost:8080/none-block";
        CyclicBarrier barrier = new CyclicBarrier(count+1);
        ExecutorService executor = Executors.newFixedThreadPool(100);
        IntStream.range(0, count).forEach(value -> executor.submit(() -> {
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

            long startTime = System.currentTimeMillis();
            RestTemplate restTemplate = new RestTemplate();
            restTemplate.getForEntity(url, String.class);

            long diff = System.currentTimeMillis() - startTime;
            System.out.println("latency : " +diff);
        }));

        barrier.await();

        executor.shutdown();
    }

}

WebFlux를 이용한 NIO


webflux 사용자 요청을 NIO을 이용한 event Loop로 작업 처리된다. 실제로는 1개가 아니지만 적은수의 쓰레드로 처리한다.

밑에 로그를 보면
controller의 method는 reactor netty의 쓰레드에서 호출되고 바로 반환한다.
반환된 Mono는 스프링이 구독한다.(NIO를 통해 이벤트를 전달받고 netty의 쓰레드로 클라이언트로 결과 전달한다)

webClient는 서버의 reactor netty를 같이 사용한다.(이벤트 루프 같이 사용)
webClient는 api 요청을 논블러킹 방식으로 호출하고 결과 callback 쓰레드는 webclient thread가 호출하게 된다.

    WebClient webClient = WebClient.create("http://localhost:8081");

    @GetMapping("/mono")
    public Mono<String> webflux(){
        //스프링 네티 이벤트 thread
        log.info("webflux() method call thread : {}", Thread.currentThread().getName());

        //실제 결과는 스프링이 구독 네티 이벤트 thread,
        return webClient.get().uri("/sleep").exchange().log()
            .flatMap(clientResponse -> {
                //webclient의 쓰레드
                log.info("flatMap() method call thread : {}", Thread.currentThread().getName());
                return clientResponse.bodyToMono(String.class);
            })
            .flatMap(s-> webClient.get().uri("/sleep").exchange())
            .flatMap(clientResponse -> {
                //webclient의 쓰레드
                log.info("flatMap() method call thread : {}", Thread.currentThread().getName());
                return clientResponse.bodyToMono(String.class);
            });
    }


    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {

        int count = 50;
        String url = "http://localhost:8080/flux";
        CyclicBarrier barrier = new CyclicBarrier(count+1);
        ExecutorService executor = Executors.newFixedThreadPool(count);
        IntStream.range(0, count).forEach(value -> executor.submit(() -> {
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

            long startTime = System.currentTimeMillis();
            RestTemplate restTemplate = new RestTemplate();
            String body = restTemplate.getForEntity(url, String.class).getBody();

            long diff = System.currentTimeMillis() - startTime;
            System.out.println("latency : " +diff);
        }));

        barrier.await();

        executor.shutdown();
    }

'개발' 카테고리의 다른 글

Effective JAVA Exception처리  (0) 2020.03.30
진정한 Rest Api란 무엇인가?  (0) 2020.03.28
Mockito를 이용한 Unit테스트 Mocking  (0) 2020.03.22
Spring의 핵심 및 기술  (0) 2020.03.21
도커를 이용한 테스트  (0) 2020.03.21
Comments