Data Engineering/Spark

[Linux/Spark] 노트북 환경에서 Spark 사용하기 (Jupyterlab)

seoraroong 2024. 8. 22. 00:00

Jupyterlab 사용하기

- Jupyter lab 설치하기

 

- bashrc 파일에 경로 추가하기

 

- 설치 및 환경 설정 후 pyspark를 실행하면 자동으로 브라우저가 열리며 Jupyterlab이 실행된다.

 

 

 

Jupyter lab 환경에서 PySpark를 이용해 데이터 다뤄보기

 

- 데이터를 읽어오고 쿼리화 하기

// 데이터 로드하기 
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("./bydata/by-day/*.csv")

 

// 임시 테이블 생성
staticDataFrame.createOrReplaceTempView("retail_data")
// 데이터 프레임의 스키마구조를 복사
staticSchema = staticDataFrame.schema

 

// 정적인 데이터를 쿼리화 
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .show(5)

 

 

- 스트리밍을 이용한 데이터 실시간 처리

// 실시간 처리를 위해서 스트리밍 기술을 사용
// 특정폴더의 데이터를 수집
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("./bydata/by-day/*.csv")
// 수집한 데이터를 출력 - 테스트
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")
// 실시간 처리를 위해서 reading 하고 있는데이터를 메모리에 실시간으로 적재
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()
// 메모리에 적재된 데이터를 읽어옴
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)

 

 

- PySpark의 라이브러리를 이용한 데이터 전처리

// 스파크를이용한 전처리 방법
// 결측치(NA )를 0으로 채움
// 피처를 발굴 InvoiceDate를 기준으로 요일을 full name으로 추출해서 day_of_week 컬럼을 만들고 저장
// 날짜와같은 연속형 데이터를 범주형 데이터로 새롭게 추출
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)
  
// 스파크에서 지원하는 머신러닝을위한 훈련데이터와 검증데이터를 만들기
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")

 

// 범주형 변수를 숫자형으로 바꾸는 작업 - 높낮이나 크기가 존재하는 범주형 데이터 
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer().setInputCol("day_of_week").setOutputCol("day_of_week_index")

 

 

- one hot encoding (Label encoding)

데이터 사이의 연관성을 없애는 방법

사과, 딸기, 바나나 → 001, 010, 100

 

// 숫자형 데이터를 가지고 변경
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder().setInputCol('day_of_week_index').setOutputCol('day_of_week_encoded')
from pyspark.ml.feature import VectorAssembler
// 여러 개의 열을 하나의 작업으로 수행
vectorAssembler = VectorAssembler().setInputCols(["UnitPrice","day_of_week_encoded"]).setOutputCol("features")

 

 

- Pipeline을 만들어서 여러기능들을 하나로 묶어 실행

// 위에서 작성한 기능들을 하나로 연결
// MLOps : 각각의 기능들을 하나로 묶어서 하나의 파이프라인을 구축
// 파이프라인을 구축해서 각 단계별로 구성한 기능을 순차적으로 실행 또는 적용
from pyspark.ml import Pipeline
transformationPipeLine = Pipeline().setStages([indexer, encoder, vectorAssembler])
// 파이프라인 실행 -- fit
fittedPipeLine = transformationPipeLine.fit(trainDataFrame)
// 변환
trainsformedTraining = filtedPipeLine.transform(trainDataFrame)

trainsformedTraining.show()

 

 

- 데이터프레임을 생성하고 쿼리문을 이용해 다양하게 조회하기

// 새로운 Spark 세션 생성 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
// 데이터 프레임 생성
df = spark.createDataFrame([
    ("lee",32),("kim",20),("hong",30),("cho",50)
]
,["name","age"]
                          )
// 생성했던 데이터프레임에 컬럼 생성하기
from pyspark.sql.functions import col, when

// 기존 데이터
df1=df.withColumn(
    'life_state',
    when(col('age')<30, 'young')
    .when(col('age').between(20,30), 'middle')
    .otherwise("old")
)