|
|
@@ -0,0 +1,457 @@
|
|
|
+#!/usr/bin/env python
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+查询td_d_log_result表,筛选符合条件的device_id和channel_no组合
|
|
|
+筛选条件:
|
|
|
+- task_id = 2
|
|
|
+- 对于每个device_id和channel_no的组合,取最新的10条记录(按channel_no的generate_time)
|
|
|
+- 如果这10个value值中有超过3个值大于20,则记录该(device_id, channel_no)组合
|
|
|
+"""
|
|
|
+
|
|
|
+import pymysql
|
|
|
+import sys
|
|
|
+import os
|
|
|
+import zlib
|
|
|
+import json
|
|
|
+from collections import defaultdict
|
|
|
+from datetime import datetime
|
|
|
+
|
|
|
+# 设置控制台输出编码(Windows需要,Linux默认UTF-8)
|
|
|
+if sys.platform == 'win32':
|
|
|
+ try:
|
|
|
+ os.system('chcp 65001 >nul 2>&1')
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+else:
|
|
|
+ # Linux环境下设置UTF-8编码
|
|
|
+ import locale
|
|
|
+ try:
|
|
|
+ locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
|
|
|
+ except:
|
|
|
+ try:
|
|
|
+ locale.setlocale(locale.LC_ALL, 'C.UTF-8')
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
+def parse_file_data(file_name, use_fdfs=False, fdfs_client=None):
|
|
|
+ """
|
|
|
+ 解析振动数据文件
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ file_name: 文件路径(FDFS路径或本地路径)
|
|
|
+ use_fdfs: 是否使用FDFS客户端(如果为True,需要提供fdfs_client)
|
|
|
+ fdfs_client: FDFS客户端对象(可选)
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 解析后的数据(字典或列表)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 1.1 从 FDFS 下载文件数据,先把文件名转成 UTF-8 编码的二进制串
|
|
|
+ if use_fdfs and fdfs_client:
|
|
|
+ # 使用FDFS客户端下载
|
|
|
+ data = fdfs_client.download_bytes(bytes(file_name, encoding="utf8"))
|
|
|
+ else:
|
|
|
+ # 如果没有FDFS客户端,尝试从本地文件系统读取
|
|
|
+ # 将FDFS路径转换为本地路径
|
|
|
+ if file_name.startswith('group1/M00/'):
|
|
|
+ local_path = file_name.replace('group1/M00/', '/home/soft/data/fdfs/storage/data/', 1)
|
|
|
+ elif file_name.startswith('group1/M00'):
|
|
|
+ local_path = file_name.replace('group1/M00', '/home/soft/data/fdfs/storage/data', 1)
|
|
|
+ else:
|
|
|
+ local_path = file_name
|
|
|
+
|
|
|
+ # 从本地文件系统读取
|
|
|
+ with open(local_path, 'rb') as f:
|
|
|
+ data = f.read()
|
|
|
+
|
|
|
+ # 1.2 检查下载的二进制数据开头是否是 {(JSON 格式的标志)
|
|
|
+ if data[0:1] == b'{':
|
|
|
+ # 如果是 JSON 二进制串,直接解码成字符串
|
|
|
+ data = data.decode()
|
|
|
+ else:
|
|
|
+ # 如果不是 JSON 开头,说明数据是压缩的,先解压
|
|
|
+ ss = zlib.decompress(data).decode()
|
|
|
+ # 修正格式:单引号改双引号(JSON 要求双引号)、nan 改成 0(避免 JSON 解析报错)
|
|
|
+ ss = ss.replace('\'', '"').replace("nan", "0")
|
|
|
+ # 把修正后的字符串解析成 JSON 格式的 Python 数据(字典/列表)
|
|
|
+ data = json.loads(ss)
|
|
|
+
|
|
|
+ return data
|
|
|
+ except FileNotFoundError as e:
|
|
|
+ # 文件未找到,静默返回None
|
|
|
+ return None
|
|
|
+ except zlib.error as e:
|
|
|
+ print(f" 警告: 解压缩失败 {file_name}: {e}")
|
|
|
+ return None
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ print(f" 警告: JSON解析失败 {file_name}: {e}")
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 警告: 解析文件失败 {file_name}: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+def calculate_asymmetry(data):
|
|
|
+ """
|
|
|
+ 计算不对称性
|
|
|
+ 从data中提取raw_y数组,然后计算不对称性
|
|
|
+ 返回 (abs_正, abs_负)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 如果data是字典,提取raw_y字段
|
|
|
+ if isinstance(data, dict):
|
|
|
+ if 'raw_y' not in data:
|
|
|
+ return None, None
|
|
|
+ values_list = data['raw_y']
|
|
|
+ elif isinstance(data, list):
|
|
|
+ # 如果是列表,直接使用(兼容旧代码)
|
|
|
+ values_list = data
|
|
|
+ else:
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ # 确保values_list是列表
|
|
|
+ if not isinstance(values_list, list):
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ # 转换为数值列表
|
|
|
+ values = []
|
|
|
+ for v in values_list:
|
|
|
+ try:
|
|
|
+ val = float(v)
|
|
|
+ values.append(val)
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ continue
|
|
|
+
|
|
|
+ if len(values) == 0:
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ # 分离正半轴和负半轴的值
|
|
|
+ positive_values = [v for v in values if v > 0]
|
|
|
+ negative_values = [abs(v) for v in values if v < 0] # 负半轴取绝对值
|
|
|
+
|
|
|
+ # 计算和
|
|
|
+ sum_positive = sum(positive_values)
|
|
|
+ sum_negative = sum(negative_values)
|
|
|
+
|
|
|
+ # 计算不对称性
|
|
|
+ if sum_positive > 0:
|
|
|
+ abs_positive = abs((sum_positive - sum_negative) / sum_positive)
|
|
|
+ else:
|
|
|
+ abs_positive = None
|
|
|
+
|
|
|
+ if sum_negative > 0:
|
|
|
+ abs_negative = abs((sum_positive - sum_negative) / sum_negative)
|
|
|
+ else:
|
|
|
+ abs_negative = None
|
|
|
+
|
|
|
+ return abs_positive, abs_negative
|
|
|
+ except Exception as e:
|
|
|
+ return None, None
|
|
|
+
|
|
|
+def query_filtered_devices(fdfs_client=None):
|
|
|
+ """
|
|
|
+ 查询符合条件的device_id和channel_no组合
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ fdfs_client: FDFS客户端对象(可选,用于下载文件)
|
|
|
+ """
|
|
|
+ # 数据库连接配置
|
|
|
+ db_config = {
|
|
|
+ 'host': '127.0.0.1',
|
|
|
+ 'port': 3306,
|
|
|
+ 'user': 'prod',
|
|
|
+ 'password': 'hmdmxjIvfIjIoflL',
|
|
|
+ 'database': 'iot',
|
|
|
+ 'charset': 'utf8mb4',
|
|
|
+ 'connect_timeout': 10
|
|
|
+ }
|
|
|
+
|
|
|
+ connection = None
|
|
|
+ try:
|
|
|
+ print("正在连接到数据库...")
|
|
|
+ connection = pymysql.connect(**db_config)
|
|
|
+ print("[成功] 数据库连接成功!\n")
|
|
|
+
|
|
|
+ with connection.cursor() as cursor:
|
|
|
+ # 查询所有唯一的device_id和channel_no组合,且task_id=2
|
|
|
+ print("查询所有device_id和channel_no的组合...")
|
|
|
+ query_unique = """
|
|
|
+ SELECT DISTINCT device_id, channel_no
|
|
|
+ FROM td_d_log_result
|
|
|
+ WHERE task_id = '2'
|
|
|
+ """
|
|
|
+ cursor.execute(query_unique)
|
|
|
+ device_channel_pairs = cursor.fetchall()
|
|
|
+ print(f"找到 {len(device_channel_pairs)} 个device_id和channel_no的组合\n")
|
|
|
+
|
|
|
+ # 存储符合条件的(device_id, channel_no)组合
|
|
|
+ filtered_channel_list = []
|
|
|
+
|
|
|
+ # 对每个device_id和channel_no组合进行处理
|
|
|
+ for idx, (device_id, channel_no) in enumerate(device_channel_pairs, 1):
|
|
|
+ # 查询该组合的最新10条记录(按generate_time降序,按channel_no)
|
|
|
+ query_latest = """
|
|
|
+ SELECT value, generate_time
|
|
|
+ FROM td_d_log_result
|
|
|
+ WHERE device_id = %s
|
|
|
+ AND channel_no = %s
|
|
|
+ AND task_id = '2'
|
|
|
+ ORDER BY generate_time DESC
|
|
|
+ LIMIT 10
|
|
|
+ """
|
|
|
+ cursor.execute(query_latest, (device_id, channel_no))
|
|
|
+ records = cursor.fetchall()
|
|
|
+
|
|
|
+ if len(records) < 10:
|
|
|
+ # 如果记录数少于10条,跳过
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 统计value值大于20的个数
|
|
|
+ count_over_20 = 0
|
|
|
+ values = []
|
|
|
+
|
|
|
+ for value_str, generate_time in records:
|
|
|
+ try:
|
|
|
+ # value字段是varchar类型,需要转换为float
|
|
|
+ value = float(value_str) if value_str else 0
|
|
|
+ values.append(value)
|
|
|
+ if value > 20:
|
|
|
+ count_over_20 += 1
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ # 如果转换失败,跳过该值
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 如果超过3个值大于20,记录该(device_id, channel_no)组合
|
|
|
+ if count_over_20 > 3:
|
|
|
+ filtered_channel_list.append((device_id, channel_no))
|
|
|
+ print(f"[{idx}] device_id: {device_id}, channel_no: {channel_no}, "
|
|
|
+ f"超过20的值数量: {count_over_20}/10, 最新10个值: {values}")
|
|
|
+
|
|
|
+ # 查询device_name
|
|
|
+ print("\n查询设备名称...")
|
|
|
+ device_info_map = {}
|
|
|
+ if filtered_channel_list:
|
|
|
+ # 获取所有唯一的device_id
|
|
|
+ unique_device_ids = list(set([device_id for device_id, channel_no in filtered_channel_list]))
|
|
|
+
|
|
|
+ # 批量查询device_name
|
|
|
+ placeholders = ','.join(['%s'] * len(unique_device_ids))
|
|
|
+ query_device_name = f"""
|
|
|
+ SELECT device_id, device_name
|
|
|
+ FROM td_d_device_info
|
|
|
+ WHERE device_id IN ({placeholders})
|
|
|
+ """
|
|
|
+ cursor.execute(query_device_name, tuple(unique_device_ids))
|
|
|
+ device_info_results = cursor.fetchall()
|
|
|
+
|
|
|
+ # 构建device_id到device_name的映射
|
|
|
+ for device_id, device_name in device_info_results:
|
|
|
+ device_info_map[device_id] = device_name if device_name else '(未设置设备名称)'
|
|
|
+
|
|
|
+ # 查询channel_name
|
|
|
+ print("查询通道名称...")
|
|
|
+ channel_info_map = {}
|
|
|
+ if filtered_channel_list:
|
|
|
+ # 批量查询channel_name(使用device_id和channel_no组合)
|
|
|
+ query_channel_name = """
|
|
|
+ SELECT device_id, channel_no, channel_name
|
|
|
+ FROM td_d_device_channel
|
|
|
+ WHERE (device_id, channel_no) IN (
|
|
|
+ """
|
|
|
+ # 构建IN子句的占位符
|
|
|
+ placeholders = ','.join(['(%s, %s)'] * len(filtered_channel_list))
|
|
|
+ query_channel_name += placeholders + ")"
|
|
|
+
|
|
|
+ # 准备参数:将(device_id, channel_no)元组展开
|
|
|
+ params = []
|
|
|
+ for device_id, channel_no in filtered_channel_list:
|
|
|
+ params.extend([device_id, int(channel_no)]) # channel_no需要转换为int
|
|
|
+
|
|
|
+ cursor.execute(query_channel_name, tuple(params))
|
|
|
+ channel_info_results = cursor.fetchall()
|
|
|
+
|
|
|
+ # 构建(device_id, channel_no)到channel_name的映射
|
|
|
+ for device_id, channel_no, channel_name in channel_info_results:
|
|
|
+ channel_info_map[(device_id, str(channel_no))] = channel_name if channel_name else '(未设置通道名称)'
|
|
|
+
|
|
|
+ # 查询每个(device_id, channel_no)组合最近10条的file_name
|
|
|
+ print("查询每个channel_no最近10条文件记录...")
|
|
|
+ channel_file_map = {}
|
|
|
+ if filtered_channel_list:
|
|
|
+ for device_id, channel_no in filtered_channel_list:
|
|
|
+ # 查询该(device_id, channel_no)组合最近10条记录的file_name(按generate_time降序,task_id=2)
|
|
|
+ # 同时关联查询td_d_log_result表的generate_time
|
|
|
+ query_file_name = """
|
|
|
+ SELECT lf.file_name, lf.generate_time as file_generate_time, lr.generate_time as result_generate_time
|
|
|
+ FROM td_d_log_file lf
|
|
|
+ LEFT JOIN td_d_log_result lr ON (
|
|
|
+ lf.device_id = lr.device_id
|
|
|
+ AND lf.channel_no = lr.channel_no
|
|
|
+ AND lf.task_id = lr.task_id
|
|
|
+ AND lf.generate_time = lr.generate_time
|
|
|
+ )
|
|
|
+ WHERE lf.device_id = %s
|
|
|
+ AND lf.channel_no = %s
|
|
|
+ AND lf.task_id = '2'
|
|
|
+ ORDER BY lf.generate_time DESC
|
|
|
+ LIMIT 10
|
|
|
+ """
|
|
|
+ cursor.execute(query_file_name, (device_id, channel_no))
|
|
|
+ file_results = cursor.fetchall()
|
|
|
+
|
|
|
+ # 提取file_name列表,同时解析文件计算不对称性(使用FDFS路径)
|
|
|
+ file_info_list = []
|
|
|
+ for file_name, file_generate_time, result_generate_time in file_results:
|
|
|
+ if file_name:
|
|
|
+ # 解析文件并计算不对称性(使用原始FDFS路径,如果提供了fdfs_client则使用FDFS,否则从本地读取)
|
|
|
+ data = parse_file_data(file_name, use_fdfs=(fdfs_client is not None), fdfs_client=fdfs_client)
|
|
|
+ abs_positive, abs_negative = calculate_asymmetry(data) if data is not None else (None, None)
|
|
|
+
|
|
|
+ file_info_list.append({
|
|
|
+ 'file_name': file_name, # 保持原始FDFS路径
|
|
|
+ 'file_generate_time': file_generate_time,
|
|
|
+ 'result_generate_time': result_generate_time,
|
|
|
+ 'abs_positive': abs_positive,
|
|
|
+ 'abs_negative': abs_negative
|
|
|
+ })
|
|
|
+ channel_file_map[(device_id, channel_no)] = file_info_list
|
|
|
+
|
|
|
+ # 生成输出文件(包含时间戳)
|
|
|
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
+ output_filename = f"query_result_{timestamp}.txt"
|
|
|
+ output_filepath = os.path.join(os.getcwd(), output_filename)
|
|
|
+
|
|
|
+ # 打印所有符合条件的device_id、device_name、channel_no、channel_name和最近10条file_name
|
|
|
+ print("\n" + "=" * 60)
|
|
|
+ print("符合条件的device_id和channel_no组合列表:")
|
|
|
+ print("=" * 60)
|
|
|
+
|
|
|
+ # 准备输出内容(同时用于打印和写入文件)
|
|
|
+ output_lines = []
|
|
|
+ output_lines.append("=" * 60)
|
|
|
+ output_lines.append("符合条件的device_id和channel_no组合列表:")
|
|
|
+ output_lines.append("=" * 60)
|
|
|
+ output_lines.append("")
|
|
|
+
|
|
|
+ if filtered_channel_list:
|
|
|
+ # 按device_id和channel_no排序后打印
|
|
|
+ sorted_channels = sorted(filtered_channel_list, key=lambda x: (x[0], int(x[1]) if x[1].isdigit() else x[1]))
|
|
|
+ displayed_count = 0 # 记录实际显示的记录数
|
|
|
+ for device_id, channel_no in sorted_channels:
|
|
|
+ device_name = device_info_map.get(device_id, '(未找到设备名称)')
|
|
|
+ channel_name = channel_info_map.get((device_id, channel_no), '(未找到通道名称)')
|
|
|
+
|
|
|
+ # 过滤掉"未找到通道名称"的记录
|
|
|
+ if channel_name == '(未找到通道名称)':
|
|
|
+ continue
|
|
|
+
|
|
|
+ displayed_count += 1 # 增加显示计数
|
|
|
+ file_names = channel_file_map.get((device_id, channel_no), [])
|
|
|
+
|
|
|
+ device_info_str = f"\n设备ID: {device_id}"
|
|
|
+ device_name_str = f"设备名称: {device_name}"
|
|
|
+ channel_no_str = f"通道号: {channel_no}"
|
|
|
+ channel_name_str = f"通道名称: {channel_name}"
|
|
|
+ file_count_str = f"最近10条文件记录({len(file_names)}条):"
|
|
|
+
|
|
|
+ print(device_info_str)
|
|
|
+ print(device_name_str)
|
|
|
+ print(channel_no_str)
|
|
|
+ print(channel_name_str)
|
|
|
+ print(file_count_str)
|
|
|
+
|
|
|
+ output_lines.append(device_info_str)
|
|
|
+ output_lines.append(device_name_str)
|
|
|
+ output_lines.append(channel_no_str)
|
|
|
+ output_lines.append(channel_name_str)
|
|
|
+ output_lines.append(file_count_str)
|
|
|
+
|
|
|
+ if file_names:
|
|
|
+ for idx, file_info in enumerate(file_names, 1):
|
|
|
+ file_name = file_info['file_name']
|
|
|
+ file_generate_time = file_info.get('file_generate_time')
|
|
|
+ result_generate_time = file_info.get('result_generate_time')
|
|
|
+ abs_positive = file_info.get('abs_positive')
|
|
|
+ abs_negative = file_info.get('abs_negative')
|
|
|
+
|
|
|
+ # 格式化输出
|
|
|
+ file_time_str = str(file_generate_time) if file_generate_time else "N/A"
|
|
|
+ result_time_str = str(result_generate_time) if result_generate_time else "N/A"
|
|
|
+ abs_positive_str = f"{abs_positive:.6f}" if abs_positive is not None else "N/A"
|
|
|
+ abs_negative_str = f"{abs_negative:.6f}" if abs_negative is not None else "N/A"
|
|
|
+
|
|
|
+ file_line = f" {idx}. {file_name}"
|
|
|
+ time_line = f" generate_time(td_d_log_result): {result_time_str}"
|
|
|
+ abs_line = f" abs正: {abs_positive_str}, abs负: {abs_negative_str}"
|
|
|
+
|
|
|
+ print(file_line)
|
|
|
+ print(time_line)
|
|
|
+ print(abs_line)
|
|
|
+
|
|
|
+ output_lines.append(file_line)
|
|
|
+ output_lines.append(time_line)
|
|
|
+ output_lines.append(abs_line)
|
|
|
+ else:
|
|
|
+ no_file_str = " (无文件记录)"
|
|
|
+ print(no_file_str)
|
|
|
+ output_lines.append(no_file_str)
|
|
|
+
|
|
|
+ separator = "-" * 60
|
|
|
+ print(separator)
|
|
|
+ output_lines.append(separator)
|
|
|
+
|
|
|
+ total_str = f"\n总计: {displayed_count} 个(device_id, channel_no)组合(已过滤掉未找到通道名称的记录)"
|
|
|
+ print(total_str)
|
|
|
+ output_lines.append(total_str)
|
|
|
+ else:
|
|
|
+ no_result_str = "未找到符合条件的(device_id, channel_no)组合"
|
|
|
+ print(no_result_str)
|
|
|
+ output_lines.append(no_result_str)
|
|
|
+
|
|
|
+ # 将结果写入文件
|
|
|
+ try:
|
|
|
+ with open(output_filepath, 'w', encoding='utf-8') as f:
|
|
|
+ f.write('\n'.join(output_lines))
|
|
|
+ f.write('\n')
|
|
|
+ print(f"\n[成功] 查询结果已保存到文件: {output_filepath}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"\n[警告] 保存结果到文件失败: {e}")
|
|
|
+
|
|
|
+ return filtered_channel_list
|
|
|
+
|
|
|
+ except pymysql.Error as e:
|
|
|
+ error_code, error_msg = e.args
|
|
|
+ print("[失败] 数据库操作失败!")
|
|
|
+ print(f"[失败] 错误代码: {error_code}")
|
|
|
+ print(f"[失败] 错误信息: {error_msg}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print("[失败] 发生未知错误!")
|
|
|
+ print(f"[失败] 错误信息: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+ return None
|
|
|
+
|
|
|
+ finally:
|
|
|
+ # 关闭数据库连接
|
|
|
+ if connection:
|
|
|
+ connection.close()
|
|
|
+ print("\n[成功] 数据库连接已关闭")
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ print("=" * 60)
|
|
|
+ print("设备和通道筛选工具")
|
|
|
+ print("=" * 60)
|
|
|
+ print("筛选条件:")
|
|
|
+ print("- task_id = 2")
|
|
|
+ print("- 每个device_id下channel_no的最新10条记录(按channel_no的generate_time)")
|
|
|
+ print("- 这10个value值中有超过3个值大于20")
|
|
|
+ print("=" * 60 + "\n")
|
|
|
+
|
|
|
+ result = query_filtered_devices()
|
|
|
+
|
|
|
+ if result is not None:
|
|
|
+ sys.exit(0)
|
|
|
+ else:
|
|
|
+ sys.exit(1)
|
|
|
+
|