IT/Spark

Spark - Partitioning

물통꿀꿀이 2020. 2. 4. 22:09

Spark에서 파티셔닝(Partitioning)은 RDD의 데이터를 여러개의 파티션에 나누는 것이다.


파티셔닝에 이해를 위해 간단한 예를 들어보자.

val purchasesPerCost = purchasesRdd
.map(p => (p.customerId, p.price))
.groupByKey()

map에서 Pair RDD가 만들어진 후, groupByKey에 의해 RDD는 shuffling 된다.

그런데 shuffling된 RDD는 어떤 노드에 전달될지 어떻게 결정하는 것일까?

바로, p = k.hashCode() % numPartitions에 의해 p를 호스팅하는 노드에 전달된다. (Hash Partitioning 기법을 사용할 때)


이렇듯 파티셔닝을 통해 각 데이터들이 어떤 노드에 저장될지 결정된다.

Spark에서 지원하는 파티셔닝 방법은 Hash, Range와 같이 2개로, 데이터가 같은 파티션에 묶여있다는 것은 같은 노드에 있다는 것을 보장한다.

(덧붙여 한 노드에는 한 개이상의 파티션이 있을 수 있다.)


간단히 Hash, Range 파티셔닝 방법 중 잘못된 선택을 했을 때 어떤 일이 발생하는지 알아보자.

Pair RDD 중에는 [1,2,3,4,5,6,7,8] 과 같은 데이터가 있을 때, Hash를 적용해보면 (파티션이 4개 있다고 가정)

Partition 1 : [1,2,3,4]

Partition 2 : [5,6,7]

Partition 3 : [8]

Partition 4 : 

과 같이 데이터가 나누어질 수 있다.

누가보다라도 데이터가 불균형하게 구성되어 있는 것을 확인 할 수 있다.


반면에, Range를 적용해보면

Partition 1 : [1,2]

Partition 2 : [3,4]

Partition 3 : [5,6]

Partition 4 : [7,8]

Hash 보다 균형적으로 데이터가 나누어진 것을 확인 할 수 있다.


그런데 사용자가 굳이 직접 파티셔닝 방법을 고르면서 할 필요가 없다. 그 이유는 이미 각 메소드에서 어떤 파티셔닝 방법을 써야 할지 알기 때문이다.

예를 들어 sortByKey는 Range, groupByKey는 Hash를 사용한다. (메소드 이름만 봐도 짐작할 수 있다.)

이 외에도 Parent RDD로부터 파티셔닝 기법을 상속받는다.


Spark에서 파티셔닝을 적용하는 메소드는 아래 표를 참조하면 된다.

cogroup 

foldByKey 

groupWith

combineByKey 

join 

partitionBy 

leftOuterJoin 

sort 

rightOuterJoin 

mapValues 

groupByKey

flatMapValues 

reduceByKey 

filter 


물론! 사용자가 임의로 파티셔닝을 설정할 수 있다.

val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
import org.apache.spark.HashPartitioner
val partitioned = pairs.partitionBy(new HashPartitioner(2))

partitionBy 메소드에 파티셔닝 객체를 넣으면 된다. (결과 => Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)


이처럼 파티셔닝은 RDD의 Shuffling을 줄여서 Spark의 작업 시간을 줄이는데 큰 영향을 끼친다. 

또한 파티셔닝을 설정하는것은 단순히 partitionBy 메소드만 적용하면 되기 때문에 만약 Spark의 작업 시간이 오래 걸린다면 파티셔닝을 따져보는 것도 좋은 방안이다.

(참고로 데이터를 join할 때, 데이터의 크기가 큰 곳에 하는 편이 성능상 이득이다.)