Data Engineering/Kafka

[Linux/Kafka] Spark Streaming을 이용해 MySQL에 데이터를 실시간 적재해보자

seoraroong 2024. 8. 22. 17:00

Topic 1 Producer, Consumer 생성하기

 

-- 필요한 라이브러리 로드

from kafka import KafkaProducer
import numpy as np
from time import time, sleep
import os
import json

 

- 평균이 0이고 표준 편차가 1인 정규 분포에서 하나의 샘플을 추출하여 temp 리스트에 저장하기

temp = [np.random.normal(loc=0.0, scale=1.0, size=None)]

 

- 정규 분포에서 무작위로 데이터를 생성하여 Kafka 클러스터로 전송하는 Producer 생성

producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer = lambda v: json.dumps(v).encode('utf-8'))
count=0
while True:
    producer.send('topic1',value=np.random.normal())
    print('sending data to kafka')
    sleep(.5)
    count+=1
    if count % 10 == 0:
        print('topic1 producer....')

 

- temp 리스트에 있는 모든 데이터를 'random_data'라는 주제로 Kafka 클러스터로 보내는 작업을 수행

for data in temp:
    producer.send('random_data', value = temp)
producer.flush()

 

- Kafka 클러스터에서 'random_data' 토픽을 구독하는 KafkaConsumer 생성

from kafka import KafkaConsumer
consumer = KafkaConsumer('random_data', value_deserializer = lambda v: json.loads(v.decode('utf-8')),
                        auto_offset_reset = 'earliest',
                        consumer_timeout_ms = 1000*5, max_poll_interval_ms=1000*10, group_id='g1')

 

 

Topic 2 Producer 생성하기 (data.txt 파일 이용)

data.txt 파일은 Kafka 실습용으로 구한 임의의 파일이다.

 

- 필요한 라이브러리 로드

from kafka import KafkaProducer
import numpy as np
from time import time, sleep
import os
import json
import random

 

- 'data.txt' 파일을 열어서 각 줄의 내용을 읽고, 각 줄을 공백 없이 분리하여 리스트로 저장

with open('data.txt', 'r', encoding='utf-8') as f:
    words= f.read().splitlines()

 

- KafkaProducer를 사용하여 Kafka 클러스터로 메시지를 전송하는 Producer 생성

producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer = lambda v: json.dumps(v).encode('utf-8'))
count=0
while True:
    index = random.randint(0, len(words)-1)
    producer.send('topic2', value=words[index])
    print('sending data to kafka')
    sleep(1)
    count+=1
    if count % 10 == 0:
        print('topic2 producer....')

 

 

Kafka와 Spark Streaming 연동하기

📌 연동하지 않은 상태에서 코드를 실행하면 다음과 같은 오류가 발생한다

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

 

📌 Terminal에서 spark 경로로 이동 → 다음 명령어 입력 후 실행되는 Jupyter Lab에서 코드 실행

./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1

 

topic1, topic2를 생성하는 Producer는 kafka/pysource 위치에서 pyspark를 실행해서 만들었다.

동일한 위치에서 Spark streaming 처리를 하려고 했더니 위의 오류가 발생했다.

 

Spark streaming 처리를 할 때, 생성한 Producer에서 계속해서 topic이 생성되는 상태여야 한다.

(속도가 느려져서 producer 실행을 중지하고 streaming 처리를 시도했더니 오류가 발생했다..)

 

 

- 필요한 라이브러리 로드

from pyspark.sql import SparkSession, streaming
from pyspark.sql.functions import col, expr

 

- Spark session 생성

spark = SparkSession.builder.appName('kafka_spark').getOrCreate()

 

- foreachBatch 함수 생성, 스트리밍 데이터의 각 배치를 처리 후 MySQL 데이터베이스에 저장

def foreach_batch_function(batch_df, batch_id):
    url = "jdbc:mysql://localhost:3306/epldatabase?useSSL=false"
    driver='com.mysql.jdbc.Driver'
    user='root'
    password='password'
    tablename = 'tbl_kafka_spark'
    mode = 'append'
    props = {'driver':driver,'user':user,'password':password}
    params = {
        'url': url,
        'table' :  tablename,
        'properties' : props,
        'mode' : mode
    }
    batch_df.write.jdbc(**params)

 

- Terminal에서 MySQL 접속 후 테이블 생성

USE 사용할_데이터베이스_이름;
CREATE TABLE 생성할_테이블_이름 (
	topic VARCHAR(50),
	value VARCHAR(50));

 

- Apache Spark를 사용하여 Kafka 토픽에서 스트리밍 데이터를 읽어와서 처리

kafka_options = {
    'kafka.bootstrap.servers':'localhost:9092',
    'subscribe':'topic1,topic2'
}
df = spark.readStream.format('kafka')\
                    .options(**kafka_options).load()

rawDF = df.selectExpr('topic','CAST(value as STRING)')
query = rawDF.writeStream\
        .format('console')\
        .trigger(processingTime='5 seconds')\
        .foreachBatch(foreach_batch_function)\
        .start()
query.awaitTermination()

→ 코드 실행하면 위와 같은 진행 상황 로그가 보인다.

 

- 생성한 MySQL 테이블에 데이터가 실시간으로 적재되는지 확인