Search

[러닝스파크 5장] 스파크 SQL과 데이터 프레임: 외부 데이터 소스와 소통하기

p120 ~ 163

개요

스파크 SQL을 사용하여 다음을 수행하는 방법에 대해 살펴보자. 아파치 하이브 및 아파치 스파크 모두에 대해 사용자 정의 함수를 사용한다. JDBC 및 SQL 데이터베이스, PostgreSQL, MySQL, 태블로. 애저 코스모스 DB 및 MS SQL 서버 와 같은 외부 데이터 원본과 연결한다. 단순하거나 복잡한 유형, 고차 함수 그리고 일반적인 관계 연산자를 사용하여 작업한다.

스파크 SQL과 아파치 하이브

스파크 SQL은 관계형 처리와 스파크의 함수형 프로그래밍 API를 통합하는 아파치 스파크의 기본 구성 요소
스파크 SQL을 사용하면 스파크 프로그래머는 더 빠른 성능 및 관계형 프로그래밍(예: 선언적 쿼리 및 최적화된 스토리지)의 이점을 활용할 수 있을 뿐만 아니라 복잡한 분석 라이브러리(예: 머신러닝)를 호출 할 수 있다.

사용자 정의 함수

사용자 정의 함수 = user-defined function, UDF
스파크 SQL UDF 사용자만의 고유한 pyspark 또는 스칼라 UDF를 생성하는 이점은 사용자(또는 다른 사용자)도 스파크 SQL 안에서 이를 사용할 수 있다는 것
from pyspark.sql.types import LongType #큐브 함수 생성 def cubed(s): return s * s * s #UDF로 등록 spark.udf.register("cubed", cubed, LongType()) #임시 뷰 생성 spark.range(1, 9).createOrReplaceTempView("udf_test") spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show() >>> +---+--------+ | id|id_cubed| +---+--------+ | 1| 1| | 2| 8| | 3| 27| | 4| 64| | 5| 125| | 6| 216| | 7| 343| | 8| 512| +---+--------+
Python
복사
스파크 SQL에서 평가 순서 및 null 검사
스파크 SQL(SQL, 데이터 프레임 API 및 데이터세트 API 포함)은 하위 표현식의 평가 순서를 보장하지 않는다.
# s is NOT NULL 절이 strlen(s) 1절 이전에 실행된다는 것을 보장하지 않는다. spark.sql("SELECT s FROM test1 WHERE s IS NOT NULL AND strlen(s) > 1")
Python
복사
따라서 적절한 null 검사를 수행하려면 다음을 수행하는 것이 좋다. 1. UDF 자체가 null을 인식하도록 만들고 UDF 내부에서 null 검사를 수행한다. 2. IF 또는 CASE WHEN 식을 사용하여 null 검사를 수행하고 조건 분기에서 UDF를 호출한다.
pandas UDF, pySpark UDF 속도 향상 및 배포
1.
Pyspark UDF 사용과 관련하여 기존의 일반적인 문제 중 하나는 스칼라 UDF보다 성능이 느리다
a.
Pyspark UDF가 JVM과 파이썬 사이의 데이터 이동을 필요로 해서 비용이 많이 들었기 때문 ⇒ 판다스 UDF가 나온 이유
2.
판다스 UDF는 아파치 애로우(Arrow)를 사용하여 데이터를 전송하고, 판다스는 해당 데이터로 작업합니다. 판다스 UDF를 정의할 때는 pandas.udf 키워드를 데코레이터로 사용하거나, 함수 자체를 래핑할 수 있습니다. 이미 아파치 애로우 형식에 포함된 데이터를 사용하기 때문에, 더 이상 데이터를 직렬화하거나 피클해야 하는 번거로움이 없습니다. 작업을 행마다 개별 입력에 대해 수행하는 대신, 판다스 시리즈 또는 데이터프레임에서 작업을 수행하여 벡터화된 실행을 할 수 있습니다.
판다스 UDF
아파치 스파크 3.0에서 판다스 UDF는 Pandas.Series, pandas.DataFrame, Tuple 및 Iterator와 같은 파이썬 유형 힌트를 사용하여 판다스 UDF 유형을 추론합니다. 이전에는 각 판다스 UDF 유형을 수동으로 정의하고 지정해야 했지만, 현재 판다스 UDF에서는 시리즈와 시리즈, 시리즈 반복자와 시리즈 반복자, 다중 시리즈 반복자와 시리즈 반복자, 시리즈와 스칼라(단일 값)를 파이썬 유형 힌트로 지원합니다.
판다스 함수 API
판다스 함수 API를 사용하면 입력과 출력이 모두 판다스 인스턴스인 파이스파크 데이터프레임에 로컬 파이썬 함수를 직접 적용할 수 있습니다. 스파크 3.0의 경우 판다스 함수 API는 그룹화된 맵, 맵, 공동 그룹화된 맵을 지원합니다.
# 판다스 가져오기 import pandas as pd # Import various pyspark SQL functions including pandas_udf from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import LongType #큐브 함수 선언 def cubed(a: pd.Series) -> pd.Series: return a * a * a # 큐브 함수에 대한 판다스 UDF 생성 cubed_udf = pandas_udf(cubed, returnType=LongType()) #판다스 시리즈 생성 x = pd.Series([1, 2, 3]) # 로컬 판다스 데이터를 실행하는 pandas_udf에 대한 함수 print(cubed(x)) 0 1 1 8 2 27 dtype: int64 # 스파크 데이터 프레임 생성, ’spark’는 기존의 sparkSession과 같다. df = spark.range(1, 4) # 벡터화된 스파크 UDF를 함수로 실행 df.select("id", cubed_udf(col("id"))).show() +---+---------+ | id|cubed(id)| +---+---------+ | 1| 1| | 2| 8| | 3| 27| +---+---------+
Python
복사
로컬 함수와 달리 벡터화된 UDF를 사용할 때 스파크 작업이 실행됩니다. 이는 스파크 UI에서 pandas_udf 함수의 실행 단계를 더 명확하게 확인할 수 있습니다.
스파크 작업은 parallelize()로 시작하여 로컬 데이터(애로우 바이너리 배치)를 이그제큐터로 보내고, mapPartitionsQ를 호출하여 애로우 바이너리 배치를 스파크의 내부 데이터 형식으로 변환하여 스파크 작업자에게 배포합니다. WholeStageCodegen 단계가 여러 번 적용되어 CPU 효율성과 성능이 향상됩니다. 하지만 이 경우에는 ArrowEvalPython 단계에서 판다스 UDF가 실행됩니다.

