IT/Spark

Spark - Shuffling

물통꿀꿀이 2020. 2. 3. 23:18

Shuffling은 분산 시스템 환경에서 데이터가 움직이는 것(A Node -> B Node)을 의미한다.


간단한 예시 하나를 확인해보자.

case class CFFPurchase(customer: Int, destination: String, price: Double)
val purchasesRDD = sc.parallelize(List(CFFPurchase(100, "Geneva", 22.25),
| CFFPurchase(300, "Zurich", 42.10),
| CFFPurchase(100, "Fribourg", 12.40),
| CFFPurchase(200, "st. Gallen", 8.20),
| CFFPurchase(100, "Lucerne", 31.60),
| CFFPurchase(300, "Basel", 16.20)))

val purchasesPerMonth = purchasesRDD
.map(p => (p.customer, p.price))
.groupByKey()
.map(p => (p._1, (p._2.size, p._2.sum)))
.collect()

생성된 RDD를 바탕으로 메소드를 차례차례 적용할 때 발생하는 일들을 살펴보자. (Spark의 Node가 3개라 가정하고 진행한다.)

1) map(p => (p.customer, p.price))

RDD를 Pair RDD로 만드는 과정이다. 

CFFPurchase(100, "Geneva", 22.25)

CFFPurchase(100, "Lucerne", 31.60) 

CFFPurchase(100, "Fribourg", 12.40)

CFFPurchase(200, "st. Gallen", 8.20)

CFFPurchase(300, "Geneva", 22.25)

CFFPurchase(300, "Basel", 16.20)

(100, 22.25)

(100, 31.60)

(100, 12,40)

(200, 8.20)

(300, 22.25)

(300, 16.20)


적용한 결과는 위와 같다. 초기 각 Node에 2개씩 분포되어 있다고 가정한다면, map은 단순히 각 Node에 있는 값을 묶는 것이기 때문에 크게 달라지지 않는다.


2) groupByKey()

CFFPurchase(100, "Geneva", 22.25)

CFFPurchase(100, "Lucerne", 31.60) 

CFFPurchase(100, "Fribourg", 12.40)

CFFPurchase(200, "st. Gallen", 8.20)

CFFPurchase(100, "Geneva", 22.25)

CFFPurchase(300, "Basel", 16.20)

(100, 22.25)

(100, 31.60)

(100, 12,40)

(200, 8.20)

(300, 22.25)

(300, 16.20)

(100, [22.25, 12.40, 31.60])

(200, [8.20])

(300, [22.25, 16.20]) 


위의 경우는 조금 다르다. groupByKey는 Pair RDD의 Key를 바탕으로 분류를 한다. 때문에 각 노드 값이 섞이게 된다.


즉, 그림처럼 데이터가 여러 군데 이동하게 된다. 바로 이 부분이 Spark에서 언급하는 Shuffling이다.

Shuffling은 보기와 같이 단순해보이지만 분산 환경에서는 큰 문제로 작용할 수 있다. 바로 Latency 문제이다. Shuffling 되는 데이터가 많을 수록, 결과의 응답시간은 점점 느려지기 때문이다. (성능이 나빠질 수 밖에 없다.)


그러므로 Spark에서는 Shuffling을 최소화하는게 가장 큰 일이 될 수 있다.

다시 위의 예제로 돌아가서 Shuffling을 줄일 수 있는 방법은 무엇이 있을까?

val purchasesPerMonth = purchasesRDD
.map(p => (p.customer, (1, p.price)))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.collect()

바로 reduce 연산을 사용하면 된다.


CFFPurchase(100, "Geneva", 22.25)

CFFPurchase(100, "Lucerne", 31.60) 

CFFPurchase(100, "Fribourg", 12.40)

CFFPurchase(200, "st. Gallen", 8.20)

CFFPurchase(100, "Geneva", 22.25)

CFFPurchase(300, "Basel", 16.20)

(100, (1, 22.25))

(100, (1, 31.60))

(100, (1, 12,40))

(200, (1, 8.20))

(300, (1, 22.25))

(300, (1, 16.20))

(100, (3, 66.25))

(200, (2, 58.3))

(300, (1, 8.2)) 


groupByKey의 결과와 조금 다른 것을 확인 할 수 있다.

특히, reduceByKey는 Shuffling이 안되는 것은 아니지만 같은 노드 상에서 한 번 처리한 이후 Shuffling을 하기 때문에 groupByKey와 차이가 크다.


정리해보면 Shuffling은 각 노드 사이에 데이터가 group과 같은 메소드로 인해 이동하는 것을 의미한다.

이동하는 데이터가 많을 수록 Spark에서는 성능에 큰 영향을 끼치기 때문에 항상 Shuffling이 적게 발생할 수 있도록 구성해야 한다.