You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
	
	
		
			122 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
		
		
			
		
	
	
			122 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
| 
											7 months ago
										 | import os | ||
|  | import time | ||
|  | from datetime import datetime | ||
|  | 
 | ||
|  | from influxdb_client import InfluxDBClient, Point, WritePrecision, BucketsApi | ||
|  | from influxdb_client.client.write_api import SYNCHRONOUS | ||
|  | from influxdb_client.client.util import date_utils | ||
|  | from influxdb_client.client.util.date_utils import DateHelper | ||
|  | import dateutil.parser | ||
|  | from dateutil import tz | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | def parseDate(date_string: str): | ||
|  | 	# 默认UTC时间转中国时间 | ||
|  | 	return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT-8')) | ||
|  | 
 | ||
|  | 
 | ||
|  | class HistoryDBManage(): | ||
|  | 	org = "DCS" | ||
|  | 	# bucket = "history" | ||
|  | 	url = "http://localhost:8086" | ||
|  | 
 | ||
|  | 	def __init__(self, bucket, mem = None, isCelery = True): | ||
|  | 		self.getToken(isCelery) | ||
|  | 		self.client = InfluxDBClient(url = self.url, token = self.token, org = self.org) | ||
|  | 		self.writeApi = self.client.write_api(write_options = SYNCHRONOUS) | ||
|  | 		self.deleteApi = self.client.delete_api() | ||
|  | 		self.queryApi = self.client.query_api() | ||
|  | 		self.bucketApi = self.client.buckets_api() | ||
|  | 		self.mem = mem | ||
|  | 		self.bucket = bucket | ||
|  | 		date_utils.date_helper = DateHelper() | ||
|  | 		date_utils.date_helper.parse_date = parseDate | ||
|  | 		# if self.mem: | ||
|  | 		self.startTime = 1680534 | ||
|  | 
 | ||
|  | 	def __del__(self): | ||
|  | 		self.client.close() | ||
|  | 
 | ||
|  | 	def getToken(self, isCelery): | ||
|  | 		if not isCelery: | ||
|  | 			with open('Static/InfluxDB.api', 'r', encoding='utf-8') as f: | ||
|  | 				self.token = f.read() | ||
|  | 		else: | ||
|  | 			with open('../../../Static/InfluxDB.api', 'r', encoding='utf-8') as f: | ||
|  | 				self.token = f.read() | ||
|  | 
 | ||
|  | 
 | ||
|  | 
 | ||
|  | 	def writeFun(self, varName, value): | ||
|  | 		try: | ||
|  | 			value = int(value) | ||
|  | 			point = Point(self.mem).tag("varName", varName).field("value", value).time(datetime.utcnow(), WritePrecision.NS) | ||
|  | 			self.writeApi.write(self.bucket, self.org, point) | ||
|  | 		except Exception as e: | ||
|  | 			print(e) | ||
|  | 			BucketsApi(self.client).create_bucket(bucket_name = self.bucket, org = self.org) | ||
|  | 
 | ||
|  | 	def queryFun(self, varName): | ||
|  | 		# startTime = time.mktime(time.strptime("%Y-%m-%d %H:%M:%S", mem)) | ||
|  | 		data = [] | ||
|  | 		timeList = [] | ||
|  | 		query = ' from(bucket:"{}")\
 | ||
|  | 					|> range(start: {})\ | ||
|  | 					|> filter(fn:(r) => r._measurement == "{}")\ | ||
|  | 					|> filter(fn:(r) => r.varName == "{}")\ | ||
|  | 					|> filter(fn:(r) => r._field == "value" )'.format(self.bucket, self.startTime, self.mem, varName) | ||
|  | 		results = self.queryApi.query(query, org = self.org) | ||
|  | 		for result in results:    | ||
|  | 			for record in result.records: | ||
|  | 				data.append(record['_value']) | ||
|  | 				timeList.append(record['_time'].strftime("%H:%M:%S:%f")[:-3]) | ||
|  | 		return data, timeList | ||
|  | 
 | ||
|  | 	def deleteMem(self): | ||
|  | 		self.deleteApi.delete(start = '1970-01-01T00:00:00Z', stop = '2099-01-01T00:00:00Z', predicate = '_measurement={}'.format(self.mem), bucket = self.bucket, org = self.org, ) | ||
|  | 
 | ||
|  | 	def deleteFun(self, varName): | ||
|  | 		self.deleteApi.delete(start = '1970-01-01T00:00:00Z', stop = '2099-01-01T00:00:00Z', predicate = '_measurement={} AND varName={}'.format(self.mem, varName), bucket = self.bucket, org = self.org, ) | ||
|  | 
 | ||
|  | 	def deleteBucket(self): | ||
|  | 		bucket = self.bucketApi.find_bucket_by_name(self.bucket) | ||
|  | 		self.bucketApi.delete_bucket(bucket) | ||
|  | 
 | ||
|  | 	def queryMem(self): | ||
|  | 		query = 'from(bucket: "{}") |> range(start: 0) |> keys(column: " _measurement")'.format(self.bucket) | ||
|  | 		result = self.queryApi.query(query) | ||
|  | 		return result | ||
|  | 
 | ||
|  | 	def queryVarName(self): | ||
|  | 		tagSet = set() | ||
|  | 		query = ' from(bucket:"{}")\
 | ||
|  | 					|> range(start: {})\ | ||
|  | 					|> filter(fn:(r) => r._measurement == "{}")\ | ||
|  | 					|> filter(fn:(r) => r._field == "value")'.format(self.bucket, self.startTime, self.mem) | ||
|  | 		try: | ||
|  | 			results = self.queryApi.query(query, org = self.org) | ||
|  | 		except Exception as e: | ||
|  | 			BucketsApi(self.client).create_bucket(bucket_name = self.bucket, org = self.org) | ||
|  | 			return tagSet | ||
|  | 		for result in results: | ||
|  | 			for record in result.records: | ||
|  | 				tagSet.add(record['varName']) | ||
|  | 		return tagSet | ||
|  | 
 | ||
|  | if __name__ == '__main__': | ||
|  | 	t = time.time() | ||
|  | 	# t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t)) | ||
|  | 	print(t) | ||
|  | 	h = HistoryDBManage(mem = t, bucket = 'history') | ||
|  | 	h.writeFun('test1', 1) | ||
|  | 	h.writeFun('test1', 2) | ||
|  | 	h.writeFun('test1', 3) | ||
|  | 	h.queryFun('test1') | ||
|  | 	h.queryMem() | ||
|  | 	h.deleteMem() | ||
|  | 	# h.deleteBucket() | ||
|  | 
 | ||
|  | 
 | ||
|  | 	 |