스파크 SQL 셸, 비라인 및 태블로로 쿼리하기

아파치 스파크를 쿼리하는 방법에는 스파크 SQL 셸, Beeline CLI, Tableau, Power BI 등 다양한 리포팅 툴이 있다.
스파크 SQL 셸 사용하기
스파크 SQL 쿼리를 실행하는 쉬운 방법은 spark-sql CLI.
이 유틸리티는 하이브 메타스토어 서비스와 로컬 모드에서 통신하는 대신, 스파크 쓰리프트 서버(STS)인 Thrift JDBC/ODBC 서버와 통신합니다.
STS를 사용하면, JDBC/ODBC 클라이언트가 아파치 스파크에서 JDBC 또는 ODBC 프로토콜을 통해 SQL 쿼리를 실행할 수 있습니다.
./bin/spark-sql
Python
복사
테이블 만들기
새롭고 영구적인 스파크 SQL 테이블을 생성
spark-sql> CREATE TABLE people (name STRING, age INT);
Python
복사
테이블에 데이터 삽입하기 스파크 SQL 테이블에 데이터를 삽입할 수 있다.
INSERT INTO people SELECT name, age FROM ... spark-sql> INSERT INTO people VALUES ("Michael", NULL); Time taken: 1.696 seconds spark-sql> INSERT INTO people VALUES ("Andy", 30); Time taken: 0.744 seconds spark-sql> INSERT INTO people VALUES ("Samantha", 19); Time taken: 0.637 seconds
Python
복사
스파크 SQL 쿼리 실행하기
메타스토어에 있는 테이블 확인
spark-sql> SHOW TABLES; default people false
Python
복사
테이블에 있는 20세 미만의 사람들이 몇 명인지..!
spark-sql> SELECT * FROM people WHERE age < 20; Samantha 19 Time taken: 0.593 seconds. Fetched 1 row(s)
Python
복사
spark-sql> SELECT name FROM people WHERE age IS NULL; Michael Time taken: 0.112 seconds, Fetched 1 row(s)
Python
복사

비라인 작업

