Spark - Partitioning
Spark에서 파티셔닝(Partitioning)은 RDD의 데이터를 여러개의 파티션에 나누는 것이다.
파티셔닝에 이해를 위해 간단한 예를 들어보자.
val purchasesPerCost = purchasesRdd
.map(p => (p.customerId, p.price))
.groupByKey()
map에서 Pair RDD가 만들어진 후, groupByKey에 의해 RDD는 shuffling 된다.
그런데 shuffling된 RDD는 어떤 노드에 전달될지 어떻게 결정하는 것일까?
바로, p = k.hashCode() % numPartitions에 의해 p를 호스팅하는 노드에 전달된다. (Hash Partitioning 기법을 사용할 때)
이렇듯 파티셔닝을 통해 각 데이터들이 어떤 노드에 저장될지 결정된다.
Spark에서 지원하는 파티셔닝 방법은 Hash, Range와 같이 2개로, 데이터가 같은 파티션에 묶여있다는 것은 같은 노드에 있다는 것을 보장한다.
(덧붙여 한 노드에는 한 개이상의 파티션이 있을 수 있다.)
간단히 Hash, Range 파티셔닝 방법 중 잘못된 선택을 했을 때 어떤 일이 발생하는지 알아보자.
Pair RDD 중에는 [1,2,3,4,5,6,7,8] 과 같은 데이터가 있을 때, Hash를 적용해보면 (파티션이 4개 있다고 가정)
Partition 1 : [1,2,3,4] Partition 2 : [5,6,7] Partition 3 : [8] Partition 4 : |
과 같이 데이터가 나누어질 수 있다.
누가보다라도 데이터가 불균형하게 구성되어 있는 것을 확인 할 수 있다.
반면에, Range를 적용해보면
Partition 1 : [1,2] Partition 2 : [3,4] Partition 3 : [5,6] Partition 4 : [7,8] |
Hash 보다 균형적으로 데이터가 나누어진 것을 확인 할 수 있다.
그런데 사용자가 굳이 직접 파티셔닝 방법을 고르면서 할 필요가 없다. 그 이유는 이미 각 메소드에서 어떤 파티셔닝 방법을 써야 할지 알기 때문이다.
예를 들어 sortByKey는 Range, groupByKey는 Hash를 사용한다. (메소드 이름만 봐도 짐작할 수 있다.)
이 외에도 Parent RDD로부터 파티셔닝 기법을 상속받는다.
Spark에서 파티셔닝을 적용하는 메소드는 아래 표를 참조하면 된다.
cogroup |
foldByKey |
groupWith |
combineByKey |
join |
partitionBy |
leftOuterJoin |
sort |
rightOuterJoin |
mapValues |
groupByKey | flatMapValues |
reduceByKey | filter |
물론! 사용자가 임의로 파티셔닝을 설정할 수 있다.
val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
import org.apache.spark.HashPartitioner
val partitioned = pairs.partitionBy(new HashPartitioner(2))
partitionBy 메소드에 파티셔닝 객체를 넣으면 된다. (결과 => Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
이처럼 파티셔닝은 RDD의 Shuffling을 줄여서 Spark의 작업 시간을 줄이는데 큰 영향을 끼친다.
또한 파티셔닝을 설정하는것은 단순히 partitionBy 메소드만 적용하면 되기 때문에 만약 Spark의 작업 시간이 오래 걸린다면 파티셔닝을 따져보는 것도 좋은 방안이다.
(참고로 데이터를 join할 때, 데이터의 크기가 큰 곳에 하는 편이 성능상 이득이다.)