| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 解析振动数据文件
- 从FDFS下载文件并解析数据
- """
- import zlib
- import json
- import sys
- import os
- # 设置控制台输出编码(Windows需要,Linux默认UTF-8)
- if sys.platform == 'win32':
- try:
- os.system('chcp 65001 >nul 2>&1')
- except:
- pass
- else:
- # Linux环境下设置UTF-8编码
- import locale
- try:
- locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
- except:
- try:
- locale.setlocale(locale.LC_ALL, 'C.UTF-8')
- except:
- pass
- def parse_file_data(file_name, use_fdfs=False, fdfs_client=None):
- """
- 解析文件数据
-
- 参数:
- file_name: 文件路径(FDFS路径或本地路径)
- use_fdfs: 是否使用FDFS客户端(如果为True,需要提供fdfs_client)
- fdfs_client: FDFS客户端对象(可选)
-
- 返回:
- 解析后的数据(字典或列表)
- """
- try:
- # 1.1 从 FDFS 下载文件数据,先把文件名转成 UTF-8 编码的二进制串
- if use_fdfs and fdfs_client:
- # 使用FDFS客户端下载
- data = fdfs_client.download_bytes(bytes(file_name, encoding="utf8"))
- else:
- # 如果没有FDFS客户端,尝试从本地文件系统读取
- # 将FDFS路径转换为本地路径
- if file_name.startswith('group1/M00/'):
- local_path = file_name.replace('group1/M00/', '/home/soft/data/fdfs/storage/data/', 1)
- elif file_name.startswith('group1/M00'):
- local_path = file_name.replace('group1/M00', '/home/soft/data/fdfs/storage/data', 1)
- else:
- local_path = file_name
-
- # 从本地文件系统读取
- with open(local_path, 'rb') as f:
- data = f.read()
-
- # 1.2 检查下载的二进制数据开头是否是 {(JSON 格式的标志)
- if data[0:1] == b'{':
- # 如果是 JSON 二进制串,直接解码成字符串
- data = data.decode()
- else:
- # 如果不是 JSON 开头,说明数据是压缩的,先解压
- ss = zlib.decompress(data).decode()
- # 修正格式:单引号改双引号(JSON 要求双引号)、nan 改成 0(避免 JSON 解析报错)
- ss = ss.replace('\'', '"').replace("nan", "0")
- # 把修正后的字符串解析成 JSON 格式的 Python 数据(字典/列表)
- data = json.loads(ss)
-
- return data
- except FileNotFoundError as e:
- print(f"[错误] 文件未找到: {e}")
- return None
- except zlib.error as e:
- print(f"[错误] 解压缩失败: {e}")
- return None
- except json.JSONDecodeError as e:
- print(f"[错误] JSON解析失败: {e}")
- return None
- except Exception as e:
- print(f"[错误] 解析文件时发生错误: {e}")
- import traceback
- traceback.print_exc()
- return None
- def calculate_asymmetry(data):
- """
- 计算不对称性
- 从data中提取raw_y数组,然后计算不对称性
- 返回 (abs_正, abs_负)
- """
- try:
- # 如果data是字典,提取raw_y字段
- if isinstance(data, dict):
- if 'raw_y' not in data:
- return None, None
- values_list = data['raw_y']
- elif isinstance(data, list):
- # 如果是列表,直接使用(兼容旧代码)
- values_list = data
- else:
- return None, None
-
- # 确保values_list是列表
- if not isinstance(values_list, list):
- return None, None
-
- # 转换为数值列表
- values = []
- for v in values_list:
- try:
- val = float(v)
- values.append(val)
- except (ValueError, TypeError):
- continue
-
- if len(values) == 0:
- return None, None
-
- # 分离正半轴和负半轴的值
- positive_values = [v for v in values if v > 0]
- negative_values = [abs(v) for v in values if v < 0] # 负半轴取绝对值
-
- # 计算和
- sum_positive = sum(positive_values)
- sum_negative = sum(negative_values)
-
- # 计算不对称性
- if sum_positive > 0:
- abs_positive = abs((sum_positive - sum_negative) / sum_positive)
- else:
- abs_positive = None
-
- if sum_negative > 0:
- abs_negative = abs((sum_positive - sum_negative) / sum_negative)
- else:
- abs_negative = None
-
- return abs_positive, abs_negative
- except Exception as e:
- print(f"[错误] 计算不对称性时发生错误: {e}")
- return None, None
- def main():
- """主函数"""
- # 从命令行参数获取文件路径
- if len(sys.argv) < 2:
- print("[错误] 请提供文件路径作为参数")
- print("使用方法: python parse_file.py <file_name>")
- print("示例: python parse_file.py group1/M00/4C/AD/wKgUZWlQzpKAcUtmAA03b1suhI44618492")
- sys.exit(1)
-
- file_name = sys.argv[1]
-
- print("=" * 60)
- print("文件解析工具")
- print("=" * 60)
- print(f"文件路径: {file_name}")
- print("-" * 60)
-
- # 解析文件
- print("正在解析文件...")
- data = parse_file_data(file_name, use_fdfs=False)
-
- if data is not None:
- print("\n[成功] 文件解析成功!")
- print("=" * 60)
- print("解析后的数据:")
- print("=" * 60)
-
- # 打印数据
- # 如果是字典或列表,使用json.dumps格式化输出
- if isinstance(data, (dict, list)):
- print(json.dumps(data, indent=2, ensure_ascii=False))
- else:
- print(data)
-
- print("=" * 60)
-
- # 打印数据类型和基本信息
- print(f"\n数据类型: {type(data).__name__}")
- if isinstance(data, dict):
- print(f"字典键数量: {len(data)}")
- print(f"字典键: {list(data.keys())}")
- elif isinstance(data, list):
- print(f"列表长度: {len(data)}")
- if len(data) > 0:
- print(f"第一个元素类型: {type(data[0]).__name__}")
- if isinstance(data[0], (dict, list)):
- print(f"第一个元素: {json.dumps(data[0], indent=2, ensure_ascii=False)[:200]}...")
-
- # 计算不对称性
- print("\n" + "=" * 60)
- print("不对称性计算:")
- print("=" * 60)
- abs_positive, abs_negative = calculate_asymmetry(data)
-
- if abs_positive is not None and abs_negative is not None:
- print(f"abs正: {abs_positive:.10f}")
- print(f"abs负: {abs_negative:.10f}")
- else:
- print("[警告] 无法计算不对称性(数据中可能缺少raw_y字段或raw_y为空)")
- if isinstance(data, dict) and 'raw_y' in data:
- raw_y = data['raw_y']
- print(f"raw_y类型: {type(raw_y).__name__}")
- if isinstance(raw_y, list):
- print(f"raw_y长度: {len(raw_y)}")
- else:
- print("\n[失败] 文件解析失败!")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|