하이브QL 쿼리를 실행하기 위한 유틸리티인 비라인 커맨드라인 툴에 익숙해져보자. 비라인은 SQLLine CLI를 기반으로 하는 JDBC 클라이언트입니다. 이 동일한 유틸리티를 사용하여 스파크 스리프트 서버에 대해 스파크 SQL 쿼리를 실행할 수 있습니다. 현재 구현된 스리프트 JDBC/ODBC 서버는 하이브 1.2.1의 하이브서버에 해당. 스파크 또는 하이브 1.2.1에서 제공되는 아래 비라인 스크립트를 사용하여 JDBC 서버를 테스트합니다.
쓰리프트 서버 시작하기  스파크 쓰리프트 JDBC/ODBC 서버를 시작하려면 $SPARK_HOME 폴더에서 다음 명령을 실행
./sbin/start-thriftserver.sh
Python
복사
비라인을 통해 쓰리프트 서버에 연결하기 
비라인을 사용하여 쓰리프트 JDBC/ODBC 서버를 테스트
./bin/beeline
Python
복사
비라인을 구성하여 로컬 쓰리프트 서버에 연결
!connect jdbc:hive2://localhost:10000
Python
복사
비라인으로 스파크 SQL 쿼리 실행하기
0: jdbc:hive?://localhost:10000> SHOW tables; +------------+------------+--------------+ | namespace | tableName | isTemporary | +------------+------------+--------------+ | default | people | false | +------------+------------+--------------+ 1 row selected (0.421 seconds) 0: jdbc:hive2://localhost:10000> SELECT * FROM people; +-----------+-------+ | name | age | +-----------+-------+ | Andy | 30 | | Samantha | 19 | | Michael | NULL | +-----------+-------+ 3 rows selected (0.981 seconds)
Python
복사
쓰리프트 서버 중지하기
./sbin/stop-thriftserver.sh
Python
복사

태블로로 작업하기

스파크 SQL에 선호하는 BI 도구를 쓰리프트 JDBC/ODBC 서버를 통해 연결할 수 있습니다.
로컬 아파치 스파크 인스턴스에 Tableau Desktop(버전 2019.2)을 연결하는 방법은 다음과 같습니다.
1.
More...>Spark SQL을 선택하여 스파크 SQL에 연결합니다.
2.
서버: localhost 포트: 10000 (기본값) 유형: SparkThriftServer (기본값) 인증: 사용자 이름 사용자 이름: 로그인(예: user@learningspark.org) SSL 필요: 선택하지 않음
하지만 위의 sparkSQL 커넥터는 태블로 유료 계정에 지원이 된다.

외부 데이터 소스

JDBC 및 SQL 데이터베이스부터 시작하여 스파크 SQL을 사용하여 외부 데이터 소스에 연결하는 방법

JDBC SQL 데이터베이스 

스파크 SQL은 다른 데이터베이스에서 데이터를 읽을 수 있는 데이터 소스 API를 포함하고 있으며, 데이터 프레임으로 결과를 반환할 때 이 API를 사용하여 쿼리를 단순화할 수 있습니다. 이를 통해 스파크 SQL은 성능 및 다른 데이터 소스와 조인할 수 있는 기능을 포함하여 모든 이점을 제공합니다.
시작하려면 JDBC 데이터 소스에 대한 JDBC 드라이버를 지정해야 하며 스파크 클래스 경로에 있어야 한다.
$SPARK_HOME 폴더에서 다음과 같은 명령을 실행
./bin/spark-shell --driver-class-path $database.jar —jars $database.jar
Python
복사
데이터 소스 API를 사용하여 원격 데이터베이스의 테이블을 데이터 프레임 또는 스파크 SQL 임시 뷰로 로드할 수 있다. 또한 사용자는 데이터 소스 옵션에서 JDBC 연결 속성을 지정할 수 있다.
파티셔닝의 중요성
스파크 SQL과 JDBC 외부 소스에서 많은 양의 데이터를 전송할 때는 데이터 소스를 분할하는 것이 중요합니다. 모든 데이터가 하나의 드라이버 연결을 통해 처리되면 추출 성능을 포화 상태로 만들고 성능을 크게 저하시키며 소스 시스템의 리소스를 포화 상태로 만들 수 있습니다.
JDBC 속성은 선택 사항이지만 대규모 작업의 경우 아래 표에 표시된 속성을 사용하는 것 권장
위의 파티셔닝 속성 작동 방식 예
•numPartitions:10 •lowerBound:1000 •upperBound:10000 ## 파티션 크기는 1.000이 되고 10개의 파티션이 생성된다. ## 이것은 다음 10개의 쿼리를 실행하는 것과 동일하다 •SELECT * FROM table WHERE partitioncolumn BETWEEN 1000 and 2000 •SELECT * FROM table WHERE partitioncolumn BETWEEN 2000 and 3000 •SELECT * FROM table WHERE partitioncolumn BETWEEN 9000 and 10000
YAML
복사
numPartitions의 좋은 시작점은 스파크 워커 수의 배수를 사용하는 것
처음에는 최소 및 최대 partitionColumn의 실제 값을 기준으로 lowerBound 및 upperBound를 기반으로 계산한다.
데이터 skew를 방지하기 위해 균일하게 분산될 수 있는 partitioncolumn을 선택해야 한다.

