Topic 1 Producer, Consumer 생성하기
-- 필요한 라이브러리 로드
from kafka import KafkaProducer
import numpy as np
from time import time, sleep
import os
import json
- 평균이 0이고 표준 편차가 1인 정규 분포에서 하나의 샘플을 추출하여 temp 리스트에 저장하기
temp = [np.random.normal(loc=0.0, scale=1.0, size=None)]
- 정규 분포에서 무작위로 데이터를 생성하여 Kafka 클러스터로 전송하는 Producer 생성
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer = lambda v: json.dumps(v).encode('utf-8'))
count=0
while True:
producer.send('topic1',value=np.random.normal())
print('sending data to kafka')
sleep(.5)
count+=1
if count % 10 == 0:
print('topic1 producer....')
- temp 리스트에 있는 모든 데이터를 'random_data'라는 주제로 Kafka 클러스터로 보내는 작업을 수행
for data in temp:
producer.send('random_data', value = temp)
producer.flush()
- Kafka 클러스터에서 'random_data' 토픽을 구독하는 KafkaConsumer 생성
from kafka import KafkaConsumer
consumer = KafkaConsumer('random_data', value_deserializer = lambda v: json.loads(v.decode('utf-8')),
auto_offset_reset = 'earliest',
consumer_timeout_ms = 1000*5, max_poll_interval_ms=1000*10, group_id='g1')
Topic 2 Producer 생성하기 (data.txt 파일 이용)
data.txt 파일은 Kafka 실습용으로 구한 임의의 파일이다.
- 필요한 라이브러리 로드
from kafka import KafkaProducer
import numpy as np
from time import time, sleep
import os
import json
import random
- 'data.txt' 파일을 열어서 각 줄의 내용을 읽고, 각 줄을 공백 없이 분리하여 리스트로 저장
with open('data.txt', 'r', encoding='utf-8') as f:
words= f.read().splitlines()
- KafkaProducer를 사용하여 Kafka 클러스터로 메시지를 전송하는 Producer 생성
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer = lambda v: json.dumps(v).encode('utf-8'))
count=0
while True:
index = random.randint(0, len(words)-1)
producer.send('topic2', value=words[index])
print('sending data to kafka')
sleep(1)
count+=1
if count % 10 == 0:
print('topic2 producer....')
Kafka와 Spark Streaming 연동하기
📌 연동하지 않은 상태에서 코드를 실행하면 다음과 같은 오류가 발생한다
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
📌 Terminal에서 spark 경로로 이동 → 다음 명령어 입력 후 실행되는 Jupyter Lab에서 코드 실행
./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1
topic1, topic2를 생성하는 Producer는 kafka/pysource 위치에서 pyspark를 실행해서 만들었다.
동일한 위치에서 Spark streaming 처리를 하려고 했더니 위의 오류가 발생했다.
Spark streaming 처리를 할 때, 생성한 Producer에서 계속해서 topic이 생성되는 상태여야 한다.
(속도가 느려져서 producer 실행을 중지하고 streaming 처리를 시도했더니 오류가 발생했다..)
- 필요한 라이브러리 로드
from pyspark.sql import SparkSession, streaming
from pyspark.sql.functions import col, expr
- Spark session 생성
spark = SparkSession.builder.appName('kafka_spark').getOrCreate()
- foreachBatch 함수 생성, 스트리밍 데이터의 각 배치를 처리 후 MySQL 데이터베이스에 저장
def foreach_batch_function(batch_df, batch_id):
url = "jdbc:mysql://localhost:3306/epldatabase?useSSL=false"
driver='com.mysql.jdbc.Driver'
user='root'
password='password'
tablename = 'tbl_kafka_spark'
mode = 'append'
props = {'driver':driver,'user':user,'password':password}
params = {
'url': url,
'table' : tablename,
'properties' : props,
'mode' : mode
}
batch_df.write.jdbc(**params)
- Terminal에서 MySQL 접속 후 테이블 생성
USE 사용할_데이터베이스_이름;
CREATE TABLE 생성할_테이블_이름 (
topic VARCHAR(50),
value VARCHAR(50));
- Apache Spark를 사용하여 Kafka 토픽에서 스트리밍 데이터를 읽어와서 처리
kafka_options = {
'kafka.bootstrap.servers':'localhost:9092',
'subscribe':'topic1,topic2'
}
df = spark.readStream.format('kafka')\
.options(**kafka_options).load()
rawDF = df.selectExpr('topic','CAST(value as STRING)')
query = rawDF.writeStream\
.format('console')\
.trigger(processingTime='5 seconds')\
.foreachBatch(foreach_batch_function)\
.start()
query.awaitTermination()
→ 코드 실행하면 위와 같은 진행 상황 로그가 보인다.
- 생성한 MySQL 테이블에 데이터가 실시간으로 적재되는지 확인
'Data Engineering > Kafka' 카테고리의 다른 글
[Linux/Kafka] Apache Kafka와 MySQL을 연동해보자 (0) | 2024.08.22 |
---|---|
[Linux/Kafka] Apache Kafka를 설치하고 Producer/Consumer/Topic을 생성해보자 (0) | 2024.08.22 |
[Linux/Kafka] Apache Kafka를 알아보자 (0) | 2024.08.22 |