helpers.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. """
  2. 工具函数模块
  3. 提供常用的辅助功能:
  4. - 命令行执行工具
  5. - 文件处理工具
  6. - 日志工具
  7. """
  8. import subprocess
  9. import logging
  10. import re
  11. from typing import Tuple, Optional
  12. def run_command(cmd: str, shell: bool = True, timeout: int = 300) -> Tuple[int, str, str]:
  13. """
  14. 执行命令行命令
  15. Args:
  16. cmd: 要执行的命令
  17. shell: 是否使用 shell 执行
  18. timeout: 超时时间(秒)
  19. Returns:
  20. (return_code, stdout, stderr)
  21. """
  22. try:
  23. result = subprocess.run(
  24. cmd,
  25. shell=shell,
  26. capture_output=True,
  27. text=True,
  28. timeout=timeout
  29. )
  30. return result.returncode, result.stdout, result.stderr
  31. except subprocess.TimeoutExpired:
  32. return -1, "", f"Command timed out after {timeout} seconds"
  33. except Exception as e:
  34. return -1, "", str(e)
  35. def validate_hdfs_path(path: str) -> bool:
  36. """
  37. 验证 HDFS 路径格式是否有效
  38. Args:
  39. path: 要验证的路径
  40. Returns:
  41. 路径是否有效
  42. """
  43. if not path:
  44. return False
  45. # HDFS 路径必须以 / 开头
  46. if not path.startswith('/'):
  47. return False
  48. # 检查是否包含非法字符
  49. invalid_chars = re.compile(r'[<>:"|?*]')
  50. if invalid_chars.search(path):
  51. return False
  52. # 检查是否包含连续的斜杠
  53. if '//' in path:
  54. return False
  55. return True
  56. def format_file_size(size_bytes: int) -> str:
  57. """
  58. 格式化文件大小,将字节转换为人类可读的格式
  59. Args:
  60. size_bytes: 文件大小(字节)
  61. Returns:
  62. 格式化后的文件大小字符串
  63. """
  64. for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
  65. if size_bytes < 1024.0:
  66. return f"{size_bytes:.2f} {unit}"
  67. size_bytes /= 1024.0
  68. return f"{size_bytes:.2f} PB"
  69. def setup_logger(name: str, level: int = logging.INFO, log_file: Optional[str] = None) -> logging.Logger:
  70. """
  71. 设置日志器
  72. Args:
  73. name: 日志器名称
  74. level: 日志级别
  75. log_file: 日志文件路径(可选)
  76. Returns:
  77. 配置好的日志器
  78. """
  79. logger = logging.getLogger(name)
  80. logger.setLevel(level)
  81. # 避免重复添加处理器
  82. if logger.handlers:
  83. return logger
  84. # 创建格式器
  85. formatter = logging.Formatter(
  86. '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  87. datefmt='%Y-%m-%d %H:%M:%S'
  88. )
  89. # 添加控制台处理器
  90. console_handler = logging.StreamHandler()
  91. console_handler.setLevel(level)
  92. console_handler.setFormatter(formatter)
  93. logger.addHandler(console_handler)
  94. # 如果指定了日志文件,添加文件处理器
  95. if log_file:
  96. file_handler = logging.FileHandler(log_file)
  97. file_handler.setLevel(level)
  98. file_handler.setFormatter(formatter)
  99. logger.addHandler(file_handler)
  100. return logger