query_device_filter.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 查询td_d_log_result表和td_d_log_file表,筛选符合条件的device_id和channel_no组合
  5. 筛选条件(两种类型):
  6. - task_id = 2
  7. 第一种类型(value>30):
  8. - 对于每个device_id和channel_no的组合,取最新的10条记录(按generate_time)
  9. - 计算这10个value值的平均值
  10. - 如果平均值大于30,则记录该(device_id, channel_no)组合
  11. 第二种类型(abs>2):
  12. - 对于每个device_id和channel_no的组合,取最新的10条file_name记录(按generate_time)
  13. - 计算每条记录的不对称性(abs正、abs负)
  14. - 如果有3条或以上记录的abs正或abs负绝对值大于2,则记录该(device_id, channel_no)组合
  15. 输出字段:
  16. - 设备ID
  17. - 设备名称
  18. - 通道名称
  19. - 问题结果(value>30时显示平均值;abs>2时显示10个文件的abs正和abs负值)
  20. """
  21. import pymysql
  22. import sys
  23. import os
  24. import zlib
  25. import json
  26. from collections import defaultdict
  27. from datetime import datetime
  28. # 设置控制台输出编码(Windows需要,Linux默认UTF-8)
  29. if sys.platform == 'win32':
  30. try:
  31. os.system('chcp 65001 >nul 2>&1')
  32. except:
  33. pass
  34. else:
  35. # Linux环境下设置UTF-8编码
  36. import locale
  37. try:
  38. locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
  39. except:
  40. try:
  41. locale.setlocale(locale.LC_ALL, 'C.UTF-8')
  42. except:
  43. pass
  44. def parse_file_data(file_name, use_fdfs=False, fdfs_client=None):
  45. """
  46. 解析振动数据文件
  47. 参数:
  48. file_name: 文件路径(FDFS路径或本地路径)
  49. use_fdfs: 是否使用FDFS客户端(如果为True,需要提供fdfs_client)
  50. fdfs_client: FDFS客户端对象(可选)
  51. 返回:
  52. 解析后的数据(字典或列表)
  53. """
  54. try:
  55. # 1.1 从 FDFS 下载文件数据,先把文件名转成 UTF-8 编码的二进制串
  56. if use_fdfs and fdfs_client:
  57. # 使用FDFS客户端下载
  58. data = fdfs_client.download_bytes(bytes(file_name, encoding="utf8"))
  59. else:
  60. # 如果没有FDFS客户端,尝试从本地文件系统读取
  61. # 将FDFS路径转换为本地路径
  62. if file_name.startswith('group1/M00/'):
  63. local_path = file_name.replace('group1/M00/', '/home/soft/data/fdfs/storage/data/', 1)
  64. elif file_name.startswith('group1/M00'):
  65. local_path = file_name.replace('group1/M00', '/home/soft/data/fdfs/storage/data', 1)
  66. else:
  67. local_path = file_name
  68. # 从本地文件系统读取
  69. with open(local_path, 'rb') as f:
  70. data = f.read()
  71. # 1.2 检查下载的二进制数据开头是否是 {(JSON 格式的标志)
  72. if data[0:1] == b'{':
  73. # 如果是 JSON 二进制串,直接解码成字符串
  74. data = data.decode()
  75. else:
  76. # 如果不是 JSON 开头,说明数据是压缩的,先解压
  77. ss = zlib.decompress(data).decode()
  78. # 修正格式:单引号改双引号(JSON 要求双引号)、nan 改成 0(避免 JSON 解析报错)
  79. ss = ss.replace('\'', '"').replace("nan", "0")
  80. # 把修正后的字符串解析成 JSON 格式的 Python 数据(字典/列表)
  81. data = json.loads(ss)
  82. return data
  83. except FileNotFoundError as e:
  84. # 文件未找到,静默返回None
  85. return None
  86. except zlib.error as e:
  87. print(f" 警告: 解压缩失败 {file_name}: {e}")
  88. return None
  89. except json.JSONDecodeError as e:
  90. print(f" 警告: JSON解析失败 {file_name}: {e}")
  91. return None
  92. except Exception as e:
  93. print(f" 警告: 解析文件失败 {file_name}: {e}")
  94. return None
  95. def calculate_asymmetry(data):
  96. """
  97. 计算不对称性
  98. 从data中提取raw_y数组,然后计算不对称性
  99. 返回 (abs_正, abs_负, 正半轴点数量, 负半轴点数量)
  100. """
  101. try:
  102. # 如果data是字典,提取raw_y字段
  103. if isinstance(data, dict):
  104. if 'raw_y' not in data:
  105. return None, None, None, None
  106. values_list = data['raw_y']
  107. elif isinstance(data, list):
  108. # 如果是列表,直接使用(兼容旧代码)
  109. values_list = data
  110. else:
  111. return None, None, None, None
  112. # 确保values_list是列表
  113. if not isinstance(values_list, list):
  114. return None, None, None, None
  115. # 转换为数值列表
  116. values = []
  117. for v in values_list:
  118. try:
  119. val = float(v)
  120. values.append(val)
  121. except (ValueError, TypeError):
  122. continue
  123. if len(values) == 0:
  124. return None, None, None, None
  125. # 分离正半轴和负半轴的值
  126. positive_values = [v for v in values if v > 0]
  127. negative_values = [abs(v) for v in values if v < 0] # 负半轴取绝对值
  128. # 计算正半轴和负半轴的点数量
  129. count_positive = len(positive_values)
  130. count_negative = len(negative_values)
  131. # 计算和
  132. sum_positive = sum(positive_values)
  133. sum_negative = sum(negative_values)
  134. # 计算不对称性
  135. if sum_negative > 0:
  136. abs_positive = abs(sum_positive / sum_negative)
  137. else:
  138. abs_positive = None
  139. if sum_positive > 0:
  140. abs_negative = abs(sum_negative / sum_positive)
  141. else:
  142. abs_negative = None
  143. return abs_positive, abs_negative, count_positive, count_negative
  144. except Exception as e:
  145. return None, None, None, None
  146. def query_filtered_devices(fdfs_client=None):
  147. """
  148. 查询符合条件的device_id和channel_no组合
  149. 参数:
  150. fdfs_client: FDFS客户端对象(可选,用于下载文件)
  151. """
  152. # 数据库连接配置
  153. db_config = {
  154. 'host': '127.0.0.1',
  155. 'port': 3306,
  156. 'user': 'prod',
  157. 'password': 'hmdmxjIvfIjIoflL',
  158. 'database': 'iot',
  159. 'charset': 'utf8mb4',
  160. 'connect_timeout': 10
  161. }
  162. connection = None
  163. try:
  164. print("正在连接到数据库...")
  165. connection = pymysql.connect(**db_config)
  166. print("[成功] 数据库连接成功!\n")
  167. with connection.cursor() as cursor:
  168. # 查询所有唯一的device_id和channel_no组合,且task_id=2
  169. print("查询所有device_id和channel_no的组合...")
  170. query_unique = """
  171. SELECT DISTINCT device_id, channel_no
  172. FROM td_d_log_result
  173. WHERE task_id = '2'
  174. """
  175. cursor.execute(query_unique)
  176. device_channel_pairs = cursor.fetchall()
  177. print(f"找到 {len(device_channel_pairs)} 个device_id和channel_no的组合\n")
  178. # 存储符合条件的(device_id, channel_no)组合及其问题结果
  179. # 格式: {(device_id, channel_no): {'value>30': 平均值, 'abs>2': [(生成时间, abs正, abs负, 正半轴点数量, 负半轴点数量), ...]}}
  180. filtered_channel_dict = {}
  181. # ===== 阶段一:筛选value>30的故障测点(基于平均值) =====
  182. print("=" * 60)
  183. print("阶段一:筛选value平均值>30的故障测点")
  184. print("=" * 60)
  185. for idx, (device_id, channel_no) in enumerate(device_channel_pairs, 1):
  186. # 查询该组合的最新10条记录(按generate_time降序)
  187. query_latest = """
  188. SELECT value, generate_time
  189. FROM td_d_log_result
  190. WHERE device_id = %s
  191. AND channel_no = %s
  192. AND task_id = '2'
  193. ORDER BY generate_time DESC
  194. LIMIT 10
  195. """
  196. cursor.execute(query_latest, (device_id, channel_no))
  197. records = cursor.fetchall()
  198. if len(records) < 10:
  199. # 如果记录数少于10条,跳过
  200. continue
  201. # 计算最近10个value的平均值
  202. values = []
  203. for value_str, generate_time in records:
  204. try:
  205. # value字段是varchar类型,需要转换为float
  206. value = float(value_str) if value_str else 0
  207. values.append(value)
  208. except (ValueError, TypeError):
  209. # 如果转换失败,跳过该值
  210. continue
  211. # 如果有效值数量不足,跳过
  212. if len(values) == 0:
  213. continue
  214. # 计算平均值
  215. avg_value = sum(values) / len(values)
  216. # 如果平均值大于30,记录该(device_id, channel_no)组合(类型1:value>30)
  217. if avg_value > 30:
  218. key = (device_id, channel_no)
  219. if key not in filtered_channel_dict:
  220. filtered_channel_dict[key] = {}
  221. filtered_channel_dict[key]['value>30'] = avg_value
  222. print(f"[{idx}] device_id: {device_id}, channel_no: {channel_no}, "
  223. f"平均值: {avg_value:.2f}")
  224. # ===== 阶段二:筛选abs>2的故障测点(基于所有组合) =====
  225. print("\n" + "=" * 60)
  226. print("阶段二:筛选abs>2的故障测点")
  227. print("=" * 60)
  228. for idx, (device_id, channel_no) in enumerate(device_channel_pairs, 1):
  229. # 查询该(device_id, channel_no)组合最近10条记录的file_name(按generate_time降序,task_id=2)
  230. query_file_name = """
  231. SELECT lf.file_name, lf.generate_time as file_generate_time
  232. FROM td_d_log_file lf
  233. WHERE lf.device_id = %s
  234. AND lf.channel_no = %s
  235. AND lf.task_id = '2'
  236. ORDER BY lf.generate_time DESC
  237. LIMIT 10
  238. """
  239. cursor.execute(query_file_name, (device_id, channel_no))
  240. file_results = cursor.fetchall()
  241. if len(file_results) < 10:
  242. # 如果文件记录数少于10条,跳过
  243. continue
  244. # 统计abs正或abs负绝对值大于2的个数,同时保存所有10个文件的abs值、点数量和generate_time
  245. count_abs_over_2 = 0
  246. abs_values_list = [] # 保存所有10个文件的(生成时间, abs正, abs负, 正半轴点数量, 负半轴点数量)值
  247. for file_name, file_generate_time in file_results:
  248. if file_name:
  249. # 解析文件并计算不对称性
  250. data = parse_file_data(file_name, use_fdfs=(fdfs_client is not None), fdfs_client=fdfs_client)
  251. result = calculate_asymmetry(data) if data is not None else (None, None, None, None)
  252. abs_positive, abs_negative, count_positive, count_negative = result
  253. # 保存abs值、点数量和generate_time
  254. abs_values_list.append((file_generate_time, abs_positive, abs_negative, count_positive, count_negative))
  255. # 检查abs正或abs负的绝对值是否大于2(只要其中一个大于2就算一条)
  256. if (abs_positive is not None and abs(abs_positive) > 2) or (abs_negative is not None and abs(abs_negative) > 2):
  257. count_abs_over_2 += 1
  258. else:
  259. # 如果file_name为空,也添加None值以保持列表长度
  260. abs_values_list.append((file_generate_time, None, None, None, None))
  261. # 如果有3条或以上记录的abs>2,记录该(device_id, channel_no)组合(类型2:abs>2)
  262. if count_abs_over_2 >= 3:
  263. key = (device_id, channel_no)
  264. if key not in filtered_channel_dict:
  265. filtered_channel_dict[key] = {}
  266. filtered_channel_dict[key]['abs>2'] = abs_values_list
  267. print(f"[{idx}] device_id: {device_id}, channel_no: {channel_no}, "
  268. f"abs>2的记录数量: {count_abs_over_2}/10")
  269. # 转换为列表格式(用于后续处理)
  270. filtered_channel_list = list(filtered_channel_dict.keys())
  271. # 查询device_name
  272. print("\n查询设备名称...")
  273. device_info_map = {}
  274. if filtered_channel_list:
  275. # 获取所有唯一的device_id
  276. unique_device_ids = list(set([device_id for device_id, channel_no in filtered_channel_list]))
  277. # 批量查询device_name
  278. placeholders = ','.join(['%s'] * len(unique_device_ids))
  279. query_device_name = f"""
  280. SELECT device_id, device_name
  281. FROM td_d_device_info
  282. WHERE device_id IN ({placeholders})
  283. """
  284. cursor.execute(query_device_name, tuple(unique_device_ids))
  285. device_info_results = cursor.fetchall()
  286. # 构建device_id到device_name的映射
  287. for device_id, device_name in device_info_results:
  288. device_info_map[device_id] = device_name if device_name else '(未设置设备名称)'
  289. # 查询channel_name
  290. print("查询通道名称...")
  291. channel_info_map = {}
  292. if filtered_channel_list:
  293. # 批量查询channel_name(使用device_id和channel_no组合)
  294. query_channel_name = """
  295. SELECT device_id, channel_no, channel_name
  296. FROM td_d_device_channel
  297. WHERE (device_id, channel_no) IN (
  298. """
  299. # 构建IN子句的占位符
  300. placeholders = ','.join(['(%s, %s)'] * len(filtered_channel_list))
  301. query_channel_name += placeholders + ")"
  302. # 准备参数:将(device_id, channel_no)元组展开
  303. params = []
  304. for device_id, channel_no in filtered_channel_list:
  305. params.extend([device_id, int(channel_no)]) # channel_no需要转换为int
  306. cursor.execute(query_channel_name, tuple(params))
  307. channel_info_results = cursor.fetchall()
  308. # 构建(device_id, channel_no)到channel_name的映射
  309. for device_id, channel_no, channel_name in channel_info_results:
  310. channel_info_map[(device_id, str(channel_no))] = channel_name if channel_name else '(未设置通道名称)'
  311. # 生成输出文件(包含时间戳)
  312. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  313. output_filename = f"query_result_{timestamp}.txt"
  314. output_filepath = os.path.join(os.getcwd(), output_filename)
  315. # 打印所有符合条件的设备信息
  316. print("\n" + "=" * 60)
  317. print("符合条件的故障测点列表:")
  318. print("=" * 60)
  319. # 准备输出内容(同时用于打印和写入文件)
  320. output_lines = []
  321. output_lines.append("=" * 60)
  322. output_lines.append("符合条件的故障测点列表:")
  323. output_lines.append("=" * 60)
  324. output_lines.append("")
  325. output_lines.append("字段说明:")
  326. output_lines.append("- 设备ID: 设备标识")
  327. output_lines.append("- 设备名称: 设备名称")
  328. output_lines.append("- 通道名称: 通道名称")
  329. output_lines.append("- 问题结果: value>30时显示平均值;abs>2时显示10个文件的abs正和abs负值")
  330. output_lines.append("")
  331. if filtered_channel_list:
  332. # 按device_id和channel_no排序后打印
  333. sorted_channels = sorted(filtered_channel_list, key=lambda x: (x[0], int(x[1]) if x[1].isdigit() else x[1]))
  334. displayed_count = 0 # 记录实际显示的记录数
  335. for device_id, channel_no in sorted_channels:
  336. device_name = device_info_map.get(device_id, '(未找到设备名称)')
  337. channel_name = channel_info_map.get((device_id, channel_no), '(未找到通道名称)')
  338. # 过滤掉"未找到通道名称"的记录
  339. if channel_name == '(未找到通道名称)':
  340. continue
  341. displayed_count += 1 # 增加显示计数
  342. # 获取问题结果
  343. result_data = filtered_channel_dict.get((device_id, channel_no), {})
  344. # 格式化输出
  345. device_info_str = f"设备ID: {device_id}"
  346. device_name_str = f"设备名称: {device_name}"
  347. channel_name_str = f"通道名称: {channel_name}"
  348. print(device_info_str)
  349. print(device_name_str)
  350. print(channel_name_str)
  351. output_lines.append(device_info_str)
  352. output_lines.append(device_name_str)
  353. output_lines.append(channel_name_str)
  354. # 输出问题结果
  355. problem_result_lines = []
  356. if 'value>30' in result_data:
  357. avg_value = result_data['value>30']
  358. result_line = f"问题结果(value>30): 平均值 = {avg_value:.2f}"
  359. print(result_line)
  360. problem_result_lines.append(result_line)
  361. if 'abs>2' in result_data:
  362. abs_values_list = result_data['abs>2']
  363. result_line = f"问题结果(abs>2):"
  364. print(result_line)
  365. problem_result_lines.append(result_line)
  366. for idx, (generate_time, abs_pos, abs_neg, count_pos, count_neg) in enumerate(abs_values_list, 1):
  367. time_str = str(generate_time) if generate_time else "N/A"
  368. abs_pos_str = f"{abs_pos:.6f}" if abs_pos is not None else "N/A"
  369. abs_neg_str = f"{abs_neg:.6f}" if abs_neg is not None else "N/A"
  370. count_pos_str = str(count_pos) if count_pos is not None else "N/A"
  371. count_neg_str = str(count_neg) if count_neg is not None else "N/A"
  372. detail_line = f" 文件{idx}: generate_time = {time_str}, abs正 = {abs_pos_str}, abs负 = {abs_neg_str}, 正半轴点数量 = {count_pos_str}, 负半轴点数量 = {count_neg_str}"
  373. print(detail_line)
  374. problem_result_lines.append(detail_line)
  375. output_lines.extend(problem_result_lines)
  376. print("-" * 60)
  377. output_lines.append("-" * 60)
  378. total_str = f"\n总计: {displayed_count} 个故障测点(已过滤掉未找到通道名称的记录)"
  379. print(total_str)
  380. output_lines.append(total_str)
  381. else:
  382. no_result_str = "未找到符合条件的故障测点"
  383. print(no_result_str)
  384. output_lines.append(no_result_str)
  385. # 将结果写入文件
  386. try:
  387. with open(output_filepath, 'w', encoding='utf-8') as f:
  388. f.write('\n'.join(output_lines))
  389. f.write('\n')
  390. print(f"\n[成功] 查询结果已保存到文件: {output_filepath}")
  391. except Exception as e:
  392. print(f"\n[警告] 保存结果到文件失败: {e}")
  393. return filtered_channel_list
  394. except pymysql.Error as e:
  395. error_code, error_msg = e.args
  396. print("[失败] 数据库操作失败!")
  397. print(f"[失败] 错误代码: {error_code}")
  398. print(f"[失败] 错误信息: {error_msg}")
  399. return None
  400. except Exception as e:
  401. print("[失败] 发生未知错误!")
  402. print(f"[失败] 错误信息: {e}")
  403. import traceback
  404. traceback.print_exc()
  405. return None
  406. finally:
  407. # 关闭数据库连接
  408. if connection:
  409. connection.close()
  410. print("\n[成功] 数据库连接已关闭")
  411. if __name__ == "__main__":
  412. print("=" * 60)
  413. print("设备和通道筛选工具")
  414. print("=" * 60)
  415. print("筛选条件:")
  416. print("- task_id = 2")
  417. print("")
  418. print("第一种类型(value>30):")
  419. print(" - 每个(device_id, channel_no)组合的最新10条记录(按generate_time)")
  420. print(" - 计算这10个value值的平均值,如果平均值大于30")
  421. print("")
  422. print("第二种类型(abs>2):")
  423. print(" - 每个(device_id, channel_no)组合的最新10条file_name记录(按generate_time)")
  424. print(" - 计算每条记录的不对称性(abs正、abs负)")
  425. print(" - 如果有3条或以上记录的abs正或abs负绝对值大于2")
  426. print("=" * 60 + "\n")
  427. result = query_filtered_devices()
  428. if result is not None:
  429. sys.exit(0)
  430. else:
  431. sys.exit(1)