PostgreSQL

위의 링크에서 postgresql-42.2.6.jar 다운, 그 후 SPARK_HOME 경로에 추가
bin/spark-shell --jars ./postgresql-42.6.0.jar
Shell
복사
python에서 스파크 SQL 데이터 소스 API 및 JDBC를 사용하여 PostgreSQL 데이터베이스에서 로드하고 저장하는 방법
# postgre의 접속 정보와 import pyspark를 한다. import pyspark ip = "127.0.0.1" port = 5432 user = "아이디" passwd = "비밀번호" db = "데이터베이스" spark = pyspark.sql.SparkSession \ .builder \ .config("spark.driver.extraClassPath", r"/Users/yoohajun/spark-3.3.2-bin-hadoop2/postgresql-42.6.0.jar") \ .appName("App") \ .getOrCreate() sql = """ select * from schema.sample_table ( 스키마 이름 . 테이블 이름 ) limit 10 """ # 읽기 방법 1: 로드 함수를 사용하여 JDBC 소스로부터 데이터를 로드 postgreTable = spark.read.format("jdbc")\ .option("url","jdbc:postgresql://{0}:{1}/{2}".format( ip, port, db ) )\ .option("driver", "org.postgresql.Driver")\ .option("query", sql)\ .option("user", user)\ .option("password", passwd)\ .load() postgreTable.show() # Spark SQL를 작성한다 postgreTable.createOrReplaceTempView("postgreTable" ) spark.sql( """ select from postgreTable limit 5 """).show()
Python
복사

MySQL

MySQL 데이터베이스에 연결하려면 MySQL JDBC jar를 다운로드한 후에 클래스 경로에 추가한다. 그런 다음 해당 jar를 지정하여 스파크 셸spark-shell 또는 pyspark을 시작한다.
아래는 pyspark 예제.

데이터 프레임 및 스파크 SQL의 고차 함수

복잡한 데이터 유형은 단순한 데이터 유형의 결합이기 때문에 직접 조작하고자 하는 생각이 들 수 있다.
일반적으로는 get_json_ object(), from_json(), to_json(), breakfast(), selectExpr()과 같은 유틸리티 함수
1.
분해 및 수집
중첩된 구조를 개별 행으로 분해하고 일부 함수를 적용한 다음 중첩된 구조를 다시 만드는 방법
아래 SQL문은 explode(values)를 사용하여 values 내의 각 요소에 대한 새로운 행(id 포함)을 만든다.
In SQL SELECT id, collect_list(value + 1) AS values FROM (SELECT id, EXPLODE(values) AS value FROM table) x GROUP BY id
SQL
복사
collect_list()는 중복된 개체 목록을 반환하지만 GROUP BY문에는 셔플 작업이 필요합니다. 따라서, 재수집된 배열의 순서는 원래 배열의 순서와 항상 일치하지는 않습니다. 값은 매우 넓거나 긴 배열의 여러 차원이 될 수 있으며, GROUP BY(셔플)를 수행하기 때문에 이 접근 방식은 매우 비쌀 수 있습니다.
2.
사용자 정의 함수 구축
동일한 작업(values의 각 요소에 1을 더함)
동일한 작업을 수행하기 위해 map()을 사용하여 각 요소(값)를 반복하고 더하기 작업을 수행하는 UDF를 생성할 수도 있다.
def addOne(values: Seq[Int]): Seq[Int] = { values.map(value => value + 1) } val plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])
Scala
복사
spark.sql("SELECT id, plusOneInt(values) AS values FROM table").show()
SQL
복사
위의 방법은 직렬화와 역직렬화 프로세스 자체는 비용이 많이 들지만, 정렬 문제가 없는 경우, explode()와 collect_list()보다 효과적일 수 있다.
collect_list()를 사용하면 메모리 부족 문제가 발생할 수 있으므로 UDF 사용을 고려해야 한다.

복잡한 데이터 유형을 위한 내장 함수

복잡한 데이터 유형에 대해 아파치 스파크 2.4 이상 버전에 포함된 내장 함수를 사용

고차 함수

