본문 바로가기

DKE/MySQL

Kafka와 MySQL 연동하기 (with Python) / 2023.01.31

1. 필요한 라이브러리 설치

- kafka-python :$pip install kafka-python

- pymysql :$pip install pymysql

2. Jookeeper, Kafka 실행

- Jookeeper 실행 :$ /tools/zookeeper/bin/zkServer.sh start

- Kafka 실행 :$ /tools/kafka/bin/kafka-server-start.sh -daemon /tools/kafka/config/server.properties

실행 확인

3. Producer

- Producer는 sql구문을 Topic에 저장하는 역할을 한다

 

from kafka import KafkaProducer
from json import dumps

#producer 객체 생성
producer = KafkaProducer(acks=0, compression_type='gzip',
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda x : dumps(x, default=str).encode('utf-8'))

sql = []
sql.append('''CREATE TABLE 테이블명 (
    name varchar(11),
    email varchar(255),
    phone varchar(255))''')
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")

#data를 topic에 저장
for data in sql:
    producer.send('Topic 이름', value=data)
    producer.flush()

 

4. Consumer

- Consumer는 Topic에 있는 Message를 불러오는 역할을 한다

- pymysql 라이브러리를 활용해 Message를 DB에 적용한다

 

import pymysql
from kafka import KafkaConsumer
from json import loads

#mysql에 접속
mysql_con = pymysql.connect(
    host='localhost',
    user='root',
    password='mysql비밀번호',
    db='DB이름',
    charset='utf8'
)

#KafkaConsumer 객체 생성
consumer = KafkaConsumer('토픽이름', bootstrap_servers='localhost:9092',
enable_auto_commit=True, auto_offset_reset='earliest',
value_deserializer=lambda x : loads(x).encode().decode('utf-8'),
consumer_timeout_ms=1000)

#커서 가져오기
cursor = mysql_con.cursor()

#consumer에 있는 message 
for message in consumer: 
    sql = message.value
    cursor.execute(sql) #message DB에 적용

mysql_con.commit()
mysql_con.close()

 

5. 결과 확인

- Topic에 저장한 sql 구문이 적용된 것을 확인할 수 있다

결과