You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
122 lines
3.9 KiB
Python
122 lines
3.9 KiB
Python
import os
|
|
import time
|
|
from datetime import datetime
|
|
|
|
from influxdb_client import InfluxDBClient, Point, WritePrecision, BucketsApi
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
from influxdb_client.client.util import date_utils
|
|
from influxdb_client.client.util.date_utils import DateHelper
|
|
import dateutil.parser
|
|
from dateutil import tz
|
|
|
|
|
|
|
|
|
|
def parseDate(date_string: str):
|
|
# 默认UTC时间转中国时间
|
|
return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT-8'))
|
|
|
|
|
|
class HistoryDBManage():
|
|
org = "DCS"
|
|
# bucket = "history"
|
|
url = "http://localhost:8086"
|
|
|
|
def __init__(self, bucket, mem = None, isCelery = True):
|
|
self.getToken(isCelery)
|
|
self.client = InfluxDBClient(url = self.url, token = self.token, org = self.org)
|
|
self.writeApi = self.client.write_api(write_options = SYNCHRONOUS)
|
|
self.deleteApi = self.client.delete_api()
|
|
self.queryApi = self.client.query_api()
|
|
self.bucketApi = self.client.buckets_api()
|
|
self.mem = mem
|
|
self.bucket = bucket
|
|
date_utils.date_helper = DateHelper()
|
|
date_utils.date_helper.parse_date = parseDate
|
|
# if self.mem:
|
|
self.startTime = 1680534
|
|
|
|
def __del__(self):
|
|
self.client.close()
|
|
|
|
def getToken(self, isCelery):
|
|
if not isCelery:
|
|
with open('Static/InfluxDB.api', 'r', encoding='utf-8') as f:
|
|
self.token = f.read()
|
|
else:
|
|
with open('../../../Static/InfluxDB.api', 'r', encoding='utf-8') as f:
|
|
self.token = f.read()
|
|
|
|
|
|
|
|
def writeFun(self, varName, value):
|
|
try:
|
|
value = int(value)
|
|
point = Point(self.mem).tag("varName", varName).field("value", value).time(datetime.utcnow(), WritePrecision.NS)
|
|
self.writeApi.write(self.bucket, self.org, point)
|
|
except Exception as e:
|
|
print(e)
|
|
BucketsApi(self.client).create_bucket(bucket_name = self.bucket, org = self.org)
|
|
|
|
def queryFun(self, varName):
|
|
# startTime = time.mktime(time.strptime("%Y-%m-%d %H:%M:%S", mem))
|
|
data = []
|
|
timeList = []
|
|
query = ' from(bucket:"{}")\
|
|
|> range(start: {})\
|
|
|> filter(fn:(r) => r._measurement == "{}")\
|
|
|> filter(fn:(r) => r.varName == "{}")\
|
|
|> filter(fn:(r) => r._field == "value" )'.format(self.bucket, self.startTime, self.mem, varName)
|
|
results = self.queryApi.query(query, org = self.org)
|
|
for result in results:
|
|
for record in result.records:
|
|
data.append(record['_value'])
|
|
timeList.append(record['_time'].strftime("%H:%M:%S:%f")[:-3])
|
|
return data, timeList
|
|
|
|
def deleteMem(self):
|
|
self.deleteApi.delete(start = '1970-01-01T00:00:00Z', stop = '2099-01-01T00:00:00Z', predicate = '_measurement={}'.format(self.mem), bucket = self.bucket, org = self.org, )
|
|
|
|
def deleteFun(self, varName):
|
|
self.deleteApi.delete(start = '1970-01-01T00:00:00Z', stop = '2099-01-01T00:00:00Z', predicate = '_measurement={} AND varName={}'.format(self.mem, varName), bucket = self.bucket, org = self.org, )
|
|
|
|
def deleteBucket(self):
|
|
bucket = self.bucketApi.find_bucket_by_name(self.bucket)
|
|
self.bucketApi.delete_bucket(bucket)
|
|
|
|
def queryMem(self):
|
|
query = 'from(bucket: "{}") |> range(start: 0) |> keys(column: " _measurement")'.format(self.bucket)
|
|
result = self.queryApi.query(query)
|
|
return result
|
|
|
|
def queryVarName(self):
|
|
tagSet = set()
|
|
query = ' from(bucket:"{}")\
|
|
|> range(start: {})\
|
|
|> filter(fn:(r) => r._measurement == "{}")\
|
|
|> filter(fn:(r) => r._field == "value")'.format(self.bucket, self.startTime, self.mem)
|
|
try:
|
|
results = self.queryApi.query(query, org = self.org)
|
|
except Exception as e:
|
|
BucketsApi(self.client).create_bucket(bucket_name = self.bucket, org = self.org)
|
|
return tagSet
|
|
for result in results:
|
|
for record in result.records:
|
|
tagSet.add(record['varName'])
|
|
return tagSet
|
|
|
|
if __name__ == '__main__':
|
|
t = time.time()
|
|
# t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
|
|
print(t)
|
|
h = HistoryDBManage(mem = t, bucket = 'history')
|
|
h.writeFun('test1', 1)
|
|
h.writeFun('test1', 2)
|
|
h.writeFun('test1', 3)
|
|
h.queryFun('test1')
|
|
h.queryMem()
|
|
h.deleteMem()
|
|
# h.deleteBucket()
|
|
|
|
|
|
|