#!/usr/bin/env python # -*- coding: utf-8 -*- """ 查询td_d_log_result表和td_d_log_file表,筛选符合条件的device_id和channel_no组合 筛选条件(两种类型): - task_id = 2 第一种类型(value>30): - 对于每个device_id和channel_no的组合,取最新的10条记录(按generate_time) - 计算这10个value值的平均值 - 如果平均值大于30,则记录该(device_id, channel_no)组合 第二种类型(abs>2): - 对于每个device_id和channel_no的组合,取最新的10条file_name记录(按generate_time) - 计算每条记录的不对称性(abs正、abs负) - 如果有3条或以上记录的abs正或abs负绝对值大于2,则记录该(device_id, channel_no)组合 输出字段: - 设备ID - 设备名称 - 通道名称 - 问题结果(value>30时显示平均值;abs>2时显示10个文件的abs正和abs负值) """ import pymysql import sys import os import zlib import json from collections import defaultdict from datetime import datetime # 设置控制台输出编码(Windows需要,Linux默认UTF-8) if sys.platform == 'win32': try: os.system('chcp 65001 >nul 2>&1') except: pass else: # Linux环境下设置UTF-8编码 import locale try: locale.setlocale(locale.LC_ALL, 'en_US.UTF-8') except: try: locale.setlocale(locale.LC_ALL, 'C.UTF-8') except: pass def parse_file_data(file_name, use_fdfs=False, fdfs_client=None): """ 解析振动数据文件 参数: file_name: 文件路径(FDFS路径或本地路径) use_fdfs: 是否使用FDFS客户端(如果为True,需要提供fdfs_client) fdfs_client: FDFS客户端对象(可选) 返回: 解析后的数据(字典或列表) """ try: # 1.1 从 FDFS 下载文件数据,先把文件名转成 UTF-8 编码的二进制串 if use_fdfs and fdfs_client: # 使用FDFS客户端下载 data = fdfs_client.download_bytes(bytes(file_name, encoding="utf8")) else: # 如果没有FDFS客户端,尝试从本地文件系统读取 # 将FDFS路径转换为本地路径 if file_name.startswith('group1/M00/'): local_path = file_name.replace('group1/M00/', '/home/soft/data/fdfs/storage/data/', 1) elif file_name.startswith('group1/M00'): local_path = file_name.replace('group1/M00', '/home/soft/data/fdfs/storage/data', 1) else: local_path = file_name # 从本地文件系统读取 with open(local_path, 'rb') as f: data = f.read() # 1.2 检查下载的二进制数据开头是否是 {(JSON 格式的标志) if data[0:1] == b'{': # 如果是 JSON 二进制串,直接解码成字符串 data = data.decode() else: # 如果不是 JSON 开头,说明数据是压缩的,先解压 ss = zlib.decompress(data).decode() # 修正格式:单引号改双引号(JSON 要求双引号)、nan 改成 0(避免 JSON 解析报错) ss = ss.replace('\'', '"').replace("nan", "0") # 把修正后的字符串解析成 JSON 格式的 Python 数据(字典/列表) data = json.loads(ss) return data except FileNotFoundError as e: # 文件未找到,静默返回None return None except zlib.error as e: print(f" 警告: 解压缩失败 {file_name}: {e}") return None except json.JSONDecodeError as e: print(f" 警告: JSON解析失败 {file_name}: {e}") return None except Exception as e: print(f" 警告: 解析文件失败 {file_name}: {e}") return None def calculate_asymmetry(data): """ 计算不对称性 从data中提取raw_y数组,然后计算不对称性 返回 (abs_正, abs_负, 正半轴点数量, 负半轴点数量) """ try: # 如果data是字典,提取raw_y字段 if isinstance(data, dict): if 'raw_y' not in data: return None, None, None, None values_list = data['raw_y'] elif isinstance(data, list): # 如果是列表,直接使用(兼容旧代码) values_list = data else: return None, None, None, None # 确保values_list是列表 if not isinstance(values_list, list): return None, None, None, None # 转换为数值列表 values = [] for v in values_list: try: val = float(v) values.append(val) except (ValueError, TypeError): continue if len(values) == 0: return None, None, None, None # 分离正半轴和负半轴的值 positive_values = [v for v in values if v > 0] negative_values = [abs(v) for v in values if v < 0] # 负半轴取绝对值 # 计算正半轴和负半轴的点数量 count_positive = len(positive_values) count_negative = len(negative_values) # 计算和 sum_positive = sum(positive_values) sum_negative = sum(negative_values) # 计算不对称性 if sum_negative > 0: abs_positive = abs(sum_positive / sum_negative) else: abs_positive = None if sum_positive > 0: abs_negative = abs(sum_negative / sum_positive) else: abs_negative = None return abs_positive, abs_negative, count_positive, count_negative except Exception as e: return None, None, None, None def query_filtered_devices(fdfs_client=None): """ 查询符合条件的device_id和channel_no组合 参数: fdfs_client: FDFS客户端对象(可选,用于下载文件) """ # 数据库连接配置 db_config = { 'host': '127.0.0.1', 'port': 3306, 'user': 'prod', 'password': 'hmdmxjIvfIjIoflL', 'database': 'iot', 'charset': 'utf8mb4', 'connect_timeout': 10 } connection = None try: print("正在连接到数据库...") connection = pymysql.connect(**db_config) print("[成功] 数据库连接成功!\n") with connection.cursor() as cursor: # 查询所有唯一的device_id和channel_no组合,且task_id=2 print("查询所有device_id和channel_no的组合...") query_unique = """ SELECT DISTINCT device_id, channel_no FROM td_d_log_result WHERE task_id = '2' """ cursor.execute(query_unique) device_channel_pairs = cursor.fetchall() print(f"找到 {len(device_channel_pairs)} 个device_id和channel_no的组合\n") # 存储符合条件的(device_id, channel_no)组合及其问题结果 # 格式: {(device_id, channel_no): {'value>30': 平均值, 'abs>2': [(生成时间, abs正, abs负, 正半轴点数量, 负半轴点数量), ...]}} filtered_channel_dict = {} # ===== 阶段一:筛选value>30的故障测点(基于平均值) ===== print("=" * 60) print("阶段一:筛选value平均值>30的故障测点") print("=" * 60) for idx, (device_id, channel_no) in enumerate(device_channel_pairs, 1): # 查询该组合的最新10条记录(按generate_time降序) query_latest = """ SELECT value, generate_time FROM td_d_log_result WHERE device_id = %s AND channel_no = %s AND task_id = '2' ORDER BY generate_time DESC LIMIT 10 """ cursor.execute(query_latest, (device_id, channel_no)) records = cursor.fetchall() if len(records) < 10: # 如果记录数少于10条,跳过 continue # 计算最近10个value的平均值 values = [] for value_str, generate_time in records: try: # value字段是varchar类型,需要转换为float value = float(value_str) if value_str else 0 values.append(value) except (ValueError, TypeError): # 如果转换失败,跳过该值 continue # 如果有效值数量不足,跳过 if len(values) == 0: continue # 计算平均值 avg_value = sum(values) / len(values) # 如果平均值大于30,记录该(device_id, channel_no)组合(类型1:value>30) if avg_value > 30: key = (device_id, channel_no) if key not in filtered_channel_dict: filtered_channel_dict[key] = {} filtered_channel_dict[key]['value>30'] = avg_value print(f"[{idx}] device_id: {device_id}, channel_no: {channel_no}, " f"平均值: {avg_value:.2f}") # ===== 阶段二:筛选abs>2的故障测点(基于所有组合) ===== print("\n" + "=" * 60) print("阶段二:筛选abs>2的故障测点") print("=" * 60) for idx, (device_id, channel_no) in enumerate(device_channel_pairs, 1): # 查询该(device_id, channel_no)组合最近10条记录的file_name(按generate_time降序,task_id=2) query_file_name = """ SELECT lf.file_name, lf.generate_time as file_generate_time FROM td_d_log_file lf WHERE lf.device_id = %s AND lf.channel_no = %s AND lf.task_id = '2' ORDER BY lf.generate_time DESC LIMIT 10 """ cursor.execute(query_file_name, (device_id, channel_no)) file_results = cursor.fetchall() if len(file_results) < 10: # 如果文件记录数少于10条,跳过 continue # 统计abs正或abs负绝对值大于2的个数,同时保存所有10个文件的abs值、点数量和generate_time count_abs_over_2 = 0 abs_values_list = [] # 保存所有10个文件的(生成时间, abs正, abs负, 正半轴点数量, 负半轴点数量)值 for file_name, file_generate_time in file_results: if file_name: # 解析文件并计算不对称性 data = parse_file_data(file_name, use_fdfs=(fdfs_client is not None), fdfs_client=fdfs_client) result = calculate_asymmetry(data) if data is not None else (None, None, None, None) abs_positive, abs_negative, count_positive, count_negative = result # 保存abs值、点数量和generate_time abs_values_list.append((file_generate_time, abs_positive, abs_negative, count_positive, count_negative)) # 检查abs正或abs负的绝对值是否大于2(只要其中一个大于2就算一条) if (abs_positive is not None and abs(abs_positive) > 2) or (abs_negative is not None and abs(abs_negative) > 2): count_abs_over_2 += 1 else: # 如果file_name为空,也添加None值以保持列表长度 abs_values_list.append((file_generate_time, None, None, None, None)) # 如果有3条或以上记录的abs>2,记录该(device_id, channel_no)组合(类型2:abs>2) if count_abs_over_2 >= 3: key = (device_id, channel_no) if key not in filtered_channel_dict: filtered_channel_dict[key] = {} filtered_channel_dict[key]['abs>2'] = abs_values_list print(f"[{idx}] device_id: {device_id}, channel_no: {channel_no}, " f"abs>2的记录数量: {count_abs_over_2}/10") # 转换为列表格式(用于后续处理) filtered_channel_list = list(filtered_channel_dict.keys()) # 查询device_name print("\n查询设备名称...") device_info_map = {} if filtered_channel_list: # 获取所有唯一的device_id unique_device_ids = list(set([device_id for device_id, channel_no in filtered_channel_list])) # 批量查询device_name placeholders = ','.join(['%s'] * len(unique_device_ids)) query_device_name = f""" SELECT device_id, device_name FROM td_d_device_info WHERE device_id IN ({placeholders}) """ cursor.execute(query_device_name, tuple(unique_device_ids)) device_info_results = cursor.fetchall() # 构建device_id到device_name的映射 for device_id, device_name in device_info_results: device_info_map[device_id] = device_name if device_name else '(未设置设备名称)' # 查询channel_name print("查询通道名称...") channel_info_map = {} if filtered_channel_list: # 批量查询channel_name(使用device_id和channel_no组合) query_channel_name = """ SELECT device_id, channel_no, channel_name FROM td_d_device_channel WHERE (device_id, channel_no) IN ( """ # 构建IN子句的占位符 placeholders = ','.join(['(%s, %s)'] * len(filtered_channel_list)) query_channel_name += placeholders + ")" # 准备参数:将(device_id, channel_no)元组展开 params = [] for device_id, channel_no in filtered_channel_list: params.extend([device_id, int(channel_no)]) # channel_no需要转换为int cursor.execute(query_channel_name, tuple(params)) channel_info_results = cursor.fetchall() # 构建(device_id, channel_no)到channel_name的映射 for device_id, channel_no, channel_name in channel_info_results: channel_info_map[(device_id, str(channel_no))] = channel_name if channel_name else '(未设置通道名称)' # 生成输出文件(包含时间戳) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"query_result_{timestamp}.txt" output_filepath = os.path.join(os.getcwd(), output_filename) # 打印所有符合条件的设备信息 print("\n" + "=" * 60) print("符合条件的故障测点列表:") print("=" * 60) # 准备输出内容(同时用于打印和写入文件) output_lines = [] output_lines.append("=" * 60) output_lines.append("符合条件的故障测点列表:") output_lines.append("=" * 60) output_lines.append("") output_lines.append("字段说明:") output_lines.append("- 设备ID: 设备标识") output_lines.append("- 设备名称: 设备名称") output_lines.append("- 通道名称: 通道名称") output_lines.append("- 问题结果: value>30时显示平均值;abs>2时显示10个文件的abs正和abs负值") output_lines.append("") if filtered_channel_list: # 按device_id和channel_no排序后打印 sorted_channels = sorted(filtered_channel_list, key=lambda x: (x[0], int(x[1]) if x[1].isdigit() else x[1])) displayed_count = 0 # 记录实际显示的记录数 for device_id, channel_no in sorted_channels: device_name = device_info_map.get(device_id, '(未找到设备名称)') channel_name = channel_info_map.get((device_id, channel_no), '(未找到通道名称)') # 过滤掉"未找到通道名称"的记录 if channel_name == '(未找到通道名称)': continue displayed_count += 1 # 增加显示计数 # 获取问题结果 result_data = filtered_channel_dict.get((device_id, channel_no), {}) # 格式化输出 device_info_str = f"设备ID: {device_id}" device_name_str = f"设备名称: {device_name}" channel_name_str = f"通道名称: {channel_name}" print(device_info_str) print(device_name_str) print(channel_name_str) output_lines.append(device_info_str) output_lines.append(device_name_str) output_lines.append(channel_name_str) # 输出问题结果 problem_result_lines = [] if 'value>30' in result_data: avg_value = result_data['value>30'] result_line = f"问题结果(value>30): 平均值 = {avg_value:.2f}" print(result_line) problem_result_lines.append(result_line) if 'abs>2' in result_data: abs_values_list = result_data['abs>2'] result_line = f"问题结果(abs>2):" print(result_line) problem_result_lines.append(result_line) for idx, (generate_time, abs_pos, abs_neg, count_pos, count_neg) in enumerate(abs_values_list, 1): time_str = str(generate_time) if generate_time else "N/A" abs_pos_str = f"{abs_pos:.6f}" if abs_pos is not None else "N/A" abs_neg_str = f"{abs_neg:.6f}" if abs_neg is not None else "N/A" count_pos_str = str(count_pos) if count_pos is not None else "N/A" count_neg_str = str(count_neg) if count_neg is not None else "N/A" detail_line = f" 文件{idx}: generate_time = {time_str}, abs正 = {abs_pos_str}, abs负 = {abs_neg_str}, 正半轴点数量 = {count_pos_str}, 负半轴点数量 = {count_neg_str}" print(detail_line) problem_result_lines.append(detail_line) output_lines.extend(problem_result_lines) print("-" * 60) output_lines.append("-" * 60) total_str = f"\n总计: {displayed_count} 个故障测点(已过滤掉未找到通道名称的记录)" print(total_str) output_lines.append(total_str) else: no_result_str = "未找到符合条件的故障测点" print(no_result_str) output_lines.append(no_result_str) # 将结果写入文件 try: with open(output_filepath, 'w', encoding='utf-8') as f: f.write('\n'.join(output_lines)) f.write('\n') print(f"\n[成功] 查询结果已保存到文件: {output_filepath}") except Exception as e: print(f"\n[警告] 保存结果到文件失败: {e}") return filtered_channel_list except pymysql.Error as e: error_code, error_msg = e.args print("[失败] 数据库操作失败!") print(f"[失败] 错误代码: {error_code}") print(f"[失败] 错误信息: {error_msg}") return None except Exception as e: print("[失败] 发生未知错误!") print(f"[失败] 错误信息: {e}") import traceback traceback.print_exc() return None finally: # 关闭数据库连接 if connection: connection.close() print("\n[成功] 数据库连接已关闭") if __name__ == "__main__": print("=" * 60) print("设备和通道筛选工具") print("=" * 60) print("筛选条件:") print("- task_id = 2") print("") print("第一种类型(value>30):") print(" - 每个(device_id, channel_no)组合的最新10条记录(按generate_time)") print(" - 计算这10个value值的平均值,如果平均值大于30") print("") print("第二种类型(abs>2):") print(" - 每个(device_id, channel_no)组合的最新10条file_name记录(按generate_time)") print(" - 计算每条记录的不对称性(abs正、abs负)") print(" - 如果有3条或以上记录的abs正或abs负绝对值大于2") print("=" * 60 + "\n") result = query_filtered_devices() if result is not None: sys.exit(0) else: sys.exit(1)