| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- """
- 工具函数模块
- 提供常用的辅助功能:
- - 命令行执行工具
- - 文件处理工具
- - 日志工具
- """
- import subprocess
- import logging
- import re
- from typing import Tuple, Optional
- def run_command(cmd: str, shell: bool = True, timeout: int = 300) -> Tuple[int, str, str]:
- """
- 执行命令行命令
-
- Args:
- cmd: 要执行的命令
- shell: 是否使用 shell 执行
- timeout: 超时时间(秒)
-
- Returns:
- (return_code, stdout, stderr)
- """
- try:
- result = subprocess.run(
- cmd,
- shell=shell,
- capture_output=True,
- text=True,
- timeout=timeout
- )
- return result.returncode, result.stdout, result.stderr
- except subprocess.TimeoutExpired:
- return -1, "", f"Command timed out after {timeout} seconds"
- except Exception as e:
- return -1, "", str(e)
- def validate_hdfs_path(path: str) -> bool:
- """
- 验证 HDFS 路径格式是否有效
-
- Args:
- path: 要验证的路径
-
- Returns:
- 路径是否有效
- """
- if not path:
- return False
-
- # HDFS 路径必须以 / 开头
- if not path.startswith('/'):
- return False
-
- # 检查是否包含非法字符
- invalid_chars = re.compile(r'[<>:"|?*]')
- if invalid_chars.search(path):
- return False
-
- # 检查是否包含连续的斜杠
- if '//' in path:
- return False
-
- return True
- def format_file_size(size_bytes: int) -> str:
- """
- 格式化文件大小,将字节转换为人类可读的格式
-
- Args:
- size_bytes: 文件大小(字节)
-
- Returns:
- 格式化后的文件大小字符串
- """
- for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
- if size_bytes < 1024.0:
- return f"{size_bytes:.2f} {unit}"
- size_bytes /= 1024.0
- return f"{size_bytes:.2f} PB"
- def setup_logger(name: str, level: int = logging.INFO, log_file: Optional[str] = None) -> logging.Logger:
- """
- 设置日志器
-
- Args:
- name: 日志器名称
- level: 日志级别
- log_file: 日志文件路径(可选)
-
- Returns:
- 配置好的日志器
- """
- logger = logging.getLogger(name)
- logger.setLevel(level)
-
- # 避免重复添加处理器
- if logger.handlers:
- return logger
-
- # 创建格式器
- formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
-
- # 添加控制台处理器
- console_handler = logging.StreamHandler()
- console_handler.setLevel(level)
- console_handler.setFormatter(formatter)
- logger.addHandler(console_handler)
-
- # 如果指定了日志文件,添加文件处理器
- if log_file:
- file_handler = logging.FileHandler(log_file)
- file_handler.setLevel(level)
- file_handler.setFormatter(formatter)
- logger.addHandler(file_handler)
-
- return logger
|