#!/usr/bin/env python # -*- coding: utf-8 -*- """ 解析振动数据文件 从FDFS下载文件并解析数据 """ import zlib import json import sys import os # 设置控制台输出编码(Windows需要,Linux默认UTF-8) if sys.platform == 'win32': try: os.system('chcp 65001 >nul 2>&1') except: pass else: # Linux环境下设置UTF-8编码 import locale try: locale.setlocale(locale.LC_ALL, 'en_US.UTF-8') except: try: locale.setlocale(locale.LC_ALL, 'C.UTF-8') except: pass def parse_file_data(file_name, use_fdfs=False, fdfs_client=None): """ 解析文件数据 参数: file_name: 文件路径(FDFS路径或本地路径) use_fdfs: 是否使用FDFS客户端(如果为True,需要提供fdfs_client) fdfs_client: FDFS客户端对象(可选) 返回: 解析后的数据(字典或列表) """ try: # 1.1 从 FDFS 下载文件数据,先把文件名转成 UTF-8 编码的二进制串 if use_fdfs and fdfs_client: # 使用FDFS客户端下载 data = fdfs_client.download_bytes(bytes(file_name, encoding="utf8")) else: # 如果没有FDFS客户端,尝试从本地文件系统读取 # 将FDFS路径转换为本地路径 if file_name.startswith('group1/M00/'): local_path = file_name.replace('group1/M00/', '/home/soft/data/fdfs/storage/data/', 1) elif file_name.startswith('group1/M00'): local_path = file_name.replace('group1/M00', '/home/soft/data/fdfs/storage/data', 1) else: local_path = file_name # 从本地文件系统读取 with open(local_path, 'rb') as f: data = f.read() # 1.2 检查下载的二进制数据开头是否是 {(JSON 格式的标志) if data[0:1] == b'{': # 如果是 JSON 二进制串,直接解码成字符串 data = data.decode() else: # 如果不是 JSON 开头,说明数据是压缩的,先解压 ss = zlib.decompress(data).decode() # 修正格式:单引号改双引号(JSON 要求双引号)、nan 改成 0(避免 JSON 解析报错) ss = ss.replace('\'', '"').replace("nan", "0") # 把修正后的字符串解析成 JSON 格式的 Python 数据(字典/列表) data = json.loads(ss) return data except FileNotFoundError as e: print(f"[错误] 文件未找到: {e}") return None except zlib.error as e: print(f"[错误] 解压缩失败: {e}") return None except json.JSONDecodeError as e: print(f"[错误] JSON解析失败: {e}") return None except Exception as e: print(f"[错误] 解析文件时发生错误: {e}") import traceback traceback.print_exc() return None def calculate_asymmetry(data): """ 计算不对称性 从data中提取raw_y数组,然后计算不对称性 返回 (abs_正, abs_负) """ try: # 如果data是字典,提取raw_y字段 if isinstance(data, dict): if 'raw_y' not in data: return None, None values_list = data['raw_y'] elif isinstance(data, list): # 如果是列表,直接使用(兼容旧代码) values_list = data else: return None, None # 确保values_list是列表 if not isinstance(values_list, list): return None, None # 转换为数值列表 values = [] for v in values_list: try: val = float(v) values.append(val) except (ValueError, TypeError): continue if len(values) == 0: return None, None # 分离正半轴和负半轴的值 positive_values = [v for v in values if v > 0] negative_values = [abs(v) for v in values if v < 0] # 负半轴取绝对值 # 计算和 sum_positive = sum(positive_values) sum_negative = sum(negative_values) # 计算不对称性 if sum_positive > 0: abs_positive = abs((sum_positive - sum_negative) / sum_positive) else: abs_positive = None if sum_negative > 0: abs_negative = abs((sum_positive - sum_negative) / sum_negative) else: abs_negative = None return abs_positive, abs_negative except Exception as e: print(f"[错误] 计算不对称性时发生错误: {e}") return None, None def main(): """主函数""" # 从命令行参数获取文件路径 if len(sys.argv) < 2: print("[错误] 请提供文件路径作为参数") print("使用方法: python parse_file.py ") print("示例: python parse_file.py group1/M00/4C/AD/wKgUZWlQzpKAcUtmAA03b1suhI44618492") sys.exit(1) file_name = sys.argv[1] print("=" * 60) print("文件解析工具") print("=" * 60) print(f"文件路径: {file_name}") print("-" * 60) # 解析文件 print("正在解析文件...") data = parse_file_data(file_name, use_fdfs=False) if data is not None: print("\n[成功] 文件解析成功!") print("=" * 60) print("解析后的数据:") print("=" * 60) # 打印数据 # 如果是字典或列表,使用json.dumps格式化输出 if isinstance(data, (dict, list)): print(json.dumps(data, indent=2, ensure_ascii=False)) else: print(data) print("=" * 60) # 打印数据类型和基本信息 print(f"\n数据类型: {type(data).__name__}") if isinstance(data, dict): print(f"字典键数量: {len(data)}") print(f"字典键: {list(data.keys())}") elif isinstance(data, list): print(f"列表长度: {len(data)}") if len(data) > 0: print(f"第一个元素类型: {type(data[0]).__name__}") if isinstance(data[0], (dict, list)): print(f"第一个元素: {json.dumps(data[0], indent=2, ensure_ascii=False)[:200]}...") # 计算不对称性 print("\n" + "=" * 60) print("不对称性计算:") print("=" * 60) abs_positive, abs_negative = calculate_asymmetry(data) if abs_positive is not None and abs_negative is not None: print(f"abs正: {abs_positive:.10f}") print(f"abs负: {abs_negative:.10f}") else: print("[警告] 无法计算不对称性(数据中可能缺少raw_y字段或raw_y为空)") if isinstance(data, dict) and 'raw_y' in data: raw_y = data['raw_y'] print(f"raw_y类型: {type(raw_y).__name__}") if isinstance(raw_y, list): print(f"raw_y长度: {len(raw_y)}") else: print("\n[失败] 文件解析失败!") sys.exit(1) if __name__ == "__main__": main()