Spark - Shuffling
Shuffling은 분산 시스템 환경에서 데이터가 움직이는 것(A Node -> B Node)을 의미한다.
간단한 예시 하나를 확인해보자.
case class CFFPurchase(customer: Int, destination: String, price: Double)
val purchasesRDD = sc.parallelize(List(CFFPurchase(100, "Geneva", 22.25),
| CFFPurchase(300, "Zurich", 42.10),
| CFFPurchase(100, "Fribourg", 12.40),
| CFFPurchase(200, "st. Gallen", 8.20),
| CFFPurchase(100, "Lucerne", 31.60),
| CFFPurchase(300, "Basel", 16.20)))
val purchasesPerMonth = purchasesRDD
.map(p => (p.customer, p.price))
.groupByKey()
.map(p => (p._1, (p._2.size, p._2.sum)))
.collect()
생성된 RDD를 바탕으로 메소드를 차례차례 적용할 때 발생하는 일들을 살펴보자. (Spark의 Node가 3개라 가정하고 진행한다.)
1) map(p => (p.customer, p.price))
RDD를 Pair RDD로 만드는 과정이다.
CFFPurchase(100, "Geneva", 22.25) CFFPurchase(100, "Lucerne", 31.60) |
CFFPurchase(100, "Fribourg", 12.40) CFFPurchase(200, "st. Gallen", 8.20) |
CFFPurchase(300, "Geneva", 22.25) CFFPurchase(300, "Basel", 16.20) |
(100, 22.25) (100, 31.60) |
(100, 12,40) (200, 8.20) |
(300, 22.25) (300, 16.20) |
적용한 결과는 위와 같다. 초기 각 Node에 2개씩 분포되어 있다고 가정한다면, map은 단순히 각 Node에 있는 값을 묶는 것이기 때문에 크게 달라지지 않는다.
2) groupByKey()
CFFPurchase(100, "Geneva", 22.25) CFFPurchase(100, "Lucerne", 31.60) | CFFPurchase(100, "Fribourg", 12.40) CFFPurchase(200, "st. Gallen", 8.20) | CFFPurchase(100, "Geneva", 22.25) CFFPurchase(300, "Basel", 16.20) |
(100, 22.25) (100, 31.60) | (100, 12,40) (200, 8.20) | (300, 22.25) (300, 16.20) |
(100, [22.25, 12.40, 31.60]) | (200, [8.20]) | (300, [22.25, 16.20]) |
위의 경우는 조금 다르다. groupByKey는 Pair RDD의 Key를 바탕으로 분류를 한다. 때문에 각 노드 값이 섞이게 된다.
즉, 그림처럼 데이터가 여러 군데 이동하게 된다. 바로 이 부분이 Spark에서 언급하는 Shuffling이다.
Shuffling은 보기와 같이 단순해보이지만 분산 환경에서는 큰 문제로 작용할 수 있다. 바로 Latency 문제이다. Shuffling 되는 데이터가 많을 수록, 결과의 응답시간은 점점 느려지기 때문이다. (성능이 나빠질 수 밖에 없다.)
그러므로 Spark에서는 Shuffling을 최소화하는게 가장 큰 일이 될 수 있다.
다시 위의 예제로 돌아가서 Shuffling을 줄일 수 있는 방법은 무엇이 있을까?
val purchasesPerMonth = purchasesRDD
.map(p => (p.customer, (1, p.price)))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.collect()
바로 reduce 연산을 사용하면 된다.
CFFPurchase(100, "Geneva", 22.25) CFFPurchase(100, "Lucerne", 31.60) | CFFPurchase(100, "Fribourg", 12.40) CFFPurchase(200, "st. Gallen", 8.20) | CFFPurchase(100, "Geneva", 22.25) CFFPurchase(300, "Basel", 16.20) |
(100, (1, 22.25)) (100, (1, 31.60)) | (100, (1, 12,40)) (200, (1, 8.20)) | (300, (1, 22.25)) (300, (1, 16.20)) |
(100, (3, 66.25)) | (200, (2, 58.3)) | (300, (1, 8.2)) |
groupByKey의 결과와 조금 다른 것을 확인 할 수 있다.
특히, reduceByKey는 Shuffling이 안되는 것은 아니지만 같은 노드 상에서 한 번 처리한 이후 Shuffling을 하기 때문에 groupByKey와 차이가 크다.
정리해보면 Shuffling은 각 노드 사이에 데이터가 group과 같은 메소드로 인해 이동하는 것을 의미한다.
이동하는 데이터가 많을 수록 Spark에서는 성능에 큰 영향을 끼치기 때문에 항상 Shuffling이 적게 발생할 수 있도록 구성해야 한다.