IT/Spark

Spark - Dataframe API (Part 2)

물통꿀꿀이 2020. 2. 11. 23:27

이전과 다른 예시를 가지고 Dataframe을 다뤄보려고 한다.

https://www.kaggle.com/timoboz/superbowl-history-1967-2020/data


먼저 데이터를 살펴보면,

Date,SB,Winner,Winner Pts,Loser,Loser Pts,MVP,Stadium,City,State

Feb 2 2020,LIV (54),Kansas City Chiefs,31,San Francisco 49ers,20,Patrick Mahomes,Hard Rock Stadium,Miami Gardens,Florida

Feb 3 2019,LIII (53),New England Patriots,13,Los Angeles Rams,3,Julian Edelman,Mercedes-Benz Stadium,Atlanta,Georgia

Feb 4 2018,LII (52),Philadelphia Eagles,41,New England Patriots,33,Nick Foles,U.S. Bank Stadium,Minneapolis,Minnesota

Feb 5 2017,LI (51),New England Patriots,34,Atlanta Falcons,28,Tom Brady,NRG Stadium,Houston,Texas

Feb 7 2016,50,Denver Broncos,24,Carolina Panthers,10,Von Miller,Levi's Stadium,Santa Clara,California

Feb 1 2015,XLIX (49),New England Patriots,28,Seattle Seahawks,24,Tom Brady,University of Phoenix Stadium,Glendale,Arizona 

CSV 파일로 대략 이렇게 되어 있다. 그리고 가장 상위에는 해당 값에 대한 칼럼명이 붙어있다.

val superbowlDF = mySpark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("...")

superbowlDF.show()

csv의 option에 대한 내용은 https://github.com/databricks/spark-csv 을 참조하자.

간단히 예제에서 사용한 옵션만을 살펴보면,

- inferSchema : 각 컬럼의 타입을 추론해서 데이터 타입으로 사용한다.

- header : 데이터 레코드의 첫 라인을 헤더로 사용한다.


먼저, "Winner Pts가 30보다 크고 Loser Pts가 20보다 작은 레코드 중 Winner, MVP, Stadium만 뽑아서 Date로 정렬"한다고 할 때, 아래와 같이 표현될 수 있다.

superbowlDF
.select("Winner", "MVP", "Stadium")
.filter(($"Winner Pts" > 30) && ($"Loser Pts" < 20))
.orderBy("Date")
.show()

+-------------------+-----------------+--------------------+

|             Winner|              MVP|             Stadium|

+-------------------+-----------------+--------------------+

|   Seattle Seahawks|    Malcolm Smith|     MetLife Stadium|

| New Orleans Saints|       Drew Brees|    Sun Life Stadium|

|  Green Bay Packers|      Bart Starr+|         Orange Bowl|

|  Green Bay Packers|      Bart Starr+|   Memorial Coliseum|

|Pittsburgh Steelers|  Terry Bradshaw+|           Rose Bowl|

|San Francisco 49ers|     Joe Montana+|    Stanford Stadium|

|Los Angeles Raiders|    Marcus Allen+|       Tampa Stadium|

|      Chicago Bears|    Richard Dent+| Louisiana Superdome|

|San Francisco 49ers|     Joe Montana+| Louisiana Superdome|

|   Baltimore Ravens|       Ray Lewis+|Raymond James Sta...|

|Washington Redskins|    Doug Williams| Jack Murphy Stadium|

|     Dallas Cowboys|     Troy Aikman+|           Rose Bowl|

|     Denver Broncos|      John Elway+|  Pro Player Stadium|

|    Oakland Raiders|Fred Biletnikoff+|           Rose Bowl|

+-------------------+-----------------+--------------------+ 


그럼 관련 결과를 위와 같이 Dataframe으로 얻을 수 있다.


같은 방식으로 "State를 그룹핑한 결과를 갯수 반환" 한다면 아래와 같다.

superbowlDF
.groupBy($"State")
.count()
.show()

+----------+-----+

|     State|count|

+----------+-----+

| Minnesota|    2|

|     Texas|    4|

|   Georgia|    3|

|  Michigan|    2|

|New Jersey|    1|

|   Arizona|    3|

| Louisiana|   10|

|   Florida|   16|

|   Indiana|    1|

|California|   12|

+----------+-----+ 

마찬가지로 위와 같은 결과를 얻을 수 있다.

관련 함수 예시는 https://sparkbyexamples.com/spark/spark-sql-aggregate-functions/ 에서 확인할 수도 있다.


번외로 Join은 아래와 같이 나타낼 수 있다.

val sc = mySpark.sparkContext

case class Abo(id: Int, v: (String, String))

case class Loc(id: Int, v: String)

val as = List(Abo(101, ("Ruetli", "AG")), Abo(102, ("Brelaz", "DemiTarif")),
Abo(103, ("Gress", "DemiTarifVisa")), Abo(104, ("Schatten", "DemiTarif")))
val abosDF = sc.parallelize(as).toDF()

val ls = List(Loc(101, "Bern"), Loc(101, "Thun"), Loc(102, "Lausanne"), Loc(102, "Geneve"),
Loc(102, "Nyon"), Loc(103, "Zurich"), Loc(103, "St-Gallen"), Loc(103, "Chur"))
val locationsDF = sc.parallelize(ls).toDF()

val trackedCustomersDF = abosDF.join(locationsDF, abosDF("id") === locationsDF("id")).show()

2개의 Dataframe에서 id를 사용하여 join 하는 예제이다. join 결과는 아래와 같다.

+---+--------------------+---+---------+

| id|                   v| id|        v|

+---+--------------------+---+---------+

|101|        [Ruetli, AG]|101|     Bern|

|101|        [Ruetli, AG]|101|     Thun|

|103|[Gress, DemiTarif...|103|   Zurich|

|103|[Gress, DemiTarif...|103|St-Gallen|

|103|[Gress, DemiTarif...|103|     Chur|

|102| [Brelaz, DemiTarif]|102| Lausanne|

|102| [Brelaz, DemiTarif]|102|   Geneve|

|102| [Brelaz, DemiTarif]|102|     Nyon|

+---+--------------------+---+---------+