ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Part3] Java8 In action - Chapter11 - 1
    Java8 In Action 2022. 8. 9. 13:53
    반응형

    해당 내용은 Java8 In Action 책을 요약 및 정리한 내용입니다.

    좀 더 자세히 알고 싶으신 분들은 책을 사서 읽어보심을 추천드립니다.!

    11.1 Future

    Main

    package Part3.Chapter11.Chapter11_11_1;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    import java.util.stream.IntStream;
    
    /*
     * 11.1 Future
     * 
     * 자바 5부터는 미래의 어느 시점에 결과를 얻는 모델에 활용할 수 있도록 Future 인터페이스를 제공하고 있다.
     * 비동기 계산을 모델링하는데 Future를 이용할 수 있으며, Future는 계산이 끝났을 때 결과에 접근할 수 있는 레퍼런스를 제공한다.
     * 시간이 걸릴수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기달리는 동안 다른 유용한 작업을 수행할 수 있다.
     * Futhre는 저수준의 스레드에 비해 직관적으로 이해하기 쉽다는 장점이 있다.
     * Future를 이용하려면 시간이 오래 걸리는 작업을 Callable 객체 내부로 감싼 다음에 ExecutorService에 제출해야 한다. 
     */
    public class Main_11_1 {
    
        public static void main(String[] args) {
            /*
             * ExecutorService에서 제공하는 스레드가 시간이 오래 걸리는 작업을 처리하는 동안 우리 스레드로 다른 작업을 동시에 실행할 수 있다.
             * 다른 작업을 처리하다가 시간이 오래 걸리는 작업의 결과가 필요한 시점이 되었을 때 Future의 get 메서드로 결과를 가져올 수 있다.
             * get 메서드를 호출했을 때 이미 계산이 완료되어 결과가 준비되었다면 즉시 결과를 반환하지만 결과가 준비되지 않았다면
             * 작업이 완료될 때까지 우리 스레드를 블록시킨다.
             */
            System.out.println(new examFuture().futureExecute());
    
            /*
             * 11.1.1 Future 제한
             * 
             * 여러 Future의 결과가 있을 때 이들의 의존성은 표현하기가 어렵다.
             * 즉 '오래 걸리는 A라는 계산이 끝나면 그 결과를 다른 오래 걸리는 계산 B로 전달하시오. 그리고 B의 결과가 나오면 다른 질의의 결과와
             * B의 결과를 조합하시오'와 같은 요구사항을 쉽게 구현할 수 있어야 한다.
             * 
             * 따라서 다음과 같은 선언형 기능이 필요하다.
             * - 두 개의 비동기 계산 결과를 하나로 합친다. 두 가지 결과 계산은 서로 독립적일 수 있으며 또는 두 번째 결과가 첫 번째 결과에
             * 의존하는 상황일 수 있다.
             * 
             * - Future 집합이 실행하는 모든 태스크의 완료를 기다린다.
             * 
             * - Future 집합에서 가장 빨리 완료되는 태스크를 기달렸다가 결과를 얻는다.
             * (여러 태스크가 다양한 방식으로 같은 결과를 구하는 상황.)
             * 
             * - 프로그램적으로 Future를 완료시킨다.(비동기 동작에 수동으로 결과 제공.)
             * 
             * - Future 완료 동작에 반응한다.
             * (결과를 기다리면서 블록되지 않고 결과가 준비되었다는 알림을 받은 다음에 Future의 결과로 원하는 추가 동작을 수행할 수 있음.)
             * 
             * 지금까지 설명한 기능을 선언형으로 이용할 수 있도록 자바 8에서 새로 제공하는 CompletableFuture 클래스(Future 인터페이스 구현체)
             * 가 있다. Stream과 CompletableFuture는 비슷한 패턴이다. 즉 람다 표현식과 파이프라이닝을 활용한다.
             * 따라서 Future와 CompletableFuture의 관계를 Collection과 Stream의 관계에 비유할 수 있다. 
             */
    
            /*
             * 11.1.2 CompletbleFuture로 비동기 애플리케이션 만들기
             * 
             * 어떤 제품이나 서비스를 이용해야 하는 상황의 가정에서 예산을 줄일 수 있도록 여러 온라인상점 중 가장 저렴한 가격을 제시하는 상점을
             * 찾는 애플리케이션을 만든다.
             * 애플리케이션을 만드는 동안 다음과 같은 기술을 배운다.
             * 
             * 첫째 고객에게 비동기 API를 제공하는 방법을 배운다.
             * 
             * 둘째 동기 API를 사용해야 할 때 코드를 비블록으로 만드는 방법을 배운다.
             * 두 개의 비동기 동작을 파이프라인으로 만드는 방법과 두 개의 동작 결과를 하나의 비동기 계산으로 합치는 방법을 배운다.
             * 예를 들어 온라인상점에서 우리가 사려는 물건에 대응하는 할인 코드를 반환한다고 가정하자. 우리는 다른 원격 할인 서비스에
             * 접근해서 할인 코드에 해당하는 할인율을 찾아야한다. 그래야 원래 가격에 할인율을 적용해서 최종 결과를 계산할 수 있다.
             * 
             * 셋째 비동기 동작의 완료에 대응하는 방법을 배운다.
             * 모든 상점에서 가격 정보를 얻을 때까지 기다리는 것이 아니라 각 상점에서 가격 정보를 얻을 때마다 즉시 최저가격을 찾는
             * 애플리케이션을 갱신하는 방법을 설명한다.(그렇지 않으면 서버 다운 등 문제가 발생했을 때 사용자에게 검은 화면만 보여주게 된다.)
             */
    
            /*
             * 동기 API와 비동기 API
             * 
             * 동기 API
             * 전통적인 동기 API는 메서드를 호출한 다음에 메서드의 계산을 완료될 때까지 기다린 다음 메서드가 반환되면 호출자는 반환된 값으로
             * 계속 다른 동작을 수행한다.
             * 호출자와 피호출자가 각각 다른 스레드에서 실행되는 상황이었더라도 호출자는 피호출자의 동작 완료를 기달린다.
             * 이처럼 동기 API를 사용하는 상황을 '블록 호출(blocking call)' 이라고 한다.
             * 
             * 비동기 API
             * 메서드가 즉시 반환되며 끝내지 못한 나머지 작업을 호출자 스레드와 동기적으로 실행될 수 있도록 다른 스레드에 할당한다.
             * 이와 같은 비동기 API를 사용하는 상황을 '비블록 호출(non-blocking call)'이라고 한다.
             * 다른 스레드에 할당된 나머지 계산 결과는 콜백 메서드를 호출해서 전달하거나 호출자가 '계산 결과가 끝날 때까지 기달림' 메서드를
             * 추가로 호출하면서 전달된다.
             * 주로 I/O 시스템 프로그래밍에서 이와 같은 방식으로 동작을 수행하며 즉, 계산 동작을 수행하는 동안 비동기적으로 디스크 접근을 수행한다.
             * 그리고 더 이상 수행할 동작이 없으면 디스크 블록이 메모리로 로딩될 때까지 기다린다. 
             */
        }
    }
    
    class examFuture {
    
        public Double futureExecute() {
            Double result = new Double(0);
    
            // 스레드 풀에 태스크를 제출하려면 ExecutorService를 생성.
            ExecutorService executor = Executors.newCachedThreadPool();
    
            // Callable을 ExecutorService로 제출.
            Future<Double> future = executor.submit(new Callable<Double>() {
                public Double call() {
                    // 시간이 오래 걸리는 작업은 다른 스레드에서 비동기적으로 실행.
                    return doSomeLongComputation();
                }
            });
    
            // 비동기 작업을 수행하는 동안 다른 작업을 실행.
            this.doSomeThingElse();
    
            try {
    
                /*
                 * 비동기 작업의 결과를 가져온다. 결과가 준비되어 있지 않으면 호출 스레드가 블록된다.
                 * 최대 1초까지만 기달린다.
                 * 
                 * get 메서드 호출 시 우리 스레드가 블록이 되기 때문에 오래 걸리는 작업이 영원히 끝나지 않으면 
                 * 작업이 끝나지 않는 문제가 생길 수 있으므로 get 메서드를 오버로드해서 우리 스레드가 대기할 최대 타임아웃 시간을
                 * 설정하는 것이 좋다.
                 */
                result = future.get(1, TimeUnit.SECONDS);
            } catch(ExecutionException ee) {
                System.out.println("계산 중 에러가 발생하였습니다.");
            } catch(InterruptedException ie) {
                System.out.println("현재 스레드에서 대기 중 인터럽트가 발생.");
            } catch(TimeoutException te) {
                System.out.println("Future가 완료되기 전에 타임아웃이 발생.");
            }
    
            return result;
        }
    
        private Double doSomeLongComputation() {
            return IntStream.rangeClosed(1, 100_000_000).asDoubleStream().sum();
        }
    
        private void doSomeThingElse() {
            System.out.println("doSomeThingElse method execute!!"); 
        }
    }

    11.2 비동기 API 구현

    Process

    Shop.java

    package Part3.Chapter11.Chapter11_11_2.App;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Future;
    
    public class Shop {
        private final String shopName;
    
        public Shop() { 
            this.shopName = "";
        }
    
        public Shop(String shopName) {
            this.shopName = shopName;
        }
    
        public double getPrice(String product) {
            return this.calculatePrice(product);
        }
    
        public Future<Double> getPriceSupplyAsync(String product) {
            /*
             * supplyAsync 메서드는 Supplier를 인수로 받아서 CompletableFuture를 반환한다.
             * CompletableFutre는 Supplier를 실행해서 비동기적으로 결과를 생성한다.
             * ForkJoinPool의 Executor 중 하나가 Supplier를 실행할 것이다.
             * 
             * 하지만 두 번째 인수로 Executor를 받는 다른 오버로드 버전을 통해서 Executor를 지정할 수 있다.
             * 결국 모든 다른 CompletableFuture의 팩토리 메서드에 Executor를 선택적으로 전달할 수 있다.
             * 
             * 에러 및 예외 관리는 getPriceAsync 메서드의 try ~ catch와 같은 방법으로 관리 한다.
             */
            return CompletableFuture.supplyAsync(() -> calculatePrice(product));
        }
    
        public Future<Double> getPriceAsync(String product) {
            // 계산 결과를 포함할 CompletableFuture를 생성한다.
            CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    
            new Thread(() ->  {
                /*
                 * 예외가 발생하면 해당 스레드에만 영향을 미친다.
                 * 즉, 에러가 발생해도 가격 계산은 계속 진행되며 일의 순서가 꼬인다.
                 * 결과적으로 클라이언트는 get 메서드가 반환될 때까지 영원히 기다리게 될 수도 있다.
                 * 
                 * 영구 대기 문제는 get 메서드의 타임아웃값을 받는 get 메서드를 사용해서 해결할 수 있다.
                 * 그래야 문제가 발생했을 때 클라이언트가 영원히 블록되지 않고 TimeoutExcetion을 받을 수 있다.
                 */
                try {
                    // 다른 스레드에서 비동기적으로 계산을 수행한다.
                    double price = calculatePrice(product);
                    // 오랜 시간이 걸리는 계산이 완료되면 Future에 값을 설정한다.
                    futurePrice.complete(price);                
                } catch(Exception e) {
                    /*
                     * 에러가 발생 되면 왜 났는지 알 수가 없기 때문에 completeExceptionally 메서드를 이용해서
                     * CompletableFuture 내부에서 발생한 예외를 클라이언트로 전달해줘야 한다.
                     */
                    futurePrice.completeExceptionally(e);                
                }
            }).start();
    
            // 계산 결과가 완료되길 기다리지 않고 Future를 반환한다.
            return futurePrice;
        }
    
        private double calculatePrice(String product) {
            /*
             * 상점의 데이터베이스를 이용해서 가격 정보를 얻는 동시에 다른 외부 서비스에 접근해야 하지만
             * 예제에서는 실제 호출할 서비스까지 구현할 수 없으므로 임으로 1초를 지연 시킨다.
             */
            delay();
    
            return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
        }
    
        public static void delay() {
            try {
                Thread.sleep(1000L);
            } catch(InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    
        public String getShopName() {
            return this.shopName;
        }
    
        @Override
        public String toString() {
            return "{"
                + "shopName : " + this.shopName
            + "}";
        }
    
    
    }

    Main

    package Part3.Chapter11.Chapter11_11_2;
    
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    import Part3.Chapter11.Chapter11_11_2.App.Shop;
    
    /*
     * 11.2 비동기 API 구현
     */
    public class Main_11_2 {
    
        public static void doSomeThingElse() {
            System.out.println("doSomeThingElse method execute!!"); 
        }
    
        public static void main(String[] args) {
            Shop shop = new Shop("BestShop");
    
            long start = System.nanoTime();
            // 상점에 제품가격 정보 요청
            Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
            Future<Double> futureSupplyPrice = shop.getPriceSupplyAsync("my favorite product");
            System.out.println("Invocation returned after " 
                + ((System.nanoTime() - start) / 1_000_000) + " msecs");    
    
            doSomeThingElse();
    
            try {
                /*
                 * 가격 정보가 있으면 Future에서 가격정보를 읽고, 가격 정보가 없으면
                 * 가격 정보를 받을 때까지 블록한다.
                 */
                System.out.printf("Price is %.2f%n", futurePrice.get(1, TimeUnit.SECONDS));            
                System.out.printf("Supply Price is %.2f%n", futureSupplyPrice.get(1, TimeUnit.SECONDS));
            } catch(Exception e) {
                throw new RuntimeException(e);
            }
    
            System.out.println("Price returned after " 
                + ((System.nanoTime() - start) / 1_000_000) + " msecs");
        }
    
    }

    11.3 비블록 코드 만들기

    Process

    Shop.java

    package Part3.Chapter11.Chapter11_11_3.App;
    
    import java.util.Random;
    
    public class Shop {
        private final String shopName;
    
        public Shop() { 
            this.shopName = "";
        }
    
        public Shop(String shopName) {
            this.shopName = shopName;
        }
    
        public double getPrice(String product) {
            return this.calculatePrice(product);
        }
    
        private double calculatePrice(String product) {
            /*
             * 상점의 데이터베이스를 이용해서 가격 정보를 얻는 동시에 다른 외부 서비스에 접근해야 하지만
             * 예제에서는 실제 호출할 서비스까지 구현할 수 없으므로 임으로 1초를 지연 시킨다.
             */
            delay();
    
            return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
        }
    
        public static void delay() {
            try {
                Thread.sleep(1000L);
            } catch(InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    
        public String getShopName() {
            return this.shopName;
        }
    
        @Override
        public String toString() {
            return "{"
                + "shopName : " + this.shopName
            + "}";
        }
    
    
    }

    Main

    package Part3.Chapter11.Chapter11_11_3;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    import java.util.stream.Collectors;
    
    import Part3.Chapter11.Chapter11_11_3.App.Shop;
    
    /*
     * 11.3 비블록 코드 만들기
     */
    public class Main_11_3 {
    
        /*
         * 동기 API를 이용해서 최저가격 검색 애플리케이션을 개발해야 한다.
         * 다음과 같이 상점 리스트가 있다고 가정하자.
         */
        final static List<Shop> shops = Arrays.asList(
            new Shop("BestPrice")
            , new Shop("LetsSaveBig")
            , new Shop("MyFavoriteShop")
            , new Shop("BuyItAll")        
        );
    
        /*
         * 상점 수보다 많은 스레드를 생성해봐야 사용할 가능성이 전혀 없으므로 상점 수보다 많은 스레드를 갖는건 낭비다.
         * 한 상점에 하나의 스레드가 할당될 수 있도록 즉, 가격 정보를 검색하려는 상점 수만큼 스레드를 갖도록 Executor를
         * 설정한다. 스레드 수가 너무 많으면 서버가 크래시될 수 있으므로 하나의 Executor에서 사용할 스레드의 최대 개수는
         * 100 이하로 설정하는 것이 좋다.
         */
        final static Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 2000), new ThreadFactory() {
    
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                /*
                 * 생성한 풀은 '데몬 스레드(daemon thread)'를 포함한다. 자바에서 일반 스레드가 실행 중이면
                 * 자바 프로그램은 종료되지 않는다. 따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는
                 * 일반 스레드가 있으면 문제가 될 수 있다.
                 * 반면 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있다. 두 스레드 성능은 같다.
                 */
                t.setDaemon(true);
    
                return t;
            }
        });
    
        /*
         * 제품명을 입력하면 상점 이름과 제품가격 문자열 정보를 포함하는 List를 반환하는 메서드.
         */
        public static List<String> findPrices(String product) {
            return shops.stream()
                .map(shop -> String.format("%s price is %.2f", shop.getShopName(), shop.getPrice(product)) )
                .collect(Collectors.toList());
        }
    
        public static List<String> findParallelPrices(String product) {
            /*
             * 11.3.1 병렬 스트림으로 요청 병렬화하기  
             */
            return shops.parallelStream()
                .map(shop -> String.format("%s price is %.2f", shop.getShopName(), shop.getPrice(product)) )
                .collect(Collectors.toList());
        }
    
        public static List<String> findFuturePrices(String product) {
            /*
             * 11.3.2 CompletableFuture로 비동기 호출 구현하기
             */
            /*
             * CompletableFuture를 포함하는 리스트 List<CompletableFuture<String>>를 얻을 수 있다.
             * 리스트의 CompletableFuture는 각각 계산 결과가 끝난 상점의 이름 문자열을 포함한다.
             */
            List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop -> {
                    return CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getShopName(), shop.getPrice(product)) );
                })
                .collect(Collectors.toList());
    
            /*
             * 리스트의 모든 CompletableFuture에 join을 호출해서 모든 동작이 끝나기를 기다린다.
             * join 메서드는 Future 인터페이스의 get 메서드와 같은 의미를 갖는다.
             * 다만 join은 아무 예외도 발생시키지 않는다는 점이 다르다.
             * 따라서 try ~ catch로 감쌀 필요가 없다.
             */
            return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    
            /*
             * 두 map 연산을 하나의 스트림 처리 파이프라인으로 처리하지 않고 두 개의 스트림 파이프라인으로 처리한 이유는
             * 스트림 연산은 게으른 특성이 있으므로 하나의 파이프라인으로 연산을 처리 했다면 모든 가격 정보 요청이
             * 동기적, 순차적으로 이루어지는 결과가 된다.
             * CompletableFuture로 각 상점의 정보를 요청할 때 기존 요청 작업이 완료되어야 join이 결과를 반환하면서
             * 다음 상점으로 정보를 요청할 수 있기 때문이다.
             */        
    
            /*        
                shops.stream()
                    .map(shop -> {
                        return CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getShopName(), shop.getPrice(product)) );
                    })
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
            */        
        }
    
        public static List<String> findFutureExecutorPrices(String product) {        
            List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop -> {
                    return CompletableFuture.supplyAsync(() -> {
                        return String.format("%s price is %.2f", shop.getShopName(), shop.getPrice(product));    
                    }, executor );
                })
                .collect(Collectors.toList());
    
            return priceFutures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
        }
        public static void main(String[] args) {
            long start = System.nanoTime();
            /*
             * 네 개의 상점에서 가격을 검색하는 동안 각각 1초의 대기시간이 있으므로 전체 가격 검색 결과는 4초 정도 걸린다.
             */
            System.out.println(findPrices("myPhone27S"));
            System.out.println("findPrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    
            start = System.nanoTime();
            System.out.println(findParallelPrices("myPhone27S"));
            System.out.println("findParallelPrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    
            start = System.nanoTime();
            System.out.println(findFuturePrices("myPhone27S"));
            System.out.println("findFuturePrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    
            start = System.nanoTime();
            System.out.println(findFutureExecutorPrices("myPhone27S"));
            System.out.println("findFutureExecutorPrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    
            /*
             * 스트림 병렬화와 CompletableFuture 병렬화
             * 
             * - I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 구현하기 간단하며 효율적일 수 있다.
             * (모든 스레드가 계산 작업을 수행하는 상황에서는 프로세서 코어 수 이상의 스레드를 가질 필요가 없다.)
             * 
             * - I/O를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture가 더 많은 유연성을 제공하며 대기/계산(W/C)의
             * 비율에 적합한 스레드 수를 설정할 수 있다. 특히 스트림의 게으른 특성 때문에 스트림에서 I/O를 실제로 언제 처리할지
             * 예측하기 어려운 문제도 있다.
             */
        }
    }

    11.4 비동기 작업 파이프라인 만들기

    Process

    Discount.java

    package Part3.Chapter11.Chapter11_11_4.App;
    
    import java.util.Random;
    
    public class Discount {
        public enum Code {
            NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
    
            private final int percentage;
    
            Code(int percentage) {
                this.percentage = percentage;
            }
        }
    
        public static String applyDiscount(Quote quote) {
            // 기존 가격에 할인 코드를 적용한다.
            return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
        }
    
        private static double apply(double price, Code code) {
            // Discount 서비스의 응답 지연을 흉내낸다. 
            delay();
    
            return price * (100 - code.percentage) / 100;
        }
    
        private static void delay() {
            int randomDelay = 500 + new Random().nextInt(2000);
    
            try {
                Thread.sleep(randomDelay);
            } catch(InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    ExchangeService.java

    package Part3.Chapter11.Chapter11_11_4.App;
    
    import java.util.Random;
    
    public class ExchangeService {
        public enum Money {
            USD(100), EUR(200);
    
            private final int exchange;
    
            Money(int exchange) {
                this.exchange = exchange;
            }
        }
    
        public static double getRate(Money exchange) {
            delay();
            return exchange.exchange;
        }
    
        private static void delay() {
            int randomDelay = 500 + new Random().nextInt(2000);
    
            try {
                Thread.sleep(randomDelay);
            } catch(InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    Quote.java

    package Part3.Chapter11.Chapter11_11_4.App;
    
    /*
     * 11.4.1 할인 서비스 구현
     * 
     * 최저가격 검색 애플리케이션은 여러 상점에서 가격 정보를 얻어오고, 결과 문자열을 파싱하고,
     * 할인 서버에 질의를 보낼 준비가 되었다.
     * 할인 서버에서 할인율을 확인해서 최종 가격을 계산할 수 있다.
     * (할인 코드와 연계된 할인율은 언제든 바뀔 수 있으므로 매번 서버에서 정보를 얻어 와야 한다.)
     */
    public class Quote {
        private final String shopName;
        private final double price;
        private final Discount.Code discountCode;
    
        public Quote(String shopName, double price, Discount.Code discountCode) {
            this.shopName = shopName;
            this.price = price;
            this.discountCode = discountCode;
        }
    
        public static Quote parse(String s) {
            String[] split = s.split(":");
            String shopName = split[0];
            double price = Double.parseDouble(split[1]);
            Discount.Code discountCode = Discount.Code.valueOf(split[2]);
    
            return new Quote(shopName, price, discountCode);
        }
    
        public String getShopName() {
            return this.shopName;
        }
    
        public double getPrice() {
            return this.price;
        }
    
        public Discount.Code getDiscountCode() {
            return this.discountCode;
        }
    }

    Shop.java

    package Part3.Chapter11.Chapter11_11_4.App;
    
    import java.util.Random;
    
    public class Shop {
        private final String shopName;
    
        public Shop() { 
            this.shopName = "";
        }
    
        public Shop(String shopName) {
            this.shopName = shopName;
        }
    
        /*
         * ShopName:price:DiscountCode 형식의 문자열을 반환한다.
         */
        public String getPrice(String product) {
            double price = this.calculatePrice(product);
            Discount.Code code = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)];
    
            return String.format("%s:%.2f:%s", this.shopName, price, code);
        }
    
        private double calculatePrice(String product) {
            /*
             * 상점의 데이터베이스를 이용해서 가격 정보를 얻는 동시에 다른 외부 서비스에 접근해야 하지만
             * 예제에서는 실제 호출할 서비스까지 구현할 수 없으므로 임으로 1초를 지연 시킨다.
             */
            delay();
    
            return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
        }
    
        private static void delay() {
            int randomDelay = 500 + new Random().nextInt(2000);
    
            try {
                Thread.sleep(randomDelay);
            } catch(InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    
        public String getShopName() {
            return this.shopName;
        }
    
        @Override
        public String toString() {
            return "{"
                + "shopName : " + this.shopName
            + "}";
        }
    
    
    }

    Main

    package Part3.Chapter11.Chapter11_11_4;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;
    
    import Part3.Chapter11.Chapter11_11_4.App.Discount;
    import Part3.Chapter11.Chapter11_11_4.App.ExchangeService;
    import Part3.Chapter11.Chapter11_11_4.App.ExchangeService.Money;
    import Part3.Chapter11.Chapter11_11_4.App.Quote;
    import Part3.Chapter11.Chapter11_11_4.App.Shop;
    
    /*
     * 11.4 비동기 작업 파이프라인 만들기
     */
    public class Main_11_4 {
        final static List<Shop> shops = Arrays.asList(
            new Shop("BestPrice")
            , new Shop("LetsSaveBig")
            , new Shop("MyFavoriteShop")
            , new Shop("BuyItAll")        
        );
    
        final static Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 2000), new ThreadFactory() {
    
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                /*
                 * 생성한 풀은 '데몬 스레드(daemon thread)'를 포함한다. 자바에서 일반 스레드가 실행 중이면
                 * 자바 프로그램은 종료되지 않는다. 따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는
                 * 일반 스레드가 있으면 문제가 될 수 있다.
                 * 반면 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있다. 두 스레드 성능은 같다.
                 */
                t.setDaemon(true);
    
                return t;
            }
        });
    
        /*
         * 11.4.2. 할인 서비스 사용
         * 
         * Discount는 원격 서비스이므로 1초 지연이 되어있다.
         */
        public static List<String> findPrices(String product) {
            /*
             * 세 개의 map 연산을 상점 스트림에 파이프라인으로 연결해서 결과를 얻었다.
             * 
             * 1. 각 상점을 요청한 제품의 가격과 할인 코드로 변환한다.
             * 2. 이들 문자열을 파싱해서 Quote 객체를 생성한다.
             * 3. 원격 Discount 서비스에 접근해서 최종 할인가격을 계산하고 가격에 대응하는 상점 이름을 포함하는 문자열을 반환한다.
             */
            return shops.stream()
                // 각 상점에서 할인전 가격 얻기.
                .map(shop -> shop.getPrice(product))
                // 상점에서 반환한 문자열을 Quote 객체에 변환.
                .map(Quote::parse)
                // Discount 서비스를 이용해서 각 Quote에 할인을 적용.        
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());
        }
    
        public static List<String> findFutureExecutorPrices(String product) {
            List<CompletableFuture<String>> priceFuture = shops.stream()
                .map(shop ->CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
                /*
                 * 상점에서 반환한 문자열을 Quote 객체로 변환.
                 * 
                 * 파싱 동작에서는 원격 서비스나 I/O가 없으므로 원하는 즉시 지연 없이 동작을 수행한다.
                 * thenApply 메서드는 CompletableFuture가 끝날 떄까지 블록하지 않는다.
                 * 즉, CompletableFuture가 동작을 완전히 완료한 다음에 thenApply 메서드로 전달된 람다 표현식을
                 * 적용할 수 있다.
                 * 따라서 CompletableFuture<String>을 CompletableFuture<Quote>로 변환된다.
                 */
                .map(prevFuture -> prevFuture.thenApply(Quote::parse))
                /*
                 * 결과 Future를 다른 비동기 작업과 조합해서 할인 코드를 적용.
                 * 
                 * 상점에서 받은 할인전 가격에 원격 Discount 서비스에서 제공하는 할인율을 적용한다.
                 * 이번에는 원격 실행이 포함되므로 이전 두 변환과 다르며 동기적으로 작업을 수행해야 한다.
                 * 
                 * 자바 8의 CompletableFuture API는 두 비동기 연산을 파이프라인으로 만들 수 있도록
                 * thenCompose 메서드를 제공한다. 
                 * 이전 CompletableFuture에 thenCompose 메서드를 호출하고 Function에 넘겨주는 식으로
                 * CompletableFutur를 조합할 수 있다.
                 * Function은 이전 CompletableFuture 반환 결과를 인수로 받고 다음 CompletableFuture를 반환하는데
                 * 다음 CompletableFuture는 이전 CompletableFuture의 결과를 계산의 입력으로 사용한다. 
                 */
                .map(nextFuture -> nextFuture.thenCompose(quote -> {
                    return CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor);
                }))
                .collect(Collectors.toList());
    
            return priceFuture.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    
            /*
             * thenCompose 메서드로 Async로 끝나는 버전이 존재한다.
             * thenCompose 메서드는 이전 작업을 수행한 스레드와 같은 스레드에서 작업을 실행함을 의미하고
             * thenComposeAsync 메서드는 다른 스레드에서 실행되도록 스레드 풀로 작업을 제출한다.
             * 위 코드에서 두 번째 CompletableFuture의 결고는 첫 번째 CompletableFuture에 의존하므로
             * 두 CompletableFuture를 하나로 조합하든 thenComposeAsync 메서드를 사용하든 최종결과나
             * 개괄적인 실행시간에는 영향을 미치지 않는다.
             * 따라서 스레드 오버헤드가 적게 발생하면서 효율성이 좀 더 좋은 thenCompose 메서드를 사용했다.
             */
        }
    
        public static Double findFutureExecutorExchange(Shop shop, String product) {
            /*
             * 11.4.4. 독립 CompletableFuture와 비독립 CompletableFuture 합치기
             * 
             * 독립적으로 실행된 두 개의 CompletableFuture 결과를 합쳐야 하는 상황 있을수 있고
             * 첫 번째 CompletableFuture의 동작 완료와 관계없이 두 번째 CompletableFuture를 실행할 수 있어야 한다.
             * 
             * 이런 상황에서는 thenCombine 메서드를 사용한다. thenCompose와 마찬가지로 Async 버전이 존재한다.
             */
            CompletableFuture<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)
                .thenApply(Quote::parse)
                .thenCombine(CompletableFuture.supplyAsync(() -> ExchangeService.getRate(Money.EUR))
                    , (price, rate) -> price.getPrice() * rate);
    
            return Stream.of(futurePriceInUSD)
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
                .get(0);
        }
    
        public static Stream<CompletableFuture<String>> findPriceStream(String product) {
            return shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
                .map(prevFuture -> prevFuture.thenApply(Quote::parse))
                .map(nextFuture -> nextFuture.thenCompose(quote -> 
                    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)) );
        }
    
        @SuppressWarnings("rawtypes")
        public static void main(String[] args) {
            long start = System.nanoTime();
    
            System.out.println(findPrices("myPhone27S"));
            System.out.println("findPrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    
            start = System.nanoTime();
            System.out.println(findFutureExecutorPrices("myPhone27S"));
            System.out.println("findFutureExecutorPrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    
            start = System.nanoTime();
            System.out.println(shops.get(0).getShopName() + " exchange : " +findFutureExecutorExchange(shops.get(0), "myPhone27S"));
            System.out.println("findFutureExecutorExchange Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    
            System.out.println("thenAccept ::::: ");
            long streamStart = System.nanoTime();
    
            /*
             * 11.5.1 최저가격 검색 애플리케이션 리팩토링
             * 
             * findPriceStream 메서드 내부에서 세 가지 map 연산을 적용하고 반환하는 스트림에 네 번째 map 연산을 적용하자.
             * 새로 추가한 연산은 단순하게 각 CompletableFuture에 동작을 등록한다.
             * CompletableFuture에 등록된 동작은 CompletableFuture의 계산이 끝나면 값을 소비한다. 
             */
            CompletableFuture[] futures = findPriceStream("myPhone")
                /*
                 * 자바 8의 CompletableFuture API는 thenAccept라는 메서드를 제공한다.
                 * thenAccept 메서드는 연산 결과를 소비하는 Consumer를 인수로 받는다.
                 * 우리 예제에서는 할인 서비스에서 반환하는 문자열이 값이다. 이 문자열은 상점 이름과 할인율을 적용한 제품의 가격을 포함한다.
                 * 
                 * thenAccept도 thenCompose, thenCombine 메서드와 마찬가지로 thenAcceptAsync라는 Async 버전이 존재한다.
                 * thenAcceptAsync 메서드는 CompletableFuture가 완료된 스레드가 아니라 새로운 스레드를 이용해서 Consumer를 실행한다.
                 * 불필요한 콘텍스트 변경은 피하는 동시에 CompletableFuture가 완료되는 즉시 응답하는 것이 좋으므로 thenAcceptAsync를 사용하지 않는다.
                 * (오히려 thenAcceptAsync를 사용하면 새로운 스레드를 이용할 수 있을 때까지 기다려야 하는 상황이 일어날 수 있다.)
                 * 
                 * thenAccept 메서드는 CompletableFuture가 생성한 결과를 어떻게 소비할지 미리 지정했기 때문에 CompletableFuture<Void>를 반환한다.
                 * 이제 CompletableFuture<Void>가 동작을 끝낼 때까지 딱히 할 수 있는 일이 없다.
                 */
                .map(f -> f.thenAccept(s -> 
                    System.out.println(s + " (done in " + ((System.nanoTime() - streamStart) / 1_000_000) + " msecs)")) )
                .toArray(size -> new CompletableFuture[size]);
    
            /*
             * 팩토리 메서드 allOf는 CompletableFuture 배열을 입력으로 받아 CompletableFuture<Void>를 반환한다.
             * 전달된 모든 CompletableFuture가 완료되어야 CompletableFuture<Void>가 완료된다.
             * 따라서 allOf 메서드가 반환하는 CompletableFuture에 join을 호출하면 원래 스트림의 모든 CompletableFuture의 실행 완료를
             * 기다릴 수 있다.
             * 
             * 이를 이용해서 최저가격 검색 애플리케이션은 '모든 상점이 결과를 반환했거나 타임아웃되었음' 같은 메시지를 사용자에게
             * 보여줌으로써 사용자는 추가로 가격 정보를 기다리지 않다도 된다는 사실을 보여줄 수 있다.
             * 
             * 반면 배열의 CompletableFuture 중 하나의 작업이 끝나길 기달리는 상황도 있을 수 있다.
             * (예를 들어 두 개의 환율 서버에 동시 접근했을 때 한 서버의 응답만 있어도 되는 경우)
             * 이때는 팩토리 메서드 anyOf를 사용한다.
             * anyOf 메서드는 CompletableFuture 배열을 입력으로 받아서 CompletableFuture<Object>을 반환한다.
             * CompletableFuture<Object>는 처음으로 완료한 CompletableFuture의 값으로 동작을 완료한다.
             */
            CompletableFuture.allOf(futures).join();
    
            System.out.println("All shops have now responded in " + ((System.nanoTime() - streamStart) / 1_000_000) + " msecs");
        }
    
    }

    요약

    • 한 개 이상의 원격 외부 서비스를 사용하는 긴 동작을 실행할 때는 비동기 방식으로 애플리케이션의 성능과 반응성을 향상 시킬 수 있다.
    • 우리 고객에게 비동기 API를 제공하는 것을 고려해야 한다. CompletableFuture의 기능을 이용하면 쉽게 비동기 API를 구현할 수 있다.
    • CompletableFuture를 이용할 대 비동기 태스크에서 발생한 에러를 관리하고 전달할 수 있다.
    • 동기 API를 CompletableFuture로 감싸서 비동기적으로 소비할 수 있다.
    • 서로 독립적인 비동기 동작이든 아니면 하나의 비동기 동작이 다른 비동기 동작의 결과를에 의존하는 상황이든 여러 비동기 동작을 조립하고 조합할 수 있다.
    • CompletableFuture에 콜백을 등록해서 Future가 동작을 끝내고 결과를 생산했을 때 어떤 코드를 실행하도록 지정할 수 있다.
    • CompletableFuture 리스트의 모든 값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴리 선택할 수 있다.
    반응형

    댓글

Designed by Tistory.