|
|
import os
|
|
|
import time
|
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
|
|
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, write_client_options
|
|
|
import dateutil.parser
|
|
|
from dateutil import tz
|
|
|
import pandas as pd
|
|
|
import requests
|
|
|
import queue
|
|
|
import threading
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parseDate(date_string: str):
|
|
|
# 默认UTC时间转中国时间
|
|
|
return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT-8'))
|
|
|
|
|
|
|
|
|
class HistoryDBManage:
|
|
|
def __init__(self, database='dcs', table='history', host="http://localhost:8181", token="", org="dcs"):
|
|
|
token = self.getAPIToken() if not token else token
|
|
|
self.database = database
|
|
|
self.table = table
|
|
|
self.host = host
|
|
|
self.token = token
|
|
|
self.org = org
|
|
|
|
|
|
# 写入回调
|
|
|
def onSuccess(conf, data, exception=None):
|
|
|
now_ms = datetime.now(timezone(timedelta(hours=8)))
|
|
|
# 统一转为字符串
|
|
|
if isinstance(data, bytes):
|
|
|
data = data.decode(errors="ignore")
|
|
|
elif not isinstance(data, str):
|
|
|
data = str(data)
|
|
|
for line in data.split('\n'):
|
|
|
# print(line)
|
|
|
# m = re.search(r'enqueue_time_ms=([0-9]+)', line)
|
|
|
m = line.split(' ')[-1][:-3]
|
|
|
if m:
|
|
|
enqueue_time_ms = int(m)
|
|
|
# 转为datetime对象
|
|
|
# print(enqueue_time_ms / 1000)
|
|
|
enqueue_time_dt = datetime.fromtimestamp(enqueue_time_ms /1000000, tz=timezone(timedelta(hours=8)))
|
|
|
# delay = now_ms - enqueue_time_ms
|
|
|
print(f"写入延迟: {1} ms (enqueue_time={enqueue_time_dt}, now={now_ms})")
|
|
|
def onError(conf, data, exception=None):
|
|
|
print("InfluxDB写入失败")
|
|
|
def onRetry(conf, data, exception=None):
|
|
|
print("InfluxDB写入重试")
|
|
|
|
|
|
# 官方推荐批量写入配置
|
|
|
self.client = InfluxDBClient3(
|
|
|
host=host,
|
|
|
token=token,
|
|
|
org=org,
|
|
|
database=database,
|
|
|
write_client_options=write_client_options(
|
|
|
write_options=WriteOptions(batch_size=5000, flush_interval=2000), # flush_interval单位ms
|
|
|
# success_callback=onSuccess,
|
|
|
error_callback=onError,
|
|
|
retry_callback=onRetry
|
|
|
)
|
|
|
)
|
|
|
|
|
|
self.writeQueue = queue.Queue()
|
|
|
self._stopWriteThread = threading.Event()
|
|
|
self.writeThread = threading.Thread(target=self._writeWorker, daemon=True)
|
|
|
self.writeThread.start()
|
|
|
|
|
|
@classmethod
|
|
|
def getAPIToken(cls):
|
|
|
try:
|
|
|
with open('Static/InfluxDB.api', 'r', encoding='utf-8') as f:
|
|
|
token = f.read()
|
|
|
except Exception as e:
|
|
|
print(f"读取token文件失败: {e}")
|
|
|
token = ""
|
|
|
return token
|
|
|
|
|
|
def createDatabaseIfNotExists(self):
|
|
|
try:
|
|
|
sql = f'SELECT 1 FROM "{self.table}" LIMIT 1'
|
|
|
self.client.query(sql)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
def writeVarValue(self, varName, value):
|
|
|
# 入队时记录中国时区时间
|
|
|
enqueue_time = datetime.now()
|
|
|
self.writeQueue.put((varName, value, enqueue_time))
|
|
|
|
|
|
def _writeWorker(self):
|
|
|
while not self._stopWriteThread.is_set():
|
|
|
try:
|
|
|
varName, value, enqueue_time = self.writeQueue.get()
|
|
|
# print(self.writeQueue.qsize())
|
|
|
except queue.Empty:
|
|
|
# print(1111)
|
|
|
continue
|
|
|
# 用Point对象构建数据点,时间用入队时间
|
|
|
point = Point(self.table).tag("varName", varName).field("value", float(value)).time(enqueue_time)
|
|
|
self.client.write(point)
|
|
|
|
|
|
def queryVarHistory(self, varName, startTime=None, endTime=None):
|
|
|
where = f'"varName" = \'{varName}\''
|
|
|
if startTime:
|
|
|
where += f" AND time >= '{startTime}'"
|
|
|
if endTime:
|
|
|
where += f" AND time <= '{endTime}'"
|
|
|
# 关键:用tz()函数将time字段转为中国时区
|
|
|
sql = f'SELECT tz(time, \'Asia/Shanghai\') AS time, value FROM "{self.table}" WHERE {where} ORDER BY time ASC LIMIT 100000'
|
|
|
# print(f"执行SQL查询: {sql}")
|
|
|
try:
|
|
|
df = self.client.query(sql, mode="pandas")
|
|
|
import pandas as pd
|
|
|
if isinstance(df, pd.DataFrame):
|
|
|
data = df["value"].tolist() if "value" in df.columns else []
|
|
|
timeList = df["time"].tolist() if "time" in df.columns else []
|
|
|
print(f"查询到 {len(data)} 个数据点")
|
|
|
else:
|
|
|
# print(f"查询结果不是DataFrame: {type(df)}")
|
|
|
data, timeList = [], []
|
|
|
except Exception as e:
|
|
|
print(f"查询历史数据失败: {e}")
|
|
|
# 检查是否是Parquet文件问题
|
|
|
if "parquet" in str(e).lower() and "not found" in str(e).lower():
|
|
|
print(f"变量 '{varName}' 的历史数据文件损坏或丢失")
|
|
|
print("尝试查询最近的数据...")
|
|
|
# 尝试查询最近24小时的数据
|
|
|
try:
|
|
|
recent_sql = f'SELECT tz(time, \'Asia/Shanghai\') AS time, value FROM "{self.table}" WHERE {where} AND time >= now() - interval \'24 hours\' ORDER BY time DESC LIMIT 1000'
|
|
|
df = self.client.query(recent_sql, mode="pandas")
|
|
|
if isinstance(df, pd.DataFrame):
|
|
|
data = df["value"].tolist() if "value" in df.columns else []
|
|
|
timeList = df["time"].tolist() if "time" in df.columns else []
|
|
|
print(f"查询到最近 {len(data)} 个数据点")
|
|
|
else:
|
|
|
data, timeList = [], []
|
|
|
except Exception as e2:
|
|
|
print(f"查询最近数据也失败: {e2}")
|
|
|
data, timeList = [], []
|
|
|
else:
|
|
|
data, timeList = [], []
|
|
|
return data, timeList
|
|
|
|
|
|
def getAllVarNames(self):
|
|
|
"""获取所有去重后的变量名列表"""
|
|
|
sql = f'SELECT DISTINCT("varName") FROM "{self.table}"'
|
|
|
try:
|
|
|
df = self.client.query(sql, mode="pandas")
|
|
|
import pandas as pd
|
|
|
if isinstance(df, pd.DataFrame) and 'varName' in df.columns:
|
|
|
return df['varName'].tolist()
|
|
|
else:
|
|
|
return []
|
|
|
except Exception as e:
|
|
|
print(f"获取变量名失败: {e}")
|
|
|
# 如果是Parquet文件缺失错误,尝试清理损坏的数据
|
|
|
if "parquet" in str(e).lower() and "not found" in str(e).lower():
|
|
|
print("检测到Parquet文件缺失,可能需要清理InfluxDB数据")
|
|
|
print("建议解决方案:")
|
|
|
print("1. 重启InfluxDB服务")
|
|
|
print("2. 检查磁盘空间")
|
|
|
print("3. 清理损坏的数据文件")
|
|
|
print("4. 重建数据库索引")
|
|
|
return []
|
|
|
|
|
|
@classmethod
|
|
|
def deleteTable(cls, table):
|
|
|
token = cls.getAPIToken()
|
|
|
host = 'http://localhost:8181'
|
|
|
import datetime
|
|
|
# 获取当前UTC时间+1分钟,去除微秒,Z结尾
|
|
|
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc, microsecond=0)
|
|
|
hardDeleteAt = (now + datetime.timedelta(minutes=1)).isoformat().replace('+00:00', 'Z')
|
|
|
url = f"{host.rstrip('/')}/api/v3/configure/table?db={'dcs'}&table={table}&hard_delete_at={hardDeleteAt}"
|
|
|
headers = {
|
|
|
'Authorization': f'Bearer {token}'
|
|
|
}
|
|
|
response = requests.delete(url, headers=headers)
|
|
|
print(hardDeleteAt)
|
|
|
if response.status_code != 200:
|
|
|
print(f"删除失败: {response.status_code} {response.text}")
|
|
|
else:
|
|
|
print(f"已硬删除表 {table} 的所有历史数据。")
|
|
|
|
|
|
def stopWriteThread(self):
|
|
|
self._stopWriteThread.set()
|
|
|
self.writeThread.join(timeout=1)
|
|
|
|
|
|
def close(self):
|
|
|
self.stopWriteThread()
|
|
|
self.client.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
db = HistoryDBManage(
|
|
|
database="dcs",
|
|
|
table="p1",
|
|
|
host="http://localhost:8181",
|
|
|
token="apiv3_ynlNTgq_OX164srSzjYXetWZJGOpgokFJbp_JaToWYlzwIPAZboPxKt4ss6vD1_4jj90QOIDnRDodQSJ66m3_g",
|
|
|
org="dcs"
|
|
|
)
|
|
|
data, times = db.queryVarHistory("有源/无源4-20mA输入通道1")
|
|
|
print(data, times)
|
|
|
db.close()
|
|
|
|
|
|
|
|
|
|