JOIN
- Join 실습을 하기 위한 DataFrame 생성하기 (Person, graduateProgram, sparkStatus)
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")
- Table의 Key값을 통해 Relation 확인 후 Join 연산 수행
joinExpress = person['graduate_program'] == graduateProgram['id']
joinType = 'inner'
// person 테이블과 graduateProgram 테이블을 Join
// 위에서 선언한 join 기준과 join 형식을 이용해 join 연산 수
// truncate : 데이터프레임의 열의 너비를 자동으로 줄일지에 대한 옵션
person.join(graduateProgram, joinExpress, joinType).show(truncate=False)
- union을 이용해 테이블 병합하기
// 새로운 데이터프레임을 만들고, union을 이용해 graduateProgram 테이블과 병합
graduateProgram2 = graduateProgram.union(spark.createDataFrame([
(0, 'Master', 'Duplicated Row', 'Duplicated School')
]))
graduateProgram2.createOrReplaceTempView('graduateProgram2')
// graduateProgram2 내용을 쿼리문으로 조회하기
spark.sql("""
select * from graduateProgram2 where degree == 'Masters'
""").show()
- expr을 이용해 join 조건을 작성해서 테이블 join 하기
from pyspark.sql.functions import expr
// person 테이블의 컬럼명 변경하고
// expr 표현식으로 join 조건 (특정 컬럼의 배열에 id가 포함되어 있는지) 작성 후
// 두 테이블을 join
person.withColumnRenamed('id', 'personID').join(sparkStatus,
expr('array_contains(spark_status, id)')).show()
데이터 프레임을 읽어오고 저장하기
읽어올 csv, json, parquet 파일은 미리 spark가 있는 경로로 이동해두었다.
hadoop 에서 분할해서 처리해야 하기 때문에 저장한 파일이 '폴더'로 생성되며, 폴더 내에는 분할된 'part~'와 같은 파일이 존재한다.
- CSV 파일을 읽어온 뒤 저장하기
// csv 파일 읽어오기
csvFile = spark.read.format('csv').option('header','true').option('inferSchema','true')\
.option('mode','FAILFAST').load('2015-summary.csv')
- “header” option : 데이터프레임의 열 이름이 첫 번째 줄의 값으로 설정할지 여부
- “inferSchema” option : PySpark가 CSV 파일의 각 열의 데이터 유형을 추론, 추론한 Schema를 데이터프레임의 Schema로 사용
- “mode”, “FAILFAST” : 잘못된 데이터나 오류가 발생하면 읽기 작업 중단
- 데이터 프레임을 tsv 파일로 저장하기
// 읽어온 데이터프레임을 tsv 파일로 저장하기
csvFile.write.format('csv').mode('overwrite').option('sep','\t')\
.save('my-file.tsv')
- mode(’overwrite’) : 기존에 동일한 파일 이름이 있다면 덮어 씀
- option(’sep’,’\t’) : tab으로 필드 구분
- tsv : tab separated ~
- csv : comma separated ~
- tsv 파일로 저장한 데이터 프레임 읽어오기
// tsv 파일로 저장한 데이터 프레임을 읽어오기
csvFile2 = spark.read.format('csv').option('header','true').option('inferSchema','true')\
.option('mode','FAILFAST').load('my-file.tsv').show()
- JSON 파일을 읽어온 뒤 저장하기
// json 파일 읽어오기
jsonFile = spark.read.format('json').option('inferSchema','true')\
.option('mode','FAILFAST').load('2015-summary.json').show()
- json 파일은 header option이 없다
- json 형식은 키-값의 dictionary 형태로 데이터가 정의되어 있어 열 헤더가 명시되지 않음
- CSV 형식의 데이터프레임을 JSON 형식으로 저장하기
csvFile.write.format('json').mode('overwrite').save('my-json-file.json')
- Parquet 형식의 데이터프레임을 만들고 저장하기
Parquet :
- 분석용 데이터를 저장하는 데 사용되는 파일 형식
- 압축/ 분할 저장/ 스키마 지원 / 서드 파티 지원/ 열 지향 저장
- 열 지향 저장 : 데이터를 열 단위로 저장, 쿼리를 읽을 때 열 단위로 읽어올 수 있도록, 분산 처리할 때 유용하다는 사실!
// Parquet 형식 데이터프레임 생성을 위해 새로운 세션 생성
from pyspark.sql import SparkSession
// SparkSession.builder -> 세션을 생성하기 위한 빌더 객체 생성
// appName('Create Parquet File') -> 생성된 세션에 애플리케이션의 이름을 설정
//getOrCreate() -> 이미 존재하는 SparkSession을 반환하거나, 존재하지 않을 경우 새로운 SparkSession을 생성
spark = SparkSession.builder.appName('Create Parquet File').getOrCreate()
// Parquet 형식으로 저장할 데이터를 구성하고 데이터프레임으로 만들기
data = [
('a',10),('b',20),('c',30)
]
df = spark.createDataFrame(data, ['name', 'age'])
// 데이터프레임을 Parquet 형식으로 저장하고 내용 확인
df.write.parquet('output_parquet')
spark.read.format('parquet').load('output_parquet').show()
- ORC 형식의 파일을 읽어오고 저장하기
ORC (Optimized Row Columnar)
- parquet과 더불어 대규모 데이터를 저장하고 처리하는 파일 형식
- Apache Hive에서 분산 처리할 때 주로 사용
// ORC 형식의 파일 읽어오기
orcFile = spark.read.format('orc').load('2010-summary.orc')
// 읽어온 ORC 파일을 저장하기
orcFile.write.format('orc').mode('overwrite').save('my-orc-file.orc')
'Data Engineering > Spark' 카테고리의 다른 글
[Linux/Spark] 데이터 분석을 위한 Scala에 대해 알아보자 (0) | 2024.08.22 |
---|---|
[Linux/Spark] 노트북 환경에서 Spark 사용하기 (Jupyterlab) (0) | 2024.08.22 |
[Linux/Spark] Apache Spark에 대해 알아보자 (0) | 2024.08.22 |
[Linux/Spark] PySpark와 MySQL을 연동해보자 (0) | 2024.08.22 |
[Linux/Spark] Apache Spark를 설치해보자 (0) | 2024.08.22 |