通过下面这个实例来演示Kafka与Mysql的组合使用
假设有一个学生表student,编写python程序完成如下操作
1读取student表的数据内容,将其转为JSON格式,发送给Kafka
2从Kafka中获取JSON格式数据,打印出来
---------------------------------------------------->
在使用Python操作Mysql之前,需要安装第三方模块python-kafka(在windows命令窗口)
win+r--->输入cmd然后回车
会出现一个小黑窗
输入命令pip install kafka-python安装python-kafka模块
查看我们安装的模块的版本信息(出现kafka-python2.0.2表示我们安装模块成功)
一、先在Windows命令窗口连接上mysql
win+r---->输入cmd---->回车
会出现一个小黑窗,在小黑窗中输入mysql -u root -p然后回车输入密码
二,在school001数据库下创建student表
1.创建school001数据库: create database school001;
2.查看现有的数据库:show databases;
如果有school001表示我们创建库成功
3.使用school001数据库:use school001;
4.创建student表:create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));
5. 查看数据库中的表:show tables;
6.向表中插入两条数据
第一条: insert into student values("95001","John","M",23);
第二条: insert into student values("95002","Tom","M",23);
7.查看student表中的数据:
(到这里我们的student表就创建成功了!)
三、在python中创建producer.py文件读取student表的数据内容,将其转为JSON格式,发送给Kafka
# 运行前先在win上启动zookeap和kafka # 导入相关模块 from kafka import KafkaProducer import json # 连接kafka json.dumps(v).encode('utf-8')将json格式的数抠转挨为字节类型,然后使用ut了-8进行编码 producer = KafkaProducer(bootstrap_servers = 'localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8')) # 定义一个json格式的数第,json格式以键值对形式保存数掂,每个键值对之间使用逗号隔开 data = { 'sno':'95001', 'sname':'John', 'ssex':'M', 'sage':19 } # 发送数据 producer.send('test001',data) # 关闭资源 producer.close()
四、在python中创建consumer.py文件从Kafka中获取JSON格式数据
# 运行前先在win上启动mysql # 导入消费模块 import json # 导入kafka的消费模块 from kafka import KafkaConsumer import json import pymysql.cursors # 连接kafka consumer = KafkaConsumer('test001',bootstrap_servers = 'localhost:9092',group_id=None,auto_offset_reset='earliest') # 对获取的数据进行解析 for msg in consumer: # 转换为字符串类型 msg1 = str(msg.value,encoding=('utf-8')) # 将字符串的数据加载为字典 dict = json.loads(msg1) # 连接数据库 connect = pymysql.Connect( host='localhost', port=3306, user='root', passwd='123456', db='school001', charset='utf8' ) # 获取操作数抠库的对象<游标> cursor = connect.cursor() #将数抠织存到mysqL(插入数掷) # 定义sql语句 sql = "select * from student;" # 将数掐作为参敏传速给sqL,保存到hrgsql cursor.execute(sql) # 提交 connect.commit() for row in cursor.fetchall(): print("sno:%s\tsname:%s\tssex:%s\tsage:%d" % row) print("共查询出", cursor.rowcount, '条数据') connect.close()
五、运行
1.先在windows命令窗口开启 Zookeeper和Kafka
开启 Zookeeper和Kafka可以参考:(14条消息) Kafka的安装和使用(Windows中)_瑾寰的博客-CSDN博客https://blog.csdn.net/qq_68383591/article/details/130314335?spm=1001.2014.3001.55012.先运行producer.py再运行consumer.py
(在consumer.py中可以看到student表中的两条数据表示我们成功了!)