query_device_filter.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 查询td_d_log_result表,筛选符合条件的device_id和channel_no组合
  5. 筛选条件:
  6. - task_id = 2
  7. - 对于每个device_id和channel_no的组合,取最新的10条记录(按channel_no的generate_time)
  8. - 如果这10个value值中有超过3个值大于20,则记录该(device_id, channel_no)组合
  9. """
  10. import pymysql
  11. import sys
  12. import os
  13. import zlib
  14. import json
  15. from collections import defaultdict
  16. from datetime import datetime
  17. # 设置控制台输出编码(Windows需要,Linux默认UTF-8)
  18. if sys.platform == 'win32':
  19. try:
  20. os.system('chcp 65001 >nul 2>&1')
  21. except:
  22. pass
  23. else:
  24. # Linux环境下设置UTF-8编码
  25. import locale
  26. try:
  27. locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
  28. except:
  29. try:
  30. locale.setlocale(locale.LC_ALL, 'C.UTF-8')
  31. except:
  32. pass
  33. def parse_file_data(file_name, use_fdfs=False, fdfs_client=None):
  34. """
  35. 解析振动数据文件
  36. 参数:
  37. file_name: 文件路径(FDFS路径或本地路径)
  38. use_fdfs: 是否使用FDFS客户端(如果为True,需要提供fdfs_client)
  39. fdfs_client: FDFS客户端对象(可选)
  40. 返回:
  41. 解析后的数据(字典或列表)
  42. """
  43. try:
  44. # 1.1 从 FDFS 下载文件数据,先把文件名转成 UTF-8 编码的二进制串
  45. if use_fdfs and fdfs_client:
  46. # 使用FDFS客户端下载
  47. data = fdfs_client.download_bytes(bytes(file_name, encoding="utf8"))
  48. else:
  49. # 如果没有FDFS客户端,尝试从本地文件系统读取
  50. # 将FDFS路径转换为本地路径
  51. if file_name.startswith('group1/M00/'):
  52. local_path = file_name.replace('group1/M00/', '/home/soft/data/fdfs/storage/data/', 1)
  53. elif file_name.startswith('group1/M00'):
  54. local_path = file_name.replace('group1/M00', '/home/soft/data/fdfs/storage/data', 1)
  55. else:
  56. local_path = file_name
  57. # 从本地文件系统读取
  58. with open(local_path, 'rb') as f:
  59. data = f.read()
  60. # 1.2 检查下载的二进制数据开头是否是 {(JSON 格式的标志)
  61. if data[0:1] == b'{':
  62. # 如果是 JSON 二进制串,直接解码成字符串
  63. data = data.decode()
  64. else:
  65. # 如果不是 JSON 开头,说明数据是压缩的,先解压
  66. ss = zlib.decompress(data).decode()
  67. # 修正格式:单引号改双引号(JSON 要求双引号)、nan 改成 0(避免 JSON 解析报错)
  68. ss = ss.replace('\'', '"').replace("nan", "0")
  69. # 把修正后的字符串解析成 JSON 格式的 Python 数据(字典/列表)
  70. data = json.loads(ss)
  71. return data
  72. except FileNotFoundError as e:
  73. # 文件未找到,静默返回None
  74. return None
  75. except zlib.error as e:
  76. print(f" 警告: 解压缩失败 {file_name}: {e}")
  77. return None
  78. except json.JSONDecodeError as e:
  79. print(f" 警告: JSON解析失败 {file_name}: {e}")
  80. return None
  81. except Exception as e:
  82. print(f" 警告: 解析文件失败 {file_name}: {e}")
  83. return None
  84. def calculate_asymmetry(data):
  85. """
  86. 计算不对称性
  87. 从data中提取raw_y数组,然后计算不对称性
  88. 返回 (abs_正, abs_负)
  89. """
  90. try:
  91. # 如果data是字典,提取raw_y字段
  92. if isinstance(data, dict):
  93. if 'raw_y' not in data:
  94. return None, None
  95. values_list = data['raw_y']
  96. elif isinstance(data, list):
  97. # 如果是列表,直接使用(兼容旧代码)
  98. values_list = data
  99. else:
  100. return None, None
  101. # 确保values_list是列表
  102. if not isinstance(values_list, list):
  103. return None, None
  104. # 转换为数值列表
  105. values = []
  106. for v in values_list:
  107. try:
  108. val = float(v)
  109. values.append(val)
  110. except (ValueError, TypeError):
  111. continue
  112. if len(values) == 0:
  113. return None, None
  114. # 分离正半轴和负半轴的值
  115. positive_values = [v for v in values if v > 0]
  116. negative_values = [abs(v) for v in values if v < 0] # 负半轴取绝对值
  117. # 计算和
  118. sum_positive = sum(positive_values)
  119. sum_negative = sum(negative_values)
  120. # 计算不对称性
  121. if sum_positive > 0:
  122. abs_positive = abs((sum_positive - sum_negative) / sum_positive)
  123. else:
  124. abs_positive = None
  125. if sum_negative > 0:
  126. abs_negative = abs((sum_positive - sum_negative) / sum_negative)
  127. else:
  128. abs_negative = None
  129. return abs_positive, abs_negative
  130. except Exception as e:
  131. return None, None
  132. def query_filtered_devices(fdfs_client=None):
  133. """
  134. 查询符合条件的device_id和channel_no组合
  135. 参数:
  136. fdfs_client: FDFS客户端对象(可选,用于下载文件)
  137. """
  138. # 数据库连接配置
  139. db_config = {
  140. 'host': '127.0.0.1',
  141. 'port': 3306,
  142. 'user': 'prod',
  143. 'password': 'hmdmxjIvfIjIoflL',
  144. 'database': 'iot',
  145. 'charset': 'utf8mb4',
  146. 'connect_timeout': 10
  147. }
  148. connection = None
  149. try:
  150. print("正在连接到数据库...")
  151. connection = pymysql.connect(**db_config)
  152. print("[成功] 数据库连接成功!\n")
  153. with connection.cursor() as cursor:
  154. # 查询所有唯一的device_id和channel_no组合,且task_id=2
  155. print("查询所有device_id和channel_no的组合...")
  156. query_unique = """
  157. SELECT DISTINCT device_id, channel_no
  158. FROM td_d_log_result
  159. WHERE task_id = '2'
  160. """
  161. cursor.execute(query_unique)
  162. device_channel_pairs = cursor.fetchall()
  163. print(f"找到 {len(device_channel_pairs)} 个device_id和channel_no的组合\n")
  164. # 存储符合条件的(device_id, channel_no)组合
  165. filtered_channel_list = []
  166. # 对每个device_id和channel_no组合进行处理
  167. for idx, (device_id, channel_no) in enumerate(device_channel_pairs, 1):
  168. # 查询该组合的最新10条记录(按generate_time降序,按channel_no)
  169. query_latest = """
  170. SELECT value, generate_time
  171. FROM td_d_log_result
  172. WHERE device_id = %s
  173. AND channel_no = %s
  174. AND task_id = '2'
  175. ORDER BY generate_time DESC
  176. LIMIT 10
  177. """
  178. cursor.execute(query_latest, (device_id, channel_no))
  179. records = cursor.fetchall()
  180. if len(records) < 10:
  181. # 如果记录数少于10条,跳过
  182. continue
  183. # 统计value值大于20的个数
  184. count_over_20 = 0
  185. values = []
  186. for value_str, generate_time in records:
  187. try:
  188. # value字段是varchar类型,需要转换为float
  189. value = float(value_str) if value_str else 0
  190. values.append(value)
  191. if value > 20:
  192. count_over_20 += 1
  193. except (ValueError, TypeError):
  194. # 如果转换失败,跳过该值
  195. continue
  196. # 如果超过3个值大于20,记录该(device_id, channel_no)组合
  197. if count_over_20 > 3:
  198. filtered_channel_list.append((device_id, channel_no))
  199. print(f"[{idx}] device_id: {device_id}, channel_no: {channel_no}, "
  200. f"超过20的值数量: {count_over_20}/10, 最新10个值: {values}")
  201. # 查询device_name
  202. print("\n查询设备名称...")
  203. device_info_map = {}
  204. if filtered_channel_list:
  205. # 获取所有唯一的device_id
  206. unique_device_ids = list(set([device_id for device_id, channel_no in filtered_channel_list]))
  207. # 批量查询device_name
  208. placeholders = ','.join(['%s'] * len(unique_device_ids))
  209. query_device_name = f"""
  210. SELECT device_id, device_name
  211. FROM td_d_device_info
  212. WHERE device_id IN ({placeholders})
  213. """
  214. cursor.execute(query_device_name, tuple(unique_device_ids))
  215. device_info_results = cursor.fetchall()
  216. # 构建device_id到device_name的映射
  217. for device_id, device_name in device_info_results:
  218. device_info_map[device_id] = device_name if device_name else '(未设置设备名称)'
  219. # 查询channel_name
  220. print("查询通道名称...")
  221. channel_info_map = {}
  222. if filtered_channel_list:
  223. # 批量查询channel_name(使用device_id和channel_no组合)
  224. query_channel_name = """
  225. SELECT device_id, channel_no, channel_name
  226. FROM td_d_device_channel
  227. WHERE (device_id, channel_no) IN (
  228. """
  229. # 构建IN子句的占位符
  230. placeholders = ','.join(['(%s, %s)'] * len(filtered_channel_list))
  231. query_channel_name += placeholders + ")"
  232. # 准备参数:将(device_id, channel_no)元组展开
  233. params = []
  234. for device_id, channel_no in filtered_channel_list:
  235. params.extend([device_id, int(channel_no)]) # channel_no需要转换为int
  236. cursor.execute(query_channel_name, tuple(params))
  237. channel_info_results = cursor.fetchall()
  238. # 构建(device_id, channel_no)到channel_name的映射
  239. for device_id, channel_no, channel_name in channel_info_results:
  240. channel_info_map[(device_id, str(channel_no))] = channel_name if channel_name else '(未设置通道名称)'
  241. # 查询每个(device_id, channel_no)组合最近10条的file_name
  242. print("查询每个channel_no最近10条文件记录...")
  243. channel_file_map = {}
  244. if filtered_channel_list:
  245. for device_id, channel_no in filtered_channel_list:
  246. # 查询该(device_id, channel_no)组合最近10条记录的file_name(按generate_time降序,task_id=2)
  247. # 同时关联查询td_d_log_result表的generate_time
  248. query_file_name = """
  249. SELECT lf.file_name, lf.generate_time as file_generate_time, lr.generate_time as result_generate_time
  250. FROM td_d_log_file lf
  251. LEFT JOIN td_d_log_result lr ON (
  252. lf.device_id = lr.device_id
  253. AND lf.channel_no = lr.channel_no
  254. AND lf.task_id = lr.task_id
  255. AND lf.generate_time = lr.generate_time
  256. )
  257. WHERE lf.device_id = %s
  258. AND lf.channel_no = %s
  259. AND lf.task_id = '2'
  260. ORDER BY lf.generate_time DESC
  261. LIMIT 10
  262. """
  263. cursor.execute(query_file_name, (device_id, channel_no))
  264. file_results = cursor.fetchall()
  265. # 提取file_name列表,同时解析文件计算不对称性(使用FDFS路径)
  266. file_info_list = []
  267. for file_name, file_generate_time, result_generate_time in file_results:
  268. if file_name:
  269. # 解析文件并计算不对称性(使用原始FDFS路径,如果提供了fdfs_client则使用FDFS,否则从本地读取)
  270. data = parse_file_data(file_name, use_fdfs=(fdfs_client is not None), fdfs_client=fdfs_client)
  271. abs_positive, abs_negative = calculate_asymmetry(data) if data is not None else (None, None)
  272. file_info_list.append({
  273. 'file_name': file_name, # 保持原始FDFS路径
  274. 'file_generate_time': file_generate_time,
  275. 'result_generate_time': result_generate_time,
  276. 'abs_positive': abs_positive,
  277. 'abs_negative': abs_negative
  278. })
  279. channel_file_map[(device_id, channel_no)] = file_info_list
  280. # 生成输出文件(包含时间戳)
  281. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  282. output_filename = f"query_result_{timestamp}.txt"
  283. output_filepath = os.path.join(os.getcwd(), output_filename)
  284. # 打印所有符合条件的device_id、device_name、channel_no、channel_name和最近10条file_name
  285. print("\n" + "=" * 60)
  286. print("符合条件的device_id和channel_no组合列表:")
  287. print("=" * 60)
  288. # 准备输出内容(同时用于打印和写入文件)
  289. output_lines = []
  290. output_lines.append("=" * 60)
  291. output_lines.append("符合条件的device_id和channel_no组合列表:")
  292. output_lines.append("=" * 60)
  293. output_lines.append("")
  294. if filtered_channel_list:
  295. # 按device_id和channel_no排序后打印
  296. sorted_channels = sorted(filtered_channel_list, key=lambda x: (x[0], int(x[1]) if x[1].isdigit() else x[1]))
  297. displayed_count = 0 # 记录实际显示的记录数
  298. for device_id, channel_no in sorted_channels:
  299. device_name = device_info_map.get(device_id, '(未找到设备名称)')
  300. channel_name = channel_info_map.get((device_id, channel_no), '(未找到通道名称)')
  301. # 过滤掉"未找到通道名称"的记录
  302. if channel_name == '(未找到通道名称)':
  303. continue
  304. displayed_count += 1 # 增加显示计数
  305. file_names = channel_file_map.get((device_id, channel_no), [])
  306. device_info_str = f"\n设备ID: {device_id}"
  307. device_name_str = f"设备名称: {device_name}"
  308. channel_no_str = f"通道号: {channel_no}"
  309. channel_name_str = f"通道名称: {channel_name}"
  310. file_count_str = f"最近10条文件记录({len(file_names)}条):"
  311. print(device_info_str)
  312. print(device_name_str)
  313. print(channel_no_str)
  314. print(channel_name_str)
  315. print(file_count_str)
  316. output_lines.append(device_info_str)
  317. output_lines.append(device_name_str)
  318. output_lines.append(channel_no_str)
  319. output_lines.append(channel_name_str)
  320. output_lines.append(file_count_str)
  321. if file_names:
  322. for idx, file_info in enumerate(file_names, 1):
  323. file_name = file_info['file_name']
  324. file_generate_time = file_info.get('file_generate_time')
  325. result_generate_time = file_info.get('result_generate_time')
  326. abs_positive = file_info.get('abs_positive')
  327. abs_negative = file_info.get('abs_negative')
  328. # 格式化输出
  329. file_time_str = str(file_generate_time) if file_generate_time else "N/A"
  330. result_time_str = str(result_generate_time) if result_generate_time else "N/A"
  331. abs_positive_str = f"{abs_positive:.6f}" if abs_positive is not None else "N/A"
  332. abs_negative_str = f"{abs_negative:.6f}" if abs_negative is not None else "N/A"
  333. file_line = f" {idx}. {file_name}"
  334. time_line = f" generate_time(td_d_log_result): {result_time_str}"
  335. abs_line = f" abs正: {abs_positive_str}, abs负: {abs_negative_str}"
  336. print(file_line)
  337. print(time_line)
  338. print(abs_line)
  339. output_lines.append(file_line)
  340. output_lines.append(time_line)
  341. output_lines.append(abs_line)
  342. else:
  343. no_file_str = " (无文件记录)"
  344. print(no_file_str)
  345. output_lines.append(no_file_str)
  346. separator = "-" * 60
  347. print(separator)
  348. output_lines.append(separator)
  349. total_str = f"\n总计: {displayed_count} 个(device_id, channel_no)组合(已过滤掉未找到通道名称的记录)"
  350. print(total_str)
  351. output_lines.append(total_str)
  352. else:
  353. no_result_str = "未找到符合条件的(device_id, channel_no)组合"
  354. print(no_result_str)
  355. output_lines.append(no_result_str)
  356. # 将结果写入文件
  357. try:
  358. with open(output_filepath, 'w', encoding='utf-8') as f:
  359. f.write('\n'.join(output_lines))
  360. f.write('\n')
  361. print(f"\n[成功] 查询结果已保存到文件: {output_filepath}")
  362. except Exception as e:
  363. print(f"\n[警告] 保存结果到文件失败: {e}")
  364. return filtered_channel_list
  365. except pymysql.Error as e:
  366. error_code, error_msg = e.args
  367. print("[失败] 数据库操作失败!")
  368. print(f"[失败] 错误代码: {error_code}")
  369. print(f"[失败] 错误信息: {error_msg}")
  370. return None
  371. except Exception as e:
  372. print("[失败] 发生未知错误!")
  373. print(f"[失败] 错误信息: {e}")
  374. import traceback
  375. traceback.print_exc()
  376. return None
  377. finally:
  378. # 关闭数据库连接
  379. if connection:
  380. connection.close()
  381. print("\n[成功] 数据库连接已关闭")
  382. if __name__ == "__main__":
  383. print("=" * 60)
  384. print("设备和通道筛选工具")
  385. print("=" * 60)
  386. print("筛选条件:")
  387. print("- task_id = 2")
  388. print("- 每个device_id下channel_no的最新10条记录(按channel_no的generate_time)")
  389. print("- 这10个value值中有超过3个值大于20")
  390. print("=" * 60 + "\n")
  391. result = query_filtered_devices()
  392. if result is not None:
  393. sys.exit(0)
  394. else:
  395. sys.exit(1)