# -*- coding:utf-8 -*- from common import faker_data from common import global_var import csv import yaml import time import logging from schedule import every, repeat, run_pending from pathlib import Path from datetime import datetime from random import choice """ 说明: 1. 用于文件的持续创建、写入、修改和创建 2. 输出目录为data,会在里面创建当天的子目录 3. 每1小时,生成一个CSV文件 4. 每1秒,向上面的文件中写入N条数据 5. 每10分钟,从当天的文件中随机找个1个文件,删除前N条数据 6. 每6小时,从所有的文件中随机删除1个文件 """ class PrintHandler(logging.Handler): def emit(self, record): msg = self.format(record) print(msg) # 创建并配置logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # 创建并配置处理程序 handler = PrintHandler() handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) # 定义公共的部分 path = Path(__file__) def upconfig(): # 输出路径默认为当前的目录中 data_dir = path.parent.parent.parent / "data" InsertRows=200 DeleteRows=100 # 默认从全局参数中获取参数 whorun = global_var.get_value('FromGUI') filedict=global_var.get_value('file_args') fname = filedict['filename'] initRows = int(filedict['fileinit']) # 界面如果有输入就取界面的值 if filedict['outputpath']: data_dir = Path(filedict['outputpath']) if filedict['filecreating']: InsertRows = int(filedict['fileinsert']) DeleteRows = int(filedict['filedel']) # 兼容脚本处理 if not whorun: # 从配置文件中获取参数 etc_dir = path.parent.parent.parent / "etc" config_file_patch = etc_dir / "file_config.yml" # 获取配置文件 with open(config_file_patch, "r", encoding='utf-8') as fy: config = yaml.safe_load(fy) fname = config['filename'] if 'DATADIR' in config.keys() and config['DATADIR']: data_dir = Path(config['DATADIR']) initRows = config['initRows'] InsertRows = config['InsertRows'] DeleteRows = config['DeleteRows'] # 初始化目录 if not data_dir.exists(): # shutil.rmtree(data_dir) data_dir.mkdir() return [fname,data_dir,initRows,InsertRows,DeleteRows] # 定义文件的信息 def file_info(): varlist = upconfig() file_info_list = [] now = datetime.now() now_day = now.strftime("%Y-%m-%d") now_hours = now.strftime("%Y-%m-%d-%H") today_dir = varlist[1]/now_day filename = varlist[0] + "_" + now_hours + ".csv" filepath = today_dir/filename file_info_list.append(now_day) file_info_list.append(now_hours) file_info_list.append(today_dir) file_info_list.append(filename) file_info_list.append(filepath) return (file_info_list) # 每小时生成一个csv文件 # @repeat(every(1).hours)--MH:这个是基于运行的时间,不是基于系统时间 def new(): varlist = upconfig() # 创建文件夹 finfo = file_info() if not finfo[2].exists(): finfo[2].mkdir() # 创建文件 logger.info("开始创建文件:{}".format(finfo[4])) faker_data.save_data_csv(finfo[4], lines=varlist[2]) # 每1秒插入N条数据 @repeat(every(1).seconds) def inserting(): varlist = upconfig() finfo = file_info() if finfo[4].exists(): datas = faker_data.faker_data(lines=varlist[3])[1:] logger.info("正在向{0}中插入{1}条数据...".format(finfo[3],varlist[3])) with open(finfo[4], 'a+', encoding='utf-8', newline='') as file_csv: writer = csv.writer(file_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL) writer.writerows(datas) else: logger.info(str(finfo[4])+" is not exists,wating") # 手动去调一下,让其整点创建 new() # 每隔10分钟删除100条数据 @repeat(every(10).minutes) def deleting_data(): varlist = upconfig() finfo = file_info() # 获取所有文件 files = list(finfo[2].glob('*.csv')) if len(files) > 1: file = choice(files[:-1]) # 删除掉前N条数据 with open(file, 'rb') as fr: data = fr.readlines() new_data = data[varlist[4]:] # 少于100条的不删除 if len(new_data) > 100: with open(file, 'wb') as fw: fw.writelines(new_data) logger.info("已经从文件:{0} 中删除前{1}条".format(file,varlist[4]) ) else: logger.info("文件总数小于1跳过本次删除,等待下一次。") # 每隔6小时删除1个文件,低于3个不删除 @repeat(every(6).hours) # @repeat(every(2).seconds) def deleting_file(): varlist = upconfig() logger.info("deleting file ....") # 从data目录中随机选一个 files = list(varlist[1].rglob('*.csv')) if len(files) > 3: file = choice(files[:-1]) file.unlink() logger.info("文件:{0} 已删除".format(file) ) else: logger.info("file num is less 3, not delete. wait next time.") def main(): start_flag = True while start_flag: start_flag = global_var.get_value('start_flag') run_pending() time.sleep(0.3) logger.info("程序已经停止") if __name__ == '__main__': main()