Kafka와 MySQL을 연동해 Kafka 토픽에 전송된 SQL 쿼리를 MySQL 데이터베이스에서 실행할 수 있다.
Kafka의 Producer가 SQL 쿼리를 생성해 Kafka 토픽에 발행하고, Kafka의 Consumer가 이 쿼리를 읽어 MySQL 데이터베이스에서 실행하는 구조이다.
Zookeeper, Kafka 서비스가 자동 실행되도록 배치 파일 만들기
Kafka와 MySQL 연동
Kafka와 MySQL 연결하기 위해 필요한 라이브러리 로드
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
import pymysql
Producer 함수 생성
topic = 'kafka_mysql'
# create producer
def create_producer(topic, bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')):
producer = KafkaProducer(bootstrap_servers=bootstrap_servers, value_serializer=value_serializer)
// 수행할 query 만들고 topic에 실어서 보내기
sql = []
sql.append( '''
create table tbl_kafka_data(
name varchar(10),
email varchar(50),
phone varchar(50)
);'''
)
sql.append("insert into tbl_kafka_data values ('lee','lee@gmail.com','010-1111-2222');")
sql.append("insert into tbl_kafka_data values ('kim','kim@gmail.com','010-3333-4444');")
sql.append("insert into tbl_kafka_data values ('seo','seo@gmail.com','010-5555-6666');")
for data in sql:
producer.send(topic, value=data)
producer.flush()
Consumer 함수 생성
// create consumer
def create_consumer(topic, bootstrap_servers='localhost:9092', auto_offset_reset='earliest',
value_deserializer=lambda v: json.loads(v).encode('utf-8'), consumer_timeout_ms = 1000
):
consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, auto_offset_reset=auto_offset_reset,
value_deserializer=value_deserializer, consumer_timeout_ms=consumer_timeout_ms)
// mysql connect를 위한 information
mysql_con = pymysql.connect(
host='localhost',
user='root',
password='password',
db='epldatabase',
charset='utf8'
)
// 데이터베이스의 쿼리를 실행하기 위한 커서
cursor = mysql_con.cursor()
for message in consumer:
sql = message.value
print(sql)
cursor.execute(sql) // message의 내용을 db에 전송
mysql_con.commit()
mysql_con.close()
Topic 생성
// create topic
def create_topics(topics, bootstrap_servers = 'localhost:9092'):
admin_client = KafkaAdminClient(bootstrap_servers = bootstrap_servers)
topic_list = [ NewTopic(name, num_partitions=1, replication_factor = 1) for name in topics]
admin_client.create_topics(new_topics=topic_list, validate_only=False)
create_producer(topic)
create_consumer(topic)
📌 consumer 생성 시 다음과 같은 오류가 발생했다
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[7], line 1
----> 1 create_consumer(topic)
Cell In[3], line 10, in create_consumer(topic, bootstrap_servers, auto_offset_reset, value_deserializer, consumer_timeout_ms)
5 consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, auto_offset_reset=auto_offset_reset,
6 value_deserializer=value_deserializer, consumer_timeout_ms=consumer_timeout_ms)
9 # mysql connect를 위한 information
---> 10 mysql_con = pymysql.connect(
11 host='localhost',
12 user='root',
13 password='password',
14 db='epldatabase',
15 charset='utf8'
16 )
18 # 데이터베스의 쿼리를 실행하기 위한 커서
19 cursor = mysql_con.cursor()
File /usr/local/lib/python3.9/site-packages/pymysql/connections.py:358, in Connection.__init__(self, user, password, host, database, unix_socket, port, charset, collation, sql_mode, read_default_file, conv, use_unicode, client_flag, cursorclass, init_command, connect_timeout, read_default_group, autocommit, local_infile, max_allowed_packet, defer_connect, auth_plugin_map, read_timeout, write_timeout, bind_address, binary_prefix, program_name, server_public_key, ssl, ssl_ca, ssl_cert, ssl_disabled, ssl_key, ssl_verify_cert, ssl_verify_identity, compress, named_pipe, passwd, db)
356 self._sock = None
357 else:
--> 358 self.connect()
File /usr/local/lib/python3.9/site-packages/pymysql/connections.py:664, in Connection.connect(self, sock)
661 self._next_seq_id = 0
663 self._get_server_information()
--> 664 self._request_authentication()
666 # Send "SET NAMES" query on init for:
667 # - Ensure charaset (and collation) is set to the server.
668 # - collation_id in handshake packet may be ignored.
(...)
677 # - https://github.com/wagtail/wagtail/issues/9477
678 # - https://zenn.dev/methane/articles/2023-mysql-collation (Japanese)
679 self.set_character_set(self.charset, self.collation)
File /usr/local/lib/python3.9/site-packages/pymysql/connections.py:976, in Connection._request_authentication(self)
974 # https://dev.mysql.com/doc/internals/en/successful-authentication.html
975 if self._auth_plugin_name == "caching_sha2_password":
--> 976 auth_packet = _auth.caching_sha2_password_auth(self, auth_packet)
977 elif self._auth_plugin_name == "sha256_password":
978 auth_packet = _auth.sha256_password_auth(self, auth_packet)
File /usr/local/lib/python3.9/site-packages/pymysql/_auth.py:266, in caching_sha2_password_auth(conn, pkt)
263 if DEBUG:
264 print(conn.server_public_key.decode("ascii"))
--> 266 data = sha2_rsa_encrypt(conn.password, conn.salt, conn.server_public_key)
267 pkt = _roundtrip(conn, data)
File /usr/local/lib/python3.9/site-packages/pymysql/_auth.py:143, in sha2_rsa_encrypt(password, salt, public_key)
138 """Encrypt password with salt and public_key.
139
140 Used for sha256_password and caching_sha2_password.
141 """
142 if not _have_cryptography:
--> 143 raise RuntimeError(
144 "'cryptography' package is required for sha256_password or"
145 + " caching_sha2_password auth methods"
146 )
147 message = _xor_password(password + b"\0", salt)
148 rsa_key = serialization.load_pem_public_key(public_key, default_backend())
RuntimeError: 'cryptography' package is required for sha256_password or caching_sha2_password auth methods
→ 이 오류는 pymysql 패키지가 새로운 MySQL 인증 방법인 'caching_sha2_password'를 처리하기 위해 'cryptography' 패키지가 필요하다는 것을 나타낸다.
이 패키지가 없으면 pymysql이 MySQL 서버에 연결할 때 해당 인증 방법을 처리할 수 없어 오류가 발생한다.
문제를 해결하기 위해 'cryptography' 패키지 설치
!pip install cryptography
출력 결과가 바이너리 데이터로 보이지만 실제로는 문자열이다.
'b'는 바이너리 문자열을 나타내는 것으로, 실제 문자열 데이터는 이 'b' 뒤에 나오게 된다.
이 문자열들은 각각의 SQL 쿼리를 나타내며, 데이터베이스에 적용될 수 있다.
MySQL 접속해서 테이블 조회하기
mysql -u root -p
'Data Engineering > Kafka' 카테고리의 다른 글
[Linux/Kafka] Spark Streaming을 이용해 MySQL에 데이터를 실시간 적재해보자 (0) | 2024.08.22 |
---|---|
[Linux/Kafka] Apache Kafka를 설치하고 Producer/Consumer/Topic을 생성해보자 (0) | 2024.08.22 |
[Linux/Kafka] Apache Kafka를 알아보자 (0) | 2024.08.22 |