Data Engineering/Spark

[Linux/Spark] PySpark를 이용해 쿼리 조회하고 데이터 CRUD 해보자

seoraroong 2024. 8. 22. 00:00

JOIN

 

- Join 실습을 하기 위한 DataFrame 생성하기 (Person, graduateProgram, sparkStatus)

person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
  .toDF("id", "status")

 

- Table의 Key값을 통해 Relation 확인 후 Join 연산 수행

joinExpress = person['graduate_program'] == graduateProgram['id']
joinType = 'inner'
// person 테이블과 graduateProgram 테이블을 Join
// 위에서 선언한 join 기준과 join 형식을 이용해 join 연산 수
// truncate : 데이터프레임의 열의 너비를 자동으로 줄일지에 대한 옵션
person.join(graduateProgram, joinExpress, joinType).show(truncate=False)

 

- union을 이용해 테이블 병합하기

// 새로운 데이터프레임을 만들고, union을 이용해 graduateProgram 테이블과 병합
graduateProgram2 = graduateProgram.union(spark.createDataFrame([
    (0, 'Master', 'Duplicated Row', 'Duplicated School')
]))
graduateProgram2.createOrReplaceTempView('graduateProgram2')
// graduateProgram2 내용을 쿼리문으로 조회하기
spark.sql("""
    select * from graduateProgram2 where degree == 'Masters'
""").show()

 

- expr을 이용해 join 조건을 작성해서 테이블 join 하기

from pyspark.sql.functions import expr

// person 테이블의 컬럼명 변경하고
// expr 표현식으로 join 조건 (특정 컬럼의 배열에 id가 포함되어 있는지) 작성 후 
// 두 테이블을 join
person.withColumnRenamed('id', 'personID').join(sparkStatus,
                                               expr('array_contains(spark_status, id)')).show()

 

 

데이터 프레임을 읽어오고 저장하기

읽어올 csv, json, parquet 파일은 미리 spark가 있는 경로로 이동해두었다.

 

hadoop 에서 분할해서 처리해야 하기 때문에 저장한 파일이 '폴더'로 생성되며, 폴더 내에는 분할된 'part~'와 같은 파일이 존재한다.

 

- CSV 파일을 읽어온 뒤 저장하기

// csv 파일 읽어오기
csvFile = spark.read.format('csv').option('header','true').option('inferSchema','true')\
.option('mode','FAILFAST').load('2015-summary.csv')
  • “header” option : 데이터프레임의 열 이름이 첫 번째 줄의 값으로 설정할지 여부
  • “inferSchema” option : PySpark가 CSV 파일의 각 열의 데이터 유형을 추론, 추론한 Schema를 데이터프레임의 Schema로 사용
  • “mode”, “FAILFAST” : 잘못된 데이터나 오류가 발생하면 읽기 작업 중단

 

- 데이터 프레임을 tsv 파일로 저장하기

// 읽어온 데이터프레임을 tsv 파일로 저장하기 
csvFile.write.format('csv').mode('overwrite').option('sep','\t')\
.save('my-file.tsv')
  • mode(’overwrite’) : 기존에 동일한 파일 이름이 있다면 덮어 씀
  • option(’sep’,’\t’) : tab으로 필드 구분
    • tsv : tab separated ~
    • csv : comma separated ~

 

- tsv 파일로 저장한 데이터 프레임 읽어오기

// tsv 파일로 저장한 데이터 프레임을 읽어오기
csvFile2 = spark.read.format('csv').option('header','true').option('inferSchema','true')\
.option('mode','FAILFAST').load('my-file.tsv').show()

 

- JSON 파일을 읽어온 뒤 저장하기

// json 파일 읽어오기
jsonFile = spark.read.format('json').option('inferSchema','true')\
.option('mode','FAILFAST').load('2015-summary.json').show()
  • json 파일은 header option이 없다
    • json 형식은 키-값의 dictionary 형태로 데이터가 정의되어 있어 열 헤더가 명시되지 않음

 

- CSV 형식의 데이터프레임을 JSON 형식으로 저장하기

csvFile.write.format('json').mode('overwrite').save('my-json-file.json')

 

 

- Parquet 형식의 데이터프레임을 만들고 저장하기

Parquet :

  • 분석용 데이터를 저장하는 데 사용되는 파일 형식
  • 압축/ 분할 저장/ 스키마 지원 / 서드 파티 지원/ 열 지향 저장
  • 열 지향 저장 : 데이터를 열 단위로 저장, 쿼리를 읽을 때 열 단위로 읽어올 수 있도록, 분산 처리할 때 유용하다는 사실!
// Parquet 형식 데이터프레임 생성을 위해 새로운 세션 생성
from pyspark.sql import SparkSession
// SparkSession.builder -> 세션을 생성하기 위한 빌더 객체 생성
// appName('Create Parquet File') -> 생성된 세션에 애플리케이션의 이름을 설정
//getOrCreate() -> 이미 존재하는 SparkSession을 반환하거나, 존재하지 않을 경우 새로운 SparkSession을 생성
spark = SparkSession.builder.appName('Create Parquet File').getOrCreate()
// Parquet 형식으로 저장할 데이터를 구성하고 데이터프레임으로 만들기 
data = [
        ('a',10),('b',20),('c',30)
]
df = spark.createDataFrame(data, ['name', 'age'])
// 데이터프레임을 Parquet 형식으로 저장하고 내용 확인
df.write.parquet('output_parquet')
spark.read.format('parquet').load('output_parquet').show()

 

 

- ORC 형식의 파일을 읽어오고 저장하기

ORC (Optimized Row Columnar)

  • parquet과 더불어 대규모 데이터를 저장하고 처리하는 파일 형식
  • Apache Hive에서 분산 처리할 때 주로 사용
// ORC 형식의 파일 읽어오기
orcFile = spark.read.format('orc').load('2010-summary.orc')
// 읽어온 ORC 파일을 저장하기
orcFile.write.format('orc').mode('overwrite').save('my-orc-file.orc')