ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Part2] Java8 In action - Chapter7 - 1
    Java8 In Action 2022. 8. 2. 08:58
    반응형

    해당 내용은 Java8 In Action 책을 요약 및 정리한 내용입니다.

    좀 더 자세히 알고 싶으신 분들은 책을 사서 읽어보심을 추천드립니다.!

    7.1 병렬 스트림

    Main

    package Part2.Chapter7.Chapter7_7_1;
    
    import java.util.function.Function;
    import java.util.stream.LongStream;
    import java.util.stream.Stream;
    
    /*
     * 7.1 병렬 스트림
     * 
     * 컬렉션에서 parallelStream을 호출하면 병렬 스트림(parallel stream)이 생성된다.
     * 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.
     * 따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.
     */
    public class Main_7_1 {
    
        public static long meansureSumPerf(Function<Long, Long> adder, long n) {
            long fastest = Long.MAX_VALUE;
    
            for(int i = 0; i < 10; i++) {
                long start = System.nanoTime();
                long sum = adder.apply(n);
                long duration = (System.nanoTime() - start) / 1_000_000;
    
                System.out.println("Result : " + sum);
    
                if(duration < fastest) {
                    fastest = duration;
                }            
            }
    
            return fastest;
        }
    
        public static void main(String[] args) {
            /*
             * 7.1.1 순차 스트림을 병렬 스트림으로 변환하기
             */
            System.out.println("7.1.1 순차 스트림을 병렬 스트림으로 변환하기 - 전통 자바 : " + ParallelStream.iterativeSum(100));
            System.out.println("7.1.1 순차 스트림을 병렬 스트림으로 변환하기 - 순차 : " + ParallelStream.sequentialSum(100));        
            System.out.println("7.1.1 순차 스트림을 병렬 스트림으로 변환하기 - parallel : " + ParallelStream.parallelSum(100));
    
            /*
             * 병렬 스트림에서 사용하는 스레드 풀 설정
             * 
             * 스트림의 parallel 메서드에서 병렬로 작업을 수행 하는 스레드 생성, 생성 갯수, 그리고 그 과정을 어떻게 커스텀마이징할 수 있는지
             * 궁금할 것이다.
             * 
             * 병렬 스트림은 내부적으로 ForkJoinPool을 사용한다
             * 기본적으로 ForkJoinPool은 프로세스 수, 즉 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 갖는다.
             * 
             * System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", 12);
             * 
             * 위 코드는 전역 설정 코드이므로 이후의 모든 병렬 스트림 연산에 영향을 준다.
             * 현재는 하나의 병렬 스트림에 사용할 수 있는 특정한 값을 지정할 수 없다.
             * 일반적으로 기기의 프로세서 수와 같으므로 특별한 이유가 없다면 FokJoinPool의 기본값을 그대로 사용하는것을 권장한다.
             */
    
            /*
             * 7.1.2 스트림 성능 측정
             * 
             * 병렬화를 이용하면 순차나 반복 형식에 비해 성능이 더 좋아질 것이라 추측할 것이다.
             * 성능을 최적화의 세 가지 황금 규칙인 첫째도 측정, 둘째도 측정, 셋째도 측정에 따라 성능을 측정해 보자.
             */
    
            /*
             * 고전적인 for 루프를 사용한 반복 버전이 생각보다 빠르다는 점도 고려해야 한다.
             * for 루프는 저수준으로 작동하며 기본값을 박싱하거나 언박싱할 필요가 없으므로 수행 속도가 빠르다.
             */
            System.out.println("7.1.2 스트림 성능 측정 - 전통 자바 : " + meansureSumPerf(ParallelStream::iterativeSum, 10_000_000));
            System.out.println("7.1.2 스트림 성능 측정 - 순차 : " + meansureSumPerf(ParallelStream::sequentialSum, 10_000_000));
    
            /*
             * 병렬 버전이 순차 버전보다 늦게 동작하는 이유는
             *         iterate가 박싱된 객체를 생성하므로 이를 다시 언박싱하는 과정이 필요하다.
             *         iterate가 병렬로 실행될 수 있도록 독립적인 청크로 분할하기가 어렵다.
             * 
             * iterate는 본질적으로 순차적이라 청크로 분할하기가 어렵다.
             * 이와 같은 상황 때문에 리듀싱 연산이 수행되지 않는다.
             * 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록
             * 청크로 분할할 수 없다.
             * 스트림이 병렬로 처리되도록 지시했고 각각의 합계가 다른 스레드에서 수행되었지만 결국 순차처리 방식과
             * 크게 다른 점이 없으므로 스레드를 할당하는 오버헤드만 증가하게 된다. 
             */
            System.out.println("7.1.2 스트림 성능 측정 - parallel : " + meansureSumPerf(ParallelStream::parallelSum, 10_000_000));
    
            /*
             * 더 특화된 메서드 이용
             * 
             * iterate가 LongStream.rangeClosed 비해 다음과 같은 장점을 제공한다.
             *         LongStream.rangeClosed는 기본형 long을 직접 사용하므로 박싱과 언방식 오버헤드가 사라진다.
             *         LongStream.rangeClosed는 쉽게 청크로 분할할 수 있는 숫자 범위를 생상한다.
             *        예를 들어 1-20 범위의 숫자를 각각 1-5, 6-10, 11-15, 16-20 범위의 숫자로 분할할 수 있다.
             *
             * 기존 iterate 팩토리 메서드로 생성한 순차 버전에 비해 이 예제의 순차 스트림 처리 속도가 더 빠르다.
             * 특화되지 않은 스트림을 처리할 때는 오토박싱, 언박싱 등의 오버헤드를 수반하기 때문이다.
             * 상황에 따라서 어떤 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 더 중요하다.
             */
            System.out.println("더 특화된 메서드 이용 - 순차 : " + meansureSumPerf(ParallelStream::rangeSum, 10_000_000));
            System.out.println("더 특화된 메서드 이용 - parallel : " + meansureSumPerf(ParallelStream::newParallelSum, 10_000_000));
    
            /*
             * 7.1.3 병렬 스트림의 올바른 사용법
             */
            System.out.println("7.1.3 병렬 스트림의 올바른 사용법 - 잘못된 예 : " + meansureSumPerf(ParallelStream::sideEffectParallelSum, 10_000_000));
    
            /*
             * 7.1.4 병렬 스트림 효과적으로 사용하기
             * 
             * '천 개 이상의 요소가 있을 대만 병렬 스트림을 사용하라'와 같이 양을 기준으로 병렬 스트림 사용을 결정ㅎ하는 것은 적절하지 않다.
             * 그래도 어떤 상황에서 병렬 스트림을 사용할 것인지 약간의 수량적 힌트를 정하는 것이 도움이 될 때도 있다.
             * 
             * - 확신이 서지 않는다면 직접 측정하다. 
             * 병렬 스트림의 수행 과정은 투명하지 않을 때가 많다.
             * 따라서 순차 스트림과 병렬 스트림 중 어떤 것이 좋을지 모르겠다면 적절한 벤치마크로 직접 성능을 축정하는 것이 바람직하다.
             * 
             * - 박싱을 주의하라.
             * 자동 박싱과 언방식은 성능을 크게 저하시킬 수 있는 요소다. 자바 8은 박싱 동작을 피할 수 있도록 기본형 특화 스트림
             * (IntStream, LongStream, DoubleStream)을 제공한다. 따라서 되도록이면 기본형 특화 스트림을 사용하는 것이 좋다.
             * 
             * - 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다
             * limit나 findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다.
             * 예를 들어 findAny는 요소의 순서와 상관없이 연산하므로 findFirst보다 성능이 좋다. 정렬된 스트림에 unordered를
             * 호출하면 비정렬된 스트림을 얻을 수 있다. 스트림에 N개 요소가 있을 때 요소의 순서가 상관없다면(예를 들어 소스가 리스트라면)
             * 비정렬된 스트림에 limit를 호출하는 것이 효율적이다.
             * 
             * - 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라.
             * 처리해야 할 요소 수가  N이고 하나의 요소를 처리하는 데 드는 비용을 Q라 하면 전체 스트림 파이프라인 처리 비용은 N*Q로 예상할 수 있다.
             * Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미한다.
             * 
             * - 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
             * 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문이다.
             * 
             * - 스트림을 구성하는 자료구조가 적절한지 확인하라.
             * 예를 들어 ArrayList를 LinkedList보다 효율적으로 분할할 수 있다. LinkedList를 분할하려면 모든 요소를 탐색해야 하지만
             * ArrayList는 요소를 탐색하지 않고도 리스트를 분할할 수 있기 때문이다.
             * 또한 range 택토리 메서드로 만든 기본형 스트림도 쉽게 분해할 수 있다.(Spliterator를 구현해서 분해 과정을 완벽하게 제어할 수 있다.)
             * 
             * - 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
             * 예를 들어 SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 병렬 처리 할 수 있다.
             * 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.
             * 
             * - 최종 연산의 병합 과정(예를 들면 Collector의 combiner 메서드) 비용을 살펴보라.
             * 병합 과정의 비용이 비싸다면 병렬 스트림의 얻은 성능의 이익이 서브스트림의 부분결과를 합치는 과정에서 상쇄할 수 있다.
             */
    
            /*
             * 스트림 소스의 분해성 
             * 소스 : ArrayList
             * 분해성 : 훌륭함
             * 
             * 소스 : LinkedList
             * 분해성 : 나쁨
             * 
             * 소스 : IntStream.range
             * 분해성 : 훌륭함
             * 
             * 소스 : Stream.iterate
             * 분해성 : 나쁜
             * 
             * 소스 : HashSet
             * 분해성 : 젛음
             * 
             * 소스 : TreeSet
             * 분해성 : 좋음
             */
    
        }
    
        static class ParallelStream {
            public static long sequentialSum(long n) {
                return Stream.iterate(1L, i -> i + 1)
                    .limit(n)
                    .reduce(0L, Long::sum);
            }
    
            public static long iterativeSum(long n) {
                long result = 0;
    
                for(long i = 1L; i <= n; i++) {
                    result += i;
                }
    
                return result;
            }
    
            public static long parallelSum(long n) {
                /*
                 * 순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산(숫자 합계 계산)이 병렬로 처리된다.
                 * 스트림이 여러 청크로 분할이 되고 리듀싱 연산을 여러 청크에 병렬로 수행할 수 있다.
                 * 마지막으로 리듀싱 연산으로 생성된 부분결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출한다.
                 * 
                 * 사실 순차 스트림에 parallel을 호출해도 스트림 자체에는 아무 변화도 일어나지 않는다.
                 * 내부적으로는 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 불린 플래그가 설정된다.
                 * 반대로 sequential로 병렬 스트림을 순차 스트림으로 바꿀 수 있다.
                 * 이 두 메서드를 이용하면 어떤 연산은 병렬로 어떤 연산은 순차로 실행할지 제어할 수 있다.
                 * 
                 * stream.parallel()
                 *         .filter(...)
                 *         .sequential()
                 *         .map(...)
                 *         .parallel()
                 *         .reduce();
                 * 
                 * parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다.
                 * 위 예제에서는 파이프라인의 마지막 호출은 parallel이므로 파이프라인은 전체적으로 병렬로 실행된다.
                 */
                return Stream.iterate(1L, i -> i + 1)
                    .limit(n)
                    .parallel()
                    .reduce(0L, Long::sum);
            }
    
            public static long rangeSum(long n) {
                return LongStream.rangeClosed(1, n).reduce(0L, Long::sum);
            }
    
            public static long newParallelSum(long n) {        
                return LongStream.rangeClosed(1, n)
                    .parallel()
                    .reduce(0L, Long::sum);
            }
    
            public static long sideEffectParallelSum(long n) {
                /*
                 * 아래 코드는 본질적으로 순차 실행할 수 있도록 구현되어 있기 때문에 병렬로 실행하면 참사가 발생된다.
                 * 특히 total을 접근할 때마다(다수의 스레드에서 동시에 데이터에 접근하는) 데이터 레이스 문제가 발생된다.
                 * 동기화로 문제를 해결하면 결국 병렬화라는 특성이 없어져 버린다.
                 */
                Accumulator accumulator = new Accumulator();
                LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
    
                return accumulator.total;
    
            }
        }
    }
    
    class Accumulator {
        /*
         * 다수의 쓰레드가 하나의 total 변수를 참조 하려 하기 때문에 데이터 레이스 문제 발생 및
         * 여러 스레드가 동시에 add 메서드를 실행하기 때문에 올바른 결과값이 나오질 않는다. 
         */
        public long total = 0;
    
        /*
         * 얼핏 보면 아토믹 연산(atomic operation) 같지만 total += value 아토믹 연산이 아니다.
         * 결국 여러 스레드에서 공유하는 객체의 상태를 바꾸는 forEach 블록 내부에서 add 메서드를 호출하면서
         * 이 같은 문제가 발생된다.
         */
        public void add(long value) { total += value; } 
    }

    7.2 포크조인 프레임워크

    Process

    ForJoinSumCalculator.java

    package Part2.Chapter7.Chapter7_7_2.forkjoin;
    
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    import java.util.stream.LongStream;
    
    // RecursiveTask를 상속받아 포크/조인 프레임워크에서 사용할 태스크를 생성한다.
    public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    
        private static final long serialVersionUID = 1L;
    
        // 더할 숫자 배열
        private final long[] numbers;
    
        // 이 서브태스크에서 처리할 배열의 초기 위치와 최종 위치
        private final int start;
        private final int end;
    
        // 이 값 이하의 서브테스크는 더이상 분할할 수 없다.
        public static final long THRESHOLD = 10_000;
    
        // 메인 태스크를 생성할 때 사용할 공개 생성자
        public ForkJoinSumCalculator(long[] numbers) {
            this(numbers, 0, numbers.length);
        }
    
        // 메인 태스크의 서브태스크를 재귀적으로 만들 때 사용할 비공개 생성자    
        private ForkJoinSumCalculator(long[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }
    
        // RecursiveTask의 추상 메서드 오버라이드
        @Override
        protected Long compute() {
            // 이 태스크에서 더할 배열의 길이
            int length = this.end - this.start;
    
            // 기준값과 같거나 작으면 순차적으로 결과를 계산한다.
            if(length <= THRESHOLD) {
                return this.computeSequentially(); 
            }        
    
            // 배열의 첫 번째 절반을 더하도록 서브태스크를 생성한다.
            ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(this.numbers, this.start, this.start + length / 2);
            // FokJoinPool의 다른 스레드로 새로 생성한 태스크를 비동기로 실행한다.        
            leftTask.fork();
    
            // 배열의 나머지 절반을 더하도록 서브태스크를 생성한다.
            ForkJoinSumCalculator rigthTask = new ForkJoinSumCalculator(this.numbers, this.start + length / 2, this.end);
            // 두 번째 서브태스크를 동기 실행한다. 이때 추가로 분할이 일어날 수 있다.
            Long rightResult = rigthTask.compute();
            // 첫 번째 서브태스크의 결과를 읽거나 아직 결과가 없다면 기다린다.
            Long leftResult = leftTask.join();
    
            // 두 서브태스크의 결과를 조합한 값이 이 태스크의 결과다.
            return leftResult + rightResult;
        }
    
        // 더 분할할 수 없을 때 서브 태스크의 결과를 계산하는 단순 알고리즘
        private long computeSequentially() {
            long sum = 0;
    
            /*
             * n까지의 자연수 덧셈 작업을 병렬로 수행하는 방법을 더 직관적으로 보여준다.
             */
            for(int i = this.start; i < this.end; i++) {
                sum += this.numbers[i];
            }
    
            return sum;
        }
    
        public static long forkJoinSum(long n) {
            /*
             * LongStream으로 n까지의 자연수를 포함하는 배열을 생성했다.
             * 생성된 배열을 ForkJoinSumCalculator의 생성자로 전달해서 ForkJoinTask를 만들었다.
             * 마지막으로 생성한 태스크를 새로운 ForkJoinPool의 invoke 메서드로 전달했다.
             * ForkJoinPool에서 실행되는 마지막 invoke 메서드의 반환값은 ForkJoinSumCalculator에서 
             * 정의한 태스크의 결과가 된다.
             */        
            long[] numbers = LongStream.rangeClosed(1, n).toArray();
    
            ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
    
            /*
             * 일반적으로 애플리케이션에서는 둘 이상의 ForkJoinPool을 사용하지 않는다. 즉, 소프트웨어의 필요한 곳에서
             * 언제든 가져다 쓸 수 있도록 FokJoinPool을 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장한다.
             * 
             * ForkJoinPool을 만들면서 인수가 없는 디폴트 생성자를 이용했는데 이는 jvm에서 이용할 수 있는 모든 프로세스가
             * 자유롭게 풀에 접근할 수 있음을 의미한다.
             * 
             * 더 정확하게 Runtime.availableProcessors의 반환값으로 풀에 사용할 스레드 수를 결정한다.
             * availableProcessors, 즉 '사용할 수 있는 프로세서' 이름과는 달리 실제 프로세서 외에 하이퍼스레딩과 관련된
             * 가상 프로세서도 개수에 포함된다.
             */
            return new ForkJoinPool().invoke(task);
        }
    }

    Main

    package Part2.Chapter7.Chapter7_7_2;
    
    import java.util.function.Function;
    
    import Part2.Chapter7.Chapter7_7_2.forkjoin.ForkJoinSumCalculator;
    
    /*
     * 7.2 포크/조인 프레임워크
     * 
     * 포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를
     * 합쳐서 전체 결과를 만들도록 설계되었다.
     * 포크/조인 프레임워크에서는 서브태스크를 스레드 풀(FokJoinPool)의 작업자 스레드에 분산 할당하는 ExcutorService 인터페이스를 구현한다.
     */
    public class Main_7_2 {
    
        public static long meansureSumPerf(Function<Long, Long> adder, long n) {
            long fastest = Long.MAX_VALUE;
    
            for(int i = 0; i < 10; i++) {
                long start = System.nanoTime();
                long sum = adder.apply(n);
                long duration = (System.nanoTime() - start) / 1_000_000;
    
                System.out.println("Result : " + sum);
    
                if(duration < fastest) {
                    fastest = duration;
                }            
            }
    
            return fastest;
        }
    
        public static void main(String[] args) {
            /*
             * 7.2.1 RecursiveTask 활용
             * 
             * 스레드 풀을 이용하려면 RecursiveTask<R>의 서브클래스를 만들어야 한다. 
             * 여기서 R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을 때(결과가 없더라도 다른 비지역 구조를 바꿀 수 있다.) 
             * RecursiveAction 형식이다.
             * 
             * compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.
             * 따라서 대부분의 compute 메서드 구현은 다음과 같은 의사코드 형식을 유지한다.
             * 
             * if(태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
             *         순차적으로 태스크 계산
             * } else {
             *         태스크를 두 서브태스크로 분할.
             *         태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함.
             *         모든 서브태스크의 연산이 완료될 때까지 기달림.
             *         각 서브태스크의 결과를 합침.
             * }
             * 
             * 해당 알고리즘은 분할 후 정복(divide-and-conquer)알고리즘의 병렬화 버전이다.
             * 
             * ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하면서 작업을 수행한다.
             * 
             *         1. compute 메서드는 병렬로 실행할 수 있을만큼 태스크의 크기가 충분히 작아졌는지 확인하며
             *         2. 아직 태스크의 크기가 크다고 판단되면 숫자 배열을 반으로 분할해서 두 개의 새로운 ForkJoinSumCalculator로 할당한다.
             *         3. 그러면 다시 새로운 ForkJoinPool이 새로 생성된 ForkJoinSumCalculator를 실행한다.
             *         4. 결국 이 과정이 재귀적으로 반복되면서 주어진 조건(예제에서는 덧샘을 수행할 항목이 만 개 이하여야 함)을 만족할 때까지 태스크를 분할한다.
             * 
             * 이제 각 서브태스크는 순차적으로 처리되며 포킹 프로세스로 만들어진 이진트리의 태스크를 루트에서 역순으로 방문한다.
             * 즉, 각 서브태스크의 부분결과를 합쳐서 태스크의 최종 결과를 계산한다.
             */
            System.out.println("7.2 포크/조인 프레임워크 : " + meansureSumPerf(ForkJoinSumCalculator::forkJoinSum, 10_000_000));
    
            /*
             * 7.2.2 포크/조인 프레임워크를 제대로 사용하는 방법
             * 
             * 다음은 포크/조인 프레임워크를 효과적으로 사용하는 방법이다.
             * 
             * - join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다.
             * 따라서 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다.
             * 그렇지 않으면 각각의 서브태스크가 다른 태스크가 끝나길 기달리는 일이 발생하며 원래 순차 알고리즘보다
             * 느리고 복잡한 프로그램이 될수 있다. 
             * 
             * - RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말아야 한다.
             * 대신 compute나 fork 메서드를 직접 호출할 수 있다. 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용한다.
             * 
             * - 서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다.
             * 왼쪽 작업과 오른쪽 작업 모두에 fork 메서드를 호출하는 것이 자연스러울 것 같지만 한쪽 작업에는 fork를
             * 호출하는 것보다는 compute를 호출하는 것이 효율적이다.
             * 그러면 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에 불필요한 태스크를 할당하는
             * 오버헤드를 피할 수 있다.
             * 
             *  - 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.
             *  보통 IDE로 디버깅할 때 스택 트레이스(stack trace)로 문제가 일어난 과정을 쉽게 확인할 수 있는데,
             *  포크/조인 프레임워크에서는 fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 도움이
             *  되지 않는다.
             *  
             *  - 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차처리보다 무조건 빠를 거라는 생각은 버려야 한다.
             *  병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 한다.
             *  각 서브태스크의 실행시간은 새로운 서브태스크를 포킹하는데 드는 시간보다 길어야 한다.
             *      예)
             *          I/O를 한 서브태스크에 할당. 다른 서브태스크에서는 계산을 실행.
             *          즉 I/O와 계산을 병렬로 실행할 수 있다.
             * 순차버전과 병렬버전의 성능을 비교할 때는 다른 요소도 고려해야 한다.
             * 다른 자바 코드와 마찬가지로 jit 컴파일러에 의해 최적화되려면 몇 차례의 '준비 과정(warmed up)' 또는 실행과정을
             * 거쳐야 한다.
             * 따라서 성능을 측정할 때는 하니스(meansureSumPerf 메서드와 같은)를 통해 여러 번 프로그램을 실행한 결과를 측정해야한다.
             * 또한 컴파일러 최적화는 병렬버전보다는 순차버전에 집중될 수 있다는 사실도 기억하자
             * (예를 들어 순차버전에서는 죽은 코드를 분석해서 사용되지 않는 계산은 아예 삭제하는 등의 최적화를 달성하기 쉽다.)
             */
    
            /*
             * 7.2.3 작업 훔치기
             * 
             * 기기의 코어 수보다 서브태스크가 많이 생성 되는게 자원만 낭비 하는것 처럼 보일수 있다.
             * 실제로 각각의 태스크가 CPU로 할당되는 상황이라면 어차피 서브태스크로 분할한다고 해서 성능이 좋아지지는 않을것이다.
             * 하지만 실제로 코어 개수와 관계없이 적절한 크기로 분할된 많은 태스크를 포킹하는 것이 바람직하다.
             * 
             * 이론적으로 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하면 모든 CPU 코어에 태스크를 실행할 것이고 크기가 같은 각각의
             * 태스크는 같은 시간에 종료될 것이라고 생각할 수 있다.
             * 
             * 하지만 복잡한 로직인 경우에는 각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있다.
             * 이는 분할 기법이 효율적이지 않을수도 있고 예기치 않게 디스크 접근 속도 저하 및 외부 서비스와 협력 과정 중 지연이 생길 수 있기 때문이다.
             * 
             * 포크/조인 프레임워크에서는 "작업 훔치기(work stealing)"라는 기법으로 이 문제를 해결한다.
             * 작업 훔치기 기법은 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다.
             * 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트(doubly linked list)를 참조하면서 작업이 끝날 때마다 큐의 헤드에서
             * 다른 태스크를 가져와서 작업을 처리한다. 이때 한 스레드는 다른 스레드보다 자신에게 할당된 태스크를 더 빨리 처리할 수 있다.
             * 즉, 다른 스레드는 바쁘게 일하고 있는데 한 스레드는 할일이 다 떨어진 상황이다. 이때 할일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라
             * 다른 스레드 큐의 꼬리(tail)에서 작업을 훔쳐온다. 모든 태스크가 작업을 끝낼 때까지, 즉 모든 큐가 빌 때까지 이 과정을 반복한다.
             * 따라서 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있다.
             * 
             */
        }
    }

    7.3 Spliterator

    Process

    WordCounter.java

    package Part2.Chapter7.Chapter7_7_3.wordCounter;
    
    public class WordCounter {
        private final int counter;
        private final boolean lastSpace;
    
        public WordCounter(int counter, boolean lastSpace) {
            this.counter = counter;
            this.lastSpace = lastSpace;
        }
    
        /*
         * WordCounter의 상태를 어떻게 바꿀 것인지, 또는 엄밀히 WordCounter는 불변 클래스 이므로 새로운 WordCounter 클래스를
         * 어떤 형태로 생성할 것인지 정의한다.
         * 스트림을 탐색하면서 새로운 문자를 찾을 때마다 accmulate 메서드를 호출한다.
         * 
         * 반복 알고리즘처럼 accumulate 메서드는 문자열의 문자를 하나씩 탐색한다.
         */
        public WordCounter accumulate(Character c) {
            if(Character.isWhitespace(c)) {
                return lastSpace ? this : new WordCounter(this.counter, true);
            } else {
                // 공백 문자를 만나면 지금까지 탐색한 문자를 단어로 간주(공백 문자는 제외) 단어 개수를 증가.
                return lastSpace ? new WordCounter(this.counter + 1, false) : this;
            }
        }
    
        /*
         * 두 WordCounter의 counter 값을 더한다.
         */
        public WordCounter combine(WordCounter wordCounter) {
            // counter 값만 더할 것이므로 마지막 공백은 신경 쓰지 않는다.
            return new WordCounter(this.counter + wordCounter.counter, wordCounter.lastSpace);
        }
    
        public int getCounter() {
            return this.counter;
        }
    }

    WordCounterSpliterator.java

    package Part2.Chapter7.Chapter7_7_3.wordCounterSpliterator;
    
    import java.util.Spliterator;
    import java.util.function.Consumer;
    
    public class WordCounterSpliterator implements Spliterator<Character>{
        private final String string;
        private int currentChar = 0;
    
        public WordCounterSpliterator(String string) {
            this.string = string;
        }
    
        /*
         * 문자열에서 현재 인덱스에 해당하는 문자를 Consumer에 제공한 다음에 인덱스를 증가시킨다.
         * 
         * 인수로 전달된 Consumer는 스트림을 탐색하면서 적용해야 하는 함수 집합이 작업을 처리할 수 있도록
         * 소비한 문자를 전달하는 자바 내부 클래스다.
         * 
         * 예제에서는 스트림을 탐색하면서 하나의 리듀싱 함수, 즉 WordCounter의 accumulate 메서드만 적용한다.
         * 해당 메서드는 새로운 커서 위치가 전체 문자열 길이보다 작으면 참을 반환하며 이는 반복 탐색해야 할
         * 문자가 남아있음을 알린다.
         */
        @Override
        public boolean tryAdvance(Consumer<? super Character> action) {
            // 현재 문자를 소비한다.
            action.accept(string.charAt(this.currentChar++));
            // 소비할 문자가 남았다면 true를 반환한다.
            return this.currentChar < string.length();
        }
    
        /*
         * 반복될 자료구조를 분할하는 로직을 포함한다.
         * 
         * 분할 동작을 중단할 한계를 설정해야 한다.
         * 분할 과정에서 남은 문자 수가 한계값(예제에서는 10) 이하면 null을 반환. 즉 분할을 중지하도록 지시한다.
         * 
         * 반대로 분할이 필요한 상황에는 파싱해야 할 문자열 청크의 중간 위치를 기준으로 분할하도록 지시한다.
         * 이때 단어 중간을 분할하지 않도록 빈 문자가 나올 때까지 분할 위치를 이동시킨다.
         * 분할할 위치를 찾았다면 새로운 Spliterator를 만든다.
         * 새로운 Spliterator는 현재 위치(currentChar)부터 분할된 위치까지의 문자를 탐색한다.
         */
        @Override
        public Spliterator<Character> trySplit() {
            int currentSize = this.string.length() - this.currentChar;
    
            // 파싱할 문자열을 순차 처리할 수 있을 만큼 충분히 작아졌음을 알리는 null을 반환한다.
            if(currentSize < 10) {
                return null;
            }
    
            // 파싱할 문자열의 중간을 분할 위치로 설정한다.
            for(int splitPos = currentSize / 2 + this.currentChar; splitPos < this.string.length(); splitPos++) {
                // 다음 공백이 나올 때까지 분할 위치를 뒤로 이동 시킨다.
                if(Character.isWhitespace(this.string.charAt(splitPos))) {
                    // 처음부터 분할 위치까지 문자열을 파싱할 새로운 WordCounterSpliterator를 생성한다.
                    Spliterator<Character> spliterator = new WordCounterSpliterator(this.string.substring(this.currentChar, splitPos));
    
                    // 이 WordCounterSpliterator의 시작 위치를 분할 위치로 설정한다.
                    this.currentChar = splitPos;
    
                    return spliterator;
                }
            }
    
            return null;
        }
    
        /*
         * 탐색해야 할 요소의 개수는 Spliterator가 파싱할 문자열 전체 길이(string.length())와 현재 반복 중인 위치(currentChar)의 차다.
         */
        @Override
        public long estimateSize() {
            return this.string.length() - this.currentChar;
        }
    
        /*
         * Spliterator.ORDERED : 문자열의 문자 등장 순서가 유의미함.
         * Spliterator.SIZED : estimateSize 메서드의 반환값이 정확함.
         * Spliterator.SUBSIZED : trySplit으로 생성된 Spliterator도 정확한 크기를 가짐.
         * Spliterator.NONNULL : 문자열에는 null 문자가 존재 하지 않음.
         * Spliterator.IMMUTABLE : 문자열 자체가 불변 클래스이므로 문자열을 파싱하면서 속성이 추가되지 않음.
         * 
         * 등의 특성임을 알려준다.
         */
        @Override
        public int characteristics() {
            return Spliterator.ORDERED + Spliterator.SIZED + Spliterator.SUBSIZED + Spliterator.NONNULL + Spliterator.IMMUTABLE;
        }
    }

    Main

    package Part2.Chapter7.Chapter7_7_3;
    
    import java.util.Spliterator;
    import java.util.stream.IntStream;
    import java.util.stream.Stream;
    import java.util.stream.StreamSupport;
    
    import Part2.Chapter7.Chapter7_7_3.wordCounter.WordCounter;
    import Part2.Chapter7.Chapter7_7_3.wordCounterSpliterator.WordCounterSpliterator;
    
    /*
     * 7.3 Spliterator
     * 
     * Spliterator는 '분할할 수 있는 반복자(splitable iterator)'라는 의미이다.
     * Iterator처럼 Spliterator는 소스의 요소 탐색 기능을 제공한다는 점은 같지만 Spliterator는 병렬 작업에 특화되어 있다.
     * 자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공한다.
     * 컬렉션은 spliterator라는 메서드를 제공하는 Spliterator 인터페이스를 구현한다.
     * 
     *  public Interface Spliterator<T> {
     *      boolean tryAdvance(Consumer<? super T> action);
     *      Spliterator<T> trySplit();
     *      long estimateSize();
     *      int characteristics();
     *  }
     *  
     * T : Spliterator에서 탐색하는 요소의 형식
     * tryAdvance : Spliterator 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환. (일반적인 Iterator 동작과 같다.)
     * trySplit : Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성하는 메서드.
     * estimateSize : 탐색해야 할 요소 수 정보를 제공. 특히 탐색해야 할 요소 수가 정확하지 않더라도 제공된 값을 이용해서 더 쉽고 공평하게
     * Spliterator를 분할할 수 있다.  
     * 
     * 7.3.1 분할 과정
     * 
     * 스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다.
     * 1단계 : 첫 번째 Spliterator에 trySplit을 호출하면서 두 번째 Spliterator가 생성된다.
     * 2단계 : 두 개의 spliterator에 trySplit를 다시 호출하면서 네 개의 Spliterator가 생성된다.
     * 이처럼 trySplit의 결과가 null이 될 때까지 이 과정을 반복한다.
     * 이 분할 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받는다.
     * 
     * Spliterator 특성
     * 
     * characteristics는 추상 메서드이며, Characteristics 메서드는 Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.
     * 
     * 특성 : ORDERED 
     * 의미 : 리스트처럼 요소에 정해진 순서가 있으므로 Spliterator는 요소를 탐색하고 분할할 때 이 순서에 유의해야 한다.
     * 
     * 특성 : DISTINCT 
     * 의미 : x, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환한다.
     * 
     * 특성 : SORTED
     * 의미 : 탐색된 요소는 미리 정의된 정렬 순서를 따른다.
     * 
     * 특성 : SIZED
     * 의미 : 크기가 알려진 소스(예를 들면 Set)로 Spliterator를 생성했으므로 estimatedSize()는 정확한 값을 반환한다.
     * 
     * 특성 : NONNULL
     * 의미 : 탐색하는 모든 요소는 null이 아니다.
     * 
     * 특성 : IMMUTABLE
     * 의미 : 이 Spliterator의 소스는 불변이다. 즉, 요소를 탐색하는 동안 요소를 추가, 삭제, 고칠 수 없다.
     * 
     * 특성 : CONCURRENT
     * 의미 : 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다.
     * 
     * 특성 : SUBSIZED 
     * 의미 : 이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖는다.
     */
    public class Main_7_3 {
    
        public static int countWordsIteratively(String s) {
            int counter = 0;
            boolean lastSpace = true;
    
            for(char c : s.toCharArray()) {
                if(Character.isWhitespace(c)) {
                    lastSpace = true;
                } else {
                    if(lastSpace) {
                        counter++;
                    }
    
                    lastSpace = false;
                }
            }
    
            return counter;
        }
    
        public static int countWords(Stream<Character> stream) {
            WordCounter wordCounter = stream.reduce(new WordCounter(0, true)
                , WordCounter::accumulate
                , WordCounter::combine);
    
            return wordCounter.getCounter();
        }
    
        public static void main(String[] args) {
            final String SENTENCE = "Nel     mezzo   del   cammin     di    nostra     vita  "
                + "mi    ritrovai    in una    selva   oscura"
                + "  ch     la     dritta   via    era    smrrita   ";
    
            /*
             * 7.3.2 커스텀 Spliterator 구현하기
             */
            System.out.println("countWordsIteratively Found " + countWordsIteratively(SENTENCE) + " words");
    
            Stream<Character> stream = IntStream.range(0, SENTENCE.length())
                .mapToObj(SENTENCE::charAt);        
            System.out.println("countWords Found " + countWords(stream) + " words");
    
            /*
             * 병렬 실행 시 잘못된 결과가 나온다.
             * 이는 원래 문자열을 임의의 위치에서 둘로 나누다보니 예상치 못하게 하나의 단어를 둘로 계산하는 상황이 발생할 수 있다.
             * 즉, 순차 스트림을 병렬 스트림으로 바꿀 때 스트림 분할 위치에 따라 잘못된 결과가 나온 것이다.
             */
            stream = IntStream.range(0, SENTENCE.length())
                .mapToObj(SENTENCE::charAt);
            System.out.println("countWords(parallel) Found " + countWords(stream.parallel()) + " words");
    
            /*
             * 커스텀 Spliterator을 이용한 올바른 병렬 실행.
             */
            Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
            // StreamSupport.stream 팩토리 메서드로 전달한 두 번째 불린 인수는 병렬 스트림 생성 여부이다.
            stream = StreamSupport.stream(spliterator, true);
    
            System.out.println("countWords(spliterator parallel) Found " + countWords(stream) + " words");
    
        }
    
    }

    요약

    • 내부 반복을 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림을 병렬로 처리할 수 있다.
    • 간단하게 스트림을 병렬로 처리할 수 있지만 항상 병렬 처리가 빠른 것은 아니다. 병렬 소프트웨어 동작 방법과 성능은 직관적이지 않을 때가 많으므로 병렬 처리를 사용했을 때 성능을 직접 측정해봐야 한다.
    • 병렬 스트림으로 데이터 집합을 병렬 실행할 때 특히 처리해야 할 데이터가 아주 많거나 각 요소를 처리하는 데 오랜 시간이 거릴 때 성능을 높일 수 있다.
    • 가능하면 기본형 특화 스트림을 사용하는 등 올바른 자료구조 선택이 어떤 연산을 병렬로 처리하는 것보다 성능적으로 더 큰 영향을 미칠 수 있다.
    • 포크/조인 프레임워크에서 병렬화할 수 있는 태스크를 작은 태스크로 분할한 다음에 분할된 태스크를 각각의 스레드로 실행하며 서브태스크 각각의 결과를 합쳐서 최종 결과를 생산한다.
    • Spliterator는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화 할 것인지 정의한다.
    반응형

    댓글

Designed by Tistory.