IT/Spark

Spark - Datasets

물통꿀꿀이 2020. 2. 12. 23:48

Datasets에 대해 알아보기전에 어떤 점 때문에 Datasets가 필요한지에 대해 알아보자.

case class Listing(street: String, zip: Int, price: Int)
val listingsDF = List(Listing("A", 0, 100), Listing("B", 1, 101), Listing("C", 2, 200)).toDF()

val averagePricesDF = listingsDF
.groupBy($"zip")
.avg("price")
.show()

+---+----------+

|zip|avg(price)|

+---+----------+

|  1|     101.0|

|  2|     200.0|

|  0|     100.0|

+---+----------+ 


위의 코드를 실행시키면 "zip"으로 그룹화된 값의 평균이 Dataframe 형태로 출력된다.

(averagePricesDF: org.apache.spark.sql.DataFrame = [zip: int, avg(price): double])


그런데 아래와 같이 Dataframe을 RDD에서 많이 보던 collect 메소드로 나타내면 Row 형태로 출력된다.

val averagePrices = listingsDF.collect()

(averagePrices: Array[org.apache.spark.sql.Row] = Array([A,0,100], [B,1,101], [C,2,200]))


여기서 알 수 있는 것은 Dataframe은 Row 타입을 지닌 배열이라는 것이다. 또한 Row 타입은 내부의 값에 대한 타입을 알지 못한다는 사실이다.

실제로 문서를 보면 type DataFrame = Dataset[Row] 로 되어 있다.

(https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package@DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])


즉, 이제까지 알고 있었던 Dataframe은 사실 Dataset 인 것이다. 뭉뚱그려서 설명했지만 Dataset의 특징을 보면 대략 감을 잡을 수 있다.

- Datasets는 타입이 있는 데이터 집합이다.

- Dataset은 Dataframe과 RDD를 합친 것이다.

- Dataset은 반구조, 구조 데이터를 사용한다.

(Dataframe 보다 타입 정보가 포함되있고, RDD 보다 최적화가 잘 되 있는 확장판 정도??)

val listingsDS = List(Listing("A", 0, 100), Listing("B", 1, 101), Listing("C", 2, 200))
.toDS()
.groupByKey(l => l.zip)
.agg(avg($"price").as[Double])

그렇기 때문에 이전 예제를 위와 같이 변경해보면, 

Dataset[Listing] 으로서 RDD 처럼 groupByKey를 사용할 수 있고 또한 agg와 같은 Dataframe 관련 메소드를 사용 할 수 있다.

결과적으로 Dataframe의 결과를 좀 더 깊이있게 분석이 가능해진다. (RDD의 메소드가 high-order 이기 때문에)


또 다른 예제로 Datasets 를 알아보자.

val keyValues = List((3, "Me"), (1, "Thi"), (2, "Se"), (3, "ssa"), (1, "sIsA"), (3, "ge:"), (3, "-)"), (2, "cre"), (2, "t"))
sc.parallelize(keyValues).reduceByKey(_+_)

val keyValues = List((3, "Me"), (1, "Thi"), (2, "Se"), (3, "ssa"), (1, "sIsA"), (3, "ge:"), (3, "-)"), (2, "cre"), (2, "t"))
val keyValuesDS = keyValues.toDS()

keyValuesDS.groupByKey(p => p._1)
.mapGroups((k, vs) => (k, vs.foldLeft("")((acc, p) => acc + p._2)))

RDD에서만 Transfomation을 사용 할 수 있는 것 같지만, Dataset에서도 얼마든지 사용가능하다. 또한 아래처럼 RDD 인것 같지만서도 Dataset의 결과를 지니고 있다.

+---+----------+

| _1|        _2|

+---+----------+

|  1|   ThisIsA|

|  3|Message:-)|

|  2|    Secret|