You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
7.8 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# coding=utf-8
from time import time, sleep, strftime
import re
from utils import core
import random, json
from procedure.manage_procedure.models import InitProcedure
# init操作函数
def init_operation(runconfig, section_big_sort):
runconfig.run_text, runconfig.run_sort, runconfig.run_result = "pass", section_big_sort[1]["sort"], True
tmp_number = u'%s-%s-%s-%s' % (
runconfig.run_usecase_number, runconfig.run_operation_section, runconfig.run_big_sort_index, runconfig.run_sort)
if not runconfig.continue_run:
initProcedure = InitProcedure.get(InitProcedure.id == 1)
for init_formula in json.loads(initProcedure.operation):
set_var = init_formula[0].split("=")[0]
set_value = init_formula[0].split("=")[1]
set_location = init_formula[1]
# TODO core.Client.IoController.set_value @shurk
# io_controller.set_var(set_var, set_value, address=set_location)
core.Client.CurrentProcedureEditor.SetOperationsNumberValue(
tmp_number, runconfig.run_text, runconfig.run_result)
if runconfig.is_loop_run:
if not runconfig.run_result:
runconfig.save_loop_result()
else:
runconfig.save_run_result()
runconfig.continue_run = False
return runconfig.run_result
# write操作函数
# 如果包含startdelay则再当前的self.start_delay_dict里面添加一个delay的时间{"startdelay1":time()}
def write_operation(runconfig, section_big_sort):
if section_big_sort[1]["startdelay"] != "":
runconfig.start_delay_dict[section_big_sort[1]["startdelay"]] = time()
if section_big_sort[1]["delay"] != "":
check_time = section_big_sort[1]["delay"].split(",")
# 如果当前操作的delay时间大于上一步的操作时间
while (time() - runconfig.start_delay_dict[check_time[0]]) < float(check_time[1]):
read_loop(runconfig)
sleep(runconfig.sleep_time)
# 调用远程的设置值操作
runconfig.run_text = section_big_sort[1]["formula"].replace(",", "\n")
runconfig.run_sort = section_big_sort[1]["sort"]
if not runconfig.continue_run:
runconfig.run_result = remote_set(runconfig, section_big_sort[1]["location"])
sleep(runconfig.sleep_time)
tmp_number = u'%s-%s-%s-%s' % (
runconfig.run_usecase_number, runconfig.run_operation_section, runconfig.run_big_sort_index, runconfig.run_sort)
if not runconfig.continue_run:
core.Client.CurrentProcedureEditor.SetOperationsNumberValue(
tmp_number, runconfig.run_text, runconfig.run_result)
if runconfig.is_loop_run:
if not runconfig.run_result:
runconfig.save_loop_result()
else:
runconfig.save_run_result()
runconfig.continue_run = False
return runconfig.run_result
# 调用远程的设置方法
def remote_set(runconfig, location):
for item in runconfig.run_text.split(","):
set_var = item.split("=")[0]
set_value = item.split("=")[1]
# TODO core.Client.IoController.set_value @shurk
# if not self.io_controller.set_var(set_var, set_value, address=location):
# return False
return random.choice([True, False])
# read操作函数需要开始时间和read操作集合
def read_operation(runconfig, section_big_sort):
global READ_RESULT
READ_RESULT = {}
start_read = time() # read循环操作的时间
for k in range(1, len(section_big_sort)):
if section_big_sort[k]["delay"] != "":
try:
start_read = runconfig.start_delay_dict[section_big_sort[k]["delay"].split(",")[0]]
except:
start_read = time()
break
delay_time_list = []
for j in range(1, len(section_big_sort)):
delay_time_list.append(int(get_delay_time(section_big_sort[j])))
max_delay_time = max(delay_time_list)
# 循环检查read参数
while (time() - start_read) <= (max_delay_time + runconfig.run_interval):
runconfig.current_loop_list = get_loop_section(section_big_sort, start_read)
# 调用loop操作
read_loop(runconfig)
sleep(runconfig.sleep_time)
runconfig.continue_run = False
for item in READ_RESULT.values():
if not item:
return False
return True
# 循环操作
def read_loop(runconfig):
"""
遍历循环列表中的检查操作["location,formula1,formula2",""]这种格式
@return:
"""
for item in runconfig.current_loop_list:
check_formula = item["formula"].split(",")
location = check_formula[0]
runconfig.run_sort = item["sort"]
# 对一个检查中的多个公式进行循环设置
if not runconfig.continue_run:
remote_read(runconfig, check_formula, location)
if runconfig.is_loop_run:
if not runconfig.run_result:
runconfig.save_loop_result()
else:
runconfig.save_run_result()
# 调用远程的取值方法
def remote_read(runconfig, check_formula, location):
runconfig.run_text = ""
for i in range(1, len(check_formula)):
var_name = check_formula[i].split("=")[0]
expected_values = float(check_formula[i].split("=")[1])
# TODO core.Client.IoController.set_value @shurk
# tuple_result = runconfig.io_controller.check_var(var_name, expected_values, address=location)
tuple_result = (random.choice([True, False]), "aa")
runconfig.run_text += var_name + "=" + str(tuple_result[1]) + "\n"
runconfig.run_result = tuple_result[0]
global READ_RESULT
READ_RESULT[runconfig.run_sort] = runconfig.run_result
tmp_number = u'%s-%s-%s-%s' % (
runconfig.run_usecase_number, runconfig.run_operation_section, runconfig.run_big_sort_index,
runconfig.run_sort)
core.Client.CurrentProcedureEditor.SetOperationsNumberValue(
tmp_number, runconfig.run_text, runconfig.run_result)
# 获取循环数组
def get_loop_section(section, start_time):
loop_section = []
v_loop_section = []
# 先组合为一个循环执行的数组,从索引1开始第0个是read标识
for i in range(1, len(section)):
if (time() - start_time) > int(get_delay_time(section[i])):
loop_section.append(section[i])
# 将delay和前面有重复的操作延时改为-1
for j in range(0, len(loop_section)):
if int(get_delay_time(loop_section[j])) > 0:
for k in range(0, j):
# 比较检查的公式位置和变量是否相同如果相同则将delay_time设为-1
if compare_formula(loop_section[k]["formula"], loop_section[j]["formula"]):
loop_section[k]["delay"] = "-1"
# 去掉delay_time 小于0的操作
for m in range(0, len(loop_section)):
if int(get_delay_time(loop_section[m])) >= 0:
v_loop_section.append(loop_section[m])
return v_loop_section
# 比较befor_formula是否包含在after_formula中
def compare_formula(befor_formula, after_formula):
tmp_formual = after_formula.split(",")
compare_list = []
compare_list.append(tmp_formual[0])
for i in range(1, len(tmp_formual)):
compare_list.append(tmp_formual[i].split("=")[0])
# 若有一个公式不包含在after_formula中则返回False
for item in befor_formula.split(","):
if re.match(r'([0-9a-zA-Z\_]+)', item).group() not in compare_list:
return False
return True
# 返回当前操作中delay的时间
def get_delay_time(section_big_sort):
if section_big_sort["delay"] != "" and "," not in section_big_sort["delay"]:
return int(section_big_sort["delay"])
elif section_big_sort["delay"] != "":
return section_big_sort["delay"].split(",")[1]
else:
return 0