해당 내용은 Java8 In Action 책을 요약 및 정리한 내용입니다.
좀 더 자세히 알고 싶으신 분들은 책을 사서 읽어보심을 추천드립니다.!
11.1 Future
Main
package Part3.Chapter11.Chapter11_11_1;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
/*
* 11.1 Future
*
* 자바 5부터는 미래의 어느 시점에 결과를 얻는 모델에 활용할 수 있도록 Future 인터페이스를 제공하고 있다.
* 비동기 계산을 모델링하는데 Future를 이용할 수 있으며, Future는 계산이 끝났을 때 결과에 접근할 수 있는 레퍼런스를 제공한다.
* 시간이 걸릴수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기달리는 동안 다른 유용한 작업을 수행할 수 있다.
* Futhre는 저수준의 스레드에 비해 직관적으로 이해하기 쉽다는 장점이 있다.
* Future를 이용하려면 시간이 오래 걸리는 작업을 Callable 객체 내부로 감싼 다음에 ExecutorService에 제출해야 한다.
*/
public class Main_11_1 {
public static void main(String[] args) {
/*
* ExecutorService에서 제공하는 스레드가 시간이 오래 걸리는 작업을 처리하는 동안 우리 스레드로 다른 작업을 동시에 실행할 수 있다.
* 다른 작업을 처리하다가 시간이 오래 걸리는 작업의 결과가 필요한 시점이 되었을 때 Future의 get 메서드로 결과를 가져올 수 있다.
* get 메서드를 호출했을 때 이미 계산이 완료되어 결과가 준비되었다면 즉시 결과를 반환하지만 결과가 준비되지 않았다면
* 작업이 완료될 때까지 우리 스레드를 블록시킨다.
*/
System.out.println(new examFuture().futureExecute());
/*
* 11.1.1 Future 제한
*
* 여러 Future의 결과가 있을 때 이들의 의존성은 표현하기가 어렵다.
* 즉 '오래 걸리는 A라는 계산이 끝나면 그 결과를 다른 오래 걸리는 계산 B로 전달하시오. 그리고 B의 결과가 나오면 다른 질의의 결과와
* B의 결과를 조합하시오'와 같은 요구사항을 쉽게 구현할 수 있어야 한다.
*
* 따라서 다음과 같은 선언형 기능이 필요하다.
* - 두 개의 비동기 계산 결과를 하나로 합친다. 두 가지 결과 계산은 서로 독립적일 수 있으며 또는 두 번째 결과가 첫 번째 결과에
* 의존하는 상황일 수 있다.
*
* - Future 집합이 실행하는 모든 태스크의 완료를 기다린다.
*
* - Future 집합에서 가장 빨리 완료되는 태스크를 기달렸다가 결과를 얻는다.
* (여러 태스크가 다양한 방식으로 같은 결과를 구하는 상황.)
*
* - 프로그램적으로 Future를 완료시킨다.(비동기 동작에 수동으로 결과 제공.)
*
* - Future 완료 동작에 반응한다.
* (결과를 기다리면서 블록되지 않고 결과가 준비되었다는 알림을 받은 다음에 Future의 결과로 원하는 추가 동작을 수행할 수 있음.)
*
* 지금까지 설명한 기능을 선언형으로 이용할 수 있도록 자바 8에서 새로 제공하는 CompletableFuture 클래스(Future 인터페이스 구현체)
* 가 있다. Stream과 CompletableFuture는 비슷한 패턴이다. 즉 람다 표현식과 파이프라이닝을 활용한다.
* 따라서 Future와 CompletableFuture의 관계를 Collection과 Stream의 관계에 비유할 수 있다.
*/
/*
* 11.1.2 CompletbleFuture로 비동기 애플리케이션 만들기
*
* 어떤 제품이나 서비스를 이용해야 하는 상황의 가정에서 예산을 줄일 수 있도록 여러 온라인상점 중 가장 저렴한 가격을 제시하는 상점을
* 찾는 애플리케이션을 만든다.
* 애플리케이션을 만드는 동안 다음과 같은 기술을 배운다.
*
* 첫째 고객에게 비동기 API를 제공하는 방법을 배운다.
*
* 둘째 동기 API를 사용해야 할 때 코드를 비블록으로 만드는 방법을 배운다.
* 두 개의 비동기 동작을 파이프라인으로 만드는 방법과 두 개의 동작 결과를 하나의 비동기 계산으로 합치는 방법을 배운다.
* 예를 들어 온라인상점에서 우리가 사려는 물건에 대응하는 할인 코드를 반환한다고 가정하자. 우리는 다른 원격 할인 서비스에
* 접근해서 할인 코드에 해당하는 할인율을 찾아야한다. 그래야 원래 가격에 할인율을 적용해서 최종 결과를 계산할 수 있다.
*
* 셋째 비동기 동작의 완료에 대응하는 방법을 배운다.
* 모든 상점에서 가격 정보를 얻을 때까지 기다리는 것이 아니라 각 상점에서 가격 정보를 얻을 때마다 즉시 최저가격을 찾는
* 애플리케이션을 갱신하는 방법을 설명한다.(그렇지 않으면 서버 다운 등 문제가 발생했을 때 사용자에게 검은 화면만 보여주게 된다.)
*/
/*
* 동기 API와 비동기 API
*
* 동기 API
* 전통적인 동기 API는 메서드를 호출한 다음에 메서드의 계산을 완료될 때까지 기다린 다음 메서드가 반환되면 호출자는 반환된 값으로
* 계속 다른 동작을 수행한다.
* 호출자와 피호출자가 각각 다른 스레드에서 실행되는 상황이었더라도 호출자는 피호출자의 동작 완료를 기달린다.
* 이처럼 동기 API를 사용하는 상황을 '블록 호출(blocking call)' 이라고 한다.
*
* 비동기 API
* 메서드가 즉시 반환되며 끝내지 못한 나머지 작업을 호출자 스레드와 동기적으로 실행될 수 있도록 다른 스레드에 할당한다.
* 이와 같은 비동기 API를 사용하는 상황을 '비블록 호출(non-blocking call)'이라고 한다.
* 다른 스레드에 할당된 나머지 계산 결과는 콜백 메서드를 호출해서 전달하거나 호출자가 '계산 결과가 끝날 때까지 기달림' 메서드를
* 추가로 호출하면서 전달된다.
* 주로 I/O 시스템 프로그래밍에서 이와 같은 방식으로 동작을 수행하며 즉, 계산 동작을 수행하는 동안 비동기적으로 디스크 접근을 수행한다.
* 그리고 더 이상 수행할 동작이 없으면 디스크 블록이 메모리로 로딩될 때까지 기다린다.
*/
}
}
class examFuture {
public Double futureExecute() {
Double result = new Double(0);
// 스레드 풀에 태스크를 제출하려면 ExecutorService를 생성.
ExecutorService executor = Executors.newCachedThreadPool();
// Callable을 ExecutorService로 제출.
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
// 시간이 오래 걸리는 작업은 다른 스레드에서 비동기적으로 실행.
return doSomeLongComputation();
}
});
// 비동기 작업을 수행하는 동안 다른 작업을 실행.
this.doSomeThingElse();
try {
/*
* 비동기 작업의 결과를 가져온다. 결과가 준비되어 있지 않으면 호출 스레드가 블록된다.
* 최대 1초까지만 기달린다.
*
* get 메서드 호출 시 우리 스레드가 블록이 되기 때문에 오래 걸리는 작업이 영원히 끝나지 않으면
* 작업이 끝나지 않는 문제가 생길 수 있으므로 get 메서드를 오버로드해서 우리 스레드가 대기할 최대 타임아웃 시간을
* 설정하는 것이 좋다.
*/
result = future.get(1, TimeUnit.SECONDS);
} catch(ExecutionException ee) {
System.out.println("계산 중 에러가 발생하였습니다.");
} catch(InterruptedException ie) {
System.out.println("현재 스레드에서 대기 중 인터럽트가 발생.");
} catch(TimeoutException te) {
System.out.println("Future가 완료되기 전에 타임아웃이 발생.");
}
return result;
}
private Double doSomeLongComputation() {
return IntStream.rangeClosed(1, 100_000_000).asDoubleStream().sum();
}
private void doSomeThingElse() {
System.out.println("doSomeThingElse method execute!!");
}
}
11.2 비동기 API 구현
Process
Shop.java
package Part3.Chapter11.Chapter11_11_2.App;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public class Shop {
private final String shopName;
public Shop() {
this.shopName = "";
}
public Shop(String shopName) {
this.shopName = shopName;
}
public double getPrice(String product) {
return this.calculatePrice(product);
}
public Future<Double> getPriceSupplyAsync(String product) {
/*
* supplyAsync 메서드는 Supplier를 인수로 받아서 CompletableFuture를 반환한다.
* CompletableFutre는 Supplier를 실행해서 비동기적으로 결과를 생성한다.
* ForkJoinPool의 Executor 중 하나가 Supplier를 실행할 것이다.
*
* 하지만 두 번째 인수로 Executor를 받는 다른 오버로드 버전을 통해서 Executor를 지정할 수 있다.
* 결국 모든 다른 CompletableFuture의 팩토리 메서드에 Executor를 선택적으로 전달할 수 있다.
*
* 에러 및 예외 관리는 getPriceAsync 메서드의 try ~ catch와 같은 방법으로 관리 한다.
*/
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
public Future<Double> getPriceAsync(String product) {
// 계산 결과를 포함할 CompletableFuture를 생성한다.
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
/*
* 예외가 발생하면 해당 스레드에만 영향을 미친다.
* 즉, 에러가 발생해도 가격 계산은 계속 진행되며 일의 순서가 꼬인다.
* 결과적으로 클라이언트는 get 메서드가 반환될 때까지 영원히 기다리게 될 수도 있다.
*
* 영구 대기 문제는 get 메서드의 타임아웃값을 받는 get 메서드를 사용해서 해결할 수 있다.
* 그래야 문제가 발생했을 때 클라이언트가 영원히 블록되지 않고 TimeoutExcetion을 받을 수 있다.
*/
try {
// 다른 스레드에서 비동기적으로 계산을 수행한다.
double price = calculatePrice(product);
// 오랜 시간이 걸리는 계산이 완료되면 Future에 값을 설정한다.
futurePrice.complete(price);
} catch(Exception e) {
/*
* 에러가 발생 되면 왜 났는지 알 수가 없기 때문에 completeExceptionally 메서드를 이용해서
* CompletableFuture 내부에서 발생한 예외를 클라이언트로 전달해줘야 한다.
*/
futurePrice.completeExceptionally(e);
}
}).start();
// 계산 결과가 완료되길 기다리지 않고 Future를 반환한다.
return futurePrice;
}
private double calculatePrice(String product) {
/*
* 상점의 데이터베이스를 이용해서 가격 정보를 얻는 동시에 다른 외부 서비스에 접근해야 하지만
* 예제에서는 실제 호출할 서비스까지 구현할 수 없으므로 임으로 1초를 지연 시킨다.
*/
delay();
return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
}
public static void delay() {
try {
Thread.sleep(1000L);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
public String getShopName() {
return this.shopName;
}
@Override
public String toString() {
return "{"
+ "shopName : " + this.shopName
+ "}";
}
}
Main
package Part3.Chapter11.Chapter11_11_2;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import Part3.Chapter11.Chapter11_11_2.App.Shop;
/*
* 11.2 비동기 API 구현
*/
public class Main_11_2 {
public static void doSomeThingElse() {
System.out.println("doSomeThingElse method execute!!");
}
public static void main(String[] args) {
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
// 상점에 제품가격 정보 요청
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
Future<Double> futureSupplyPrice = shop.getPriceSupplyAsync("my favorite product");
System.out.println("Invocation returned after "
+ ((System.nanoTime() - start) / 1_000_000) + " msecs");
doSomeThingElse();
try {
/*
* 가격 정보가 있으면 Future에서 가격정보를 읽고, 가격 정보가 없으면
* 가격 정보를 받을 때까지 블록한다.
*/
System.out.printf("Price is %.2f%n", futurePrice.get(1, TimeUnit.SECONDS));
System.out.printf("Supply Price is %.2f%n", futureSupplyPrice.get(1, TimeUnit.SECONDS));
} catch(Exception e) {
throw new RuntimeException(e);
}
System.out.println("Price returned after "
+ ((System.nanoTime() - start) / 1_000_000) + " msecs");
}
}
11.3 비블록 코드 만들기
Process
Shop.java
package Part3.Chapter11.Chapter11_11_3.App;
import java.util.Random;
public class Shop {
private final String shopName;
public Shop() {
this.shopName = "";
}
public Shop(String shopName) {
this.shopName = shopName;
}
public double getPrice(String product) {
return this.calculatePrice(product);
}
private double calculatePrice(String product) {
/*
* 상점의 데이터베이스를 이용해서 가격 정보를 얻는 동시에 다른 외부 서비스에 접근해야 하지만
* 예제에서는 실제 호출할 서비스까지 구현할 수 없으므로 임으로 1초를 지연 시킨다.
*/
delay();
return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
}
public static void delay() {
try {
Thread.sleep(1000L);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
public String getShopName() {
return this.shopName;
}
@Override
public String toString() {
return "{"
+ "shopName : " + this.shopName
+ "}";
}
}
Main
package Part3.Chapter11.Chapter11_11_3;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import Part3.Chapter11.Chapter11_11_3.App.Shop;
/*
* 11.3 비블록 코드 만들기
*/
public class Main_11_3 {
/*
* 동기 API를 이용해서 최저가격 검색 애플리케이션을 개발해야 한다.
* 다음과 같이 상점 리스트가 있다고 가정하자.
*/
final static List<Shop> shops = Arrays.asList(
new Shop("BestPrice")
, new Shop("LetsSaveBig")
, new Shop("MyFavoriteShop")
, new Shop("BuyItAll")
);
/*
* 상점 수보다 많은 스레드를 생성해봐야 사용할 가능성이 전혀 없으므로 상점 수보다 많은 스레드를 갖는건 낭비다.
* 한 상점에 하나의 스레드가 할당될 수 있도록 즉, 가격 정보를 검색하려는 상점 수만큼 스레드를 갖도록 Executor를
* 설정한다. 스레드 수가 너무 많으면 서버가 크래시될 수 있으므로 하나의 Executor에서 사용할 스레드의 최대 개수는
* 100 이하로 설정하는 것이 좋다.
*/
final static Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
/*
* 생성한 풀은 '데몬 스레드(daemon thread)'를 포함한다. 자바에서 일반 스레드가 실행 중이면
* 자바 프로그램은 종료되지 않는다. 따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는
* 일반 스레드가 있으면 문제가 될 수 있다.
* 반면 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있다. 두 스레드 성능은 같다.
*/
t.setDaemon(true);
return t;
}
});
/*
* 제품명을 입력하면 상점 이름과 제품가격 문자열 정보를 포함하는 List를 반환하는 메서드.
*/
public static List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s price is %.2f", shop.getShopName(), shop.getPrice(product)) )
.collect(Collectors.toList());
}
public static List<String> findParallelPrices(String product) {
/*
* 11.3.1 병렬 스트림으로 요청 병렬화하기
*/
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f", shop.getShopName(), shop.getPrice(product)) )
.collect(Collectors.toList());
}
public static List<String> findFuturePrices(String product) {
/*
* 11.3.2 CompletableFuture로 비동기 호출 구현하기
*/
/*
* CompletableFuture를 포함하는 리스트 List<CompletableFuture<String>>를 얻을 수 있다.
* 리스트의 CompletableFuture는 각각 계산 결과가 끝난 상점의 이름 문자열을 포함한다.
*/
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> {
return CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getShopName(), shop.getPrice(product)) );
})
.collect(Collectors.toList());
/*
* 리스트의 모든 CompletableFuture에 join을 호출해서 모든 동작이 끝나기를 기다린다.
* join 메서드는 Future 인터페이스의 get 메서드와 같은 의미를 갖는다.
* 다만 join은 아무 예외도 발생시키지 않는다는 점이 다르다.
* 따라서 try ~ catch로 감쌀 필요가 없다.
*/
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
/*
* 두 map 연산을 하나의 스트림 처리 파이프라인으로 처리하지 않고 두 개의 스트림 파이프라인으로 처리한 이유는
* 스트림 연산은 게으른 특성이 있으므로 하나의 파이프라인으로 연산을 처리 했다면 모든 가격 정보 요청이
* 동기적, 순차적으로 이루어지는 결과가 된다.
* CompletableFuture로 각 상점의 정보를 요청할 때 기존 요청 작업이 완료되어야 join이 결과를 반환하면서
* 다음 상점으로 정보를 요청할 수 있기 때문이다.
*/
/*
shops.stream()
.map(shop -> {
return CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getShopName(), shop.getPrice(product)) );
})
.map(CompletableFuture::join)
.collect(Collectors.toList());
*/
}
public static List<String> findFutureExecutorPrices(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> {
return CompletableFuture.supplyAsync(() -> {
return String.format("%s price is %.2f", shop.getShopName(), shop.getPrice(product));
}, executor );
})
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
public static void main(String[] args) {
long start = System.nanoTime();
/*
* 네 개의 상점에서 가격을 검색하는 동안 각각 1초의 대기시간이 있으므로 전체 가격 검색 결과는 4초 정도 걸린다.
*/
System.out.println(findPrices("myPhone27S"));
System.out.println("findPrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
start = System.nanoTime();
System.out.println(findParallelPrices("myPhone27S"));
System.out.println("findParallelPrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
start = System.nanoTime();
System.out.println(findFuturePrices("myPhone27S"));
System.out.println("findFuturePrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
start = System.nanoTime();
System.out.println(findFutureExecutorPrices("myPhone27S"));
System.out.println("findFutureExecutorPrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
/*
* 스트림 병렬화와 CompletableFuture 병렬화
*
* - I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 구현하기 간단하며 효율적일 수 있다.
* (모든 스레드가 계산 작업을 수행하는 상황에서는 프로세서 코어 수 이상의 스레드를 가질 필요가 없다.)
*
* - I/O를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture가 더 많은 유연성을 제공하며 대기/계산(W/C)의
* 비율에 적합한 스레드 수를 설정할 수 있다. 특히 스트림의 게으른 특성 때문에 스트림에서 I/O를 실제로 언제 처리할지
* 예측하기 어려운 문제도 있다.
*/
}
}
11.4 비동기 작업 파이프라인 만들기
Process
Discount.java
package Part3.Chapter11.Chapter11_11_4.App;
import java.util.Random;
public class Discount {
public enum Code {
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
public static String applyDiscount(Quote quote) {
// 기존 가격에 할인 코드를 적용한다.
return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
}
private static double apply(double price, Code code) {
// Discount 서비스의 응답 지연을 흉내낸다.
delay();
return price * (100 - code.percentage) / 100;
}
private static void delay() {
int randomDelay = 500 + new Random().nextInt(2000);
try {
Thread.sleep(randomDelay);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
}
ExchangeService.java
package Part3.Chapter11.Chapter11_11_4.App;
import java.util.Random;
public class ExchangeService {
public enum Money {
USD(100), EUR(200);
private final int exchange;
Money(int exchange) {
this.exchange = exchange;
}
}
public static double getRate(Money exchange) {
delay();
return exchange.exchange;
}
private static void delay() {
int randomDelay = 500 + new Random().nextInt(2000);
try {
Thread.sleep(randomDelay);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Quote.java
package Part3.Chapter11.Chapter11_11_4.App;
/*
* 11.4.1 할인 서비스 구현
*
* 최저가격 검색 애플리케이션은 여러 상점에서 가격 정보를 얻어오고, 결과 문자열을 파싱하고,
* 할인 서버에 질의를 보낼 준비가 되었다.
* 할인 서버에서 할인율을 확인해서 최종 가격을 계산할 수 있다.
* (할인 코드와 연계된 할인율은 언제든 바뀔 수 있으므로 매번 서버에서 정보를 얻어 와야 한다.)
*/
public class Quote {
private final String shopName;
private final double price;
private final Discount.Code discountCode;
public Quote(String shopName, double price, Discount.Code discountCode) {
this.shopName = shopName;
this.price = price;
this.discountCode = discountCode;
}
public static Quote parse(String s) {
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}
public String getShopName() {
return this.shopName;
}
public double getPrice() {
return this.price;
}
public Discount.Code getDiscountCode() {
return this.discountCode;
}
}
Shop.java
package Part3.Chapter11.Chapter11_11_4.App;
import java.util.Random;
public class Shop {
private final String shopName;
public Shop() {
this.shopName = "";
}
public Shop(String shopName) {
this.shopName = shopName;
}
/*
* ShopName:price:DiscountCode 형식의 문자열을 반환한다.
*/
public String getPrice(String product) {
double price = this.calculatePrice(product);
Discount.Code code = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", this.shopName, price, code);
}
private double calculatePrice(String product) {
/*
* 상점의 데이터베이스를 이용해서 가격 정보를 얻는 동시에 다른 외부 서비스에 접근해야 하지만
* 예제에서는 실제 호출할 서비스까지 구현할 수 없으므로 임으로 1초를 지연 시킨다.
*/
delay();
return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
}
private static void delay() {
int randomDelay = 500 + new Random().nextInt(2000);
try {
Thread.sleep(randomDelay);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
public String getShopName() {
return this.shopName;
}
@Override
public String toString() {
return "{"
+ "shopName : " + this.shopName
+ "}";
}
}
Main
package Part3.Chapter11.Chapter11_11_4;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import Part3.Chapter11.Chapter11_11_4.App.Discount;
import Part3.Chapter11.Chapter11_11_4.App.ExchangeService;
import Part3.Chapter11.Chapter11_11_4.App.ExchangeService.Money;
import Part3.Chapter11.Chapter11_11_4.App.Quote;
import Part3.Chapter11.Chapter11_11_4.App.Shop;
/*
* 11.4 비동기 작업 파이프라인 만들기
*/
public class Main_11_4 {
final static List<Shop> shops = Arrays.asList(
new Shop("BestPrice")
, new Shop("LetsSaveBig")
, new Shop("MyFavoriteShop")
, new Shop("BuyItAll")
);
final static Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
/*
* 생성한 풀은 '데몬 스레드(daemon thread)'를 포함한다. 자바에서 일반 스레드가 실행 중이면
* 자바 프로그램은 종료되지 않는다. 따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는
* 일반 스레드가 있으면 문제가 될 수 있다.
* 반면 데몬 스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있다. 두 스레드 성능은 같다.
*/
t.setDaemon(true);
return t;
}
});
/*
* 11.4.2. 할인 서비스 사용
*
* Discount는 원격 서비스이므로 1초 지연이 되어있다.
*/
public static List<String> findPrices(String product) {
/*
* 세 개의 map 연산을 상점 스트림에 파이프라인으로 연결해서 결과를 얻었다.
*
* 1. 각 상점을 요청한 제품의 가격과 할인 코드로 변환한다.
* 2. 이들 문자열을 파싱해서 Quote 객체를 생성한다.
* 3. 원격 Discount 서비스에 접근해서 최종 할인가격을 계산하고 가격에 대응하는 상점 이름을 포함하는 문자열을 반환한다.
*/
return shops.stream()
// 각 상점에서 할인전 가격 얻기.
.map(shop -> shop.getPrice(product))
// 상점에서 반환한 문자열을 Quote 객체에 변환.
.map(Quote::parse)
// Discount 서비스를 이용해서 각 Quote에 할인을 적용.
.map(Discount::applyDiscount)
.collect(Collectors.toList());
}
public static List<String> findFutureExecutorPrices(String product) {
List<CompletableFuture<String>> priceFuture = shops.stream()
.map(shop ->CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
/*
* 상점에서 반환한 문자열을 Quote 객체로 변환.
*
* 파싱 동작에서는 원격 서비스나 I/O가 없으므로 원하는 즉시 지연 없이 동작을 수행한다.
* thenApply 메서드는 CompletableFuture가 끝날 떄까지 블록하지 않는다.
* 즉, CompletableFuture가 동작을 완전히 완료한 다음에 thenApply 메서드로 전달된 람다 표현식을
* 적용할 수 있다.
* 따라서 CompletableFuture<String>을 CompletableFuture<Quote>로 변환된다.
*/
.map(prevFuture -> prevFuture.thenApply(Quote::parse))
/*
* 결과 Future를 다른 비동기 작업과 조합해서 할인 코드를 적용.
*
* 상점에서 받은 할인전 가격에 원격 Discount 서비스에서 제공하는 할인율을 적용한다.
* 이번에는 원격 실행이 포함되므로 이전 두 변환과 다르며 동기적으로 작업을 수행해야 한다.
*
* 자바 8의 CompletableFuture API는 두 비동기 연산을 파이프라인으로 만들 수 있도록
* thenCompose 메서드를 제공한다.
* 이전 CompletableFuture에 thenCompose 메서드를 호출하고 Function에 넘겨주는 식으로
* CompletableFutur를 조합할 수 있다.
* Function은 이전 CompletableFuture 반환 결과를 인수로 받고 다음 CompletableFuture를 반환하는데
* 다음 CompletableFuture는 이전 CompletableFuture의 결과를 계산의 입력으로 사용한다.
*/
.map(nextFuture -> nextFuture.thenCompose(quote -> {
return CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor);
}))
.collect(Collectors.toList());
return priceFuture.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
/*
* thenCompose 메서드로 Async로 끝나는 버전이 존재한다.
* thenCompose 메서드는 이전 작업을 수행한 스레드와 같은 스레드에서 작업을 실행함을 의미하고
* thenComposeAsync 메서드는 다른 스레드에서 실행되도록 스레드 풀로 작업을 제출한다.
* 위 코드에서 두 번째 CompletableFuture의 결고는 첫 번째 CompletableFuture에 의존하므로
* 두 CompletableFuture를 하나로 조합하든 thenComposeAsync 메서드를 사용하든 최종결과나
* 개괄적인 실행시간에는 영향을 미치지 않는다.
* 따라서 스레드 오버헤드가 적게 발생하면서 효율성이 좀 더 좋은 thenCompose 메서드를 사용했다.
*/
}
public static Double findFutureExecutorExchange(Shop shop, String product) {
/*
* 11.4.4. 독립 CompletableFuture와 비독립 CompletableFuture 합치기
*
* 독립적으로 실행된 두 개의 CompletableFuture 결과를 합쳐야 하는 상황 있을수 있고
* 첫 번째 CompletableFuture의 동작 완료와 관계없이 두 번째 CompletableFuture를 실행할 수 있어야 한다.
*
* 이런 상황에서는 thenCombine 메서드를 사용한다. thenCompose와 마찬가지로 Async 버전이 존재한다.
*/
CompletableFuture<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)
.thenApply(Quote::parse)
.thenCombine(CompletableFuture.supplyAsync(() -> ExchangeService.getRate(Money.EUR))
, (price, rate) -> price.getPrice() * rate);
return Stream.of(futurePriceInUSD)
.map(CompletableFuture::join)
.collect(Collectors.toList())
.get(0);
}
public static Stream<CompletableFuture<String>> findPriceStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(prevFuture -> prevFuture.thenApply(Quote::parse))
.map(nextFuture -> nextFuture.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)) );
}
@SuppressWarnings("rawtypes")
public static void main(String[] args) {
long start = System.nanoTime();
System.out.println(findPrices("myPhone27S"));
System.out.println("findPrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
start = System.nanoTime();
System.out.println(findFutureExecutorPrices("myPhone27S"));
System.out.println("findFutureExecutorPrices Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
start = System.nanoTime();
System.out.println(shops.get(0).getShopName() + " exchange : " +findFutureExecutorExchange(shops.get(0), "myPhone27S"));
System.out.println("findFutureExecutorExchange Done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
System.out.println("thenAccept ::::: ");
long streamStart = System.nanoTime();
/*
* 11.5.1 최저가격 검색 애플리케이션 리팩토링
*
* findPriceStream 메서드 내부에서 세 가지 map 연산을 적용하고 반환하는 스트림에 네 번째 map 연산을 적용하자.
* 새로 추가한 연산은 단순하게 각 CompletableFuture에 동작을 등록한다.
* CompletableFuture에 등록된 동작은 CompletableFuture의 계산이 끝나면 값을 소비한다.
*/
CompletableFuture[] futures = findPriceStream("myPhone")
/*
* 자바 8의 CompletableFuture API는 thenAccept라는 메서드를 제공한다.
* thenAccept 메서드는 연산 결과를 소비하는 Consumer를 인수로 받는다.
* 우리 예제에서는 할인 서비스에서 반환하는 문자열이 값이다. 이 문자열은 상점 이름과 할인율을 적용한 제품의 가격을 포함한다.
*
* thenAccept도 thenCompose, thenCombine 메서드와 마찬가지로 thenAcceptAsync라는 Async 버전이 존재한다.
* thenAcceptAsync 메서드는 CompletableFuture가 완료된 스레드가 아니라 새로운 스레드를 이용해서 Consumer를 실행한다.
* 불필요한 콘텍스트 변경은 피하는 동시에 CompletableFuture가 완료되는 즉시 응답하는 것이 좋으므로 thenAcceptAsync를 사용하지 않는다.
* (오히려 thenAcceptAsync를 사용하면 새로운 스레드를 이용할 수 있을 때까지 기다려야 하는 상황이 일어날 수 있다.)
*
* thenAccept 메서드는 CompletableFuture가 생성한 결과를 어떻게 소비할지 미리 지정했기 때문에 CompletableFuture<Void>를 반환한다.
* 이제 CompletableFuture<Void>가 동작을 끝낼 때까지 딱히 할 수 있는 일이 없다.
*/
.map(f -> f.thenAccept(s ->
System.out.println(s + " (done in " + ((System.nanoTime() - streamStart) / 1_000_000) + " msecs)")) )
.toArray(size -> new CompletableFuture[size]);
/*
* 팩토리 메서드 allOf는 CompletableFuture 배열을 입력으로 받아 CompletableFuture<Void>를 반환한다.
* 전달된 모든 CompletableFuture가 완료되어야 CompletableFuture<Void>가 완료된다.
* 따라서 allOf 메서드가 반환하는 CompletableFuture에 join을 호출하면 원래 스트림의 모든 CompletableFuture의 실행 완료를
* 기다릴 수 있다.
*
* 이를 이용해서 최저가격 검색 애플리케이션은 '모든 상점이 결과를 반환했거나 타임아웃되었음' 같은 메시지를 사용자에게
* 보여줌으로써 사용자는 추가로 가격 정보를 기다리지 않다도 된다는 사실을 보여줄 수 있다.
*
* 반면 배열의 CompletableFuture 중 하나의 작업이 끝나길 기달리는 상황도 있을 수 있다.
* (예를 들어 두 개의 환율 서버에 동시 접근했을 때 한 서버의 응답만 있어도 되는 경우)
* 이때는 팩토리 메서드 anyOf를 사용한다.
* anyOf 메서드는 CompletableFuture 배열을 입력으로 받아서 CompletableFuture<Object>을 반환한다.
* CompletableFuture<Object>는 처음으로 완료한 CompletableFuture의 값으로 동작을 완료한다.
*/
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in " + ((System.nanoTime() - streamStart) / 1_000_000) + " msecs");
}
}
요약
- 한 개 이상의 원격 외부 서비스를 사용하는 긴 동작을 실행할 때는 비동기 방식으로 애플리케이션의 성능과 반응성을 향상 시킬 수 있다.
- 우리 고객에게 비동기 API를 제공하는 것을 고려해야 한다. CompletableFuture의 기능을 이용하면 쉽게 비동기 API를 구현할 수 있다.
- CompletableFuture를 이용할 대 비동기 태스크에서 발생한 에러를 관리하고 전달할 수 있다.
- 동기 API를 CompletableFuture로 감싸서 비동기적으로 소비할 수 있다.
- 서로 독립적인 비동기 동작이든 아니면 하나의 비동기 동작이 다른 비동기 동작의 결과를에 의존하는 상황이든 여러 비동기 동작을 조립하고 조합할 수 있다.
- CompletableFuture에 콜백을 등록해서 Future가 동작을 끝내고 결과를 생산했을 때 어떤 코드를 실행하도록 지정할 수 있다.
- CompletableFuture 리스트의 모든 값이 완료될 때까지 기다릴지 아니면 하나의 값만 완료되길 기다릴리 선택할 수 있다.