Spark - Dataframe API (Part 2)
이전과 다른 예시를 가지고 Dataframe을 다뤄보려고 한다.
https://www.kaggle.com/timoboz/superbowl-history-1967-2020/data
먼저 데이터를 살펴보면,
Date,SB,Winner,Winner Pts,Loser,Loser Pts,MVP,Stadium,City,State Feb 2 2020,LIV (54),Kansas City Chiefs,31,San Francisco 49ers,20,Patrick Mahomes,Hard Rock Stadium,Miami Gardens,Florida Feb 3 2019,LIII (53),New England Patriots,13,Los Angeles Rams,3,Julian Edelman,Mercedes-Benz Stadium,Atlanta,Georgia Feb 4 2018,LII (52),Philadelphia Eagles,41,New England Patriots,33,Nick Foles,U.S. Bank Stadium,Minneapolis,Minnesota Feb 5 2017,LI (51),New England Patriots,34,Atlanta Falcons,28,Tom Brady,NRG Stadium,Houston,Texas Feb 7 2016,50,Denver Broncos,24,Carolina Panthers,10,Von Miller,Levi's Stadium,Santa Clara,California Feb 1 2015,XLIX (49),New England Patriots,28,Seattle Seahawks,24,Tom Brady,University of Phoenix Stadium,Glendale,Arizona |
CSV 파일로 대략 이렇게 되어 있다. 그리고 가장 상위에는 해당 값에 대한 칼럼명이 붙어있다.
val superbowlDF = mySpark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("...")
superbowlDF.show()
csv의 option에 대한 내용은 https://github.com/databricks/spark-csv 을 참조하자.
간단히 예제에서 사용한 옵션만을 살펴보면,
- inferSchema : 각 컬럼의 타입을 추론해서 데이터 타입으로 사용한다.
- header : 데이터 레코드의 첫 라인을 헤더로 사용한다.
먼저, "Winner Pts가 30보다 크고 Loser Pts가 20보다 작은 레코드 중 Winner, MVP, Stadium만 뽑아서 Date로 정렬"한다고 할 때, 아래와 같이 표현될 수 있다.
superbowlDF
.select("Winner", "MVP", "Stadium")
.filter(($"Winner Pts" > 30) && ($"Loser Pts" < 20))
.orderBy("Date")
.show()
+-------------------+-----------------+--------------------+ | Winner| MVP| Stadium| +-------------------+-----------------+--------------------+ | Seattle Seahawks| Malcolm Smith| MetLife Stadium| | New Orleans Saints| Drew Brees| Sun Life Stadium| | Green Bay Packers| Bart Starr+| Orange Bowl| | Green Bay Packers| Bart Starr+| Memorial Coliseum| |Pittsburgh Steelers| Terry Bradshaw+| Rose Bowl| |San Francisco 49ers| Joe Montana+| Stanford Stadium| |Los Angeles Raiders| Marcus Allen+| Tampa Stadium| | Chicago Bears| Richard Dent+| Louisiana Superdome| |San Francisco 49ers| Joe Montana+| Louisiana Superdome| | Baltimore Ravens| Ray Lewis+|Raymond James Sta...| |Washington Redskins| Doug Williams| Jack Murphy Stadium| | Dallas Cowboys| Troy Aikman+| Rose Bowl| | Denver Broncos| John Elway+| Pro Player Stadium| | Oakland Raiders|Fred Biletnikoff+| Rose Bowl| +-------------------+-----------------+--------------------+ |
그럼 관련 결과를 위와 같이 Dataframe으로 얻을 수 있다.
같은 방식으로 "State를 그룹핑한 결과를 갯수 반환" 한다면 아래와 같다.
superbowlDF
.groupBy($"State")
.count()
.show()
+----------+-----+ | State|count| +----------+-----+ | Minnesota| 2| | Texas| 4| | Georgia| 3| | Michigan| 2| |New Jersey| 1| | Arizona| 3| | Louisiana| 10| | Florida| 16| | Indiana| 1| |California| 12| +----------+-----+ |
마찬가지로 위와 같은 결과를 얻을 수 있다.
관련 함수 예시는 https://sparkbyexamples.com/spark/spark-sql-aggregate-functions/ 에서 확인할 수도 있다.
번외로 Join은 아래와 같이 나타낼 수 있다.
val sc = mySpark.sparkContext
case class Abo(id: Int, v: (String, String))
case class Loc(id: Int, v: String)
val as = List(Abo(101, ("Ruetli", "AG")), Abo(102, ("Brelaz", "DemiTarif")),
Abo(103, ("Gress", "DemiTarifVisa")), Abo(104, ("Schatten", "DemiTarif")))
val abosDF = sc.parallelize(as).toDF()
val ls = List(Loc(101, "Bern"), Loc(101, "Thun"), Loc(102, "Lausanne"), Loc(102, "Geneve"),
Loc(102, "Nyon"), Loc(103, "Zurich"), Loc(103, "St-Gallen"), Loc(103, "Chur"))
val locationsDF = sc.parallelize(ls).toDF()
val trackedCustomersDF = abosDF.join(locationsDF, abosDF("id") === locationsDF("id")).show()
2개의 Dataframe에서 id를 사용하여 join 하는 예제이다. join 결과는 아래와 같다.
+---+--------------------+---+---------+ | id| v| id| v| +---+--------------------+---+---------+ |101| [Ruetli, AG]|101| Bern| |101| [Ruetli, AG]|101| Thun| |103|[Gress, DemiTarif...|103| Zurich| |103|[Gress, DemiTarif...|103|St-Gallen| |103|[Gress, DemiTarif...|103| Chur| |102| [Brelaz, DemiTarif]|102| Lausanne| |102| [Brelaz, DemiTarif]|102| Geneve| |102| [Brelaz, DemiTarif]|102| Nyon| +---+--------------------+---+---------+ |