You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
7.8 KiB
Python

2 years ago
# coding=utf-8
from time import time, sleep, strftime
import re
from utils import core
import random, json
from procedure.manage_procedure.models import InitProcedure
# init操作函数
def init_operation(runconfig, section_big_sort):
runconfig.run_text, runconfig.run_sort, runconfig.run_result = "pass", section_big_sort[1]["sort"], True
tmp_number = u'%s-%s-%s-%s' % (
runconfig.run_usecase_number, runconfig.run_operation_section, runconfig.run_big_sort_index, runconfig.run_sort)
if not runconfig.continue_run:
initProcedure = InitProcedure.get(InitProcedure.id == 1)
for init_formula in json.loads(initProcedure.operation):
set_var = init_formula[0].split("=")[0]
set_value = init_formula[0].split("=")[1]
set_location = init_formula[1]
# TODO core.Client.IoController.set_value @shurk
# io_controller.set_var(set_var, set_value, address=set_location)
core.Client.CurrentProcedureEditor.SetOperationsNumberValue(
tmp_number, runconfig.run_text, runconfig.run_result)
if runconfig.is_loop_run:
if not runconfig.run_result:
runconfig.save_loop_result()
else:
runconfig.save_run_result()
runconfig.continue_run = False
return runconfig.run_result
# write操作函数
# 如果包含startdelay则再当前的self.start_delay_dict里面添加一个delay的时间{"startdelay1":time()}
def write_operation(runconfig, section_big_sort):
if section_big_sort[1]["startdelay"] != "":
runconfig.start_delay_dict[section_big_sort[1]["startdelay"]] = time()
if section_big_sort[1]["delay"] != "":
check_time = section_big_sort[1]["delay"].split(",")
# 如果当前操作的delay时间大于上一步的操作时间
while (time() - runconfig.start_delay_dict[check_time[0]]) < float(check_time[1]):
read_loop(runconfig)
sleep(runconfig.sleep_time)
# 调用远程的设置值操作
runconfig.run_text = section_big_sort[1]["formula"].replace(",", "\n")
runconfig.run_sort = section_big_sort[1]["sort"]
if not runconfig.continue_run:
runconfig.run_result = remote_set(runconfig, section_big_sort[1]["location"])
sleep(runconfig.sleep_time)
tmp_number = u'%s-%s-%s-%s' % (
runconfig.run_usecase_number, runconfig.run_operation_section, runconfig.run_big_sort_index, runconfig.run_sort)
if not runconfig.continue_run:
core.Client.CurrentProcedureEditor.SetOperationsNumberValue(
tmp_number, runconfig.run_text, runconfig.run_result)
if runconfig.is_loop_run:
if not runconfig.run_result:
runconfig.save_loop_result()
else:
runconfig.save_run_result()
runconfig.continue_run = False
return runconfig.run_result
# 调用远程的设置方法
def remote_set(runconfig, location):
for item in runconfig.run_text.split(","):
set_var = item.split("=")[0]
set_value = item.split("=")[1]
# TODO core.Client.IoController.set_value @shurk
# if not self.io_controller.set_var(set_var, set_value, address=location):
# return False
return random.choice([True, False])
# read操作函数需要开始时间和read操作集合
def read_operation(runconfig, section_big_sort):
global READ_RESULT
READ_RESULT = {}
start_read = time() # read循环操作的时间
for k in range(1, len(section_big_sort)):
if section_big_sort[k]["delay"] != "":
try:
start_read = runconfig.start_delay_dict[section_big_sort[k]["delay"].split(",")[0]]
except:
start_read = time()
break
delay_time_list = []
for j in range(1, len(section_big_sort)):
delay_time_list.append(int(get_delay_time(section_big_sort[j])))
max_delay_time = max(delay_time_list)
# 循环检查read参数
while (time() - start_read) <= (max_delay_time + runconfig.run_interval):
runconfig.current_loop_list = get_loop_section(section_big_sort, start_read)
# 调用loop操作
read_loop(runconfig)
sleep(runconfig.sleep_time)
runconfig.continue_run = False
for item in READ_RESULT.values():
if not item:
return False
return True
# 循环操作
def read_loop(runconfig):
"""
遍历循环列表中的检查操作["location,formula1,formula2",""]这种格式
@return:
"""
for item in runconfig.current_loop_list:
check_formula = item["formula"].split(",")
location = check_formula[0]
runconfig.run_sort = item["sort"]
# 对一个检查中的多个公式进行循环设置
if not runconfig.continue_run:
remote_read(runconfig, check_formula, location)
if runconfig.is_loop_run:
if not runconfig.run_result:
runconfig.save_loop_result()
else:
runconfig.save_run_result()
# 调用远程的取值方法
def remote_read(runconfig, check_formula, location):
runconfig.run_text = ""
for i in range(1, len(check_formula)):
var_name = check_formula[i].split("=")[0]
expected_values = float(check_formula[i].split("=")[1])
# TODO core.Client.IoController.set_value @shurk
# tuple_result = runconfig.io_controller.check_var(var_name, expected_values, address=location)
tuple_result = (random.choice([True, False]), "aa")
runconfig.run_text += var_name + "=" + str(tuple_result[1]) + "\n"
runconfig.run_result = tuple_result[0]
global READ_RESULT
READ_RESULT[runconfig.run_sort] = runconfig.run_result
tmp_number = u'%s-%s-%s-%s' % (
runconfig.run_usecase_number, runconfig.run_operation_section, runconfig.run_big_sort_index,
runconfig.run_sort)
core.Client.CurrentProcedureEditor.SetOperationsNumberValue(
tmp_number, runconfig.run_text, runconfig.run_result)
# 获取循环数组
def get_loop_section(section, start_time):
loop_section = []
v_loop_section = []
# 先组合为一个循环执行的数组,从索引1开始第0个是read标识
for i in range(1, len(section)):
if (time() - start_time) > int(get_delay_time(section[i])):
loop_section.append(section[i])
# 将delay和前面有重复的操作延时改为-1
for j in range(0, len(loop_section)):
if int(get_delay_time(loop_section[j])) > 0:
for k in range(0, j):
# 比较检查的公式位置和变量是否相同如果相同则将delay_time设为-1
if compare_formula(loop_section[k]["formula"], loop_section[j]["formula"]):
loop_section[k]["delay"] = "-1"
# 去掉delay_time 小于0的操作
for m in range(0, len(loop_section)):
if int(get_delay_time(loop_section[m])) >= 0:
v_loop_section.append(loop_section[m])
return v_loop_section
# 比较befor_formula是否包含在after_formula中
def compare_formula(befor_formula, after_formula):
tmp_formual = after_formula.split(",")
compare_list = []
compare_list.append(tmp_formual[0])
for i in range(1, len(tmp_formual)):
compare_list.append(tmp_formual[i].split("=")[0])
# 若有一个公式不包含在after_formula中则返回False
for item in befor_formula.split(","):
if re.match(r'([0-9a-zA-Z\_]+)', item).group() not in compare_list:
return False
return True
# 返回当前操作中delay的时间
def get_delay_time(section_big_sort):
if section_big_sort["delay"] != "" and "," not in section_big_sort["delay"]:
return int(section_big_sort["delay"])
elif section_big_sort["delay"] != "":
return section_big_sort["delay"].split(",")[1]
else:
return 0