import os import time from datetime import datetime, timezone, timedelta from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, write_client_options import dateutil.parser from dateutil import tz import pandas as pd import requests import queue import threading def parseDate(date_string: str): # 默认UTC时间转中国时间 return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT-8')) class HistoryDBManage: def __init__(self, database='dcs', table='history', host="http://localhost:8181", token="", org="dcs"): token = self.getAPIToken() if not token else token self.database = database self.table = table self.host = host self.token = token self.org = org # 写入回调 def onSuccess(conf, data, exception=None): now_ms = datetime.now(timezone(timedelta(hours=8))) # 统一转为字符串 if isinstance(data, bytes): data = data.decode(errors="ignore") elif not isinstance(data, str): data = str(data) for line in data.split('\n'): # print(line) # m = re.search(r'enqueue_time_ms=([0-9]+)', line) m = line.split(' ')[-1][:-3] if m: enqueue_time_ms = int(m) # 转为datetime对象 # print(enqueue_time_ms / 1000) enqueue_time_dt = datetime.fromtimestamp(enqueue_time_ms /1000000, tz=timezone(timedelta(hours=8))) # delay = now_ms - enqueue_time_ms print(f"写入延迟: {1} ms (enqueue_time={enqueue_time_dt}, now={now_ms})") def onError(conf, data, exception=None): print("InfluxDB写入失败") def onRetry(conf, data, exception=None): print("InfluxDB写入重试") # 官方推荐批量写入配置 self.client = InfluxDBClient3( host=host, token=token, org=org, database=database, write_client_options=write_client_options( write_options=WriteOptions(batch_size=5000, flush_interval=2000), # flush_interval单位ms # success_callback=onSuccess, error_callback=onError, retry_callback=onRetry ) ) self.writeQueue = queue.Queue() self._stopWriteThread = threading.Event() self.writeThread = threading.Thread(target=self._writeWorker, daemon=True) self.writeThread.start() @classmethod def getAPIToken(cls): try: with open('Static/InfluxDB.api', 'r', encoding='utf-8') as f: token = f.read() except Exception as e: print(f"读取token文件失败: {e}") token = "" return token def createDatabaseIfNotExists(self): try: sql = f'SELECT 1 FROM "{self.table}" LIMIT 1' self.client.query(sql) except Exception: pass def writeVarValue(self, varName, value): # 入队时记录中国时区时间 enqueue_time = datetime.now() self.writeQueue.put((varName, value, enqueue_time)) def _writeWorker(self): while not self._stopWriteThread.is_set(): try: varName, value, enqueue_time = self.writeQueue.get() # print(self.writeQueue.qsize()) except queue.Empty: # print(1111) continue # 用Point对象构建数据点,时间用入队时间 point = Point(self.table).tag("varName", varName).field("value", float(value)).time(enqueue_time) self.client.write(point) def queryVarHistory(self, varName, startTime=None, endTime=None): where = f'"varName" = \'{varName}\'' if startTime: where += f" AND time >= '{startTime}'" if endTime: where += f" AND time <= '{endTime}'" # 关键:用tz()函数将time字段转为中国时区 sql = f'SELECT tz(time, \'Asia/Shanghai\') AS time, value FROM "{self.table}" WHERE {where} ORDER BY time ASC LIMIT 100000' # print(f"执行SQL查询: {sql}") try: df = self.client.query(sql, mode="pandas") import pandas as pd if isinstance(df, pd.DataFrame): data = df["value"].tolist() if "value" in df.columns else [] timeList = df["time"].tolist() if "time" in df.columns else [] print(f"查询到 {len(data)} 个数据点") else: # print(f"查询结果不是DataFrame: {type(df)}") data, timeList = [], [] except Exception as e: print(f"查询结果处理失败: {e}") data, timeList = [], [] return data, timeList def getAllVarNames(self): """获取所有去重后的变量名列表""" sql = f'SELECT DISTINCT("varName") FROM "{self.table}"' try: df = self.client.query(sql, mode="pandas") import pandas as pd if isinstance(df, pd.DataFrame) and 'varName' in df.columns: return df['varName'].tolist() else: return [] except Exception as e: print(f"获取变量名失败: {e}") return [] @classmethod def deleteTable(cls, table): token = cls.getAPIToken() host = 'http://localhost:8181' import datetime # 获取当前UTC时间+1分钟,去除微秒,Z结尾 now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc, microsecond=0) hardDeleteAt = (now + datetime.timedelta(minutes=1)).isoformat().replace('+00:00', 'Z') url = f"{host.rstrip('/')}/api/v3/configure/table?db={'dcs'}&table={table}&hard_delete_at={hardDeleteAt}" headers = { 'Authorization': f'Bearer {token}' } response = requests.delete(url, headers=headers) print(hardDeleteAt) if response.status_code != 200: print(f"删除失败: {response.status_code} {response.text}") else: print(f"已硬删除表 {table} 的所有历史数据。") def stopWriteThread(self): self._stopWriteThread.set() self.writeThread.join(timeout=1) def close(self): self.stopWriteThread() self.client.close() if __name__ == '__main__': db = HistoryDBManage( database="dcs", table="p1", host="http://localhost:8181", token="apiv3_ynlNTgq_OX164srSzjYXetWZJGOpgokFJbp_JaToWYlzwIPAZboPxKt4ss6vD1_4jj90QOIDnRDodQSJ66m3_g", org="dcs" ) data, times = db.queryVarHistory("有源/无源4-20mA输入通道1") print(data, times) db.close()