提交数据库相关的代码

This commit is contained in:
halliday2023 2024-02-25 17:02:46 +08:00
parent 214c5ec3c4
commit 8596551899
4 changed files with 119 additions and 12 deletions

17
bin/db/DBCommon.py Normal file
View File

@ -0,0 +1,17 @@
# -*- coding:utf-8 -*-
from importlib import import_module
def get_db_info(DBType):
db_dict = {}
moduleName = DBType + "Config"
# 合并处理
if DBType in ["MySQL5","MySQL8"]:
moduleName = "MySQLConfig"
elif DBType in ["PG","DM","Gbase8s"]:
moduleName = "PGConfig"
# 调用对应的模块
module = import_module(moduleName)
db_dict = module.db_config()
return db_dict

81
bin/db/MySQLConfig.py Normal file
View File

@ -0,0 +1,81 @@
# -*- coding:utf-8 -*-
from common import faker_data
from common import global_var
from datetime import datetime
# import yaml
from pathlib import Path
#定义公共的部分
path = Path(__file__)
etc_dir = path.parent.parent / "etc"
config_file_patch = etc_dir / "db_config.yml"
# # 获取配置文件
# with open(config_file_patch, "r", encoding='utf-8') as fy:
# config = yaml.safe_load(fy)
# # print(config)
def get_now():
week = str(datetime.now().isocalendar()[1])
return(week)
def db_config():
mysql_info_dict={}
mysql_info_dict['driver_name'] = "com.mysql.cj.jdbc.Driver"
mysql_info_dict['driver_jar'] = "mysql-connector-java-8.0.29.jar"
driver_jar_path = etc_dir / "driver" / mysql_info_dict['driver_jar']
mysql_info_dict['driver_jar_path'] = str(driver_jar_path)
#这里要连接到具体的库
mysql_info_dict['jdbc_url'] = "jdbc:mysql://" + config['IP'] + ":" + config['PROT'] + "/" + config['DATABASES']
mysql_info_dict['db_user'] = config['DBUSER']
mysql_info_dict['db_password'] = config['DBPASSWD']
#create table sql
week = get_now()
table_name = "mysql_test_table_w" + week
create_sql_string = "CREATE TABLE IF NOT EXISTS `"
create_sql_string += table_name
create_sql_string += "` ("
create_sql_string += "`uuid` varchar(50) DEFAULT NULL,"
create_sql_string += "`id` bigint DEFAULT NULL,"
create_sql_string += "`name` varchar(50) DEFAULT NULL,"
create_sql_string += "`mobile` varchar(50) DEFAULT NULL,"
create_sql_string += "`ssn` varchar(50) DEFAULT NULL,"
create_sql_string += "`sex` int DEFAULT NULL,"
create_sql_string += "`email` varchar(50) DEFAULT NULL,"
create_sql_string += "`job` varchar(50) DEFAULT NULL,"
create_sql_string += "`address` varchar(50) DEFAULT NULL,"
create_sql_string += "`actime_time` varchar(50) NULL DEFAULT NULL"
create_sql_string += ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"
mysql_info_dict['create_sql_string'] = create_sql_string
# insert into sql
insert_list = []
data = faker_data.faker_data(lines=config['InsertRows'])[1:]
for a in data:
#拼接sql
insert_sql_string = "INSERT INTO " + table_name
insert_sql_string += "(uuid, id, name, mobile, ssn, sex, email, job, address, actime_time) VALUES("
str_data = ','.join("'{0}'".format(x) for x in a)
insert_sql_string += str_data + ");"
insert_list.append(insert_sql_string)
# 返回所有的插入语句
mysql_info_dict['insert_list'] = insert_list
# delete sql
del_sql = "delete FROM "+ table_name + " where 1=1 limit 100"
mysql_info_dict['del_sql'] = del_sql
# uptata sql
updata_sql = "UPDATE "+ table_name + " t SET t.job='unary_测试' WHERE t.name like '%' AND sex = 1 AND id = 1 "
mysql_info_dict['updata_sql'] = updata_sql
return(mysql_info_dict)

View File

@ -1,9 +1,8 @@
# -*- coding:utf-8 -*-
import _load
import DBConfig
import DBCommon
import logging
import jaydebeapi
from common import global_var
from schedule import every, repeat, run_pending
@ -18,18 +17,28 @@ from schedule import every, repeat, run_pending
- 每1小时修改一下数据
"""
# 目前支持 MySQL和Oracle两种数据库其他的数据库需要做适配
DBType = DBConfig.config['DBType']
# logging
class PrintHandler(logging.Handler):
def emit(self, record):
msg = self.format(record)
print(msg)
# 创建并配置logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# 创建并配置处理程序
handler = PrintHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
#获取连接信息
def get_db_info(dbtype=DBType):
def get_db_info():
db_config = {}
if dbtype == "MySQL":
db_config = DBConfig.mysql_config()
elif dbtype == "Oracle":
db_config = DBConfig.oracle_config()
else:
DBType = global_var.get_value('DBType')
db_config = DBCommon.get_db_info(DBType)
if not db_config:
assert False ,"DBType is not support."
return(db_config)