해당 내용은 Java8 In Action 책을 요약 및 정리한 내용입니다.
좀 더 자세히 알고 싶으신 분들은 책을 사서 읽어보심을 추천드립니다.!
7.1 병렬 스트림
Main
package Part2.Chapter7.Chapter7_7_1;
import java.util.function.Function;
import java.util.stream.LongStream;
import java.util.stream.Stream;
/*
* 7.1 병렬 스트림
*
* 컬렉션에서 parallelStream을 호출하면 병렬 스트림(parallel stream)이 생성된다.
* 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.
* 따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.
*/
public class Main_7_1 {
public static long meansureSumPerf(Function<Long, Long> adder, long n) {
long fastest = Long.MAX_VALUE;
for(int i = 0; i < 10; i++) {
long start = System.nanoTime();
long sum = adder.apply(n);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Result : " + sum);
if(duration < fastest) {
fastest = duration;
}
}
return fastest;
}
public static void main(String[] args) {
/*
* 7.1.1 순차 스트림을 병렬 스트림으로 변환하기
*/
System.out.println("7.1.1 순차 스트림을 병렬 스트림으로 변환하기 - 전통 자바 : " + ParallelStream.iterativeSum(100));
System.out.println("7.1.1 순차 스트림을 병렬 스트림으로 변환하기 - 순차 : " + ParallelStream.sequentialSum(100));
System.out.println("7.1.1 순차 스트림을 병렬 스트림으로 변환하기 - parallel : " + ParallelStream.parallelSum(100));
/*
* 병렬 스트림에서 사용하는 스레드 풀 설정
*
* 스트림의 parallel 메서드에서 병렬로 작업을 수행 하는 스레드 생성, 생성 갯수, 그리고 그 과정을 어떻게 커스텀마이징할 수 있는지
* 궁금할 것이다.
*
* 병렬 스트림은 내부적으로 ForkJoinPool을 사용한다
* 기본적으로 ForkJoinPool은 프로세스 수, 즉 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 갖는다.
*
* System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", 12);
*
* 위 코드는 전역 설정 코드이므로 이후의 모든 병렬 스트림 연산에 영향을 준다.
* 현재는 하나의 병렬 스트림에 사용할 수 있는 특정한 값을 지정할 수 없다.
* 일반적으로 기기의 프로세서 수와 같으므로 특별한 이유가 없다면 FokJoinPool의 기본값을 그대로 사용하는것을 권장한다.
*/
/*
* 7.1.2 스트림 성능 측정
*
* 병렬화를 이용하면 순차나 반복 형식에 비해 성능이 더 좋아질 것이라 추측할 것이다.
* 성능을 최적화의 세 가지 황금 규칙인 첫째도 측정, 둘째도 측정, 셋째도 측정에 따라 성능을 측정해 보자.
*/
/*
* 고전적인 for 루프를 사용한 반복 버전이 생각보다 빠르다는 점도 고려해야 한다.
* for 루프는 저수준으로 작동하며 기본값을 박싱하거나 언박싱할 필요가 없으므로 수행 속도가 빠르다.
*/
System.out.println("7.1.2 스트림 성능 측정 - 전통 자바 : " + meansureSumPerf(ParallelStream::iterativeSum, 10_000_000));
System.out.println("7.1.2 스트림 성능 측정 - 순차 : " + meansureSumPerf(ParallelStream::sequentialSum, 10_000_000));
/*
* 병렬 버전이 순차 버전보다 늦게 동작하는 이유는
* iterate가 박싱된 객체를 생성하므로 이를 다시 언박싱하는 과정이 필요하다.
* iterate가 병렬로 실행될 수 있도록 독립적인 청크로 분할하기가 어렵다.
*
* iterate는 본질적으로 순차적이라 청크로 분할하기가 어렵다.
* 이와 같은 상황 때문에 리듀싱 연산이 수행되지 않는다.
* 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록
* 청크로 분할할 수 없다.
* 스트림이 병렬로 처리되도록 지시했고 각각의 합계가 다른 스레드에서 수행되었지만 결국 순차처리 방식과
* 크게 다른 점이 없으므로 스레드를 할당하는 오버헤드만 증가하게 된다.
*/
System.out.println("7.1.2 스트림 성능 측정 - parallel : " + meansureSumPerf(ParallelStream::parallelSum, 10_000_000));
/*
* 더 특화된 메서드 이용
*
* iterate가 LongStream.rangeClosed 비해 다음과 같은 장점을 제공한다.
* LongStream.rangeClosed는 기본형 long을 직접 사용하므로 박싱과 언방식 오버헤드가 사라진다.
* LongStream.rangeClosed는 쉽게 청크로 분할할 수 있는 숫자 범위를 생상한다.
* 예를 들어 1-20 범위의 숫자를 각각 1-5, 6-10, 11-15, 16-20 범위의 숫자로 분할할 수 있다.
*
* 기존 iterate 팩토리 메서드로 생성한 순차 버전에 비해 이 예제의 순차 스트림 처리 속도가 더 빠르다.
* 특화되지 않은 스트림을 처리할 때는 오토박싱, 언박싱 등의 오버헤드를 수반하기 때문이다.
* 상황에 따라서 어떤 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 더 중요하다.
*/
System.out.println("더 특화된 메서드 이용 - 순차 : " + meansureSumPerf(ParallelStream::rangeSum, 10_000_000));
System.out.println("더 특화된 메서드 이용 - parallel : " + meansureSumPerf(ParallelStream::newParallelSum, 10_000_000));
/*
* 7.1.3 병렬 스트림의 올바른 사용법
*/
System.out.println("7.1.3 병렬 스트림의 올바른 사용법 - 잘못된 예 : " + meansureSumPerf(ParallelStream::sideEffectParallelSum, 10_000_000));
/*
* 7.1.4 병렬 스트림 효과적으로 사용하기
*
* '천 개 이상의 요소가 있을 대만 병렬 스트림을 사용하라'와 같이 양을 기준으로 병렬 스트림 사용을 결정ㅎ하는 것은 적절하지 않다.
* 그래도 어떤 상황에서 병렬 스트림을 사용할 것인지 약간의 수량적 힌트를 정하는 것이 도움이 될 때도 있다.
*
* - 확신이 서지 않는다면 직접 측정하다.
* 병렬 스트림의 수행 과정은 투명하지 않을 때가 많다.
* 따라서 순차 스트림과 병렬 스트림 중 어떤 것이 좋을지 모르겠다면 적절한 벤치마크로 직접 성능을 축정하는 것이 바람직하다.
*
* - 박싱을 주의하라.
* 자동 박싱과 언방식은 성능을 크게 저하시킬 수 있는 요소다. 자바 8은 박싱 동작을 피할 수 있도록 기본형 특화 스트림
* (IntStream, LongStream, DoubleStream)을 제공한다. 따라서 되도록이면 기본형 특화 스트림을 사용하는 것이 좋다.
*
* - 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다
* limit나 findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다.
* 예를 들어 findAny는 요소의 순서와 상관없이 연산하므로 findFirst보다 성능이 좋다. 정렬된 스트림에 unordered를
* 호출하면 비정렬된 스트림을 얻을 수 있다. 스트림에 N개 요소가 있을 때 요소의 순서가 상관없다면(예를 들어 소스가 리스트라면)
* 비정렬된 스트림에 limit를 호출하는 것이 효율적이다.
*
* - 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라.
* 처리해야 할 요소 수가 N이고 하나의 요소를 처리하는 데 드는 비용을 Q라 하면 전체 스트림 파이프라인 처리 비용은 N*Q로 예상할 수 있다.
* Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미한다.
*
* - 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
* 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문이다.
*
* - 스트림을 구성하는 자료구조가 적절한지 확인하라.
* 예를 들어 ArrayList를 LinkedList보다 효율적으로 분할할 수 있다. LinkedList를 분할하려면 모든 요소를 탐색해야 하지만
* ArrayList는 요소를 탐색하지 않고도 리스트를 분할할 수 있기 때문이다.
* 또한 range 택토리 메서드로 만든 기본형 스트림도 쉽게 분해할 수 있다.(Spliterator를 구현해서 분해 과정을 완벽하게 제어할 수 있다.)
*
* - 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
* 예를 들어 SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 병렬 처리 할 수 있다.
* 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.
*
* - 최종 연산의 병합 과정(예를 들면 Collector의 combiner 메서드) 비용을 살펴보라.
* 병합 과정의 비용이 비싸다면 병렬 스트림의 얻은 성능의 이익이 서브스트림의 부분결과를 합치는 과정에서 상쇄할 수 있다.
*/
/*
* 스트림 소스의 분해성
* 소스 : ArrayList
* 분해성 : 훌륭함
*
* 소스 : LinkedList
* 분해성 : 나쁨
*
* 소스 : IntStream.range
* 분해성 : 훌륭함
*
* 소스 : Stream.iterate
* 분해성 : 나쁜
*
* 소스 : HashSet
* 분해성 : 젛음
*
* 소스 : TreeSet
* 분해성 : 좋음
*/
}
static class ParallelStream {
public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}
public static long iterativeSum(long n) {
long result = 0;
for(long i = 1L; i <= n; i++) {
result += i;
}
return result;
}
public static long parallelSum(long n) {
/*
* 순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산(숫자 합계 계산)이 병렬로 처리된다.
* 스트림이 여러 청크로 분할이 되고 리듀싱 연산을 여러 청크에 병렬로 수행할 수 있다.
* 마지막으로 리듀싱 연산으로 생성된 부분결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출한다.
*
* 사실 순차 스트림에 parallel을 호출해도 스트림 자체에는 아무 변화도 일어나지 않는다.
* 내부적으로는 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 불린 플래그가 설정된다.
* 반대로 sequential로 병렬 스트림을 순차 스트림으로 바꿀 수 있다.
* 이 두 메서드를 이용하면 어떤 연산은 병렬로 어떤 연산은 순차로 실행할지 제어할 수 있다.
*
* stream.parallel()
* .filter(...)
* .sequential()
* .map(...)
* .parallel()
* .reduce();
*
* parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다.
* 위 예제에서는 파이프라인의 마지막 호출은 parallel이므로 파이프라인은 전체적으로 병렬로 실행된다.
*/
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
public static long rangeSum(long n) {
return LongStream.rangeClosed(1, n).reduce(0L, Long::sum);
}
public static long newParallelSum(long n) {
return LongStream.rangeClosed(1, n)
.parallel()
.reduce(0L, Long::sum);
}
public static long sideEffectParallelSum(long n) {
/*
* 아래 코드는 본질적으로 순차 실행할 수 있도록 구현되어 있기 때문에 병렬로 실행하면 참사가 발생된다.
* 특히 total을 접근할 때마다(다수의 스레드에서 동시에 데이터에 접근하는) 데이터 레이스 문제가 발생된다.
* 동기화로 문제를 해결하면 결국 병렬화라는 특성이 없어져 버린다.
*/
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
}
}
class Accumulator {
/*
* 다수의 쓰레드가 하나의 total 변수를 참조 하려 하기 때문에 데이터 레이스 문제 발생 및
* 여러 스레드가 동시에 add 메서드를 실행하기 때문에 올바른 결과값이 나오질 않는다.
*/
public long total = 0;
/*
* 얼핏 보면 아토믹 연산(atomic operation) 같지만 total += value 아토믹 연산이 아니다.
* 결국 여러 스레드에서 공유하는 객체의 상태를 바꾸는 forEach 블록 내부에서 add 메서드를 호출하면서
* 이 같은 문제가 발생된다.
*/
public void add(long value) { total += value; }
}
7.2 포크조인 프레임워크
Process
ForJoinSumCalculator.java
package Part2.Chapter7.Chapter7_7_2.forkjoin;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
// RecursiveTask를 상속받아 포크/조인 프레임워크에서 사용할 태스크를 생성한다.
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
// 더할 숫자 배열
private final long[] numbers;
// 이 서브태스크에서 처리할 배열의 초기 위치와 최종 위치
private final int start;
private final int end;
// 이 값 이하의 서브테스크는 더이상 분할할 수 없다.
public static final long THRESHOLD = 10_000;
// 메인 태스크를 생성할 때 사용할 공개 생성자
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
// 메인 태스크의 서브태스크를 재귀적으로 만들 때 사용할 비공개 생성자
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
// RecursiveTask의 추상 메서드 오버라이드
@Override
protected Long compute() {
// 이 태스크에서 더할 배열의 길이
int length = this.end - this.start;
// 기준값과 같거나 작으면 순차적으로 결과를 계산한다.
if(length <= THRESHOLD) {
return this.computeSequentially();
}
// 배열의 첫 번째 절반을 더하도록 서브태스크를 생성한다.
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(this.numbers, this.start, this.start + length / 2);
// FokJoinPool의 다른 스레드로 새로 생성한 태스크를 비동기로 실행한다.
leftTask.fork();
// 배열의 나머지 절반을 더하도록 서브태스크를 생성한다.
ForkJoinSumCalculator rigthTask = new ForkJoinSumCalculator(this.numbers, this.start + length / 2, this.end);
// 두 번째 서브태스크를 동기 실행한다. 이때 추가로 분할이 일어날 수 있다.
Long rightResult = rigthTask.compute();
// 첫 번째 서브태스크의 결과를 읽거나 아직 결과가 없다면 기다린다.
Long leftResult = leftTask.join();
// 두 서브태스크의 결과를 조합한 값이 이 태스크의 결과다.
return leftResult + rightResult;
}
// 더 분할할 수 없을 때 서브 태스크의 결과를 계산하는 단순 알고리즘
private long computeSequentially() {
long sum = 0;
/*
* n까지의 자연수 덧셈 작업을 병렬로 수행하는 방법을 더 직관적으로 보여준다.
*/
for(int i = this.start; i < this.end; i++) {
sum += this.numbers[i];
}
return sum;
}
public static long forkJoinSum(long n) {
/*
* LongStream으로 n까지의 자연수를 포함하는 배열을 생성했다.
* 생성된 배열을 ForkJoinSumCalculator의 생성자로 전달해서 ForkJoinTask를 만들었다.
* 마지막으로 생성한 태스크를 새로운 ForkJoinPool의 invoke 메서드로 전달했다.
* ForkJoinPool에서 실행되는 마지막 invoke 메서드의 반환값은 ForkJoinSumCalculator에서
* 정의한 태스크의 결과가 된다.
*/
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
/*
* 일반적으로 애플리케이션에서는 둘 이상의 ForkJoinPool을 사용하지 않는다. 즉, 소프트웨어의 필요한 곳에서
* 언제든 가져다 쓸 수 있도록 FokJoinPool을 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장한다.
*
* ForkJoinPool을 만들면서 인수가 없는 디폴트 생성자를 이용했는데 이는 jvm에서 이용할 수 있는 모든 프로세스가
* 자유롭게 풀에 접근할 수 있음을 의미한다.
*
* 더 정확하게 Runtime.availableProcessors의 반환값으로 풀에 사용할 스레드 수를 결정한다.
* availableProcessors, 즉 '사용할 수 있는 프로세서' 이름과는 달리 실제 프로세서 외에 하이퍼스레딩과 관련된
* 가상 프로세서도 개수에 포함된다.
*/
return new ForkJoinPool().invoke(task);
}
}
Main
package Part2.Chapter7.Chapter7_7_2;
import java.util.function.Function;
import Part2.Chapter7.Chapter7_7_2.forkjoin.ForkJoinSumCalculator;
/*
* 7.2 포크/조인 프레임워크
*
* 포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를
* 합쳐서 전체 결과를 만들도록 설계되었다.
* 포크/조인 프레임워크에서는 서브태스크를 스레드 풀(FokJoinPool)의 작업자 스레드에 분산 할당하는 ExcutorService 인터페이스를 구현한다.
*/
public class Main_7_2 {
public static long meansureSumPerf(Function<Long, Long> adder, long n) {
long fastest = Long.MAX_VALUE;
for(int i = 0; i < 10; i++) {
long start = System.nanoTime();
long sum = adder.apply(n);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Result : " + sum);
if(duration < fastest) {
fastest = duration;
}
}
return fastest;
}
public static void main(String[] args) {
/*
* 7.2.1 RecursiveTask 활용
*
* 스레드 풀을 이용하려면 RecursiveTask<R>의 서브클래스를 만들어야 한다.
* 여기서 R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을 때(결과가 없더라도 다른 비지역 구조를 바꿀 수 있다.)
* RecursiveAction 형식이다.
*
* compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.
* 따라서 대부분의 compute 메서드 구현은 다음과 같은 의사코드 형식을 유지한다.
*
* if(태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
* 순차적으로 태스크 계산
* } else {
* 태스크를 두 서브태스크로 분할.
* 태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함.
* 모든 서브태스크의 연산이 완료될 때까지 기달림.
* 각 서브태스크의 결과를 합침.
* }
*
* 해당 알고리즘은 분할 후 정복(divide-and-conquer)알고리즘의 병렬화 버전이다.
*
* ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하면서 작업을 수행한다.
*
* 1. compute 메서드는 병렬로 실행할 수 있을만큼 태스크의 크기가 충분히 작아졌는지 확인하며
* 2. 아직 태스크의 크기가 크다고 판단되면 숫자 배열을 반으로 분할해서 두 개의 새로운 ForkJoinSumCalculator로 할당한다.
* 3. 그러면 다시 새로운 ForkJoinPool이 새로 생성된 ForkJoinSumCalculator를 실행한다.
* 4. 결국 이 과정이 재귀적으로 반복되면서 주어진 조건(예제에서는 덧샘을 수행할 항목이 만 개 이하여야 함)을 만족할 때까지 태스크를 분할한다.
*
* 이제 각 서브태스크는 순차적으로 처리되며 포킹 프로세스로 만들어진 이진트리의 태스크를 루트에서 역순으로 방문한다.
* 즉, 각 서브태스크의 부분결과를 합쳐서 태스크의 최종 결과를 계산한다.
*/
System.out.println("7.2 포크/조인 프레임워크 : " + meansureSumPerf(ForkJoinSumCalculator::forkJoinSum, 10_000_000));
/*
* 7.2.2 포크/조인 프레임워크를 제대로 사용하는 방법
*
* 다음은 포크/조인 프레임워크를 효과적으로 사용하는 방법이다.
*
* - join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다.
* 따라서 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다.
* 그렇지 않으면 각각의 서브태스크가 다른 태스크가 끝나길 기달리는 일이 발생하며 원래 순차 알고리즘보다
* 느리고 복잡한 프로그램이 될수 있다.
*
* - RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말아야 한다.
* 대신 compute나 fork 메서드를 직접 호출할 수 있다. 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용한다.
*
* - 서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다.
* 왼쪽 작업과 오른쪽 작업 모두에 fork 메서드를 호출하는 것이 자연스러울 것 같지만 한쪽 작업에는 fork를
* 호출하는 것보다는 compute를 호출하는 것이 효율적이다.
* 그러면 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에 불필요한 태스크를 할당하는
* 오버헤드를 피할 수 있다.
*
* - 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.
* 보통 IDE로 디버깅할 때 스택 트레이스(stack trace)로 문제가 일어난 과정을 쉽게 확인할 수 있는데,
* 포크/조인 프레임워크에서는 fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 도움이
* 되지 않는다.
*
* - 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차처리보다 무조건 빠를 거라는 생각은 버려야 한다.
* 병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 한다.
* 각 서브태스크의 실행시간은 새로운 서브태스크를 포킹하는데 드는 시간보다 길어야 한다.
* 예)
* I/O를 한 서브태스크에 할당. 다른 서브태스크에서는 계산을 실행.
* 즉 I/O와 계산을 병렬로 실행할 수 있다.
* 순차버전과 병렬버전의 성능을 비교할 때는 다른 요소도 고려해야 한다.
* 다른 자바 코드와 마찬가지로 jit 컴파일러에 의해 최적화되려면 몇 차례의 '준비 과정(warmed up)' 또는 실행과정을
* 거쳐야 한다.
* 따라서 성능을 측정할 때는 하니스(meansureSumPerf 메서드와 같은)를 통해 여러 번 프로그램을 실행한 결과를 측정해야한다.
* 또한 컴파일러 최적화는 병렬버전보다는 순차버전에 집중될 수 있다는 사실도 기억하자
* (예를 들어 순차버전에서는 죽은 코드를 분석해서 사용되지 않는 계산은 아예 삭제하는 등의 최적화를 달성하기 쉽다.)
*/
/*
* 7.2.3 작업 훔치기
*
* 기기의 코어 수보다 서브태스크가 많이 생성 되는게 자원만 낭비 하는것 처럼 보일수 있다.
* 실제로 각각의 태스크가 CPU로 할당되는 상황이라면 어차피 서브태스크로 분할한다고 해서 성능이 좋아지지는 않을것이다.
* 하지만 실제로 코어 개수와 관계없이 적절한 크기로 분할된 많은 태스크를 포킹하는 것이 바람직하다.
*
* 이론적으로 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하면 모든 CPU 코어에 태스크를 실행할 것이고 크기가 같은 각각의
* 태스크는 같은 시간에 종료될 것이라고 생각할 수 있다.
*
* 하지만 복잡한 로직인 경우에는 각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있다.
* 이는 분할 기법이 효율적이지 않을수도 있고 예기치 않게 디스크 접근 속도 저하 및 외부 서비스와 협력 과정 중 지연이 생길 수 있기 때문이다.
*
* 포크/조인 프레임워크에서는 "작업 훔치기(work stealing)"라는 기법으로 이 문제를 해결한다.
* 작업 훔치기 기법은 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다.
* 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트(doubly linked list)를 참조하면서 작업이 끝날 때마다 큐의 헤드에서
* 다른 태스크를 가져와서 작업을 처리한다. 이때 한 스레드는 다른 스레드보다 자신에게 할당된 태스크를 더 빨리 처리할 수 있다.
* 즉, 다른 스레드는 바쁘게 일하고 있는데 한 스레드는 할일이 다 떨어진 상황이다. 이때 할일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라
* 다른 스레드 큐의 꼬리(tail)에서 작업을 훔쳐온다. 모든 태스크가 작업을 끝낼 때까지, 즉 모든 큐가 빌 때까지 이 과정을 반복한다.
* 따라서 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있다.
*
*/
}
}
7.3 Spliterator
Process
WordCounter.java
package Part2.Chapter7.Chapter7_7_3.wordCounter;
public class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
/*
* WordCounter의 상태를 어떻게 바꿀 것인지, 또는 엄밀히 WordCounter는 불변 클래스 이므로 새로운 WordCounter 클래스를
* 어떤 형태로 생성할 것인지 정의한다.
* 스트림을 탐색하면서 새로운 문자를 찾을 때마다 accmulate 메서드를 호출한다.
*
* 반복 알고리즘처럼 accumulate 메서드는 문자열의 문자를 하나씩 탐색한다.
*/
public WordCounter accumulate(Character c) {
if(Character.isWhitespace(c)) {
return lastSpace ? this : new WordCounter(this.counter, true);
} else {
// 공백 문자를 만나면 지금까지 탐색한 문자를 단어로 간주(공백 문자는 제외) 단어 개수를 증가.
return lastSpace ? new WordCounter(this.counter + 1, false) : this;
}
}
/*
* 두 WordCounter의 counter 값을 더한다.
*/
public WordCounter combine(WordCounter wordCounter) {
// counter 값만 더할 것이므로 마지막 공백은 신경 쓰지 않는다.
return new WordCounter(this.counter + wordCounter.counter, wordCounter.lastSpace);
}
public int getCounter() {
return this.counter;
}
}
WordCounterSpliterator.java
package Part2.Chapter7.Chapter7_7_3.wordCounterSpliterator;
import java.util.Spliterator;
import java.util.function.Consumer;
public class WordCounterSpliterator implements Spliterator<Character>{
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
/*
* 문자열에서 현재 인덱스에 해당하는 문자를 Consumer에 제공한 다음에 인덱스를 증가시킨다.
*
* 인수로 전달된 Consumer는 스트림을 탐색하면서 적용해야 하는 함수 집합이 작업을 처리할 수 있도록
* 소비한 문자를 전달하는 자바 내부 클래스다.
*
* 예제에서는 스트림을 탐색하면서 하나의 리듀싱 함수, 즉 WordCounter의 accumulate 메서드만 적용한다.
* 해당 메서드는 새로운 커서 위치가 전체 문자열 길이보다 작으면 참을 반환하며 이는 반복 탐색해야 할
* 문자가 남아있음을 알린다.
*/
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
// 현재 문자를 소비한다.
action.accept(string.charAt(this.currentChar++));
// 소비할 문자가 남았다면 true를 반환한다.
return this.currentChar < string.length();
}
/*
* 반복될 자료구조를 분할하는 로직을 포함한다.
*
* 분할 동작을 중단할 한계를 설정해야 한다.
* 분할 과정에서 남은 문자 수가 한계값(예제에서는 10) 이하면 null을 반환. 즉 분할을 중지하도록 지시한다.
*
* 반대로 분할이 필요한 상황에는 파싱해야 할 문자열 청크의 중간 위치를 기준으로 분할하도록 지시한다.
* 이때 단어 중간을 분할하지 않도록 빈 문자가 나올 때까지 분할 위치를 이동시킨다.
* 분할할 위치를 찾았다면 새로운 Spliterator를 만든다.
* 새로운 Spliterator는 현재 위치(currentChar)부터 분할된 위치까지의 문자를 탐색한다.
*/
@Override
public Spliterator<Character> trySplit() {
int currentSize = this.string.length() - this.currentChar;
// 파싱할 문자열을 순차 처리할 수 있을 만큼 충분히 작아졌음을 알리는 null을 반환한다.
if(currentSize < 10) {
return null;
}
// 파싱할 문자열의 중간을 분할 위치로 설정한다.
for(int splitPos = currentSize / 2 + this.currentChar; splitPos < this.string.length(); splitPos++) {
// 다음 공백이 나올 때까지 분할 위치를 뒤로 이동 시킨다.
if(Character.isWhitespace(this.string.charAt(splitPos))) {
// 처음부터 분할 위치까지 문자열을 파싱할 새로운 WordCounterSpliterator를 생성한다.
Spliterator<Character> spliterator = new WordCounterSpliterator(this.string.substring(this.currentChar, splitPos));
// 이 WordCounterSpliterator의 시작 위치를 분할 위치로 설정한다.
this.currentChar = splitPos;
return spliterator;
}
}
return null;
}
/*
* 탐색해야 할 요소의 개수는 Spliterator가 파싱할 문자열 전체 길이(string.length())와 현재 반복 중인 위치(currentChar)의 차다.
*/
@Override
public long estimateSize() {
return this.string.length() - this.currentChar;
}
/*
* Spliterator.ORDERED : 문자열의 문자 등장 순서가 유의미함.
* Spliterator.SIZED : estimateSize 메서드의 반환값이 정확함.
* Spliterator.SUBSIZED : trySplit으로 생성된 Spliterator도 정확한 크기를 가짐.
* Spliterator.NONNULL : 문자열에는 null 문자가 존재 하지 않음.
* Spliterator.IMMUTABLE : 문자열 자체가 불변 클래스이므로 문자열을 파싱하면서 속성이 추가되지 않음.
*
* 등의 특성임을 알려준다.
*/
@Override
public int characteristics() {
return Spliterator.ORDERED + Spliterator.SIZED + Spliterator.SUBSIZED + Spliterator.NONNULL + Spliterator.IMMUTABLE;
}
}
Main
package Part2.Chapter7.Chapter7_7_3;
import java.util.Spliterator;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import Part2.Chapter7.Chapter7_7_3.wordCounter.WordCounter;
import Part2.Chapter7.Chapter7_7_3.wordCounterSpliterator.WordCounterSpliterator;
/*
* 7.3 Spliterator
*
* Spliterator는 '분할할 수 있는 반복자(splitable iterator)'라는 의미이다.
* Iterator처럼 Spliterator는 소스의 요소 탐색 기능을 제공한다는 점은 같지만 Spliterator는 병렬 작업에 특화되어 있다.
* 자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공한다.
* 컬렉션은 spliterator라는 메서드를 제공하는 Spliterator 인터페이스를 구현한다.
*
* public Interface Spliterator<T> {
* boolean tryAdvance(Consumer<? super T> action);
* Spliterator<T> trySplit();
* long estimateSize();
* int characteristics();
* }
*
* T : Spliterator에서 탐색하는 요소의 형식
* tryAdvance : Spliterator 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환. (일반적인 Iterator 동작과 같다.)
* trySplit : Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성하는 메서드.
* estimateSize : 탐색해야 할 요소 수 정보를 제공. 특히 탐색해야 할 요소 수가 정확하지 않더라도 제공된 값을 이용해서 더 쉽고 공평하게
* Spliterator를 분할할 수 있다.
*
* 7.3.1 분할 과정
*
* 스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다.
* 1단계 : 첫 번째 Spliterator에 trySplit을 호출하면서 두 번째 Spliterator가 생성된다.
* 2단계 : 두 개의 spliterator에 trySplit를 다시 호출하면서 네 개의 Spliterator가 생성된다.
* 이처럼 trySplit의 결과가 null이 될 때까지 이 과정을 반복한다.
* 이 분할 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받는다.
*
* Spliterator 특성
*
* characteristics는 추상 메서드이며, Characteristics 메서드는 Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.
*
* 특성 : ORDERED
* 의미 : 리스트처럼 요소에 정해진 순서가 있으므로 Spliterator는 요소를 탐색하고 분할할 때 이 순서에 유의해야 한다.
*
* 특성 : DISTINCT
* 의미 : x, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환한다.
*
* 특성 : SORTED
* 의미 : 탐색된 요소는 미리 정의된 정렬 순서를 따른다.
*
* 특성 : SIZED
* 의미 : 크기가 알려진 소스(예를 들면 Set)로 Spliterator를 생성했으므로 estimatedSize()는 정확한 값을 반환한다.
*
* 특성 : NONNULL
* 의미 : 탐색하는 모든 요소는 null이 아니다.
*
* 특성 : IMMUTABLE
* 의미 : 이 Spliterator의 소스는 불변이다. 즉, 요소를 탐색하는 동안 요소를 추가, 삭제, 고칠 수 없다.
*
* 특성 : CONCURRENT
* 의미 : 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다.
*
* 특성 : SUBSIZED
* 의미 : 이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖는다.
*/
public class Main_7_3 {
public static int countWordsIteratively(String s) {
int counter = 0;
boolean lastSpace = true;
for(char c : s.toCharArray()) {
if(Character.isWhitespace(c)) {
lastSpace = true;
} else {
if(lastSpace) {
counter++;
}
lastSpace = false;
}
}
return counter;
}
public static int countWords(Stream<Character> stream) {
WordCounter wordCounter = stream.reduce(new WordCounter(0, true)
, WordCounter::accumulate
, WordCounter::combine);
return wordCounter.getCounter();
}
public static void main(String[] args) {
final String SENTENCE = "Nel mezzo del cammin di nostra vita "
+ "mi ritrovai in una selva oscura"
+ " ch la dritta via era smrrita ";
/*
* 7.3.2 커스텀 Spliterator 구현하기
*/
System.out.println("countWordsIteratively Found " + countWordsIteratively(SENTENCE) + " words");
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
System.out.println("countWords Found " + countWords(stream) + " words");
/*
* 병렬 실행 시 잘못된 결과가 나온다.
* 이는 원래 문자열을 임의의 위치에서 둘로 나누다보니 예상치 못하게 하나의 단어를 둘로 계산하는 상황이 발생할 수 있다.
* 즉, 순차 스트림을 병렬 스트림으로 바꿀 때 스트림 분할 위치에 따라 잘못된 결과가 나온 것이다.
*/
stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
System.out.println("countWords(parallel) Found " + countWords(stream.parallel()) + " words");
/*
* 커스텀 Spliterator을 이용한 올바른 병렬 실행.
*/
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
// StreamSupport.stream 팩토리 메서드로 전달한 두 번째 불린 인수는 병렬 스트림 생성 여부이다.
stream = StreamSupport.stream(spliterator, true);
System.out.println("countWords(spliterator parallel) Found " + countWords(stream) + " words");
}
}
요약
- 내부 반복을 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림을 병렬로 처리할 수 있다.
- 간단하게 스트림을 병렬로 처리할 수 있지만 항상 병렬 처리가 빠른 것은 아니다. 병렬 소프트웨어 동작 방법과 성능은 직관적이지 않을 때가 많으므로 병렬 처리를 사용했을 때 성능을 직접 측정해봐야 한다.
- 병렬 스트림으로 데이터 집합을 병렬 실행할 때 특히 처리해야 할 데이터가 아주 많거나 각 요소를 처리하는 데 오랜 시간이 거릴 때 성능을 높일 수 있다.
- 가능하면 기본형 특화 스트림을 사용하는 등 올바른 자료구조 선택이 어떤 연산을 병렬로 처리하는 것보다 성능적으로 더 큰 영향을 미칠 수 있다.
- 포크/조인 프레임워크에서 병렬화할 수 있는 태스크를 작은 태스크로 분할한 다음에 분할된 태스크를 각각의 스레드로 실행하며 서브태스크 각각의 결과를 합쳐서 최종 결과를 생산한다.
- Spliterator는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화 할 것인지 정의한다.