Engineering/Java

CompletableFuture 기능 살펴보기

산책散策 2019. 8. 23. 17:11
728x90

Java 5 에 추가되었던 Future 는 다음과 같은 제한 사항이 있다.

1. 수동(manually)으로 완료시킬수없다. 예를 들어 remote API 로 이커머스 상품의 가장 비싼 상품을 가져오는 기능을 작성하는데 스레드를 분리해서 해당 기능에서 Future 로 반환하고자 한다. 이때 리모트 API 서비스가 down 이 되면 Future 를 수동으로 완료시킬 수 없다.


2. Future 의 결과값을 블럭킹 없이 더 이상의 행동(action) 을 수행할 수 없다. Future 는 완료를 공지하지(notify) 않는다. 그래서 get() 메소드로 결과가 나올때까지 블럭(block)해야한다. 


3. 멀티 Future 들이 서로 묶을(chain) 수 없다. 시간이 긴 연산을 실행해야하고 그 연산이 끝났을때 다른 시간이 긴 연산에 결과를 보낼 필요가 있다. Future 는 그러한 비동기 흐름(workflow) 을 만들수가 없다.


4. 멀티 Future 들을 서로 연결(combine)할 수 없다.  병렬로 실행하고 완료 후에 어떤 기능을 실행하고 싶은 10가지 다른 퓨즈를 가지고 있다고 가정해 보면 Future 할 수 없다.


5. 익셉션 처리가 없다. Future API 는 어떠한 익셉션 처리 구조를 가지고 있지 않다.

 

Java 8 의 ComptetableFuture 는 Future, CompleteStage 인터페이스를 구현하고, 생성과 여러 Future 들을 체이닝, 연결할 수 있는 편리한 메소드들을 제공한다. 또한 이해하기 쉬운 익셉션 처리를 지원한다.

 

- complete() 메소드

: CompletableFuture 생성 예제. complete() 메소드는 완료되었다는 표시만 할뿐 결과를 얻기 위해서는 get() 메소드를 호출해야 한다.

@Test
public void completableFutureTest() {
    CompletableFuture<String> future = new CompletableFuture<>();

    try {
        TimeUnit.SECONDS.sleep(2);
        future.complete("Complete Stated!");        // 완료상태 표시
        System.out.println("Before get()");
        System.out.println("future_get = " + future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

- runAsync() 메소드

@Test
public void completableFutureRunAsyncTest() {
    log.info("start.");
    // Using Lambda Expression
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        log.info("I'll run in a separate thread than the main thread.");
    });

    try {
        future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

- supplyAsync() 메소드

@Test
public void completableFutureSupplyAsyncTest() {
    // Run a task specified by a Supplier object asynchronously
    CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Result of the asynchronous computation";
        }
    });

    // Block and get the result of the Future
    String result = null;
    try {
        result = future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    log.info("get_result : " + result);
}

- thenApply() 메소드

@Test
public void completableFutureThenApplyTest() {
    // Create a CompletableFuture
    CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Rajeev";
    });

    // Attach a callback to the Future using thenApply()
    CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
        return "Hello " + name;
    });

    // Block and get the result of the future.
    try {
        log.info(greetingFuture.get()); // Hello Rajeev
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Rajeev";
    }).thenApply(name -> {
        return "Hello " + name;
    }).thenApply(greeting -> {
        return greeting + ", Welcome to the CalliCoder Blog";
    });

    try {
        log.info(welcomeText.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

- thenAccept() 메소드, thenRun() 메소드

// thenAccept() 를 위한 model 예제 클래스
@Data
public static class Product {
    int id;
    String name;

    public Product(int id, String name) {
        this.id = id;
        this.name = name;
    }
}

// thenAccept() 를 위한 service 예제 클래스
public static class ProductService {
    private ProductService() {};
    static Product getProductDetail(int productId) {
        Product product = new Product(productId, "name_"+productId);
        System.out.println("productId = " + productId);
        return product;
    }
}

@Test
public void completableFutureThenAcceptTest() {
    int productId = 123;
    // thenAccept() example
    CompletableFuture.supplyAsync(() -> {
        log.info("before call getProductDetail()");
        return ProductService.getProductDetail(productId);
    }).thenAccept(product -> {
       log.info("Got product detail from remote service " + product.getName());
    });

    int productId2 = 999;
    // thenRun() example
    CompletableFuture.supplyAsync(() -> {
        return ProductService.getProductDetail(productId2);
    }).thenRun(() -> {
        log.info("thenRun() executed");
    });
}

- thenCompose() 메소드

@Data
public static class User {
    String userId;
    String name;
    double score;
}

public static class UserService {
    private UserService() {};
    static User getUserDetails(String userId) {
        User user = new User();
        System.out.println("user = " + userId);
        return user;
    }
}

public static class CreditRatingService {
    static Double getCreditRating(User user) {
        double val = 1.2f;
        return user.getScore() + val;
    }
}

CompletableFuture<User> getUserDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        return UserService.getUserDetails(userId);
    });
}

CompletableFuture<Double> getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        return CreditRatingService.getCreditRating(user);
    });
}

@Test
public void completableFutureThenCompseTest() {
    String userId = "testUser";
    CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
            .thenApply(user -> getCreditRating(user));
    log.info("here");
    try {
        log.info("result = " + result.get().get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

    CompletableFuture<Double> result2 = getUserDetail(userId)
            .thenCompose(user -> getCreditRating(user));
    try {
        System.out.println("result2 = " + result2.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

- allOf() 메소드

CompletableFuture<String> downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // Code to download and return the web page's content
        return pageLink+"_content";
    });
}

@Test
public void completableFutureAllOfTest() {
    List<String> webPageLinks = Arrays.asList("http://www.naver.com", "http://toast.com", "http://godo.com");    // A list of 100 web page links

    // Download contents of all the web pages asynchronously
    List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
            .map(webPageLink -> downloadWebPage(webPageLink))
            .collect(Collectors.toList());

    // Create a combined Future using allOf()
    CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
    );

    // When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
    CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
        return pageContentFutures.stream()
                .map(pageContentFuture -> pageContentFuture.join())
                .collect(Collectors.toList());
    });

    // Count the number of web pages having the "CompletableFuture" keyword.
    CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
        return pageContents.stream()
                .filter(pageContent -> pageContent.contains("CompletableFuture"))
                .count();
    });

    try {
        System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

- anyOf() 메소드

@Test
public void completableFutureAnyOfTest() {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of Future 1";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of Future 2";
    });

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of Future 3";
    });

    CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

    try {
        System.out.println(anyOfFuture.get());                // Result of Future 2
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

 

출처

https://www.callicoder.com/java-8-completablefuture-tutorial/