Search

[러닝스파크 6장] 스파크 SQL과 데이터 세트

164 ~ 180 데이터세트를 이해하기 위해 내부를 자세히 살펴보자. 이는 자바와 스칼라에서 데이터세트를 어떻게 다루는지, 데이터세트 구조를 고수준 API의 일부로 스파크가 어떻게 처리하는지, 그리고 데이터세트 사용으로 인해 발생하는 비용에 대한 내용을 포함한다.

자바와 스칼라를 위한 단일 API

스파크에서 지원하는 언어 중에서 오직 스칼라와 자바만이 강력하게 형식화된 타입으로 지정
파이썬과 R은 형식화되지 않은 타입의 데이터 프레임 API를 지원
데이터세트는 데이터 프레임 API에서 익숙하게 사용되는 DSL 연산자나 함수형 프로그래밍을 사용 하여 병렬로 작동할 수 있는 도메인별 형식화된 객체

데이터세트를 위한 스칼라 케이스 클래스와 자바빈

스파크는 StringType, BinaryType, IntegerType, BooleanType 및 MapType 같은 내부적 데이터 타입을 가지고 있으며, 스파크 작업 중에 스칼라 및 자바의 언어별 데이터 타입 에 원활하게 매핑하는 데 사용
Dataset[T]를 생성하기 위해서, 여기서 T는 스칼라에서 형식화된 객체이기 때문에 객체를 정의하는 case class가 필요
{id: 1, first: "Jules", last: "Damji", url: "https://tinyurl.1", date: "1/4/2016", hits: 4535, campaigns: {"twitter", "Linkedln"} }, ...
Scala
복사
분산된 Dataset[Bloggers]를 생성하려면 먼저 스칼라 객체로 구성된 각 개별 필드를 정의하는 스칼라 케이스 클래스를 정의
case class Blogger( id: Int, first: String, last: String, url: String, date: String, hits: Int, campaigns: Array[String])
Scala
복사
val bloggers = "../data/bloggers.json" val bloggersDS = spark .read .format("json") .option("path", bloggers) .load() .as[Blogger]
Scala
복사
분산 데이터 컬렉션 결과의 각 행은 Blogger 유형으로 정의됨
자바에서 유형의 자바빈 클래스를 생성한 다음 인코더를 사용하여 Dataset<Blogger>를 생성
// 자바 예제 import org.apache.spark.sql.Encoders; import java.io.Serializable; public class Blogger implements Serializable { private int id; private String first; private String last; private String url; private String date; private int hits; private List<String> campaigns; // JavaBean getters and setters int getID() { return id; } void setID(int i) { id = i; } String getFirst() { return first; } void setFirst(String f) { first = f; } String getLast() { return last; } void setLast(String I) { last = I; } String getURL() { return url; } void setURL (String u) { url = u; } String getDate() { return date; } Void setDate(String d) { date = d; } int getHits() { return hits; } void setHits(int h) { hits = h; } List<String> getCampaigns() { return campaigns; } void setcampaigns(List<String> c) { campaigns = c; } } // 인코더 생성 Encoder<Blogger> BloggerEncoder = Encoders.bean(Blogger.class); String bloggers = "./bloggers.json"; Dataset<Blogger> bloggersDS = spark .read .format("json") .option("path", bloggers) .load() .as(BloggerEncoder);
Scala
복사
데이터 프레임과 달리, 데이터 세트 API를 사용하는 스칼라 및 자바에서 데이터세트를 만들려면 읽고 있는 행에 대한 모든 개별 칼럼 이름과 유형을 알아야 하기 때문에 사전에 먼저 고려해야 한다.
⇒ 미리 데이터 유형을 정의하고, 케이스 클래스 또는 자바 빈 클래스가 스키마와 일치해야 한다.
필드 이름이 입력한 데이터와 일치하는 경우 기존 스칼라 케이스 클래스 또는 자바빈 클래스를 사용할 수 있다
데이터세트 API로 작업하는 것은 데이터 프레임으로 작업하는 것만큼 쉽고 간결하며 선언적

