Java 5 에 추가되었던 Future 는 다음과 같은 제한 사항이 있다.
1. 수동(manually)으로 완료시킬수없다. 예를 들어 remote API 로 이커머스 상품의 가장 비싼 상품을 가져오는 기능을 작성하는데 스레드를 분리해서 해당 기능에서 Future 로 반환하고자 한다. 이때 리모트 API 서비스가 down 이 되면 Future 를 수동으로 완료시킬 수 없다.
2. Future 의 결과값을 블럭킹 없이 더 이상의 행동(action) 을 수행할 수 없다. Future 는 완료를 공지하지(notify) 않는다. 그래서 get() 메소드로 결과가 나올때까지 블럭(block)해야한다.
3. 멀티 Future 들이 서로 묶을(chain) 수 없다. 시간이 긴 연산을 실행해야하고 그 연산이 끝났을때 다른 시간이 긴 연산에 결과를 보낼 필요가 있다. Future 는 그러한 비동기 흐름(workflow) 을 만들수가 없다.
4. 멀티 Future 들을 서로 연결(combine)할 수 없다. 병렬로 실행하고 완료 후에 어떤 기능을 실행하고 싶은 10가지 다른 퓨즈를 가지고 있다고 가정해 보면 Future 할 수 없다.
5. 익셉션 처리가 없다. Future API 는 어떠한 익셉션 처리 구조를 가지고 있지 않다.
Java 8 의 ComptetableFuture 는 Future, CompleteStage 인터페이스를 구현하고, 생성과 여러 Future 들을 체이닝, 연결할 수 있는 편리한 메소드들을 제공한다. 또한 이해하기 쉬운 익셉션 처리를 지원한다.
- complete() 메소드
: CompletableFuture 생성 예제. complete() 메소드는 완료되었다는 표시만 할뿐 결과를 얻기 위해서는 get() 메소드를 호출해야 한다.
@Test
public void completableFutureTest() {
CompletableFuture<String> future = new CompletableFuture<>();
try {
TimeUnit.SECONDS.sleep(2);
future.complete("Complete Stated!"); // 완료상태 표시
System.out.println("Before get()");
System.out.println("future_get = " + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
- runAsync() 메소드
@Test
public void completableFutureRunAsyncTest() {
log.info("start.");
// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// Simulate a long-running Job
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
log.info("I'll run in a separate thread than the main thread.");
});
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
- supplyAsync() 메소드
@Test
public void completableFutureSupplyAsyncTest() {
// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of the asynchronous computation";
}
});
// Block and get the result of the Future
String result = null;
try {
result = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
log.info("get_result : " + result);
}
- thenApply() 메소드
@Test
public void completableFutureThenApplyTest() {
// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Rajeev";
});
// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
return "Hello " + name;
});
// Block and get the result of the future.
try {
log.info(greetingFuture.get()); // Hello Rajeev
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Rajeev";
}).thenApply(name -> {
return "Hello " + name;
}).thenApply(greeting -> {
return greeting + ", Welcome to the CalliCoder Blog";
});
try {
log.info(welcomeText.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
- thenAccept() 메소드, thenRun() 메소드
// thenAccept() 를 위한 model 예제 클래스
@Data
public static class Product {
int id;
String name;
public Product(int id, String name) {
this.id = id;
this.name = name;
}
}
// thenAccept() 를 위한 service 예제 클래스
public static class ProductService {
private ProductService() {};
static Product getProductDetail(int productId) {
Product product = new Product(productId, "name_"+productId);
System.out.println("productId = " + productId);
return product;
}
}
@Test
public void completableFutureThenAcceptTest() {
int productId = 123;
// thenAccept() example
CompletableFuture.supplyAsync(() -> {
log.info("before call getProductDetail()");
return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
log.info("Got product detail from remote service " + product.getName());
});
int productId2 = 999;
// thenRun() example
CompletableFuture.supplyAsync(() -> {
return ProductService.getProductDetail(productId2);
}).thenRun(() -> {
log.info("thenRun() executed");
});
}
- thenCompose() 메소드
@Data
public static class User {
String userId;
String name;
double score;
}
public static class UserService {
private UserService() {};
static User getUserDetails(String userId) {
User user = new User();
System.out.println("user = " + userId);
return user;
}
}
public static class CreditRatingService {
static Double getCreditRating(User user) {
double val = 1.2f;
return user.getScore() + val;
}
}
CompletableFuture<User> getUserDetail(String userId) {
return CompletableFuture.supplyAsync(() -> {
return UserService.getUserDetails(userId);
});
}
CompletableFuture<Double> getCreditRating(User user) {
return CompletableFuture.supplyAsync(() -> {
return CreditRatingService.getCreditRating(user);
});
}
@Test
public void completableFutureThenCompseTest() {
String userId = "testUser";
CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));
log.info("here");
try {
log.info("result = " + result.get().get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
CompletableFuture<Double> result2 = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));
try {
System.out.println("result2 = " + result2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
- allOf() 메소드
CompletableFuture<String> downloadWebPage(String pageLink) {
return CompletableFuture.supplyAsync(() -> {
// Code to download and return the web page's content
return pageLink+"_content";
});
}
@Test
public void completableFutureAllOfTest() {
List<String> webPageLinks = Arrays.asList("http://www.naver.com", "http://toast.com", "http://godo.com"); // A list of 100 web page links
// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
.map(webPageLink -> downloadWebPage(webPageLink))
.collect(Collectors.toList());
// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
return pageContentFutures.stream()
.map(pageContentFuture -> pageContentFuture.join())
.collect(Collectors.toList());
});
// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
return pageContents.stream()
.filter(pageContent -> pageContent.contains("CompletableFuture"))
.count();
});
try {
System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
- anyOf() 메소드
@Test
public void completableFutureAnyOfTest() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 3";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
try {
System.out.println(anyOfFuture.get()); // Result of Future 2
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
출처
https://www.callicoder.com/java-8-completablefuture-tutorial/
'Engineering > Java' 카테고리의 다른 글
log4j2 , logback 설정 (0) | 2020.06.29 |
---|---|
InteliJ 설치 후 Lombok 설정 (0) | 2019.11.06 |
java 8 stream API 실행시간 비교 (0) | 2019.08.22 |
MySQL insert all on update MyBatis 쿼리문 (0) | 2016.07.08 |
Thymeleaf 에서 자주 사용하는 예제 (3) | 2016.05.18 |