1. 필요한 라이브러리 설치
- kafka-python :$pip install kafka-python
- pymysql :$pip install pymysql
2. Jookeeper, Kafka 실행
- Jookeeper 실행 :$ /tools/zookeeper/bin/zkServer.sh start
- Kafka 실행 :$ /tools/kafka/bin/kafka-server-start.sh -daemon /tools/kafka/config/server.properties
3. Producer
- Producer는 sql구문을 Topic에 저장하는 역할을 한다
from kafka import KafkaProducer
from json import dumps
#producer 객체 생성
producer = KafkaProducer(acks=0, compression_type='gzip',
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x : dumps(x, default=str).encode('utf-8'))
sql = []
sql.append('''CREATE TABLE 테이블명 (
name varchar(11),
email varchar(255),
phone varchar(255))''')
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
sql.append("INSERT INTO 테이블명 VALUES('이름', '메일주소', '010-');")
#data를 topic에 저장
for data in sql:
producer.send('Topic 이름', value=data)
producer.flush()
4. Consumer
- Consumer는 Topic에 있는 Message를 불러오는 역할을 한다
- pymysql 라이브러리를 활용해 Message를 DB에 적용한다
import pymysql
from kafka import KafkaConsumer
from json import loads
#mysql에 접속
mysql_con = pymysql.connect(
host='localhost',
user='root',
password='mysql비밀번호',
db='DB이름',
charset='utf8'
)
#KafkaConsumer 객체 생성
consumer = KafkaConsumer('토픽이름', bootstrap_servers='localhost:9092',
enable_auto_commit=True, auto_offset_reset='earliest',
value_deserializer=lambda x : loads(x).encode().decode('utf-8'),
consumer_timeout_ms=1000)
#커서 가져오기
cursor = mysql_con.cursor()
#consumer에 있는 message
for message in consumer:
sql = message.value
cursor.execute(sql) #message DB에 적용
mysql_con.commit()
mysql_con.close()
5. 결과 확인
- Topic에 저장한 sql 구문이 적용된 것을 확인할 수 있다
'DKE > MySQL' 카테고리의 다른 글
MySQL (Workbench) 설치하기 / 2023.02.15 (0) | 2023.02.15 |
---|---|
[MySQL] 리눅스에 MySQL 설치하기 / 2023.01.27 (0) | 2023.01.27 |
[MySQL] ERROR 1698 (28000): Access denied for user 'root'@'localhost' 해결 방법 / 2023.01.24 (0) | 2023.01.24 |