You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
10 KiB
Python

4 months ago
import sqlite3
import json
import os # 添加os模块导入
from datetime import datetime
from utils import Globals
class DatabaseManager:
def __init__(self, dbPath="procedures.db"):
# 修改:使用跨平台的路径构造方式
project_dir = os.path.join(os.getcwd(), 'project', str(Globals.getValue('currentPro')))
db_dir = os.path.join(project_dir, 'db')
# 确保目录存在
os.makedirs(db_dir, exist_ok=True)
self.dbPath = os.path.join(db_dir, 'procedures.db')
print(f"数据库路径: {self.dbPath}")
self.conn = sqlite3.connect(self.dbPath)
self.cursor = self.conn.cursor()
self.createTables()
def createTables(self):
# 创建分类表
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
)
""")
# 创建规程表
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS procedures (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category_id INTEGER,
name TEXT NOT NULL,
number TEXT,
type TEXT,
content TEXT, -- 存储解析后的JSON数据
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(category_id) REFERENCES categories(id)
)
""")
# 创建规程执行历史表(每次执行记录一条)
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS procedure_execution_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
procedure_id INTEGER NOT NULL,
start_time TEXT NOT NULL,
end_time TEXT,
result BOOLEAN, -- 总体结果
report_path TEXT, -- 报告路径
step_results TEXT -- 新增存储所有步骤执行结果的JSON
)
""")
# 插入默认分类
self.cursor.execute("""
INSERT OR IGNORE INTO categories (name) VALUES
('默认分类'),
('紧急规程'),
('测试规程'),
('维护规程')
""")
# 确保procedures表有report_path列
try:
self.cursor.execute("ALTER TABLE procedures ADD COLUMN report_path TEXT DEFAULT ''")
except sqlite3.OperationalError:
pass # 列已存在则忽略错误
# 创建操作历史表
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS operation_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
execution_id INTEGER NOT NULL,
operation_type TEXT NOT NULL,
operation_detail TEXT,
operation_time TEXT NOT NULL,
FOREIGN KEY(execution_id) REFERENCES procedure_execution_history(id)
)
""")
self.conn.commit()
def getCategories(self):
self.cursor.execute("SELECT id, name FROM categories ORDER BY name")
return self.cursor.fetchall()
def addCategory(self, name):
try:
self.cursor.execute("INSERT INTO categories (name) VALUES (?)", (name,))
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
def deleteCategory(self, categoryId):
# 检查是否为默认分类
self.cursor.execute("SELECT name FROM categories WHERE id = ?", (categoryId,))
categoryName = self.cursor.fetchone()[0]
if categoryName == "默认分类":
return False # 默认分类不可删除
self.cursor.execute("SELECT id FROM categories WHERE name = '默认分类'")
defaultCatId = self.cursor.fetchone()[0]
self.cursor.execute("""
UPDATE procedures
SET category_id = ?
WHERE category_id = ?
""", (defaultCatId, categoryId))
self.cursor.execute("DELETE FROM categories WHERE id = ?", (categoryId,))
self.conn.commit()
return True
def addProcedure(self, categoryId, name, number, type, content, reportPath=""):
"""添加新规程到数据库"""
contentJson = json.dumps(content, ensure_ascii=False)
self.cursor.execute("""
INSERT INTO procedures (category_id, name, number, type, content, report_path)
VALUES (?, ?, ?, ?, ?, ?)
""", (categoryId, name, number, type, contentJson, reportPath))
self.conn.commit()
return self.cursor.lastrowid
def getProcedures(self, categoryId=None):
if categoryId:
self.cursor.execute("""
SELECT id, name, number, type, created_at
FROM procedures
WHERE category_id = ?
ORDER BY name
""", (categoryId,))
else:
self.cursor.execute("""
SELECT id, name, number, type, created_at
FROM procedures
ORDER BY name
""")
return self.cursor.fetchall()
def getProcedureContent(self, procedureId):
self.cursor.execute("SELECT content FROM procedures WHERE id = ?", (procedureId,))
result = self.cursor.fetchone()
if result:
return json.loads(result[0])
return None
def deleteProcedure(self, procedureId):
self.cursor.execute("DELETE FROM procedures WHERE id = ?", (procedureId,))
self.conn.commit()
return self.cursor.rowcount > 0
def insertProcedureExecution(self, procedureId, startTime, result=None, reportPath="", stepResults="[]"):
self.cursor.execute("""
INSERT INTO procedure_execution_history
(procedure_id, start_time, result, report_path, step_results)
VALUES (?, ?, ?, ?, ?)
""", (procedureId, startTime, result, reportPath, stepResults))
self.conn.commit()
return self.cursor.lastrowid
def updateStepResults(self, executionId, stepResults):
self.cursor.execute("""
UPDATE procedure_execution_history
SET step_results = ?
WHERE id = ?
""", (json.dumps(stepResults), executionId))
self.conn.commit()
return self.cursor.rowcount > 0
def getStepResults(self, executionId):
"""根据执行ID获取步骤结果 - 修正SQL语句"""
# 修正:使用正确的列名 execution_id -> id
self.cursor.execute("SELECT step_results FROM procedure_execution_history WHERE id=?", (executionId,))
row = self.cursor.fetchone()
if row:
return json.loads(row[0])
return None
def getExecutionHistory(self, filterText=None):
query = """
SELECT
peh.id,
p.name || '(' || p.number || ')' AS full_name, -- 拼接规程全名
p.type,
peh.start_time,
peh.report_path
FROM procedure_execution_history peh
JOIN procedures p ON peh.procedure_id = p.id
WHERE peh.report_path IS NOT NULL
"""
params = []
if filterText:
query += " AND (p.name LIKE ? OR p.number LIKE ? OR p.type LIKE ?)"
searchTerm = f"%{filterText}%"
params = [searchTerm, searchTerm, searchTerm]
self.cursor.execute(query, params)
return self.cursor.fetchall()
def updateExecutionReportPath(self, executionId, reportPath):
self.cursor.execute("""
UPDATE procedure_execution_history
SET report_path = ?
WHERE id = ?
""", (reportPath, executionId))
self.conn.commit()
return self.cursor.rowcount > 0
def deleteExecutionHistory(self, executionIds):
"""删除指定的执行历史记录及其相关操作历史 - 方法名已修正"""
if not executionIds:
return False
try:
# 转换为整数列表
ids = [int(id) for id in executionIds]
# 删除操作历史记录
self.cursor.executemany(
"DELETE FROM operation_history WHERE execution_id = ?",
[(id,) for id in ids]
)
# 删除执行历史记录
self.cursor.executemany(
"DELETE FROM procedure_execution_history WHERE id = ?",
[(id,) for id in ids]
)
self.conn.commit()
return True
except Exception as e:
print(f"删除执行历史失败: {str(e)}")
return False
def getExecutionDetails(self, executionId):
"""获取执行记录的完整详情"""
try:
# 获取执行记录基本信息
self.cursor.execute("""
SELECT
peh.id,
p.id AS procedure_id,
p.name AS procedure_name,
p.content AS procedure_content,
peh.step_results
FROM procedure_execution_history peh
JOIN procedures p ON peh.procedure_id = p.id
WHERE peh.id = ?
""", (executionId,))
row = self.cursor.fetchone()
if not row:
return None
# 解析数据
details = {
"execution_id": row[0],
"procedure_id": row[1],
"procedure_name": row[2],
"procedure_content": json.loads(row[3]),
"step_results": json.loads(row[4])
}
return details
except Exception as e:
print(f"获取执行详情失败: {str(e)}")
return None
def updateProcedureCategory(self, procedureId, newCategoryId):
"""更新规程的分类"""
try:
self.cursor.execute("""
UPDATE procedures
SET category_id = ?
WHERE id = ?
""", (newCategoryId, procedureId))
self.conn.commit()
return self.cursor.rowcount > 0
except Exception as e:
print(f"更新规程分类失败: {str(e)}")
return False
def saveOperationHistory(self, executionId, operationType, operationDetail, operationTime):
"""保存操作历史记录"""
self.cursor.execute("""
INSERT INTO operation_history
(execution_id, operation_type, operation_detail, operation_time)
VALUES (?, ?, ?, ?)
""", (executionId, operationType, operationDetail, operationTime))
self.conn.commit()
return self.cursor.lastrowid
def close(self):
self.conn.close()