Jupyterlab 사용하기
- Jupyter lab 설치하기
- bashrc 파일에 경로 추가하기
- 설치 및 환경 설정 후 pyspark를 실행하면 자동으로 브라우저가 열리며 Jupyterlab이 실행된다.
Jupyter lab 환경에서 PySpark를 이용해 데이터 다뤄보기
- 데이터를 읽어오고 쿼리화 하기
// 데이터 로드하기
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("./bydata/by-day/*.csv")
// 임시 테이블 생성
staticDataFrame.createOrReplaceTempView("retail_data")
// 데이터 프레임의 스키마구조를 복사
staticSchema = staticDataFrame.schema
// 정적인 데이터를 쿼리화
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)
- 스트리밍을 이용한 데이터 실시간 처리
// 실시간 처리를 위해서 스트리밍 기술을 사용
// 특정폴더의 데이터를 수집
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("./bydata/by-day/*.csv")
// 수집한 데이터를 출력 - 테스트
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")
// 실시간 처리를 위해서 reading 하고 있는데이터를 메모리에 실시간으로 적재
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()
// 메모리에 적재된 데이터를 읽어옴
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)
- PySpark의 라이브러리를 이용한 데이터 전처리
// 스파크를이용한 전처리 방법
// 결측치(NA )를 0으로 채움
// 피처를 발굴 InvoiceDate를 기준으로 요일을 full name으로 추출해서 day_of_week 컬럼을 만들고 저장
// 날짜와같은 연속형 데이터를 범주형 데이터로 새롭게 추출
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)
// 스파크에서 지원하는 머신러닝을위한 훈련데이터와 검증데이터를 만들기
trainDataFrame = preppedDataFrame\
.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
.where("InvoiceDate >= '2011-07-01'")
// 범주형 변수를 숫자형으로 바꾸는 작업 - 높낮이나 크기가 존재하는 범주형 데이터
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer().setInputCol("day_of_week").setOutputCol("day_of_week_index")
- one hot encoding (Label encoding)
데이터 사이의 연관성을 없애는 방법
사과, 딸기, 바나나 → 001, 010, 100
// 숫자형 데이터를 가지고 변경
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder().setInputCol('day_of_week_index').setOutputCol('day_of_week_encoded')
from pyspark.ml.feature import VectorAssembler
// 여러 개의 열을 하나의 작업으로 수행
vectorAssembler = VectorAssembler().setInputCols(["UnitPrice","day_of_week_encoded"]).setOutputCol("features")
- Pipeline을 만들어서 여러기능들을 하나로 묶어 실행
// 위에서 작성한 기능들을 하나로 연결
// MLOps : 각각의 기능들을 하나로 묶어서 하나의 파이프라인을 구축
// 파이프라인을 구축해서 각 단계별로 구성한 기능을 순차적으로 실행 또는 적용
from pyspark.ml import Pipeline
transformationPipeLine = Pipeline().setStages([indexer, encoder, vectorAssembler])
// 파이프라인 실행 -- fit
fittedPipeLine = transformationPipeLine.fit(trainDataFrame)
// 변환
trainsformedTraining = filtedPipeLine.transform(trainDataFrame)
trainsformedTraining.show()
- 데이터프레임을 생성하고 쿼리문을 이용해 다양하게 조회하기
// 새로운 Spark 세션 생성
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
// 데이터 프레임 생성
df = spark.createDataFrame([
("lee",32),("kim",20),("hong",30),("cho",50)
]
,["name","age"]
)
// 생성했던 데이터프레임에 컬럼 생성하기
from pyspark.sql.functions import col, when
// 기존 데이터
df1=df.withColumn(
'life_state',
when(col('age')<30, 'young')
.when(col('age').between(20,30), 'middle')
.otherwise("old")
)
'Data Engineering > Spark' 카테고리의 다른 글
[Linux/Spark] 데이터 분석을 위한 Scala에 대해 알아보자 (0) | 2024.08.22 |
---|---|
[Linux/Spark] Apache Spark에 대해 알아보자 (0) | 2024.08.22 |
[Linux/Spark] PySpark를 이용해 쿼리 조회하고 데이터 CRUD 해보자 (0) | 2024.08.22 |
[Linux/Spark] PySpark와 MySQL을 연동해보자 (0) | 2024.08.22 |
[Linux/Spark] Apache Spark를 설치해보자 (0) | 2024.08.22 |