没有合适的资源?快使用搜索试试~ 我知道了~
首页python3实现从kafka获取数据,并解析为json格式,写入到mysql中
资源详情
资源评论
资源推荐
python3实现从实现从kafka获取数据获取数据,并解析为并解析为json格式格式,写入到写入到mysql中中
今天小编就为大家分享一篇python3实现从kafka获取数据,并解析为json格式,写入到mysql中,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
项目需求:将kafka解析来的日志获取到数据库的变更记录,按照订单的级别和订单明细级别写入数据库,一条订单的所有信息包括各种维度信息均保存在一条json中,写入mysql5.7中。
配置信息:配置信息:
[Global]
kafka_server=xxxxxxxxxxx:9092
kafka_topic=mes
consumer_group=test100
passwd = tracking
port = 3306
host = xxxxxxxxxx
user = track
schema = track
dd_socket =
dd_host = xxxxxxxxxxxx
dd_port = 3306
dd_user = xxxxxxxxx
dd_passwd = xxxxxxxx
代码又长又丑,半吊子,只完成了面向过程的编程,没做到对象,将就看,有问题可以联系我
代码:
#encoding=utf-8
import datetime
import configparser
import re
import pymysql
from vertica_python import connect
import vertica_python
import json
from confluent_kafka import Consumer, KafkaError
import csv
import logging
import os
import time
import signal
import sys
#写日志
logging.basicConfig(filename=os.path.join(os.getcwd(), 'log_tracking.txt'), level=logging.WARN, filemode='a',format='%(asctime)s - %(levelname)s: %(message)s')
def writeErrorLog(errSrc, errType, errMsg):
try:
v_log_file = 'err_tracking.log';
v_file = open(v_log_file, 'a')
v_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType +" : " + errMsg + '')
v_file.flush()
except Exception as data:
v_err_file = open('err_tracking.log', 'a')
v_err_file.write(str(data) + '')
v_err_file.write(datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType + " : " + errMsg + '')
v_err_file.flush()
v_err_file.close()
finally:
v_file.close()
class RH_Consumer:
#读取配置文件的配置信息,并初始化一些类需要的变量
def __init__(self):
self.config = configparser.ConfigParser()
self.config.read('config.ini')
self.host = self.config.get('Global', 'host')
self.user = self.config.get('Global', 'user')
self.passwd = self.config.get('Global', 'passwd')
self.schema = self.config.get('Global', 'schema')
self.port = int(self.config.get('Global', 'port'))
self.kafka_server = self.config.get('Global', 'kafka_server')
self.kafka_topic = self.config.get('Global', 'kafka_topic')
self.consumer_group = self.config.get('Global', 'consumer_group')
self.dd_host = self.config.get('Global', 'dd_host')
self.dd_user = self.config.get('Global', 'dd_user')
self.dd_passwd = self.config.get('Global', 'dd_passwd')
self.dd_port = int(self.config.get('Global', 'dd_port'))
self.dd_socket = self.config.get('Global', 'dd_socket')
self.operation_time = datetime.datetime.now()
self.stop_flag = 0
self.src_table_name = []
self.__init_db()
self.__init_mes_db()
self._get_all_src_table()
#连接写入目标数据库
def __init_db(self):
try:
self.conn_info = {'host': self.host,'port': self.port,'user': self.user,'password': self.passwd,'db': 'tracking'}
self.mysql_db = pymysql.connect(**self.conn_info, charset="utf8" )
self.mysql_cur = self.mysql_db.cursor()
except Exception as data:
writeErrorLog('__init_db', 'Error', str(data))
#连接生产数据库,用于获取相关维度信息
def __init_mes_db(self):
try:
self.mes_mysql_db = pymysql.connect(host=self.dd_host, user=self.dd_user, passwd=self.dd_passwd,port=self.dd_port, unix_socket=self.dd_socket, charset="utf8")
self.mes_mysql_cur = self.mes_mysql_db.cursor()
except Exception as data:
writeErrorLog('__init_db', 'Error', str(data))
#关闭数据库
def _release_db(self):
self.mysql_cur.close()
self.mysql_db.close()
self.mes_mysql_cur.close()
self.mes_mysql_db.close()
#获取所有的配置表信息(需要获取的表)
def _get_all_src_table(self):
try:
# 获取table的信息
select_src_table_names = "select distinct src_table_name from tracking.tracking_table_mapping_rule"
self.mysql_cur.execute(select_src_table_names)
rows = self.mysql_cur.fetchall()
for item in rows:
self.src_table_name.append(item[0])
return self.src_table_name
except Exception as data:
writeErrorLog('_get_all_src_table', 'Error', str(data))
logging.error('_get_all_src_table: ' + str(data))
#获取src表的目标表信息
def _get_tgt_table_name(self,table_name,table_schema):
try:
# 获取table的信息(table_name是schema|tablename)
select_tgt_table_names = "select distinct tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and src_table_schema = '%s'" %(table_name,table_schema)
self.mysql_cur.execute(select_tgt_table_names)
rows = self.mysql_cur.fetchall()
tgt_table_names=[]
weixin_38529239
- 粉丝: 4
- 资源: 927
上传资源 快速赚钱
- 我的内容管理 收起
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
会员权益专享
最新资源
- RTL8188FU-Linux-v5.7.4.2-36687.20200602.tar(20765).gz
- c++校园超市商品信息管理系统课程设计说明书(含源代码) (2).pdf
- 建筑供配电系统相关课件.pptx
- 企业管理规章制度及管理模式.doc
- vb打开摄像头.doc
- 云计算-可信计算中认证协议改进方案.pdf
- [详细完整版]单片机编程4.ppt
- c语言常用算法.pdf
- c++经典程序代码大全.pdf
- 单片机数字时钟资料.doc
- 11项目管理前沿1.0.pptx
- 基于ssm的“魅力”繁峙宣传网站的设计与实现论文.doc
- 智慧交通综合解决方案.pptx
- 建筑防潮设计-PowerPointPresentati.pptx
- SPC统计过程控制程序.pptx
- SPC统计方法基础知识.pptx
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0