import os
import time
from datetime import datetime , timezone , timedelta
from influxdb_client_3 import InfluxDBClient3 , Point , WriteOptions , write_client_options
import dateutil . parser
from dateutil import tz
import pandas as pd
import requests
import queue
import threading
def parseDate ( date_string : str ) :
# 默认UTC时间转中国时间
return dateutil . parser . parse ( date_string ) . astimezone ( tz . gettz ( ' ETC/GMT-8 ' ) )
class HistoryDBManage :
def __init__ ( self , database = ' dcs ' , table = ' history ' , host = " http://localhost:8181 " , token = " " , org = " dcs " ) :
token = self . getAPIToken ( ) if not token else token
self . database = database
self . table = table
self . host = host
self . token = token
self . org = org
# 写入回调
def onSuccess ( conf , data , exception = None ) :
now_ms = datetime . now ( timezone ( timedelta ( hours = 8 ) ) )
# 统一转为字符串
if isinstance ( data , bytes ) :
data = data . decode ( errors = " ignore " )
elif not isinstance ( data , str ) :
data = str ( data )
for line in data . split ( ' \n ' ) :
# print(line)
# m = re.search(r'enqueue_time_ms=([0-9]+)', line)
m = line . split ( ' ' ) [ - 1 ] [ : - 3 ]
if m :
enqueue_time_ms = int ( m )
# 转为datetime对象
# print(enqueue_time_ms / 1000)
enqueue_time_dt = datetime . fromtimestamp ( enqueue_time_ms / 1000000 , tz = timezone ( timedelta ( hours = 8 ) ) )
# delay = now_ms - enqueue_time_ms
print ( f " 写入延迟: { 1 } ms (enqueue_time= { enqueue_time_dt } , now= { now_ms } ) " )
def onError ( conf , data , exception = None ) :
print ( " InfluxDB写入失败 " )
def onRetry ( conf , data , exception = None ) :
print ( " InfluxDB写入重试 " )
# 官方推荐批量写入配置
self . client = InfluxDBClient3 (
host = host ,
token = token ,
org = org ,
database = database ,
write_client_options = write_client_options (
write_options = WriteOptions ( batch_size = 5000 , flush_interval = 2000 ) , # flush_interval单位ms
# success_callback=onSuccess,
error_callback = onError ,
retry_callback = onRetry
)
)
self . writeQueue = queue . Queue ( )
self . _stopWriteThread = threading . Event ( )
self . writeThread = threading . Thread ( target = self . _writeWorker , daemon = True )
self . writeThread . start ( )
@classmethod
def getAPIToken ( cls ) :
try :
with open ( ' Static/InfluxDB.api ' , ' r ' , encoding = ' utf-8 ' ) as f :
token = f . read ( )
except Exception as e :
print ( f " 读取token文件失败: { e } " )
token = " "
return token
def createDatabaseIfNotExists ( self ) :
try :
sql = f ' SELECT 1 FROM " { self . table } " LIMIT 1 '
self . client . query ( sql )
except Exception :
pass
def writeVarValue ( self , varName , value ) :
# 入队时记录中国时区时间
enqueue_time = datetime . now ( )
self . writeQueue . put ( ( varName , value , enqueue_time ) )
def _writeWorker ( self ) :
while not self . _stopWriteThread . is_set ( ) :
try :
varName , value , enqueue_time = self . writeQueue . get ( )
# print(self.writeQueue.qsize())
except queue . Empty :
# print(1111)
continue
# 用Point对象构建数据点, 时间用入队时间
point = Point ( self . table ) . tag ( " varName " , varName ) . field ( " value " , float ( value ) ) . time ( enqueue_time )
self . client . write ( point )
def queryVarHistory ( self , varName , startTime = None , endTime = None ) :
where = f ' " varName " = \' { varName } \' '
if startTime :
where + = f " AND time >= ' { startTime } ' "
if endTime :
where + = f " AND time <= ' { endTime } ' "
# 关键: 用tz()函数将time字段转为中国时区
sql = f ' SELECT tz(time, \' Asia/Shanghai \' ) AS time, value FROM " { self . table } " WHERE { where } ORDER BY time ASC LIMIT 100000 '
# print(f"执行SQL查询: {sql}")
try :
df = self . client . query ( sql , mode = " pandas " )
import pandas as pd
if isinstance ( df , pd . DataFrame ) :
data = df [ " value " ] . tolist ( ) if " value " in df . columns else [ ]
timeList = df [ " time " ] . tolist ( ) if " time " in df . columns else [ ]
print ( f " 查询到 { len ( data ) } 个数据点 " )
else :
# print(f"查询结果不是DataFrame: {type(df)}")
data , timeList = [ ] , [ ]
except Exception as e :
print ( f " 查询历史数据失败: { e } " )
# 检查是否是Parquet文件问题
if " parquet " in str ( e ) . lower ( ) and " not found " in str ( e ) . lower ( ) :
print ( f " 变量 ' { varName } ' 的历史数据文件损坏或丢失 " )
print ( " 尝试查询最近的数据... " )
# 尝试查询最近24小时的数据
try :
recent_sql = f ' SELECT tz(time, \' Asia/Shanghai \' ) AS time, value FROM " { self . table } " WHERE { where } AND time >= now() - interval \' 24 hours \' ORDER BY time DESC LIMIT 1000 '
df = self . client . query ( recent_sql , mode = " pandas " )
if isinstance ( df , pd . DataFrame ) :
data = df [ " value " ] . tolist ( ) if " value " in df . columns else [ ]
timeList = df [ " time " ] . tolist ( ) if " time " in df . columns else [ ]
print ( f " 查询到最近 { len ( data ) } 个数据点 " )
else :
data , timeList = [ ] , [ ]
except Exception as e2 :
print ( f " 查询最近数据也失败: { e2 } " )
data , timeList = [ ] , [ ]
else :
data , timeList = [ ] , [ ]
return data , timeList
def getAllVarNames ( self ) :
""" 获取所有去重后的变量名列表 """
sql = f ' SELECT DISTINCT( " varName " ) FROM " { self . table } " '
try :
df = self . client . query ( sql , mode = " pandas " )
import pandas as pd
if isinstance ( df , pd . DataFrame ) and ' varName ' in df . columns :
return df [ ' varName ' ] . tolist ( )
else :
return [ ]
except Exception as e :
print ( f " 获取变量名失败: { e } " )
# 如果是Parquet文件缺失错误, 尝试清理损坏的数据
if " parquet " in str ( e ) . lower ( ) and " not found " in str ( e ) . lower ( ) :
print ( " 检测到Parquet文件缺失, 可能需要清理InfluxDB数据 " )
print ( " 建议解决方案: " )
print ( " 1. 重启InfluxDB服务 " )
print ( " 2. 检查磁盘空间 " )
print ( " 3. 清理损坏的数据文件 " )
print ( " 4. 重建数据库索引 " )
return [ ]
@classmethod
def deleteTable ( cls , table ) :
token = cls . getAPIToken ( )
host = ' http://localhost:8181 '
import datetime
# 获取当前UTC时间+1分钟, 去除微秒, Z结尾
now = datetime . datetime . utcnow ( ) . replace ( tzinfo = datetime . timezone . utc , microsecond = 0 )
hardDeleteAt = ( now + datetime . timedelta ( minutes = 1 ) ) . isoformat ( ) . replace ( ' +00:00 ' , ' Z ' )
url = f " { host . rstrip ( ' / ' ) } /api/v3/configure/table?db= { ' dcs ' } &table= { table } &hard_delete_at= { hardDeleteAt } "
headers = {
' Authorization ' : f ' Bearer { token } '
}
response = requests . delete ( url , headers = headers )
print ( hardDeleteAt )
if response . status_code != 200 :
print ( f " 删除失败: { response . status_code } { response . text } " )
else :
print ( f " 已硬删除表 { table } 的所有历史数据。 " )
def stopWriteThread ( self ) :
self . _stopWriteThread . set ( )
self . writeThread . join ( timeout = 1 )
def close ( self ) :
self . stopWriteThread ( )
self . client . close ( )
if __name__ == ' __main__ ' :
db = HistoryDBManage (
database = " dcs " ,
table = " p1 " ,
host = " http://localhost:8181 " ,
token = " apiv3_ynlNTgq_OX164srSzjYXetWZJGOpgokFJbp_JaToWYlzwIPAZboPxKt4ss6vD1_4jj90QOIDnRDodQSJ66m3_g " ,
org = " dcs "
)
data , times = db . queryVarHistory ( " 有源/无源4-20mA输入通道1 " )
print ( data , times )
db . close ( )