430 lines
23 KiB
Python
430 lines
23 KiB
Python
![]() |
# Ziran Gao 20240905
|
|||
|
# 智能二次运维项目中,自定义函数合集
|
|||
|
#-------------2024年10月11日--------------------------------------------------------------------------------------------------------
|
|||
|
#增加存入定值至定值数据库
|
|||
|
#重写从excel中读数据功能。这里面那个暂时没用。
|
|||
|
#-------------2024年9月13日--------------------------------------------------------------------------------------------------------
|
|||
|
#增加连接至变位数据库,数据库平台调试未完成
|
|||
|
#增加从excel中读取基础参数功能
|
|||
|
#-------------2024年9月5日--------------------------------------------------------------------------------------------------------
|
|||
|
# 增加连接至主数据库函数,注意只可连接至ISMS_BASE数据库。
|
|||
|
# encoding:utf-8
|
|||
|
|
|||
|
#------------------主数据库连接函数--------------------------------------------------------------------------------------------------
|
|||
|
import pymssql
|
|||
|
import pandas as pd
|
|||
|
import redis
|
|||
|
import msgpack
|
|||
|
import warnings
|
|||
|
|
|||
|
|
|||
|
import math
|
|||
|
def connectMainDatabase():
|
|||
|
try:
|
|||
|
conn = pymssql.connect(host='127.0.0.1', #IP地址,按需更改
|
|||
|
user='sa', #数据库用户名,按需更改
|
|||
|
password='sa', #数据库密码,按需更改
|
|||
|
database='ISMS_BASE', #连接数据库名称,按需更改
|
|||
|
charset='utf8',
|
|||
|
tds_version=r'7.0') #重要:pymssql中需声明tds版本,勿轻易升级pymssql包,如升级,请查阅对应TDS版本并修改,否则会报错。
|
|||
|
print("连接成功")
|
|||
|
except pymssql.Error as e:
|
|||
|
print("连接失败"+str(e))
|
|||
|
return conn
|
|||
|
#---------------------------------------------------------------------------------------------------------------------------------\
|
|||
|
#------------------变位遥信数据库连接函数--------------------------------------------------------------------------------------------------
|
|||
|
#import pymssql
|
|||
|
def connectDataDatabase():
|
|||
|
try:
|
|||
|
conn = pymssql.connect(host='127.0.0.1', #IP地址,按需更改
|
|||
|
user='sa', #数据库用户名,按需更改
|
|||
|
password='sa', #数据库密码,按需更改
|
|||
|
database='ISMS_DATA', #连接数据库名称,按需更改
|
|||
|
charset='utf8',
|
|||
|
tds_version=r'7.0') #重要:pymssql中需声明tds版本,勿轻易升级pymssql包,如升级,请查阅对应TDS版本并修改,否则会报错。
|
|||
|
print("连接成功")
|
|||
|
except pymssql.Error as e:
|
|||
|
print("连接失败"+str(e))
|
|||
|
return conn
|
|||
|
#---------------------------------------------------------------------------------------------------------------------------------
|
|||
|
#------------------从Excel中读参数函数(当前没用)--------------------------------------------------------------------------------------------------
|
|||
|
# 输入 PwrSuplyWay是字符串,表示sheet的名字,没有这个名字会报错,islarge表示是大所还是小所,1是大所2是小所
|
|||
|
# EXCEL格式需用sheet表示供电模式,供电模式的名称应与设定一致,不需要表头,变量名字勿随意修改。有一个大所文件和一个小所文件,分别用于展示不同的数据。
|
|||
|
|
|||
|
|
|||
|
def ImportExcelParameter(PwrSuplyWay, islarge):
|
|||
|
if islarge == 1: #变电所
|
|||
|
ImportedParameter = pd.read_excel('变电所定值整定基础参数表.xlsx',sheet_name=PwrSuplyWay,header=None)
|
|||
|
ImportedParameter_dict = ImportedParameter.set_index(0)[1].to_dict()
|
|||
|
elif islarge == 2: #分区所
|
|||
|
df = pd.read_excel('分区所定值整定基础参数表.xlsx',sheet_name=PwrSuplyWay,header=None)
|
|||
|
ImportedParameter_dict = ImportedParameter.set_index(0)[1].to_dict()
|
|||
|
else:
|
|||
|
raise ValueError(f"Invalid value for 'islarge': {islarge}. Expected 1 for 变电所 or 2 for 分区所.")
|
|||
|
return(ImportedParameter_dict)
|
|||
|
|
|||
|
#--------------------------------------------------------------------------------------------------------------------------------------------------
|
|||
|
#------------------------------------检查输入是否是可计算的量-----------------------------------------------------------------------------------------
|
|||
|
# 如果输入是int/float/complex即可正常工作,否则报错。
|
|||
|
def check_input(value):
|
|||
|
if not isinstance(value, (int, float, complex)):
|
|||
|
raise ValueError(f"输入的值 '{value}' 不是一个可运算的数字类型")
|
|||
|
|
|||
|
#---------------------------------------------------------------------------------------------------------------------------------
|
|||
|
#-----------------------------------将计算出的定值存进对应数据库--------------------------------------------------------------------
|
|||
|
# 输入数据库名字和定值词典变量。
|
|||
|
# 当前数据库包括馈线0区数据库,馈线1区数据库,馈线2区数据库,AT数据库,并补数据库,主变数据库
|
|||
|
def SavePresetValuetoSQLSERVER(table_name,mydict):
|
|||
|
import pymssql #在python中编辑SQL SERVER数据库
|
|||
|
#import Self_defined_functions #引入自定义函数文件
|
|||
|
conn = connectMainDatabase() #连接至ISMS_BASE数据库
|
|||
|
cursor = conn.cursor() #获取pymssql提供的SQL Server编辑游标
|
|||
|
sql_truncate = f"TRUNCATE TABLE {table_name}" #把原有表格清空
|
|||
|
sql_insert = f"INSERT INTO {table_name} (PresetValueName, Value) VALUES (%s, %d)" #把新定值存入
|
|||
|
cursor.execute(sql_truncate) #执行清空语句
|
|||
|
conn.commit()
|
|||
|
# 遍历字典并插入每个定值
|
|||
|
for preset_name, preset_value in mydict.items(): #执行存入语句
|
|||
|
cursor.execute(sql_insert, (preset_name, preset_value))
|
|||
|
|
|||
|
# 提交更改并关闭连接
|
|||
|
conn.commit() # 提交事务,确保更改生效
|
|||
|
cursor.close() # 关闭游标
|
|||
|
conn.close() # 关闭数据库连接
|
|||
|
|
|||
|
#---------------------------------------------------------------------------------------------------------------------------------
|
|||
|
#-----------------------------------将读取到的遥测量对应数据库(im_Obtained_YC)--------------------------------------------------------------------
|
|||
|
def obtainYC():
|
|||
|
redis_host = "192.168.110.229" # 替换为实际的主机名或 IP
|
|||
|
redis_port = 36379 # 替换为实际的端口
|
|||
|
redis_password = "yunda123" # 替换为实际的密码
|
|||
|
|
|||
|
# 使用 redis-py 配置 Redis 连接
|
|||
|
client = redis.Redis(
|
|||
|
host=redis_host,
|
|||
|
port=redis_port,
|
|||
|
db=0, # 指定默认数据库为 0
|
|||
|
password=redis_password,
|
|||
|
socket_timeout=20, # 等价于 ConnectTimeout
|
|||
|
socket_connect_timeout=20 # 等价于 SyncTimeout
|
|||
|
)
|
|||
|
|
|||
|
# 测试连接和数据操作
|
|||
|
try:
|
|||
|
client.ping()
|
|||
|
print("Successfully connected to Redis!")
|
|||
|
# 示例操作:设置和获取一个键
|
|||
|
client.set("your_key", "your_value")
|
|||
|
value = client.get("your_key")
|
|||
|
data = client.hgetall("telemeteringModelList_Zongzi") # 使用 HashSet 的键 遥测
|
|||
|
if data:
|
|||
|
conn = connectMainDatabase() #连接至ISMS_BASE数据库
|
|||
|
cursor = conn.cursor() #获取pymssql提供的SQL Server编辑游标
|
|||
|
sql_truncate = f"TRUNCATE TABLE im_Obtained_YC;" #把原有表格清空
|
|||
|
cursor.execute(sql_truncate) #执行清空语句
|
|||
|
conn.commit()
|
|||
|
# 遍历并解码每个字段
|
|||
|
decoded_data = {}
|
|||
|
for key, value in data.items():
|
|||
|
# 值都使用 MessagePack 序列化
|
|||
|
#print(key)
|
|||
|
decoded_value_YC = msgpack.unpackb(value)
|
|||
|
#print(decoded_value_YC)
|
|||
|
sql_insert = f"INSERT INTO im_Obtained_YC (PresetValueName, Value, Unit, ismsBaseYCId, EquipmentinfoID, LastValue, PresetValueID) VALUES (%s, %d, %s, %s, %s, %d, %s)"
|
|||
|
#print(decoded_value_YC.items())
|
|||
|
cursor.execute(sql_insert, (decoded_value_YC['Name'], decoded_value_YC['ResultValue'], decoded_value_YC['Unit'], decoded_value_YC['ismsbaseYCId'], decoded_value_YC['EquipmentInfoId'], decoded_value_YC['LastResultValue'], decoded_value_YC['Id']))
|
|||
|
conn.commit()
|
|||
|
#Self_defined_functions.SaveObtainedYCtoSQLSERVER(cursor,decoded_value_YC)
|
|||
|
|
|||
|
cursor.close() # 关闭游标
|
|||
|
conn.close() # 关闭数据库连接
|
|||
|
except redis.ConnectionError as e:
|
|||
|
print(f"Redis connection error: {e}")
|
|||
|
|
|||
|
#---------------------------------------------------------------------------------------------------------------------------------
|
|||
|
#-----------------------------------将读取到的遥测量按装置整理归类--------------------------------------------------------------------
|
|||
|
def obtainYC_Device():
|
|||
|
redis_host = "192.168.110.229" # 替换为实际的主机名或 IP
|
|||
|
redis_port = 36379 # 替换为实际的端口
|
|||
|
redis_password = "yunda123" # 替换为实际的密码
|
|||
|
|
|||
|
# 使用 redis-py 配置 Redis 连接
|
|||
|
client = redis.Redis(
|
|||
|
host=redis_host,
|
|||
|
port=redis_port,
|
|||
|
db=0, # 指定默认数据库为 0
|
|||
|
password=redis_password,
|
|||
|
socket_timeout=20, # 等价于 ConnectTimeout
|
|||
|
socket_connect_timeout=20 # 等价于 SyncTimeout
|
|||
|
)
|
|||
|
|
|||
|
# 测试连接和数据操作
|
|||
|
try:
|
|||
|
client.ping()
|
|||
|
print("Successfully connected to Redis!")
|
|||
|
# 示例操作:设置和获取一个键
|
|||
|
client.set("your_key", "your_value")
|
|||
|
value = client.get("your_key")
|
|||
|
data = client.hgetall("telemeteringModelList_Zongzi") # 使用 HashSet 的键 遥测
|
|||
|
#print(data)
|
|||
|
if data:
|
|||
|
conn = connectMainDatabase() #连接至ISMS_BASE数据库
|
|||
|
cursor = conn.cursor() #获取pymssql提供的SQL Server编辑游标
|
|||
|
YC_List_Dist = {}
|
|||
|
YC_List_Prtct = {}
|
|||
|
YC_List_MnTrns = {}
|
|||
|
for key, value in data.items():
|
|||
|
decoded_value_YC = msgpack.unpackb(value)
|
|||
|
#print(decoded_value_YC)
|
|||
|
sql_find = f"SELECT * FROM im_DeviceData WHERE ID = '{decoded_value_YC['ismsbaseYCId']}'"
|
|||
|
cursor.execute(sql_find)
|
|||
|
Devicelist = cursor.fetchall()
|
|||
|
#rint(Devicelist)
|
|||
|
if Devicelist != []:
|
|||
|
DeviceID = Devicelist[0][1]
|
|||
|
sql_find_1 = f"SELECT * FROM im_ProtectDevice_new WHERE DeviceID = '{DeviceID}'"
|
|||
|
cursor.execute(sql_find_1)
|
|||
|
Devicetype_1 = cursor.fetchall()
|
|||
|
Devicetype = Devicetype_1[0][2].encode('latin1').decode('gbk')
|
|||
|
if Devicetype.find('馈线保护测控装置') != -1:#把4.0排除在外了,以后很可能得改。:
|
|||
|
if YC_List_Prtct.get(Devicetype) == None:
|
|||
|
YC_List_Prtct[Devicetype] = {}
|
|||
|
YC_List_Prtct[Devicetype][decoded_value_YC['Name']] = decoded_value_YC['ResultValue']
|
|||
|
if Devicetype.find('故障测距装置') != -1:
|
|||
|
if YC_List_Dist.get(Devicetype) == None:
|
|||
|
YC_List_Dist[Devicetype] = {}
|
|||
|
YC_List_Dist[Devicetype][decoded_value_YC['Name']] = decoded_value_YC['ResultValue']
|
|||
|
if Devicetype.find('主变测控装置') != -1:
|
|||
|
if YC_List_MnTrns.get(Devicetype) == None:
|
|||
|
YC_List_MnTrns[Devicetype] = {}
|
|||
|
YC_List_MnTrns[Devicetype][decoded_value_YC['Name']] = decoded_value_YC['ResultValue']
|
|||
|
cursor.close() # 关闭游标
|
|||
|
conn.close() # 关闭数据库连接
|
|||
|
except redis.ConnectionError as e:
|
|||
|
print(f"Redis connection error: {e}")
|
|||
|
return YC_List_MnTrns, YC_List_Prtct,YC_List_Dist
|
|||
|
|
|||
|
#---------------------------------------------------------------------------------------------------------------------------------
|
|||
|
#-----------------------------------将读取到的遥信量对应数据库(im_Obtained_YX)--------------------------------------------------------------------
|
|||
|
def obtainYX():
|
|||
|
redis_host = "192.168.110.229" # 替换为实际的主机名或 IP
|
|||
|
redis_port = 36379 # 替换为实际的端口
|
|||
|
redis_password = "yunda123" # 替换为实际的密码
|
|||
|
|
|||
|
# 使用 redis-py 配置 Redis 连接
|
|||
|
client = redis.Redis(
|
|||
|
host=redis_host,
|
|||
|
port=redis_port,
|
|||
|
db=0, # 指定默认数据库为 0
|
|||
|
password=redis_password,
|
|||
|
socket_timeout=20, # 等价于 ConnectTimeout
|
|||
|
socket_connect_timeout=20 # 等价于 SyncTimeout
|
|||
|
)
|
|||
|
|
|||
|
# 测试连接和数据操作
|
|||
|
try:
|
|||
|
client.ping()
|
|||
|
print("Successfully connected to Redis!")
|
|||
|
# 示例操作:设置和获取一个键
|
|||
|
client.set("your_key", "your_value")
|
|||
|
value = client.get("your_key")
|
|||
|
data = client.hgetall("telesignalisationModelList_Zongzi") # 使用 HashSet 的键 遥测
|
|||
|
if data:
|
|||
|
conn = connectMainDatabase() #连接至ISMS_BASE数据库
|
|||
|
cursor = conn.cursor() #获取pymssql提供的SQL Server编辑游标
|
|||
|
sql_truncate = f"TRUNCATE TABLE im_Obtained_YX;" #把原有表格清空
|
|||
|
cursor.execute(sql_truncate) #执行清空语句
|
|||
|
conn.commit()
|
|||
|
# 遍历并解码每个字段
|
|||
|
decoded_data = {}
|
|||
|
for key, value in data.items():
|
|||
|
# 值都使用 MessagePack 序列化
|
|||
|
#print(key)
|
|||
|
decoded_value_YX = msgpack.unpackb(value)
|
|||
|
#print(decoded_value_YC)
|
|||
|
sql_insert = f"INSERT INTO im_Obtained_YX (PresetValueName, Value, YesValue, ismsBaseYXId, EquipmentinfoID, NoValue, PresetValueID) VALUES (%s, %d, %s, %s, %s, %d, %s)"
|
|||
|
#print(decoded_value_YC.items())
|
|||
|
cursor.execute(sql_insert, (decoded_value_YX['Name'], decoded_value_YX['ResultValue'], decoded_value_YX['YesContent'], decoded_value_YX['ismsbaseYXId'], decoded_value_YX['EquipmentInfoId'], decoded_value_YX['NoContent'], decoded_value_YX['Id']))
|
|||
|
conn.commit()
|
|||
|
#Self_defined_functions.SaveObtainedYCtoSQLSERVER(cursor,decoded_value_YC)
|
|||
|
#print(decoded_value_YX)
|
|||
|
cursor.close() # 关闭游标
|
|||
|
conn.close() # 关闭数据库连接
|
|||
|
except redis.ConnectionError as e:
|
|||
|
print(f"Redis connection error: {e}")
|
|||
|
#---------------------------------------------------------------------------------------------------------------------------------
|
|||
|
#-----------------------------------将读取到的定值量存进对应数据库(im_Obtained_SysValue和im_Obtained_UsrValue)--------------------------------------------------------------------
|
|||
|
def obtainPresetValue():#这直接拉所有装置的定值,后面可能再加一个分开拉某一装置的吧
|
|||
|
redis_host = "192.168.110.229" # 替换为实际的主机名或 IP
|
|||
|
redis_port = 36379 # 替换为实际的端口
|
|||
|
redis_password = "yunda123" # 替换为实际的密码
|
|||
|
conn = connectMainDatabase() #连接至ISMS_BASE数据库
|
|||
|
cursor = conn.cursor() #获取pymssql提供的SQL Server编辑游标
|
|||
|
sql_truncate = f"TRUNCATE TABLE im_Obtained_SysValue;" #把原有表格清空
|
|||
|
cursor.execute(sql_truncate) #执行清空语句
|
|||
|
conn.commit()
|
|||
|
sql_truncate = f"TRUNCATE TABLE im_Obtained_UsrValue;" #把原有表格清空
|
|||
|
cursor.execute(sql_truncate) #执行清空语句
|
|||
|
conn.commit()
|
|||
|
# 使用 redis-py 配置 Redis 连接
|
|||
|
client = redis.Redis(
|
|||
|
host=redis_host,
|
|||
|
port=redis_port,
|
|||
|
db=0, # 指定默认数据库为 0
|
|||
|
password=redis_password,
|
|||
|
socket_timeout=20, # 等价于 ConnectTimeout
|
|||
|
socket_connect_timeout=20 # 等价于 SyncTimeout
|
|||
|
)
|
|||
|
# 测试连接和数据操作
|
|||
|
try:
|
|||
|
client.ping()
|
|||
|
print("Successfully connected to Redis!")
|
|||
|
|
|||
|
# 获取所有键
|
|||
|
all_keys = client.keys('*')
|
|||
|
|
|||
|
# 筛选出前缀为 DZ_ 的键
|
|||
|
dz_keys = [key for key in all_keys if key.decode('utf-8').startswith('DZ_')]
|
|||
|
|
|||
|
# 打印筛选后的键
|
|||
|
for key in dz_keys:
|
|||
|
#print(key.decode('utf-8'))
|
|||
|
address = key.decode('utf-8').split('_')[2]
|
|||
|
if address == 'System':
|
|||
|
client.set("your_key", "your_value")
|
|||
|
value = client.get("your_key")
|
|||
|
data = client.hgetall(key)
|
|||
|
#print(data)
|
|||
|
for key, value in data.items():
|
|||
|
# 值都使用 MessagePack 序列化
|
|||
|
#print(key)
|
|||
|
decoded_value_Pre_sys = msgpack.unpackb(value)
|
|||
|
#print(decoded_value_Pre_sys)
|
|||
|
#print(decoded_value_YC)
|
|||
|
sql_insert = f"INSERT INTO im_Obtained_SysValue (PresetValueName, Value, ID, DeviceID) VALUES (%s, %d, %s, %s)"
|
|||
|
#print(decoded_value_YC.items())
|
|||
|
cursor.execute(sql_insert, (decoded_value_Pre_sys['DzComment'], decoded_value_Pre_sys['Value'], decoded_value_Pre_sys['Id'], decoded_value_Pre_sys['DeviceId']))
|
|||
|
conn.commit()
|
|||
|
#Self_defined_functions.SaveObtainedYCtoSQLSERVER(cursor,decoded_value_YC)
|
|||
|
if address == 'User':
|
|||
|
client.set("your_key", "your_value")
|
|||
|
value = client.get("your_key")
|
|||
|
data = client.hgetall(key)
|
|||
|
for key, value in data.items():
|
|||
|
# 值都使用 MessagePack 序列化
|
|||
|
#print(key)
|
|||
|
decoded_value_Pre_Usr = msgpack.unpackb(value)
|
|||
|
#print(decoded_value_Pre_Usr)
|
|||
|
sql_insert = f"INSERT INTO im_Obtained_UsrValue (PresetValueName, Value, ID, DeviceID) VALUES (%s, %d, %s, %s)"
|
|||
|
#print(decoded_value_YC.items())
|
|||
|
cursor.execute(sql_insert, (decoded_value_Pre_Usr['DzComment'], decoded_value_Pre_Usr['Value'], decoded_value_Pre_Usr['Id'], decoded_value_Pre_Usr['DeviceId']))
|
|||
|
conn.commit()
|
|||
|
cursor.close() # 关闭游标
|
|||
|
conn.close() # 关闭数据库连接
|
|||
|
|
|||
|
|
|||
|
except redis.ConnectionError as e:
|
|||
|
print(f"Redis connection error: {e}")
|
|||
|
|
|||
|
#---------------------------------------------------------------------------------------------------------------------------------
|
|||
|
#-----------------------------------定值校验-读取装置定值--------------------------------------------------------------------
|
|||
|
def getdevicePresetValue(device):
|
|||
|
conn = connectMainDatabase()#获取当前装置的所有定值
|
|||
|
cursor = conn.cursor() #获取pymssql提供的SQL Server编辑游标
|
|||
|
sql_read_Usr = f"SELECT * FROM im_Obtained_UsrValue WHERE deviceID = '" + device + "';"
|
|||
|
#print(sql_read_Usr)
|
|||
|
cursor.execute(sql_read_Usr)
|
|||
|
results_Usr = cursor.fetchall()
|
|||
|
Preset_dict = {}
|
|||
|
for row in results_Usr:
|
|||
|
Preset_dict.update({row[1].encode('latin1').decode('gbk'):row[2]})
|
|||
|
results_Usr_dict = {row[0]: row[1:] for row in results_Usr}
|
|||
|
sql_read_Sys = f"SELECT * FROM im_Obtained_SysValue WHERE deviceID = '" + device + "';"
|
|||
|
cursor.execute(sql_read_Sys)
|
|||
|
results_Sys = cursor.fetchall()
|
|||
|
results_Sys_dict = {row[0]: row[1:] for row in results_Sys}
|
|||
|
for row in results_Sys:
|
|||
|
Preset_dict.update({row[1].encode('latin1').decode('gbk'):row[2]})
|
|||
|
sql_read_devicetype = f"SELECT * FROM im_ProtectDevice_new WHERE deviceID = '" + device + "';"
|
|||
|
cursor.execute(sql_read_devicetype)
|
|||
|
results_devicetype = cursor.fetchall()
|
|||
|
type_flag = results_devicetype[0][2].encode('latin1').decode('gbk')
|
|||
|
if type_flag.find('主变保护')!=-1:
|
|||
|
device_type_flag = 1
|
|||
|
if type_flag.find('馈线')!=-1:
|
|||
|
device_type_flag = 2
|
|||
|
if type_flag.find('自耦变')!=-1:
|
|||
|
device_type_flag = 3
|
|||
|
if type_flag.find('并联补偿')!=-1:
|
|||
|
device_type_flag = 4
|
|||
|
if type_flag.find('测距')!=-1:
|
|||
|
device_type_flag = 5
|
|||
|
if type_flag.find('动力变')!=-1:
|
|||
|
device_type_flag = 5
|
|||
|
cursor.close() # 关闭游标
|
|||
|
conn.close() # 关闭数据库连接
|
|||
|
return Preset_dict,device_type_flag
|
|||
|
|
|||
|
#---------------------------------------------------------------------------------------------------------------------------------
|
|||
|
#-----------------------------------定值校验-定值合格/不合格--------------------------------------------------------------------
|
|||
|
def checkPresetFlag(PresetName,PresetResult,checked_preset,checked_fail_reason,FailReason='',):
|
|||
|
if PresetResult == 1:#通过
|
|||
|
if PresetName in checked_preset and checked_preset[PresetName] == '校验不通过':
|
|||
|
pass
|
|||
|
else:
|
|||
|
checked_preset[PresetName] = '校验通过'
|
|||
|
else:#不通过
|
|||
|
if PresetName in checked_fail_reason:
|
|||
|
checked_preset[PresetName] = '校验不通过'
|
|||
|
checked_fail_reason[PresetName] = checked_fail_reason[PresetName] + FailReason
|
|||
|
else:
|
|||
|
checked_preset[PresetName] = '校验不通过'
|
|||
|
checked_fail_reason[PresetName] = FailReason
|
|||
|
return checked_preset,checked_fail_reason
|
|||
|
#---------------------------------------------------------------------------------------------------------------------------------
|
|||
|
#-----------------------------------定值校验用,第一个判断是否是连续自然数,第二个判断是否递增,第三个判断字典里是不是所有值相等,第四个比较校验值相对误差-------------------------------------------------------------------
|
|||
|
def is_continuous(lst):
|
|||
|
for i in range(1, len(lst)):
|
|||
|
if lst[i] - lst[i - 1]!= 1:
|
|||
|
return False
|
|||
|
return True
|
|||
|
def is_increasing(dictionary):
|
|||
|
values = list(dictionary.values())
|
|||
|
for i in range(1, len(values)):
|
|||
|
if values[i] <= values[i - 1]:
|
|||
|
return False
|
|||
|
return True
|
|||
|
def are_all_values_equal(dictionary):
|
|||
|
if not dictionary:
|
|||
|
return True # 空字典视为所有值相等
|
|||
|
first_value = next(iter(dictionary.values())) # 获取第一个值作为基准
|
|||
|
return all(value == first_value for value in dictionary.values())
|
|||
|
def are_values_equal_with_tolerance(a, b ,tol = 0.05):
|
|||
|
if a == 0 or b == 0:
|
|||
|
return abs(a - b) < 0.001 # 使用绝对误差比较
|
|||
|
else:
|
|||
|
return abs(a - b) / max(abs(a), abs(b)) < tol
|
|||
|
|
|||
|
|
|||
|
def vector_add(A_magnitude, A_angle_deg, B_magnitude, B_angle_deg):
|
|||
|
|
|||
|
# 将角度转换为弧度
|
|||
|
A_angle_rad = math.radians(A_angle_deg)
|
|||
|
B_angle_rad = math.radians(B_angle_deg)
|
|||
|
|
|||
|
# 将矢量转换为坐标表示
|
|||
|
A_x = A_magnitude * math.cos(A_angle_rad)
|
|||
|
A_y = A_magnitude * math.sin(A_angle_rad)
|
|||
|
|
|||
|
B_x = B_magnitude * math.cos(B_angle_rad)
|
|||
|
B_y = B_magnitude * math.sin(B_angle_rad)
|
|||
|
|
|||
|
# 向量加法
|
|||
|
sum_x = A_x + B_x
|
|||
|
sum_y = A_y + B_y
|
|||
|
|
|||
|
# 计算和矢量的模长和角度
|
|||
|
sum_magnitude = math.hypot(sum_x, sum_y)
|
|||
|
sum_angle_rad = math.atan2(sum_y, sum_x)
|
|||
|
sum_angle_deg = math.degrees(sum_angle_rad)
|
|||
|
return(sum_magnitude,sum_angle_deg,sum_angle_rad)
|