""" HDFS 文件系统操作模块 提供现代化的 HDFS 操作能力: - 多种后端支持(命令行、hdfs 库、pyhdfs 库、WebHDFS) - 同步和异步 API - 上下文管理器支持 - 配置管理集成 - 重试机制 - 丰富的错误处理 功能对应 Java 版本 CommonOperation 类: - 创建目录 - 删除目录/文件 - 上传文件 - 读写文件 - 检查文件是否存在 - 列出目录内容 - 获取文件信息 """ import os import asyncio import time from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path from typing import ( Any, Callable, Dict, Generic, List, Optional, Tuple, Type, TypeVar, Union, Iterator, AsyncIterator ) from contextlib import contextmanager, asynccontextmanager from .config import ConfigurationManager, HDFSConfig, get_config from .utils.helpers import ( run_command, validate_hdfs_path, setup_logger, format_file_size ) T = TypeVar('T') class BackendType(Enum): """HDFS 后端类型""" CLI = "cli" # 命令行工具 HDFS_LIB = "hdfs_lib" # hdfs 库 PYHDFS = "pyhdfs" # pyhdfs 库 WEBHDFS = "webhdfs" # WebHDFS REST API AUTO = "auto" # 自动选择可用的后端 @dataclass class FileStatus: """文件状态信息""" path: str is_directory: bool length: int = 0 replication: int = 1 block_size: int = 134217728 # 128MB 默认块大小 modification_time: Optional[datetime] = None access_time: Optional[datetime] = None owner: str = "" group: str = "" permission: str = "644" is_snapshot: bool = False @property def size_formatted(self) -> str: """格式化的文件大小""" return format_file_size(self.length) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { 'path': self.path, 'is_directory': self.is_directory, 'length': self.length, 'size_formatted': self.size_formatted, 'replication': self.replication, 'block_size': self.block_size, 'modification_time': self.modification_time.isoformat() if self.modification_time else None, 'access_time': self.access_time.isoformat() if self.access_time else None, 'owner': self.owner, 'group': self.group, 'permission': self.permission, 'is_snapshot': self.is_snapshot, } class HDFSBackend(ABC): """HDFS 后端抽象基类""" def __init__(self, config: HDFSConfig, logger): self.config = config self.logger = logger @abstractmethod def is_available(self) -> bool: """检查后端是否可用""" pass @abstractmethod def make_dir(self, path: str) -> bool: """创建目录""" pass @abstractmethod def delete(self, path: str, recursive: bool = True) -> bool: """删除文件或目录""" pass @abstractmethod def copy_from_local(self, src: str, dst: str) -> bool: """从本地上传文件到 HDFS""" pass @abstractmethod def copy_to_local(self, src: str, dst: str) -> bool: """从 HDFS 下载文件到本地""" pass @abstractmethod def read_file(self, path: str) -> Optional[str]: """读取文件内容""" pass @abstractmethod def write_file(self, path: str, content: str, overwrite: bool = True) -> bool: """写入文件内容""" pass @abstractmethod def exists(self, path: str) -> bool: """检查路径是否存在""" pass @abstractmethod def list_dir(self, path: str) -> List[str]: """列出目录内容""" pass @abstractmethod def get_file_status(self, path: str) -> Optional[FileStatus]: """获取文件状态""" pass @abstractmethod def get_file_size(self, path: str) -> Optional[int]: """获取文件大小""" pass @abstractmethod def rename(self, src: str, dst: str) -> bool: """重命名文件或目录""" pass @abstractmethod def set_permission(self, path: str, permission: str) -> bool: """设置文件权限""" pass @abstractmethod def set_owner(self, path: str, owner: Optional[str] = None, group: Optional[str] = None) -> bool: """设置文件所有者""" pass class CLIBackend(HDFSBackend): """命令行后端实现""" def __init__(self, config: HDFSConfig, logger): super().__init__(config, logger) self.hadoop_cmd = config.hadoop_cmd or 'hdfs' self._check_cmd_available() def _check_cmd_available(self): """检查命令是否可用""" if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0: self.hadoop_cmd = 'hadoop' if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0: self.logger.warning("Neither 'hdfs' nor 'hadoop' command found") def is_available(self) -> bool: """检查后端是否可用""" return os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') == 0 def _execute_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]: """执行 HDFS 命令""" args = args or [] cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}" self.logger.debug(f"Executing command: {cmd}") return run_command(cmd) def make_dir(self, path: str) -> bool: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Creating directory: {path}") returncode, stdout, stderr = self._execute_command('dfs', ['-mkdir', '-p', path]) if returncode == 0: self.logger.info(f"Successfully created directory: {path}") return True else: self.logger.error(f"Failed to create directory: {path}, Error: {stderr}") return False def delete(self, path: str, recursive: bool = True) -> bool: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Deleting: {path}") args = ['-rm', '-r'] if recursive else ['-rm'] args.append(path) returncode, stdout, stderr = self._execute_command('dfs', args) if returncode == 0: self.logger.info(f"Successfully deleted: {path}") return True else: self.logger.error(f"Failed to delete: {path}, Error: {stderr}") return False def copy_from_local(self, src: str, dst: str) -> bool: if not os.path.exists(src): self.logger.error(f"Local file not found: {src}") return False if not validate_hdfs_path(dst): self.logger.error(f"Invalid HDFS path: {dst}") return False self.logger.info(f"Copying from local {src} to HDFS {dst}") returncode, stdout, stderr = self._execute_command('dfs', ['-copyFromLocal', src, dst]) if returncode == 0: self.logger.info(f"Successfully copied {src} to {dst}") return True else: self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}") return False def copy_to_local(self, src: str, dst: str) -> bool: if not validate_hdfs_path(src): self.logger.error(f"Invalid HDFS path: {src}") return False self.logger.info(f"Copying from HDFS {src} to local {dst}") returncode, stdout, stderr = self._execute_command('dfs', ['-copyToLocal', src, dst]) if returncode == 0: self.logger.info(f"Successfully copied {src} to {dst}") return True else: self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}") return False def read_file(self, path: str) -> Optional[str]: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return None if not self.exists(path): self.logger.error(f"File does not exist: {path}") return None self.logger.info(f"Reading file: {path}") returncode, stdout, stderr = self._execute_command('dfs', ['-cat', path]) if returncode == 0: self.logger.info(f"Successfully read file: {path}") return stdout else: self.logger.error(f"Failed to read file: {path}, Error: {stderr}") return None def write_file(self, path: str, content: str, overwrite: bool = True) -> bool: import tempfile if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Writing to file: {path}") with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file: temp_file.write(content) temp_path = temp_file.name try: args = ['-put'] if overwrite: args.append('-f') args.extend([temp_path, path]) returncode, stdout, stderr = self._execute_command('dfs', args) if returncode == 0: self.logger.info(f"Successfully wrote to file: {path}") return True else: self.logger.error(f"Failed to write to file: {path}, Error: {stderr}") return False finally: if os.path.exists(temp_path): os.unlink(temp_path) def exists(self, path: str) -> bool: if not validate_hdfs_path(path): return False returncode, _, _ = self._execute_command('dfs', ['-test', '-e', path]) return returncode == 0 def list_dir(self, path: str) -> List[str]: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return [] if not self.exists(path): self.logger.error(f"Directory does not exist: {path}") return [] returncode, stdout, stderr = self._execute_command('dfs', ['-ls', path]) if returncode == 0: lines = stdout.strip().split('\n') if len(lines) > 0 and lines[0].startswith('Found'): lines = lines[1:] files = [] for line in lines: parts = line.split() if len(parts) >= 8: files.append(parts[-1]) return files else: self.logger.error(f"Failed to list directory: {path}, Error: {stderr}") return [] def get_file_status(self, path: str) -> Optional[FileStatus]: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return None if not self.exists(path): self.logger.error(f"Path does not exist: {path}") return None returncode, stdout, stderr = self._execute_command('dfs', ['-stat', '%F,%s,%r,%b,%y,%z,%u,%g,%a', path]) if returncode == 0: parts = stdout.strip().split(',') if len(parts) >= 9: is_dir = parts[0] == 'directory' try: return FileStatus( path=path, is_directory=is_dir, length=int(parts[1]) if parts[1] else 0, replication=int(parts[2]) if parts[2] else 1, block_size=int(parts[3]) if parts[3] else 134217728, modification_time=datetime.strptime(parts[4], '%Y-%m-%d %H:%M:%S') if parts[4] else None, access_time=datetime.strptime(parts[5], '%Y-%m-%d %H:%M:%S') if parts[5] else None, owner=parts[6], group=parts[7], permission=parts[8], ) except (ValueError, IndexError) as e: self.logger.warning(f"Failed to parse file status: {e}") # 备用方法:使用 -ls returncode, stdout, stderr = self._execute_command('dfs', ['-ls', '-d', path]) if returncode == 0: parts = stdout.strip().split() if len(parts) >= 8: is_dir = parts[0].startswith('d') try: return FileStatus( path=path, is_directory=is_dir, length=int(parts[4]) if parts[4] else 0, replication=int(parts[1]) if parts[1] and not is_dir else 1, owner=parts[2], group=parts[3], ) except (ValueError, IndexError): pass return None def get_file_size(self, path: str) -> Optional[int]: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return None if not self.exists(path): self.logger.error(f"File does not exist: {path}") return None returncode, stdout, stderr = self._execute_command('dfs', ['-du', '-s', path]) if returncode == 0: parts = stdout.strip().split() if len(parts) >= 1: try: return int(parts[0]) except ValueError: self.logger.error(f"Failed to parse file size: {stdout}") return None def rename(self, src: str, dst: str) -> bool: if not validate_hdfs_path(src): self.logger.error(f"Invalid source path: {src}") return False if not validate_hdfs_path(dst): self.logger.error(f"Invalid destination path: {dst}") return False self.logger.info(f"Renaming {src} to {dst}") returncode, stdout, stderr = self._execute_command('dfs', ['-mv', src, dst]) if returncode == 0: self.logger.info(f"Successfully renamed {src} to {dst}") return True else: self.logger.error(f"Failed to rename {src} to {dst}, Error: {stderr}") return False def set_permission(self, path: str, permission: str) -> bool: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Setting permission of {path} to {permission}") returncode, stdout, stderr = self._execute_command('dfs', ['-chmod', permission, path]) if returncode == 0: self.logger.info(f"Successfully set permission of {path} to {permission}") return True else: self.logger.error(f"Failed to set permission of {path}, Error: {stderr}") return False def set_owner(self, path: str, owner: Optional[str] = None, group: Optional[str] = None) -> bool: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False if owner is None and group is None: self.logger.error("At least one of owner or group must be specified") return False owner_str = owner if owner else '' group_str = f":{group}" if group else '' owner_group = f"{owner_str}{group_str}" self.logger.info(f"Setting owner of {path} to {owner_group}") returncode, stdout, stderr = self._execute_command('dfs', ['-chown', owner_group, path]) if returncode == 0: self.logger.info(f"Successfully set owner of {path} to {owner_group}") return True else: self.logger.error(f"Failed to set owner of {path}, Error: {stderr}") return False class WebHDFSBackend(HDFSBackend): """WebHDFS REST API 后端实现""" def __init__(self, config: HDFSConfig, logger): super().__init__(config, logger) self.base_url = f"http://{config.namenode_host}:{config.namenode_http_port}/webhdfs/v1" self.user = config.user or os.environ.get('USER', 'hadoop') self._session = None def _get_session(self): """获取 HTTP 会话""" if self._session is None: try: import requests self._session = requests.Session() except ImportError: self.logger.error("requests library is required for WebHDFS backend") raise return self._session def is_available(self) -> bool: """检查后端是否可用""" try: import requests # 尝试连接 Namenode response = self._get_session().get( f"{self.base_url}/?op=LISTSTATUS", params={'user.name': self.user}, timeout=5 ) return response.status_code in [200, 401, 403] except Exception: return False def make_dir(self, path: str) -> bool: try: session = self._get_session() response = session.put( f"{self.base_url}{path}", params={'op': 'MKDIRS', 'user.name': self.user}, timeout=self.config.connect_timeout ) return response.status_code == 200 except Exception as e: self.logger.error(f"Failed to create directory: {e}") return False def delete(self, path: str, recursive: bool = True) -> bool: try: session = self._get_session() response = session.delete( f"{self.base_url}{path}", params={ 'op': 'DELETE', 'recursive': str(recursive).lower(), 'user.name': self.user }, timeout=self.config.connect_timeout ) return response.status_code == 200 except Exception as e: self.logger.error(f"Failed to delete: {e}") return False def copy_from_local(self, src: str, dst: str) -> bool: try: with open(src, 'rb') as f: content = f.read() return self.write_file(dst, content.decode('utf-8', errors='ignore')) except Exception as e: self.logger.error(f"Failed to copy from local: {e}") return False def copy_to_local(self, src: str, dst: str) -> bool: content = self.read_file(src) if content is None: return False try: with open(dst, 'w', encoding='utf-8') as f: f.write(content) return True except Exception as e: self.logger.error(f"Failed to copy to local: {e}") return False def read_file(self, path: str) -> Optional[str]: try: session = self._get_session() response = session.get( f"{self.base_url}{path}", params={'op': 'OPEN', 'user.name': self.user}, timeout=self.config.read_timeout ) if response.status_code == 200: return response.text else: self.logger.error(f"Failed to read file: {response.status_code}") return None except Exception as e: self.logger.error(f"Failed to read file: {e}") return None def write_file(self, path: str, content: str, overwrite: bool = True) -> bool: try: session = self._get_session() # 第一步:获取写入位置 response = session.put( f"{self.base_url}{path}", params={ 'op': 'CREATE', 'overwrite': str(overwrite).lower(), 'user.name': self.user }, allow_redirects=False, timeout=self.config.connect_timeout ) if response.status_code != 307: self.logger.error(f"Failed to get write location: {response.status_code}") return False # 第二步:写入数据到 DataNode location = response.headers.get('Location') if not location: self.logger.error("No Location header in response") return False response = session.put( location, data=content.encode('utf-8'), timeout=self.config.write_timeout ) return response.status_code == 201 except Exception as