""" 工具函数模块 提供常用的辅助功能: - 命令行执行工具 - 文件处理工具 - 日志工具 """ import subprocess import logging import re from typing import Tuple, Optional def run_command(cmd: str, shell: bool = True, timeout: int = 300) -> Tuple[int, str, str]: """ 执行命令行命令 Args: cmd: 要执行的命令 shell: 是否使用 shell 执行 timeout: 超时时间(秒) Returns: (return_code, stdout, stderr) """ try: result = subprocess.run( cmd, shell=shell, capture_output=True, text=True, timeout=timeout ) return result.returncode, result.stdout, result.stderr except subprocess.TimeoutExpired: return -1, "", f"Command timed out after {timeout} seconds" except Exception as e: return -1, "", str(e) def validate_hdfs_path(path: str) -> bool: """ 验证 HDFS 路径格式是否有效 Args: path: 要验证的路径 Returns: 路径是否有效 """ if not path: return False # HDFS 路径必须以 / 开头 if not path.startswith('/'): return False # 检查是否包含非法字符 invalid_chars = re.compile(r'[<>:"|?*]') if invalid_chars.search(path): return False # 检查是否包含连续的斜杠 if '//' in path: return False return True def format_file_size(size_bytes: int) -> str: """ 格式化文件大小,将字节转换为人类可读的格式 Args: size_bytes: 文件大小(字节) Returns: 格式化后的文件大小字符串 """ for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} PB" def setup_logger(name: str, level: int = logging.INFO, log_file: Optional[str] = None) -> logging.Logger: """ 设置日志器 Args: name: 日志器名称 level: 日志级别 log_file: 日志文件路径(可选) Returns: 配置好的日志器 """ logger = logging.getLogger(name) logger.setLevel(level) # 避免重复添加处理器 if logger.handlers: return logger # 创建格式器 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 添加控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(level) console_handler.setFormatter(formatter) logger.addHandler(console_handler) # 如果指定了日志文件,添加文件处理器 if log_file: file_handler = logging.FileHandler(log_file) file_handler.setLevel(level) file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger