You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

185 lines
6.7 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import time
from datetime import datetime, timezone, timedelta
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, write_client_options
import dateutil.parser
from dateutil import tz
import pandas as pd
import requests
import queue
import threading
def parseDate(date_string: str):
# 默认UTC时间转中国时间
return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT-8'))
class HistoryDBManage:
def __init__(self, database='dcs', table='history', host="http://localhost:8181", token="", org="dcs"):
token = self.getAPIToken() if not token else token
self.database = database
self.table = table
self.host = host
self.token = token
self.org = org
# 写入回调
def onSuccess(conf, data, exception=None):
now_ms = datetime.now(timezone(timedelta(hours=8)))
# 统一转为字符串
if isinstance(data, bytes):
data = data.decode(errors="ignore")
elif not isinstance(data, str):
data = str(data)
for line in data.split('\n'):
# print(line)
# m = re.search(r'enqueue_time_ms=([0-9]+)', line)
m = line.split(' ')[-1][:-3]
if m:
enqueue_time_ms = int(m)
# 转为datetime对象
# print(enqueue_time_ms / 1000)
enqueue_time_dt = datetime.fromtimestamp(enqueue_time_ms /1000000, tz=timezone(timedelta(hours=8)))
# delay = now_ms - enqueue_time_ms
print(f"写入延迟: {1} ms (enqueue_time={enqueue_time_dt}, now={now_ms})")
def onError(conf, data, exception=None):
print("InfluxDB写入失败")
def onRetry(conf, data, exception=None):
print("InfluxDB写入重试")
# 官方推荐批量写入配置
self.client = InfluxDBClient3(
host=host,
token=token,
org=org,
database=database,
write_client_options=write_client_options(
write_options=WriteOptions(batch_size=5000, flush_interval=2000), # flush_interval单位ms
# success_callback=onSuccess,
error_callback=onError,
retry_callback=onRetry
)
)
self.writeQueue = queue.Queue()
self._stopWriteThread = threading.Event()
self.writeThread = threading.Thread(target=self._writeWorker, daemon=True)
self.writeThread.start()
@classmethod
def getAPIToken(cls):
try:
with open('Static/InfluxDB.api', 'r', encoding='utf-8') as f:
token = f.read()
except Exception as e:
print(f"读取token文件失败: {e}")
token = ""
return token
def createDatabaseIfNotExists(self):
try:
sql = f'SELECT 1 FROM "{self.table}" LIMIT 1'
self.client.query(sql)
except Exception:
pass
def writeVarValue(self, varName, value):
# 入队时记录中国时区时间
enqueue_time = datetime.now()
self.writeQueue.put((varName, value, enqueue_time))
def _writeWorker(self):
while not self._stopWriteThread.is_set():
try:
varName, value, enqueue_time = self.writeQueue.get()
# print(self.writeQueue.qsize())
except queue.Empty:
# print(1111)
continue
# 用Point对象构建数据点时间用入队时间
point = Point(self.table).tag("varName", varName).field("value", float(value)).time(enqueue_time)
self.client.write(point)
def queryVarHistory(self, varName, startTime=None, endTime=None):
where = f'"varName" = \'{varName}\''
if startTime:
where += f" AND time >= '{startTime}'"
if endTime:
where += f" AND time <= '{endTime}'"
# 关键用tz()函数将time字段转为中国时区
sql = f'SELECT tz(time, \'Asia/Shanghai\') AS time, value FROM "{self.table}" WHERE {where} ORDER BY time ASC LIMIT 100000'
# print(f"执行SQL查询: {sql}")
try:
df = self.client.query(sql, mode="pandas")
import pandas as pd
if isinstance(df, pd.DataFrame):
data = df["value"].tolist() if "value" in df.columns else []
timeList = df["time"].tolist() if "time" in df.columns else []
print(f"查询到 {len(data)} 个数据点")
else:
# print(f"查询结果不是DataFrame: {type(df)}")
data, timeList = [], []
except Exception as e:
print(f"查询结果处理失败: {e}")
data, timeList = [], []
return data, timeList
def getAllVarNames(self):
"""获取所有去重后的变量名列表"""
sql = f'SELECT DISTINCT("varName") FROM "{self.table}"'
try:
df = self.client.query(sql, mode="pandas")
import pandas as pd
if isinstance(df, pd.DataFrame) and 'varName' in df.columns:
return df['varName'].tolist()
else:
return []
except Exception as e:
print(f"获取变量名失败: {e}")
return []
@classmethod
def deleteTable(cls, table):
token = cls.getAPIToken()
host = 'http://localhost:8181'
import datetime
# 获取当前UTC时间+1分钟去除微秒Z结尾
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc, microsecond=0)
hardDeleteAt = (now + datetime.timedelta(minutes=1)).isoformat().replace('+00:00', 'Z')
url = f"{host.rstrip('/')}/api/v3/configure/table?db={'dcs'}&table={table}&hard_delete_at={hardDeleteAt}"
headers = {
'Authorization': f'Bearer {token}'
}
response = requests.delete(url, headers=headers)
print(hardDeleteAt)
if response.status_code != 200:
print(f"删除失败: {response.status_code} {response.text}")
else:
print(f"已硬删除表 {table} 的所有历史数据。")
def stopWriteThread(self):
self._stopWriteThread.set()
self.writeThread.join(timeout=1)
def close(self):
self.stopWriteThread()
self.client.close()
if __name__ == '__main__':
db = HistoryDBManage(
database="dcs",
table="p1",
host="http://localhost:8181",
token="apiv3_ynlNTgq_OX164srSzjYXetWZJGOpgokFJbp_JaToWYlzwIPAZboPxKt4ss6vD1_4jj90QOIDnRDodQSJ66m3_g",
org="dcs"
)
data, times = db.queryVarHistory("有源/无源4-20mA输入通道1")
print(data, times)
db.close()