# -*- coding:utf-8 -*- from common import faker_data from common import global_var import csv import yaml import time import logging from schedule import every, repeat, run_pending from pathlib import Path from datetime import datetime from random import choice """ 说明: 1. 用于文件的持续创建、写入、修改和创建 2. 输出目录为data,会在里面创建当天的子目录 3. 每1小时,生成一个CSV文件 4. 每1秒,向上面的文件中写入N条数据 5. 每10分钟,从当天的文件中随机找个1个文件,删除前N条数据 6. 每6小时,从所有的文件中随机删除1个文件 """ class PrintHandler(logging.Handler): def emit(self, record): msg = self.format(record) print(msg) # 创建并配置logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # 创建并配置处理程序 handler = PrintHandler() handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) # 定义公共的部分 path = Path(__file__) etc_dir = path.parent.parent.parent / "etc" config_file_patch = etc_dir / "file_config.yml" # 获取配置文件 with open(config_file_patch, "r", encoding='utf-8') as fy: config = yaml.safe_load(fy) # 默认为当前的目录 data_dir = path.parent.parent.parent / "data" if 'DATADIR' in config.keys() and config['DATADIR']: data_dir = Path(config['DATADIR']) # 初始化目录 if not data_dir.exists(): # shutil.rmtree(data_dir) data_dir.mkdir() # 定义文件的信息 def file_info(): file_info_list = [] now = datetime.now() now_day = now.strftime("%Y-%m-%d") now_hours = now.strftime("%Y-%m-%d-%H") today_dir = data_dir/now_day filename = "filetest_" + now_hours + ".csv" filepath = today_dir/filename file_info_list.append(now_day) file_info_list.append(now_hours) file_info_list.append(today_dir) file_info_list.append(filename) file_info_list.append(filepath) return (file_info_list) # 每小时生成一个csv文件 # @repeat(every(1).hours)--MH:这个是基于运行的时间,不是基于系统时间 def new(): logger.info('new...') # 创建文件夹 finfo = file_info() if not finfo[2].exists(): finfo[2].mkdir() # 创建文件 faker_data.save_data_csv(finfo[4], lines=100) # 每1秒插入N条数据 @repeat(every(1).seconds) def inserting(): finfo = file_info() if finfo[4].exists(): logger.info('insert...') datas = faker_data.faker_data(lines=config['InsertRows'])[1:] with open(finfo[4], 'a+', encoding='utf-8', newline='') as file_csv: writer = csv.writer(file_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL) writer.writerows(datas) else: logger.info(str(finfo[4])+" is not exists,wating") # 手动去调一下,让其整点创建 new() # 每隔10分钟删除100条数据 @repeat(every(10).minutes) def deleting_data(): finfo = file_info() # 获取所有文件 files = list(finfo[2].glob('*.csv')) if len(files) > 1: file = choice(files[:-1]) logger.info(str(file) + "start delete data ....") # 删除掉前N条数据 with open(file, 'rb') as fr: data = fr.readlines() new_data = data[config['DeleteRows']:] # 少于100条的不删除 if len(new_data) > 100: with open(file, 'wb') as fw: fw.writelines(new_data) else: logger.info("file number is less 1,wait next time.") # 每隔6小时删除1个文件,低于3个不删除 @repeat(every(6).hours) # @repeat(every(2).seconds) def deleting_file(): logger.info("deleting file ....") # 从data目录中随机选一个 files = list(data_dir.rglob('*.csv')) if len(files) > 3: file = choice(files[:-1]) file.unlink() else: logger.info("file num is less 3, not delete. wait next time.") def main(): start_flag = True while start_flag: start_flag = global_var.get_value('start_flag') run_pending() time.sleep(0.3) logger.info("程序已经停止") if __name__ == '__main__': main()