# encoding=utf-8 from datetime import datetime from pathlib import Path import jaydebeapi class mssqlTest(object): def __init__(self): # 公共的 path = Path(__file__).parent # 数据库连接相关的参数 driver_name = "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_url = "jdbc:sqlserver://10.10.13.249:1433" OAuth = ["sa", "unary@2008"] driver_jar_path = path / "jdbc-driver" / "mssql-jdbc-9.2.0.jre8.jar" # 连接数据库 self.conn = jaydebeapi.connect(driver_name, jdbc_url, OAuth, str(driver_jar_path)) # 执行sql def execute_sql(self, sql): with self.conn.cursor() as curs: curs.execute(sql) # 创建库 def create_database(self): print("start create databases...") sql_string = "CREATE DATABASE UNATEST" try: self.execute_sql(sql_string) except: self.drop_database() print("start try create databases again") self.execute_sql(sql_string) print("CREATE DATABASE UNATEST sucess!") #创建表 def create_table(self): print("start create table...") sql_string = "create table UNATEST.dbo.TABLE_TEST(test char(15))" try: self.execute_sql(sql_string) except: self.drop_table() print("start try create table TABLE_TEST again") self.execute_sql(sql_string) print("CREATE create TABLE_TEST sucess!") # 插入数据 def insert_data_table(self): print("start insert into UNATEST.dbo.TABLE_TEST") # 插入当前的时间 actime_time = datetime.now().strftime('%Y%m%d%H%M%S') sql_string ="INSERT INTO UNATEST.dbo.TABLE_TEST (test) VALUES(N'"+ actime_time +"')" print("insert data is : " + actime_time) self.execute_sql(sql_string) # 删除表 def drop_table(self): print("start drop table TABLE_TEST...") sql_string = "DROP table UNATEST.dbo.TABLE_TEST" try: self.execute_sql(sql_string) except: print("TABLE_TEST already drop.") # 删除库 def drop_database(self): print("start drop databases...") sql_string = "DROP DATABASE UNATEST" try: self.execute_sql(sql_string) except: print("databases already drop.") # 获取插入的结果 def get_insert_res(self): sql_string = "select * from UNATEST.dbo.TABLE_TEST" with self.conn.cursor() as select: select.execute(sql_string) res = select.fetchall() print(res) # 将第一个结果返回 return res[0] # 检查表是否存在 def check_table_exists(self): sql_string = "select * from UNATEST.dbo.sysObjects WHERE name='TABLE_TEST'" with self.conn.cursor() as select: select.execute(sql_string) res = select.fetchall() # 根据结果数据量来判断表是否存在 if len(res) == 1: exists = True else: exists = False return exists # 关闭连接 def close_conn(self): self.conn.close()