데이터세트 작업

uid(사용자의 고유 ID), unname(임의 생성된 사용자 이름 문자열), usage[서버 또는 서비스 사용 시간(분)]의 세 가지 필드가 있는 스칼라 객체를 동적으로 생성
샘플 데이터 생성
// 스칼라 예제 import scala.util.Random._ // 데이터세트를 위한 케이스 클래스 case class Usage(uid: Int, uname: String, usage: Int) val r = new scala.util.Random(42) // 스칼라 Usage 클래스의 1000개 인스턴스 생성 // 데이터를 즉시 생성 val data = for (i <- 0 to 1000) yield (Usage(i, "user-" + r.alphanumeric.take(5).mkString("", "", "'"), r.nextInt(1000))) // Usage 형태의 데이터세트 생성 val dsUsage = spark.createDataset(data) dsUsage.show(10)
Scala
복사
자바에서는 명시적 인코더를 사용
// 자바 예제 import org.apache.spark.sql.Encoders; import org.apache.commons.lang3.RandomStringUtils; import java.io.Serializable; import java.util.Random; import java.util.ArrayList; import java.util.List; // 자바빈으로 자바 클래스 생성 public class Usage implements Serializable { int uid; // user id String uname; // username int usage; // usage public Usage() { } public Usage(int uid, String uname, int usage) { this.uid = uid; this.uname = uname; this.usage = usage; } // JavaBean getters and setters public int getUid() {return this.uid;} public void setUid(int uid) {this.uid = uid;} public String getUname() {return this.uname;} public void setUname(String uname) {this.uname = uname;} public int getUsage() {return this.usage;} public void setUsage(int usage) {this.usage = usage;} public Usage() {} public String toString() { return "uid: '" + this.uid + "' uname: '" + this.uname + "' usage: '" + this.usage "'"; } } // 명시적 인코더 생성 Encoder<Usage> usageEncoder = Encoders.bean(Usage.class); Random rand = new Random(); rand.setSeed(42); List<Usage> data = new ArrayList<Usage>(); // 자바 Usage 클래스의 1000개 인스턴스 생성 for (int i = 0; i < 1000; i++) { data.add( new Usage(i, "user" + RandomStringUtils.randomAlphanumeric(5), rand.nextInt(1000) ) ); } // Usage 형태의 데이터세트 생성 Dataset<Usage> dsUsage = spark.createDataset(data, usageEncoder);
Scala
복사
데이터세트인 dsUsage가 있으므로 이전 장에서 수행한 몇 가지 일반적인 변환을 수행

샘플 데이터 변환

데이터세트는 도메인별 객체의 강력하게 정형화된 컬렉션임을 기억
객체는 함수적인 또는 관계적인 연산을 사용하여 병렬로 변환
변환 : map(), reduce(),filter(), select(), aggregate()

고차 함수 및 함수형 프로그래밍

filter를 사용하여 사용량이 900분을 초과하는 dsUsage 데이터세트의 모든 사용자를 반환
import org.apache.spark.sql.functions._ // filter() 함수에 대한 인수로 함수 표현식을 사용하는 것 dsUsage.filter(d => d.usage > 900) .orderBy(desc("usage")) .show(5, false) // 함수를 정의하고 해당 함수를 filterO 함수의 인수로 제공 def filterWithUsage(u: Usage) = u.usage > 900 dsUsage.filter(filterWithUsage(_)).orderBy(desc("usage")).show(5)
Scala
복사

데이터세트 및 데이터 프레임을 위한 메모리 관리

스파크는 집중적인 인메모리 분산 빅데이터 엔진으로, 메모리를 효율적으로 사용하는 것은 실행 속 도에 큰 영향을 미친다. 스파크 릴리스 역사를 보면 스파크의 메모리 사용은 엄청나게 진화하였다

reference