提交文件代码
This commit is contained in:
parent
a1f389f255
commit
e0cbb1d5a3
|
@ -160,3 +160,5 @@ cython_debug/
|
||||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||||
#.idea/
|
#.idea/
|
||||||
|
|
||||||
|
/.vscode
|
||||||
|
/data/
|
||||||
|
|
180
bin/e2f.py
180
bin/e2f.py
|
@ -1,180 +0,0 @@
|
||||||
# -*- coding:utf-8 -*-
|
|
||||||
from __future__ import unicode_literals
|
|
||||||
import os
|
|
||||||
import io
|
|
||||||
import _load
|
|
||||||
import shutil
|
|
||||||
import time
|
|
||||||
import xlrd
|
|
||||||
|
|
||||||
from fnmatch import fnmatch
|
|
||||||
from jinja2 import Environment, PackageLoader
|
|
||||||
#处理py2的编码和字典顺序的问题
|
|
||||||
#from __future__ import unicode_literals --必须放在第一个
|
|
||||||
from collections import OrderedDict
|
|
||||||
|
|
||||||
|
|
||||||
class xl2fe:
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
HERE = os.path.dirname(__file__)
|
|
||||||
TOP = os.path.join(HERE, "..")
|
|
||||||
self.templates_dir = os.path.join(TOP, "templates")
|
|
||||||
self.output_dir = os.path.join(TOP, "output")
|
|
||||||
if os.path.exists(self.output_dir):
|
|
||||||
shutil.rmtree(self.output_dir)
|
|
||||||
time.sleep(2)
|
|
||||||
os.mkdir(self.output_dir)
|
|
||||||
self.excel_files_dir = os.path.join(TOP, "excel_files")
|
|
||||||
self.xlsx_sheets = {"封面": "00_home_index.feature",
|
|
||||||
"统计表": "01_statistics.feature",
|
|
||||||
"基础功能测试用例": "02_function.feature",
|
|
||||||
"异常测试": "03_abnormality_test.feature",
|
|
||||||
"性能测试": "04_performance.feature"
|
|
||||||
}
|
|
||||||
|
|
||||||
def read_xlsx(self):
|
|
||||||
"""
|
|
||||||
指定的sheet页,对内容进行处理
|
|
||||||
"""
|
|
||||||
#获取所有的数据
|
|
||||||
work_book = xlrd.open_workbook(self.xlsx_file_path)
|
|
||||||
#为了让py2字典的顺序与py3一致
|
|
||||||
all_data = OrderedDict()
|
|
||||||
for i, sheet_obj in enumerate(work_book.sheets()):
|
|
||||||
all_data[sheet_obj.name] = [sheet_obj.row_values(row)
|
|
||||||
for row in range(sheet_obj.nrows)]
|
|
||||||
#按sheet页处理
|
|
||||||
for sheet_name, self.feature_name in self.xlsx_sheets.items():
|
|
||||||
datas = all_data[sheet_name]
|
|
||||||
# 处理数据
|
|
||||||
self.context = OrderedDict()
|
|
||||||
self.context = xl2fe.get_feature_data(self, datas, sheet_name)
|
|
||||||
# 渲染模板
|
|
||||||
xl2fe.feature_tpl(self)
|
|
||||||
|
|
||||||
def get_feature_data(self, datas, sheet_name):
|
|
||||||
"""
|
|
||||||
根据sheet_name 处理数据
|
|
||||||
"""
|
|
||||||
context_temp = OrderedDict()
|
|
||||||
|
|
||||||
if sheet_name == "封面":
|
|
||||||
context_temp['project'] = self.project
|
|
||||||
context_temp['sheet_name'] = sheet_name
|
|
||||||
# 处理更新记录
|
|
||||||
Scenario_table = []
|
|
||||||
lines = datas[4:]
|
|
||||||
for line in lines:
|
|
||||||
cells = line
|
|
||||||
# 处理换行
|
|
||||||
for index, cell in enumerate(cells):
|
|
||||||
#因为py2的编码问题,不能判断str
|
|
||||||
#isinstance(cell,str)
|
|
||||||
if not isinstance(cell,(int,float)):
|
|
||||||
cells[index] = cell.replace('\n', '\\n')
|
|
||||||
Scenario_table.append(cells)
|
|
||||||
context_temp['Scenario_table'] = Scenario_table
|
|
||||||
|
|
||||||
elif sheet_name == "统计表":
|
|
||||||
context_temp['project'] = self.project
|
|
||||||
context_temp['sheet_name'] = sheet_name
|
|
||||||
|
|
||||||
elif sheet_name == "基础功能测试用例":
|
|
||||||
context_temp['project'] = self.project
|
|
||||||
context_temp['sheet_name'] = sheet_name
|
|
||||||
# 处理基础测试用例中的数据
|
|
||||||
Scenario_table = OrderedDict()
|
|
||||||
lines = datas[2:]
|
|
||||||
for line in lines:
|
|
||||||
cells = line[0:9]
|
|
||||||
#补全合并单元格的信息
|
|
||||||
#模块
|
|
||||||
if cells[0]:
|
|
||||||
model = cells[0]
|
|
||||||
else:
|
|
||||||
cells[0] = model
|
|
||||||
#子模块
|
|
||||||
if cells[1]:
|
|
||||||
sub_model = cells[1]
|
|
||||||
else:
|
|
||||||
cells[1] = sub_model
|
|
||||||
#处理编号
|
|
||||||
if '-ST-' in cells[2]:
|
|
||||||
cells[2] = 'NUM'
|
|
||||||
# 处理换行
|
|
||||||
for index, cell in enumerate(cells):
|
|
||||||
if not isinstance(cell,(int,float)):
|
|
||||||
cells[index] = cell.replace('\n', '\\n')
|
|
||||||
#以模块为单位存储
|
|
||||||
if model not in list(Scenario_table.keys()):
|
|
||||||
Scenario_table[model] = []
|
|
||||||
Scenario_table[model].append(cells)
|
|
||||||
context_temp['Scenario_table'] = Scenario_table
|
|
||||||
|
|
||||||
elif sheet_name == "异常测试":
|
|
||||||
context_temp['project'] = self.project
|
|
||||||
context_temp['sheet_name'] = sheet_name
|
|
||||||
# 处理更新记录
|
|
||||||
Scenario_table = []
|
|
||||||
lines = datas[4:]
|
|
||||||
for line in lines:
|
|
||||||
cells = line[0:8]
|
|
||||||
cells[0] = 'NUM'
|
|
||||||
# 处理换行
|
|
||||||
for index, cell in enumerate(cells):
|
|
||||||
if not isinstance(cell,(int,float)):
|
|
||||||
cells[index] = cell.replace('\n', '\\n')
|
|
||||||
Scenario_table.append(cells)
|
|
||||||
context_temp['Scenario_table'] = Scenario_table
|
|
||||||
|
|
||||||
elif sheet_name == "性能测试":
|
|
||||||
context_temp['project'] = self.project
|
|
||||||
context_temp['sheet_name'] = sheet_name
|
|
||||||
|
|
||||||
return context_temp
|
|
||||||
|
|
||||||
def feature_tpl(self):
|
|
||||||
"""
|
|
||||||
拿处理后的数据来渲染指定的模板
|
|
||||||
"""
|
|
||||||
# 读取模板
|
|
||||||
tpl = os.path.join(self.templates_dir, self.feature_name + ".j2")
|
|
||||||
tpl_data = io.open(tpl, encoding="utf-8").read()
|
|
||||||
# 渲染模板
|
|
||||||
env = Environment()
|
|
||||||
text = env.from_string(tpl_data).render(self.context)
|
|
||||||
# 保存文件
|
|
||||||
xl2fe.save_feature(self, text)
|
|
||||||
|
|
||||||
def save_feature(self, text):
|
|
||||||
"""
|
|
||||||
保存渲染好的模板为feature文件
|
|
||||||
"""
|
|
||||||
#为了解决windows换行符的问题转为二进制,主要是由于py2中open不支持newline参数
|
|
||||||
#py2没有bytes()函数
|
|
||||||
#text_bytes = bytes(text,'utf-8')
|
|
||||||
text_bytes = text.encode('utf-8')
|
|
||||||
feature_path = os.path.join(self.project_dir, self.feature_name)
|
|
||||||
# 写入文件
|
|
||||||
with open(feature_path, 'wb+') as fp:
|
|
||||||
fp.write(text_bytes)
|
|
||||||
|
|
||||||
def main(self):
|
|
||||||
xlsx_files = os.listdir(self.excel_files_dir)
|
|
||||||
for xlsx_file in xlsx_files:
|
|
||||||
# 排除掉非xlsx结尾的文件
|
|
||||||
if not fnmatch(xlsx_file, "*.xlsx"):
|
|
||||||
continue
|
|
||||||
self.project = xlsx_file.split('_')[0]
|
|
||||||
self.xlsx_file_path = os.path.join(self.excel_files_dir, xlsx_file)
|
|
||||||
# 按项目存放
|
|
||||||
self.project_dir = os.path.join(self.output_dir, self.project)
|
|
||||||
os.mkdir(self.project_dir)
|
|
||||||
|
|
||||||
xl2fe.read_xlsx(self)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
test_env = xl2fe()
|
|
||||||
test_env.main()
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
#encoding=utf-8
|
||||||
|
import csv
|
||||||
|
from faker import Faker
|
||||||
|
|
||||||
|
"""
|
||||||
|
生成尽可能真实的假数据
|
||||||
|
|
||||||
|
使用方法:
|
||||||
|
1.安装依赖包
|
||||||
|
pip install faker
|
||||||
|
2.执行脚本
|
||||||
|
python faker_data.py
|
||||||
|
3.当前目录,查看生成的数据
|
||||||
|
"""
|
||||||
|
|
||||||
|
#保存为csv文件
|
||||||
|
def save_data_csv(file_name,lines=50000):
|
||||||
|
#获取数据
|
||||||
|
datas = faker_data(lines)
|
||||||
|
#保存
|
||||||
|
with open(file_name,'w+',encoding='utf-8',newline='') as file_csv:
|
||||||
|
writer = csv.writer(file_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
|
||||||
|
writer.writerows(datas)
|
||||||
|
|
||||||
|
#生成数据
|
||||||
|
def faker_data(lines=10):
|
||||||
|
#指定数据的国家地区
|
||||||
|
f = Faker('zh-CN')
|
||||||
|
#定义一个列表,用来存放所有数据
|
||||||
|
datas = []
|
||||||
|
#标题
|
||||||
|
title = ["uuid","id","name","mobile","ssn","sex","email","job","address","actime_time"]
|
||||||
|
#title2 = ["唯一标识","编号","姓名","手机号","身份证号","性别","邮箱","职业","家庭地址","获取时间"]
|
||||||
|
#添加标题到列表中
|
||||||
|
datas.append(title)
|
||||||
|
#datas.append(title2)
|
||||||
|
#开始按照标题的顺序,生成5w条数据
|
||||||
|
for i in range(0,lines):
|
||||||
|
#定义一个列表,用来存一行数据
|
||||||
|
data = []
|
||||||
|
#uuid
|
||||||
|
data.append(f.uuid4())
|
||||||
|
#编号,001,不足3位的左边用0来补齐
|
||||||
|
data.append(str(i+1).rjust(3,'0'))
|
||||||
|
#姓名
|
||||||
|
data.append(f.name())
|
||||||
|
#手机号
|
||||||
|
data.append(f.phone_number())
|
||||||
|
#身份证
|
||||||
|
ssn = f.ssn()
|
||||||
|
data.append(ssn)
|
||||||
|
#性别,根据身份证的第17位来判断
|
||||||
|
ssn_sex = int(ssn[16:17])
|
||||||
|
#01:男,02:女
|
||||||
|
if ssn_sex % 2:
|
||||||
|
sex = "01"
|
||||||
|
else:
|
||||||
|
sex = "02"
|
||||||
|
data.append(sex)
|
||||||
|
#邮箱
|
||||||
|
data.append(f.email())
|
||||||
|
#职业
|
||||||
|
data.append(f.job())
|
||||||
|
#地址,让其更加复合中国的地址
|
||||||
|
address = f.address()[:-9] + str(f.pyint(min_value=0, max_value=999))+ "号"
|
||||||
|
data.append(address)
|
||||||
|
#获取时间,近3年的
|
||||||
|
actime_time = f.date_time_between(start_date="-3y", end_date="now")
|
||||||
|
data.append(actime_time)
|
||||||
|
#将这一行数据添加到datas中
|
||||||
|
datas.append(data)
|
||||||
|
#返回所有的数据
|
||||||
|
return datas
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
#文件名
|
||||||
|
file_name = 'test.csv'
|
||||||
|
save_data_csv(file_name)
|
||||||
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
# -*- coding:utf-8 -*-
|
||||||
|
import faker_data
|
||||||
|
import shutil
|
||||||
|
import csv
|
||||||
|
|
||||||
|
from schedule import every, repeat, run_pending
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
from random import choice
|
||||||
|
|
||||||
|
"""
|
||||||
|
说明:
|
||||||
|
1. 用于文件的持续创建、写入、修改和创建
|
||||||
|
2. 输出目录为data,会在里面创建当天的子目录
|
||||||
|
3. 每1小时,生成一个10M大小左右的csv文件
|
||||||
|
4. 每2秒,向上面的文件中写入10条数据
|
||||||
|
5. 每10分钟,从当天的文件中随机找个1个文件,删除前100条数据
|
||||||
|
6. 每6小时,从所有的文件中随机删除1个文件
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
# 定义公共的部分
|
||||||
|
path = Path(__file__)
|
||||||
|
data_dir = path.parent.parent / "data"
|
||||||
|
# 初始化目录
|
||||||
|
if not data_dir.exists():
|
||||||
|
# shutil.rmtree(data_dir)
|
||||||
|
data_dir.mkdir()
|
||||||
|
|
||||||
|
# 定义文件的信息
|
||||||
|
|
||||||
|
|
||||||
|
def file_info():
|
||||||
|
file_info_list = []
|
||||||
|
now = datetime.now()
|
||||||
|
now_day = now.strftime("%Y-%m-%d")
|
||||||
|
now_hours = now.strftime("%Y-%m-%d-%H")
|
||||||
|
today_dir = data_dir/now_day
|
||||||
|
filename = "filetest_" + now_hours + ".csv"
|
||||||
|
filepath = today_dir/filename
|
||||||
|
|
||||||
|
file_info_list.append(now_day)
|
||||||
|
file_info_list.append(now_hours)
|
||||||
|
file_info_list.append(today_dir)
|
||||||
|
file_info_list.append(filename)
|
||||||
|
file_info_list.append(filepath)
|
||||||
|
|
||||||
|
return (file_info_list)
|
||||||
|
|
||||||
|
# 每小时生成一个10M大小的csv文件
|
||||||
|
# @repeat(every(1).hours) --是基于运行的时间,不是基于系统时间
|
||||||
|
|
||||||
|
|
||||||
|
def new():
|
||||||
|
print('new...')
|
||||||
|
# 创建文件夹
|
||||||
|
finfo = file_info()
|
||||||
|
if not finfo[2].exists():
|
||||||
|
finfo[2].mkdir()
|
||||||
|
# 创建文件
|
||||||
|
faker_data.save_data_csv(finfo[4], lines=10)
|
||||||
|
|
||||||
|
|
||||||
|
# 每2秒插入10条数据
|
||||||
|
@repeat(every(2).seconds)
|
||||||
|
def inserting():
|
||||||
|
finfo = file_info()
|
||||||
|
if finfo[4].exists():
|
||||||
|
print('insert...')
|
||||||
|
datas = faker_data.faker_data(lines=10)[1:]
|
||||||
|
with open(finfo[4], 'a+', encoding='utf-8', newline='') as file_csv:
|
||||||
|
writer = csv.writer(file_csv, delimiter=',',
|
||||||
|
quotechar='"', quoting=csv.QUOTE_ALL)
|
||||||
|
writer.writerows(datas)
|
||||||
|
else:
|
||||||
|
print(str(finfo[4])+" is not exists,wating")
|
||||||
|
# 手动去调一下,让其整点创建
|
||||||
|
new()
|
||||||
|
|
||||||
|
|
||||||
|
# 每隔10分钟删除100条数据
|
||||||
|
@repeat(every(10).minutes)
|
||||||
|
def deleting_data():
|
||||||
|
finfo = file_info()
|
||||||
|
# 获取所有文件
|
||||||
|
files = list(finfo[2].glob('*.csv'))
|
||||||
|
if len(files) > 1:
|
||||||
|
file = choice(files[:-1])
|
||||||
|
print(str(file) + "start delete data ....")
|
||||||
|
# 删除掉前100条数据
|
||||||
|
with open(file, 'rb') as fr:
|
||||||
|
data = fr.readlines()
|
||||||
|
new_data = data[100:]
|
||||||
|
# 少于100条的不删除
|
||||||
|
if len(new_data) > 100:
|
||||||
|
with open(file, 'wb') as fw:
|
||||||
|
fw.writelines(new_data)
|
||||||
|
else:
|
||||||
|
print("file number is less 1,wait next time.")
|
||||||
|
|
||||||
|
|
||||||
|
# 每隔6小时删除1个文件,低于3个不删除
|
||||||
|
@repeat(every(6).hours)
|
||||||
|
# @repeat(every(2).seconds)
|
||||||
|
def deleting_file():
|
||||||
|
print("deleting file ....")
|
||||||
|
# 从data目录中随机选一个
|
||||||
|
files = list(data_dir.rglob('*.csv'))
|
||||||
|
if len(files) > 3:
|
||||||
|
file = choice(files[:-1])
|
||||||
|
file.unlink()
|
||||||
|
else:
|
||||||
|
print("file num is less 3, not delete. wait next time.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
while True:
|
||||||
|
run_pending()
|
|
@ -1,2 +1,3 @@
|
||||||
faker
|
faker==19.12.0
|
||||||
|
schedule==1.2.1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
|
||||||
|
set PROJECT_HOME=%~dp0
|
||||||
|
|
||||||
|
|
||||||
|
rem 确保vscode在path里面
|
||||||
|
start code %PROJECT_HOME%
|
6
tox.ini
6
tox.ini
|
@ -16,8 +16,8 @@ deps =
|
||||||
envdir = devenv
|
envdir = devenv
|
||||||
basepython = python3
|
basepython = python3
|
||||||
usedevelop = True
|
usedevelop = True
|
||||||
commands =
|
#commands =
|
||||||
{envpython} {toxinidir}/bin/e2f.py
|
# {envpython} {toxinidir}/bin/e2f.py
|
||||||
|
|
||||||
[testenv:devenv2]
|
[testenv:devenv2]
|
||||||
envdir = devenv2
|
envdir = devenv2
|
||||||
|
@ -31,7 +31,7 @@ deps =
|
||||||
-r{toxinidir}/requirements.txt
|
-r{toxinidir}/requirements.txt
|
||||||
|
|
||||||
commands =
|
commands =
|
||||||
{envpython} {toxinidir}/release.py {envdir} {envsitepackagesdir} {toxinidir}/build/excel2feature {toxinidir}
|
{envpython} {toxinidir}/release.py {envdir} {envsitepackagesdir} {toxinidir}/build/dataCreating {toxinidir}
|
||||||
|
|
||||||
|
|
||||||
[testenv:py27-release]
|
[testenv:py27-release]
|
||||||
|
|
Loading…
Reference in New Issue