You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
5.7 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import time
from datetime import datetime, timezone, timedelta
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, write_client_options
import dateutil.parser
from dateutil import tz
import pandas as pd
import requests
import queue
import threading
def parseDate(date_string: str):
# 默认UTC时间转中国时间
return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT-8'))
class HistoryDBManage:
def __init__(self, database='dcs', table='history', host="http://localhost:8181", token="", org="dcs"):
token = self.getAPIToken() if not token else token
self.database = database
self.table = table
self.host = host
self.token = token
self.org = org
# 写入回调
def onSuccess(conf, data, exception=None):
now_ms = datetime.now(timezone(timedelta(hours=8)))
# 统一转为字符串
if isinstance(data, bytes):
data = data.decode(errors="ignore")
elif not isinstance(data, str):
data = str(data)
for line in data.split('\n'):
# print(line)
# m = re.search(r'enqueue_time_ms=([0-9]+)', line)
m = line.split(' ')[-1][:-3]
if m:
enqueue_time_ms = int(m)
# 转为datetime对象
# print(enqueue_time_ms / 1000)
enqueue_time_dt = datetime.fromtimestamp(enqueue_time_ms /1000000, tz=timezone(timedelta(hours=8)))
# delay = now_ms - enqueue_time_ms
print(f"写入延迟: {1} ms (enqueue_time={enqueue_time_dt}, now={now_ms})")
def onError(conf, data, exception=None):
print("InfluxDB写入失败")
def onRetry(conf, data, exception=None):
print("InfluxDB写入重试")
# 官方推荐批量写入配置
self.client = InfluxDBClient3(
host=host,
token=token,
org=org,
database=database,
write_client_options=write_client_options(
write_options=WriteOptions(batch_size=5000, flush_interval=2000), # flush_interval单位ms
# success_callback=onSuccess,
error_callback=onError,
retry_callback=onRetry
)
)
self.writeQueue = queue.Queue()
self._stopWriteThread = threading.Event()
self.writeThread = threading.Thread(target=self._writeWorker, daemon=True)
self.writeThread.start()
@classmethod
def getAPIToken(cls):
try:
with open('Static/InfluxDB.api', 'r', encoding='utf-8') as f:
token = f.read()
except Exception as e:
print(f"读取token文件失败: {e}")
token = ""
return token
def createDatabaseIfNotExists(self):
try:
sql = f'SELECT 1 FROM "{self.table}" LIMIT 1'
self.client.query(sql)
except Exception:
pass
def writeVarValue(self, varName, value):
# 入队时记录当前时间
enqueue_time = datetime.now(timezone(timedelta(hours=8)))
# print(enqueue_time)
self.writeQueue.put((varName, value, enqueue_time))
def _writeWorker(self):
while not self._stopWriteThread.is_set():
try:
varName, value, enqueue_time = self.writeQueue.get()
# print(self.writeQueue.qsize())
except queue.Empty:
# print(1111)
continue
# 用Point对象构建数据点时间用入队时间
point = Point(self.table).tag("varName", varName).field("value", float(value)).time(enqueue_time)
self.client.write(point)
def queryVarHistory(self, varName, startTime=None, endTime=None, limit=1000):
where = f'"varName" = \'{varName}\''
if startTime:
where += f" AND time >= '{startTime}'"
if endTime:
where += f" AND time <= '{endTime}'"
sql = f'SELECT time, value FROM "{self.table}" WHERE {where} ORDER BY time LIMIT {limit}'
try:
df = self.client.query(sql, mode="pandas")
import pandas as pd
if isinstance(df, pd.DataFrame):
data = df["value"].tolist() if "value" in df.columns else []
timeList = df["time"].tolist() if "time" in df.columns else []
else:
data, timeList = [], []
except Exception as e:
print(f"查询结果处理失败: {e}")
data, timeList = [], []
return data, timeList
@classmethod
def deleteTable(cls, table):
token = cls.getAPIToken()
host = 'http://localhost:8181'
url = f"{host.rstrip('/')}/api/v3/configure/table?db={'dcs'}&table={table}"
headers = {
'Authorization': f'Bearer {token}'
}
response = requests.delete(url, headers=headers)
if response.status_code != 200:
print(f"删除失败: {response.status_code} {response.text}")
else:
print(f"已删除表 {table} 的所有历史数据。")
def stopWriteThread(self):
self._stopWriteThread.set()
self.writeThread.join(timeout=1)
def close(self):
self.stopWriteThread()
self.client.close()
if __name__ == '__main__':
db = HistoryDBManage(
database="dcs",
table="p1",
host="http://localhost:8181",
token="apiv3_ynlNTgq_OX164srSzjYXetWZJGOpgokFJbp_JaToWYlzwIPAZboPxKt4ss6vD1_4jj90QOIDnRDodQSJ66m3_g",
org="dcs"
)
data, times = db.queryVarHistory("有源/无源4-20mA输入通道1")
print(data, times)
db.close()