익명 람다 함수를 인수로 사용하는 고차 함수
transform(values, value -> lambda expression)
SQL
복사
transform() 함수는 배열 values와 익명 함수(람다 표현식)를 입력으로 사용합니다.
이 함수는 각 요소에 익명 함수를 적용한 다음, 결과를 출력 배열에 할당하여 새로운 배열을 효율적으로 생성합니다(UDF 접근 방식과 유사하지만 더 효율적입니다).
from pyspark.sql.types import * # Define the schema for the DataFrame schema = StructType([StructField("celsius", ArrayType(IntegerType()))]) # Create a list of temperature arrays t_list = [ [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]] ] # Create a DataFrame from the list and schema t_c = spark.createDataFrame(t_list, schema) # Create a temporary view for the DataFrame t_c.createOrReplaceTempView("tC") # Show the DataFrame t_c.show()
Python
복사
transform 함수
transform(array<T>, function<T, U>): array<U>
Python
복사
transform() 함수는 입력 배열의 각 요소에 함수를 적용하여 배열을 생성한다(map() 함수와 유사).
spark.sql(""" SELECT celsius, transform(celsius, t -> (((t * 9) div 5) + 32)) AS fahrenheit FROM tC """).show()
Python
복사
filter 함수
filter(array<T>, functioncT, Boolean>): array<T>
Python
복사
입력한 배열의 요소 중 부울 함수가 참인 요소만으로 구성된 배열을 생성한다
spark.sql(""" SELECT celsius, filter(celsius, t -> t > 38) AS high FROM tC """).show()
Python
복사
exists 함수
exists(array<T>, function<T, V, Boolean〉): Boolean
Python
복사
입력한 배열의 요소 중 불린 함수를 만족시키는 것이 존재하면 exists 함수는 참을 반환한다.
spark.sql(""" SELECT celsius, exists(celsius, t -> t = 38) as threshold FROM tC """).show()
Python
복사
reduce 함수
reduce 함수는 function<B, T, B>를 사용하여 요소를 버퍼 B에 병합하고 최종 버퍼에 마무리 fuction<B, R>을 적용하여 배열의 요소를 단일값으로 줄인다.
# 온도의 평균을 계산하고 화씨로 변환 spark.sql(""" SELECT celsius, reduce( celsius, 0, (t, acc) -> t + acc, acc -> (acc div size(celsius) * 9 div 5) + 32 ) as avgFahrenheit FROM tC """).show()
Python
복사

일반적인 데이터 프레임 및 스파크 SQL 작업

일반적인 관계형 연산에 집중
1.
결합과 조인
2.
윈도우
3.
수정
아래는 데이터 준비 과정
1.
공항 정보 데이터와 미국 비행 지연 데이터를 각각 가져와 데이터 프레임을 만든다.
2.
delay(지연)과 distance(거리) 칼럼을 STRING에서 INT로 변환한다.
3.
데모 예제에 집중하기 위해 작은 테이블 foo를 만든다. 이 테이블에는 시애틀(SEA)에서 출발하여 샌프란시스코(SFO)에 도착하는 3개 항공편의 정보만 포함된다.
from pyspark.sql.functions import expr tripdelaysFilePath = "./departuredelays.csv" airportsnaFilePath = "./airport-codes-na.txt" # 공항 데이터세트를 읽어옵니다. airportsna = (spark .read.format("csv") .options(header="true", inferSchema="true", sep="\t") .load(airportsnaFilePath)) airportsna.createOrReplaceTempView("airports_na") # 출발 지연 데이터세트를 읽어옵니다. departureDelays = (spark.read .format("csv") .options(header="true") .load(tripdelaysFilePath)) departureDelays = (departureDelays .withColumn("delay", expr("CAST(delay as INT) as delay")) .withColumn("distance", expr("CAST(distance as INT) as distance")) ) departureDelays.createOrReplaceTempView("departureDelays") # 임시 작은 테이블 생성 foo = (departureDelays .filter(expr(""" origin == 'SEA' AND destination == 'SFO' AND date LIKE '01010%' AND delay > 0""" ))) foo.createOrReplaceTempView("foo")
Python
복사

윈도우 함수

일반적으로 윈도우(입력 행의 범위) 행의 값을 사용해 다른 행의 형태로 값 집합을 반환합니다. 윈도우 함수를 사용하면 모든 입력 행에 대해 단일 값이 반환되면서 행 그룹에 대해 작업할 수 있습니다. 이 섹션에서는 dense_rank() 윈도우 함수를 사용하는 방법을 보여줍니다.
표 5-5에는 이외에도 많은 기능이 있습니다.
-- departureDelaysWindow 테이블을 삭제합니다. DROP TABLE IF EXISTS departureDelaysWindow; -- departureDelaysWindow 테이블을 생성합니다. CREATE TABLE departureDelaysWindow AS SELECT origin, destination, SUM(delay) AS TotalDelays FROM departureDelays WHERE origin IN ('SEA', 'SFO', 'JFK') AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL') GROUP BY origin, destination; -- 결과를 출력합니다. SELECT * FROM departureDelaysWindow;
SQL
복사