You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

214 lines
8.5 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import time
from datetime import datetime, timezone, timedelta
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, write_client_options
import dateutil.parser
from dateutil import tz
import pandas as pd
import requests
import queue
import threading
def parseDate(date_string: str):
# 默认UTC时间转中国时间
return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT-8'))
class HistoryDBManage:
def __init__(self, database='dcs', table='history', host="http://localhost:8181", token="", org="dcs"):
token = self.getAPIToken() if not token else token
self.database = database
self.table = table
self.host = host
self.token = token
self.org = org
# print(self.table, 1111111111111111)
# 写入回调
def onSuccess(conf, data, exception=None):
now_ms = datetime.now(timezone(timedelta(hours=8)))
# 统一转为字符串
if isinstance(data, bytes):
data = data.decode(errors="ignore")
elif not isinstance(data, str):
data = str(data)
for line in data.split('\n'):
# print(line)
# m = re.search(r'enqueue_time_ms=([0-9]+)', line)
m = line.split(' ')[-1][:-3]
if m:
enqueue_time_ms = int(m)
# 转为datetime对象
# print(enqueue_time_ms / 1000)
enqueue_time_dt = datetime.fromtimestamp(enqueue_time_ms /1000000, tz=timezone(timedelta(hours=8)))
# delay = now_ms - enqueue_time_ms
print(f"写入延迟: {1} ms (enqueue_time={enqueue_time_dt}, now={now_ms})")
def onError(conf, data, exception=None):
print("InfluxDB写入失败")
def onRetry(conf, data, exception=None):
print("InfluxDB写入重试")
# 官方推荐批量写入配置
self.client = InfluxDBClient3(
host=host,
token=token,
org=org,
database=database,
write_client_options=write_client_options(
write_options=WriteOptions(batch_size=5000, flush_interval=2000), # flush_interval单位ms
# success_callback=onSuccess,
error_callback=onError,
retry_callback=onRetry
)
)
self.writeQueue = queue.Queue()
self._stopWriteThread = threading.Event()
self.writeThread = threading.Thread(target=self._writeWorker, daemon=True)
self.writeThread.start()
@classmethod
def getAPIToken(cls):
try:
with open('Static/InfluxDB.api', 'r', encoding='utf-8') as f:
token = f.read()
except Exception as e:
print(f"读取token文件失败: {e}")
token = ""
return token
def createDatabaseIfNotExists(self):
try:
sql = f'SELECT 1 FROM "{self.table}" LIMIT 1'
self.client.query(sql)
except Exception:
pass
def writeVarValue(self, varName, value):
# 入队时记录中国时区时间
enqueue_time = datetime.now()
self.writeQueue.put((varName, value, enqueue_time))
def _writeWorker(self):
while not self._stopWriteThread.is_set():
try:
varName, value, enqueue_time = self.writeQueue.get()
# print(self.writeQueue.qsize())
except queue.Empty:
# print(1111)
continue
# 用Point对象构建数据点时间用入队时间
point = Point(self.table).tag("varName", varName).field("value", float(value)).time(enqueue_time)
self.client.write(point)
def queryVarHistory(self, varName, startTime=None, endTime=None):
where = f'"varName" = \'{varName}\''
if startTime:
where += f" AND time >= '{startTime}'"
if endTime:
where += f" AND time <= '{endTime}'"
# 关键用tz()函数将time字段转为中国时区
sql = f'SELECT tz(time, \'Asia/Shanghai\') AS time, value FROM "{self.table}" WHERE {where} ORDER BY time ASC LIMIT 100000'
# print(f"执行SQL查询: {sql}")
try:
df = self.client.query(sql, mode="pandas")
import pandas as pd
if isinstance(df, pd.DataFrame):
data = df["value"].tolist() if "value" in df.columns else []
timeList = df["time"].tolist() if "time" in df.columns else []
# print(f"查询到 {len(data)} 个数据点")
else:
# print(f"查询结果不是DataFrame: {type(df)}")
data, timeList = [], []
except Exception as e:
print(f"查询历史数据失败: {e}")
# 检查是否是Parquet文件问题
if "parquet" in str(e).lower() and "not found" in str(e).lower():
print(f"变量 '{varName}' 的历史数据文件损坏或丢失")
print("尝试查询最近的数据...")
# 尝试查询最近24小时的数据
try:
recent_sql = f'SELECT tz(time, \'Asia/Shanghai\') AS time, value FROM "{self.table}" WHERE {where} AND time >= now() - interval \'24 hours\' ORDER BY time DESC LIMIT 1000'
df = self.client.query(recent_sql, mode="pandas")
if isinstance(df, pd.DataFrame):
data = df["value"].tolist() if "value" in df.columns else []
timeList = df["time"].tolist() if "time" in df.columns else []
print(f"查询到最近 {len(data)} 个数据点")
else:
data, timeList = [], []
except Exception as e2:
print(f"查询最近数据也失败: {e2}")
data, timeList = [], []
else:
data, timeList = [], []
return data, timeList
def getAllVarNames(self):
"""获取所有去重后的变量名列表"""
sql = f'SELECT DISTINCT("varName") FROM "{self.table}"'
try:
# print(self.table, 2222222222222222)
df = self.client.query(sql, mode="pandas")
# print(df)
import pandas as pd
if isinstance(df, pd.DataFrame) and 'varName' in df.columns:
return df['varName'].tolist()
else:
return []
except Exception as e:
print(f"获取变量名失败: {e}")
# 如果是Parquet文件缺失错误尝试清理损坏的数据
if "parquet" in str(e).lower() and "not found" in str(e).lower():
print("检测到Parquet文件缺失可能需要清理InfluxDB数据")
print("建议解决方案:")
print("1. 重启InfluxDB服务")
print("2. 检查磁盘空间")
print("3. 清理损坏的数据文件")
print("4. 重建数据库索引")
return []
@classmethod
def deleteTable(cls, table):
token = cls.getAPIToken()
host = 'http://localhost:8181'
import datetime
# 获取当前UTC时间+1分钟去除微秒Z结尾
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc, microsecond=0)
hardDeleteAt = (now + datetime.timedelta(minutes=1)).isoformat().replace('+00:00', 'Z')
url = f"{host.rstrip('/')}/api/v3/configure/table?db={'dcs'}&table={table}&hard_delete_at={hardDeleteAt}"
headers = {
'Authorization': f'Bearer {token}'
}
response = requests.delete(url, headers=headers)
print(hardDeleteAt)
if response.status_code != 200:
print(f"删除失败: {response.status_code} {response.text}")
else:
print(f"已硬删除表 {table} 的所有历史数据。")
def stopWriteThread(self):
self._stopWriteThread.set()
self.writeThread.join(timeout=1)
def close(self):
self.stopWriteThread()
self.client.close()
# if __name__ == '__main__':
# db = HistoryDBManage(
# database="dcs",
# table="p1",
# host="http://localhost:8181",
# token="apiv3_ynlNTgq_OX164srSzjYXetWZJGOpgokFJbp_JaToWYlzwIPAZboPxKt4ss6vD1_4jj90QOIDnRDodQSJ66m3_g",
# org="dcs"
# )
# data, times = db.queryVarHistory("有源/无源4-20mA输入通道1")
# print(data, times)
# db.close()