IT/Spark

Spark - Dataframe API (Part 1)

물통꿀꿀이 2020. 2. 10. 00:06

이전에도 잠깐 알아보았지만, RDD 메소드에 비해 어떤 부분이 더 좋은지 살펴보면

1) RDD 메소드는 functional API에 비해 Dataframe은 declarative API 이다. 즉, SQL*과 같이 사용하기 편하다. 

*SQL 또한 declarative API 이다.

2) 자동으로 최적화가 된다. (Dataframe을 잘 사용 할 수 있도록 지속해서 업데이트가 된다.)

3) 타입이 없다*.

* Dataframe이 타입이 완전이 없는 것이 아니라, Dataframe의 내부 row의 타입이 존재하지 않는다.

(Spark SQL에서 지원하는 데이터 타입 => https://spark.apache.org/docs/latest/sql-reference.html)


간단한 예시를 바탕으로 API를 살펴보자.

(https://raw.githubusercontent.com/databricks/learning-spark/master/files/testweet.json 샘플을 사용하려고 한다.)

val peopleDF = spark.read.json("...")
peopleDF.show()

기본적으로 위와 같이 JSON 파일을 읽어 Dataframe 형태로 만든다. 그리고 생성된 Dataframe을 바탕으로 Dataframe API를 사용하면 된다.

그런데!! SQL를 호출하는 방식이 2 가지이다.

1) Dataframe API

peopleDF.select("source").show()

2) Spark API

peopleDF.createOrReplaceTempView("people")
spark.sql("select source from people").show()

관련해서 https://stackoverflow.com/questions/45430816/writing-sql-vs-using-dataframe-apis-in-spark-sql 글에 자세히 나와있지만, 요약해보면

- Spark API는 SQL 에러를 컴파일 시점에 잡을 수 없다.

- 최적화가 Dataframe API가 더 잘되어 있다. (즉, 성능이 더 좋다.)

때문에 spark.sql 보다는 Dataframe에서 제공해주는 API를 사용하여 SQL를 호출 하는 것이 더 낫다


돌아와서 예시를 살펴보면 아래와 같은 구조로 되어 있다.

"createdAt": "Nov 4, 2014 4:56:59 PM",
"id": 529799371026485248,
"text": "Adventures With Coffee, Code, and Writing.",
"source": "\u003ca href\u003d\"http://twitter.com\" rel\u003d\"nofollow\"\u003eTwitter Web Client\u003c/a\u003e",
"isTruncated": false,

...


먼저, printschema() 는 파일이 가진 구조를 보여준다. (위의 예제과 겹치는 부분은 빨간색으로 칠하였다.)

root

 |-- contributorsIDs: array (nullable = true)

 |    |-- element: string (containsNull = true)

 |-- createdAt: string (nullable = true)

 |-- currentUserRetweetId: long (nullable = true)

 |-- hashtagEntities: array (nullable = true)

 |    |-- element: string (containsNull = true)

 |-- id: long (nullable = true)

 |-- inReplyToStatusId: long (nullable = true)

 |-- inReplyToUserId: long (nullable = true)

 |-- isFavorited: boolean (nullable = true)

 |-- isPossiblySensitive: boolean (nullable = true) 


Dataframe의 Transformation으로 select, agg, groupBy, join 등등이 있다. 해당 메소드의 결과 값은 Dataframe이며 RDD와 비슷하게 Lazy하게 동작한다.

그런데 Dataframe은 재미있게도 메소드의 내부 즉, 메소드의 파라미터를 사용하는 방식은 3가지가 존재한다.

(그 이유는 입력받은 데이터를 Dataframe으로 구성할 때 Column에 대해 전처리를 하기 때문이다.)

1. $ 사용

peopleDF
.select("user")
.filter($"user.friendsCount" > 10)
.show()

2. dataframe 사용

peopleDF
.select("user")
.filter(peopleDF("user.friendsCount") > 10)
.show()

3. SQL 사용

peopleDF
.select("user.friendsCount")
.filter("user.friendsCount > 10")
.show()

세가지 중 어떤 것을 사용해도 무방하다. 그냥 "Dataframe에서는 위와 같이 사용 할 수도 있다" 정도이다. (개인적으로는 1,3번이 사용하기 편해보인다는..)


그럼 다음 편에서..


Ref

http://bigdatums.net/2016/02/12/how-to-extract-nested-json-data-in-spark/