# -*- coding:utf-8 -*- import _load import DBConfig import jaydebeapi from schedule import every, repeat, run_pending """ 说明: 1. 用于数据库的持续增、删、改,目前支持MySQL和Oracle 2. DBConfig.py 为数据库连接配置,使用时需要修改连接信息 - 每周一建立一张表 - 每秒往测试表中插入N条数据 - 每5分钟删除N条数据 - 每1小时,修改一下数据 """ # 目前支持 MySQL和Oracle两种数据库,其他的数据库需要做适配 DBType = DBConfig.config['DBType'] #获取连接信息 def get_db_info(dbtype=DBType): db_config = {} if dbtype == "MySQL": db_config = DBConfig.mysql_config() elif dbtype == "Oracle": db_config = DBConfig.oracle_config() else: assert False ,"DBType is not support." return(db_config) #处理连接信息 db_config = get_db_info() OAuth = [str(db_config["db_user"]), str(db_config["db_password"])] if not db_config["db_user"] and not db_config["db_password"]: OAuth = None # conn to db by jaydebeapi conn = jaydebeapi.connect(db_config["driver_name"], db_config["jdbc_url"], OAuth, db_config["driver_jar_path"]) # conn.close() #执行sql def execute_sql(sql): with conn.cursor() as curs: curs.execute(sql) # 按周创建,这里不采用这种方式,通过插入来触发 #@repeat(every().monday) def create_table(): print("start create table...") #重新获取一下 db_config2 = get_db_info() sql_string = db_config2["create_sql_string"] print("create table ...." + sql_string) try: execute_sql(sql_string) except: print("table maybe exists,continue this create table task.") #每秒插入N条数据 @repeat(every(2).seconds) def insert_data(): print("insert data ....") #重新获取一下,确保每次的插入的数据都不一样 db_config2 = get_db_info() print("insert data ...."+db_config2["jdbc_url"]) insert_list = db_config2["insert_list"] #执行插入 try: for sql_string in insert_list: execute_sql(sql_string) # 出错可能是表不存在,这里触发一下建表 except: create_table() #每5分钟删除N条数据 @repeat(every(5).minutes) def dalete_data(): db_config2 = get_db_info() print("dalete data....") sql_string = db_config2["del_sql"] try: execute_sql(sql_string) except: print("table not exists,continue this delete task.") #每1小时,修改一下数据 @repeat(every(1).hours) def updata_data(): db_config2 = get_db_info() print("updata data ....") sql_string = db_config2["updata_sql"] try: execute_sql(sql_string) except: print("table not exists,continue this updata task.") if __name__ == '__main__': while True: run_pending()