""" HDFS 文件系统操作模块 提供现代化的 HDFS 操作能力: - 多种后端支持(命令行、hdfs 库、pyhdfs 库、WebHDFS) - 同步和异步 API - 上下文管理器支持 - 配置管理集成 - 重试机制 - 丰富的错误处理 功能对应 Java 版本 CommonOperation 类: - 创建目录 - 删除目录/文件 - 上传文件 - 读写文件 - 检查文件是否存在 - 列出目录内容 - 获取文件信息 """ import os import asyncio import time from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path from typing import ( Any, Callable, Dict, Generic, List, Optional, Tuple, Type, TypeVar, Union, Iterator, AsyncIterator ) from contextlib import contextmanager, asynccontextmanager from .config import ConfigurationManager, HDFSConfig, get_config from .utils.helpers import ( run_command, validate_hdfs_path, setup_logger, format_file_size ) T = TypeVar('T') class BackendType(Enum): """HDFS 后端类型""" CLI = "cli" # 命令行工具 HDFS_LIB = "hdfs_lib" # hdfs 库 PYHDFS = "pyhdfs" # pyhdfs 库 WEBHDFS = "webhdfs" # WebHDFS REST API AUTO = "auto" # 自动选择可用的后端 @dataclass class FileStatus: """文件状态信息""" path: str is_directory: bool length: int = 0 replication: int = 1 block_size: int = 134217728 # 128MB 默认块大小 modification_time: Optional[datetime] = None access_time: Optional[datetime] = None owner: str = "" group: str = "" permission: str = "644" is_snapshot: bool = False @property def size_formatted(self) -> str: """格式化的文件大小""" return format_file_size(self.length) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { 'path': self.path, 'is_directory': self.is_directory, 'length': self.length, 'size_formatted': self.size_formatted, 'replication': self.replication, 'block_size': self.block_size, 'modification_time': self.modification_time.isoformat() if self.modification_time else None, 'access_time': self.access_time.isoformat() if self.access_time else None, 'owner': self.owner, 'group': self.group, 'permission': self.permission, 'is_snapshot': self.is_snapshot, } class HDFSBackend(ABC): """HDFS 后端抽象基类""" def __init__(self, config: HDFSConfig, logger): self.config = config self.logger = logger @abstractmethod def is_available(self) -> bool: """检查后端是否可用""" pass @abstractmethod def make_dir(self, path: str) -> bool: """创建目录""" pass @abstractmethod def delete(self, path: str, recursive: bool = True) -> bool: """删除文件或目录""" pass @abstractmethod def copy_from_local(self, src: str, dst: str) -> bool: """从本地上传文件到 HDFS""" pass @abstractmethod def copy_to_local(self, src: str, dst: str) -> bool: """从 HDFS 下载文件到本地""" pass @abstractmethod def read_file(self, path: str) -> Optional[str]: """读取文件内容""" pass @abstractmethod def write_file(self, path: str, content: str, overwrite: bool = True) -> bool: """写入文件内容""" pass @abstractmethod def exists(self, path: str) -> bool: """检查路径是否存在""" pass @abstractmethod def list_dir(self, path: str) -> List[str]: """列出目录内容""" pass @abstractmethod def get_file_status(self, path: str) -> Optional[FileStatus]: """获取文件状态""" pass @abstractmethod def get_file_size(self, path: str) -> Optional[int]: """获取文件大小""" pass @abstractmethod def rename(self, src: str, dst: str) -> bool: """重命名文件或目录""" pass @abstractmethod def set_permission(self, path: str, permission: str) -> bool: """设置文件权限""" pass @abstractmethod def set_owner(self, path: str, owner: Optional[str] = None, group: Optional[str] = None) -> bool: """设置文件所有者""" pass class CLIBackend(HDFSBackend): """命令行后端实现""" def __init__(self, config: HDFSConfig, logger): super().__init__(config, logger) self.hadoop_cmd = config.hadoop_cmd or 'hdfs' self._check_cmd_available() def _check_cmd_available(self): """检查命令是否可用""" if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0: self.hadoop_cmd = 'hadoop' if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0: self.logger.warning("Neither 'hdfs' nor 'hadoop' command found") def is_available(self) -> bool: """检查后端是否可用""" return os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') == 0 def _execute_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]: """执行 HDFS 命令""" args = args or [] cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}" self.logger.debug(f"Executing command: {cmd}") return run_command(cmd) def make_dir(self, path: str) -> bool: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Creating directory: {path}") returncode, stdout, stderr = self._execute_command('dfs', ['-mkdir', '-p', path]) if returncode == 0: self.logger.info(f"Successfully created directory: {path}") return True else: self.logger.error(f"Failed to create directory: {path}, Error: {stderr}") return False def delete(self, path: str, recursive: bool = True) -> bool: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Deleting: {path}") args = ['-rm', '-r'] if recursive else ['-rm'] args.append(path) returncode, stdout, stderr = self._execute_command('dfs', args) if returncode == 0: self.logger.info(f"Successfully deleted: {path}") return True else: self.logger.error(f"Failed to delete: {path}, Error: {stderr}") return False def copy_from_local(self, src: str, dst: str) -> bool: if not os.path.exists(src): self.logger.error(f"Local file not found: {src}") return False if not validate_hdfs_path(dst): self.logger.error(f"Invalid HDFS path: {dst}") return False self.logger.info(f"Copying from local {src} to HDFS {dst}") returncode, stdout, stderr = self._execute_command('dfs', ['-copyFromLocal', src, dst]) if returncode == 0: self.logger.info(f"Successfully copied {src} to {dst}") return True else: self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}") return False def copy_to_local(self, src: str, dst: str) -> bool: if not validate_hdfs_path(src): self.logger.error(f"Invalid HDFS path: {src}") return False self.logger.info(f"Copying from HDFS {src} to local {dst}") returncode, stdout, stderr = self._execute_command('dfs', ['-copyToLocal', src, dst]) if returncode == 0: self.logger.info(f"Successfully copied {src} to {dst}") return True else: self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}") return False def read_file(self, path: str) -> Optional[str]: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return None if not self.exists(path): self.logger.error(f"File does not exist: {path}") return None self.logger.info(f"Reading file: {path}") returncode, stdout, stderr = self._execute_command('dfs', ['-cat', path]) if returncode == 0: self.logger.info(f"Successfully read file: {path}") return stdout else: self.logger.error(f"Failed to read file: {path}, Error: {stderr}") return None def write_file(self, path: str, content: str, overwrite: bool = True) -> bool: import tempfile if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Writing to file: {path}") with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file: temp_file.write(content) temp_path = temp_file.name try: args = ['-put'] if overwrite: args.append('-f') args.extend([temp_path, path]) returncode, stdout, stderr = self._execute_command('dfs', args) if returncode == 0: self.logger.info(f"Successfully wrote to file: {path}") return True else: self.logger.error(f"Failed to write to file: {path}, Error: {stderr}") return False finally: if os.path.exists(temp_path): os.unlink(temp_path) def exists(self, path: str) -> bool: if not validate_hdfs_path(path): return False returncode, _, _ = self._execute_command('dfs', ['-test', '-e', path]) return returncode == 0 def list_dir(self, path: str) -> List[str]: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return [] if not self.exists(path): self.logger.error(f"Directory does not exist: {path}") return [] returncode, stdout, stderr = self._execute_command('dfs', ['-ls', path]) if returncode == 0: lines = stdout.strip().split('\n') if len(lines) > 0 and lines[0].startswith('Found'): lines = lines[1:] files = [] for line in lines: parts = line.split() if len(parts) >= 8: files.append(parts[-1]) return files else: self.logger.error(f"Failed to list directory: {path}, Error: {stderr}") return [] def get_file_status(self, path: str) -> Optional[FileStatus]: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return None if not self.exists(path): self.logger.error(f"Path does not exist: {path}") return None returncode, stdout, stderr = self._execute_command('dfs', ['-stat', '%F,%s,%r,%b,%y,%z,%u,%g,%a', path]) if returncode == 0: parts = stdout.strip().split(',') if len(parts) >= 9: is_dir = parts[0] == 'directory' try: return FileStatus( path=path, is_directory=is_dir, length=int(parts[1]) if parts[1] else 0, replication=int(parts[2]) if parts[2] else 1, block_size=int(parts[3]) if parts[3] else 134217728, modification_time=datetime.strptime(parts[4], '%Y-%m-%d %H:%M:%S') if parts[4] else None, access_time=datetime.strptime(parts[5], '%Y-%m-%d %H:%M:%S') if parts[5] else None, owner=parts[6], group=parts[7], permission=parts[8], ) except (ValueError, IndexError) as e: self.logger.warning(f"Failed to parse file status: {e}") # 备用方法:使用 -ls returncode, stdout, stderr = self._execute_command('dfs', ['-ls', '-d', path]) if returncode == 0: parts = stdout.strip().split() if len(parts) >= 8: is_dir = parts[0].startswith('d') try: return FileStatus( path=path, is_directory=is_dir, length=int(parts[4]) if parts[4] else 0, replication=int(parts[1]) if parts[1] and not is_dir else 1, owner=parts[2], group=parts[3], ) except (ValueError, IndexError): pass return None def get_file_size(self, path: str) -> Optional[int]: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return None if not self.exists(path): self.logger.error(f"File does not exist: {path}") return None returncode, stdout, stderr = self._execute_command('dfs', ['-du', '-s', path]) if returncode == 0: parts = stdout.strip().split() if len(parts) >= 1: try: return int(parts[0]) except ValueError: self.logger.error(f"Failed to parse file size: {stdout}") return None def rename(self, src: str, dst: str) -> bool: if not validate_hdfs_path(src): self.logger.error(f"Invalid source path: {src}") return False if not validate_hdfs_path(dst): self.logger.error(f"Invalid destination path: {dst}") return False self.logger.info(f"Renaming {src} to {dst}") returncode, stdout, stderr = self._execute_command('dfs', ['-mv', src, dst]) if returncode == 0: self.logger.info(f"Successfully renamed {src} to {dst}") return True else: self.logger.error(f"Failed to rename {src} to {dst}, Error: {stderr}") return False def set_permission(self, path: str, permission: str) -> bool: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Setting permission of {path} to {permission}") returncode, stdout, stderr = self._execute_command('dfs', ['-chmod', permission, path]) if returncode == 0: self.logger.info(f"Successfully set permission of {path} to {permission}") return True else: self.logger.error(f"Failed to set permission of {path}, Error: {stderr}") return False def set_owner(self, path: str, owner: Optional[str] = None, group: Optional[str] = None) -> bool: if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False if owner is None and group is None: self.logger.error("At least one of owner or group must be specified") return False owner_str = owner if owner else '' group_str = f":{group}" if group else '' owner_group = f"{owner_str}{group_str}" self.logger.info(f"Setting owner of {path} to {owner_group}") returncode, stdout, stderr = self._execute_command('dfs', ['-chown', owner_group, path]) if returncode == 0: self.logger.info(f"Successfully set owner of {path} to {owner_group}") return True else: self.logger.error(f"Failed to set owner of {path}, Error: {stderr}") return False class WebHDFSBackend(HDFSBackend): """WebHDFS REST API 后端实现""" def __init__(self, config: HDFSConfig, logger): super().__init__(config, logger) self.base_url = f"http://{config.namenode_host}:{config.namenode_http_port}/webhdfs/v1" self.user = config.user or os.environ.get('USER', 'hadoop') self._session = None def _get_session(self): """获取 HTTP 会话""" if self._session is None: try: import requests self._session = requests.Session() except ImportError: self.logger.error("requests library is required for WebHDFS backend") raise return self._session def is_available(self) -> bool: """检查后端是否可用""" try: import requests # 尝试连接 Namenode response = self._get_session().get( f"{self.base_url}/?op=LISTSTATUS", params={'user.name': self.user}, timeout=5 ) return response.status_code in [200, 401, 403] except Exception: return False def make_dir(self, path: str) -> bool: try: session = self._get_session() response = session.put( f"{self.base_url}{path}", params={'op': 'MKDIRS', 'user.name': self.user}, timeout=self.config.connect_timeout ) return response.status_code == 200 except Exception as e: self.logger.error(f"Failed to create directory: {e}") return False def delete(self, path: str, recursive: bool = True) -> bool: try: session = self._get_session() response = session.delete( f"{self.base_url}{path}", params={ 'op': 'DELETE', 'recursive': str(recursive).lower(), 'user.name': self.user }, timeout=self.config.connect_timeout ) return response.status_code == 200 except Exception as e: self.logger.error(f"Failed to delete: {e}") return False def copy_from_local(self, src: str, dst: str) -> bool: try: with open(src, 'rb') as f: content = f.read() return self.write_file(dst, content.decode('utf-8', errors='ignore')) except Exception as e: self.logger.error(f"Failed to copy from local: {e}") return False def copy_to_local(self, src: str, dst: str) -> bool: content = self.read_file(src) if content is None: return False try: with open(dst, 'w', encoding='utf-8') as f: f.write(content) return True except Exception as e: self.logger.error(f"Failed to copy to local: {e}") return False def read_file(self, path: str) -> Optional[str]: try: session = self._get_session() response = session.get( f"{self.base_url}{path}", params={'op': 'OPEN', 'user.name': self.user}, timeout=self.config.read_timeout ) if response.status_code == 200: return response.text else: self.logger.error(f"Failed to read file: {response.status_code}") return None except Exception as e: self.logger.error(f"Failed to read file: {e}") return None def write_file(self, path: str, content: str, overwrite: bool = True) -> bool: try: session = self._get_session() # 第一步:获取写入位置 response = session.put( f"{self.base_url}{path}", params={ 'op': 'CREATE', 'overwrite': str(overwrite).lower(), 'user.name': self.user }, allow_redirects=False, timeout=self.config.connect_timeout ) if response.status_code != 307: self.logger.error(f"Failed to get write location: {response.status_code}") return False # 第二步:写入数据到 DataNode location = response.headers.get('Location') if not location: self.logger.error("No Location header in response") return False response = session.put( location, data=content.encode('utf-8'), timeout=self.config.write_timeout ) return response.status_code == 201 except Exception as e: self.logger.error(f"Failed to write file: {e}") return False def exists(self, path: str) -> bool: status = self.get_file_status(path) return status is not None def list_dir(self, path: str) -> List[str]: try: session = self._get_session() response = session.get( f"{self.base_url}{path}", params={'op': 'LISTSTATUS', 'user.name': self.user}, timeout=self.config.connect_timeout ) if response.status_code == 200: data = response.json() files = [] for status in data.get('FileStatuses', {}).get('FileStatus', []): files.append(f"{path.rstrip('/')}/{status['pathSuffix']}") return files else: self.logger.error(f"Failed to list directory: {response.status_code}") return [] except Exception as e: self.logger.error(f"Failed to list directory: {e}") return [] def get_file_status(self, path: str) -> Optional[FileStatus]: try: session = self._get_session() response = session.get( f"{self.base_url}{path}", params={'op': 'GETFILESTATUS', 'user.name': self.user}, timeout=self.config.connect_timeout ) if response.status_code == 200: data = response.json() status = data.get('FileStatus', {}) return FileStatus( path=path, is_directory=status.get('type') == 'DIRECTORY', length=status.get('length', 0), replication=status.get('replication', 1), block_size=status.get('blockSize', 134217728), modification_time=datetime.fromtimestamp(status.get('modificationTime', 0) / 1000) if status.get('modificationTime') else None, access_time=datetime.fromtimestamp(status.get('accessTime', 0) / 1000) if status.get('accessTime') else None, owner=status.get('owner', ''), group=status.get('group', ''), permission=status.get('permission', '644'), ) else: return None except Exception as e: self.logger.error(f"Failed to get file status: {e}") return None def get_file_size(self, path: str) -> Optional[int]: status = self.get_file_status(path) return status.length if status else None def rename(self, src: str, dst: str) -> bool: try: session = self._get_session() response = session.put( f"{self.base_url}{src}", params={ 'op': 'RENAME', 'destination': dst, 'user.name': self.user }, timeout=self.config.connect_timeout ) return response.status_code == 200 except Exception as e: self.logger.error(f"Failed to rename: {e}") return False def set_permission(self, path: str, permission: str) -> bool: try: session = self._get_session() response = session.put( f"{self.base_url}{path}", params={ 'op': 'SETPERMISSION', 'permission': permission, 'user.name': self.user }, timeout=self.config.connect_timeout ) return response.status_code == 200 except Exception as e: self.logger.error(f"Failed to set permission: {e}") return False def set_owner(self, path: str, owner: Optional[str] = None, group: Optional[str] = None) -> bool: try: session = self._get_session() params = {'op': 'SETOWNER', 'user.name': self.user} if owner: params['owner'] = owner if group: params['group'] = group response = session.put( f"{self.base_url}{path}", params=params, timeout=self.config.connect_timeout ) return response.status_code == 200 except Exception as e: self.logger.error(f"Failed to set owner: {e}") return False class BackendFactory: """后端工厂类""" _backends: Dict[BackendType, Type[HDFSBackend]] = { BackendType.CLI: CLIBackend, BackendType.WEBHDFS: WebHDFSBackend, } @classmethod def create(cls, backend_type: BackendType, config: HDFSConfig, logger) -> Optional[HDFSBackend]: """创建后端实例""" if backend_type not in cls._backends: logger.error(f"Unsupported backend type: {backend_type}") return None try: backend = cls._backends[backend_type](config, logger) if backend.is_available(): return backend else: logger.warning(f"Backend {backend_type.value} is not available") return None except Exception as e: logger.error(f"Failed to create backend {backend_type.value}: {e}") return None @classmethod def create_auto(cls, config: HDFSConfig, logger) -> Optional[HDFSBackend]: """自动选择可用的后端""" priority_order = [ BackendType.CLI, BackendType.WEBHDFS, ] for backend_type in priority_order: backend = cls.create(backend_type, config, logger) if backend: logger.info(f"Selected backend: {backend_type.value}") return backend logger.error("No available backend found") return None class HDFSOperations: """ 现代化 HDFS 文件系统操作类 特性: - 多种后端支持(CLI、WebHDFS) - 同步和异步 API - 上下文管理器支持 - 配置管理集成 - 重试机制 - 丰富的错误处理 功能与 Java 版本的 CommonOperation 类相对应。 """ def __init__(self, config: Optional[HDFSConfig] = None, config_manager: Optional[ConfigurationManager] = None, preferred_backend: Optional[BackendType] = None, logger_name: str = 'hdfs_operations'): """ 初始化 HDFSOperations 实例 Args: config: HDFS 配置(可选) config_manager: 配置管理器(可选) preferred_backend: 首选后端类型(可选) logger_name: 日志器名称 """ self.logger = setup_logger(logger_name) # 获取配置 if config_manager is None: config_manager = get_config() if config is None: config = config_manager.hdfs self.config = config self._backend: Optional[HDFSBackend] = None # 选择后端 backend_type = preferred_backend or BackendType.AUTO if backend_type == BackendType.AUTO: self._backend = BackendFactory.create_auto(config, self.logger) else: self._backend = BackendFactory.create(backend_type, config, self.logger) if self._backend is None: self.logger.warning("No HDFS backend available. Some operations may fail.") @property def backend(self) -> HDFSBackend: """获取后端实例""" if self._backend is None: raise RuntimeError("No HDFS backend available") return self._backend @property def is_available(self) -> bool: """检查后端是否可用""" return self._backend is not None and self._backend.is_available() @contextmanager def transaction(self): """ 上下文管理器:事务支持 注意:HDFS 不支持真正的事务,这里只是提供一个上下文接口。 """ try: self.logger.info("Starting HDFS operation context") yield self except Exception as e: self.logger.error(f"HDFS operation failed: {e}") raise finally: self.logger.info("HDFS operation context completed") def _retry_operation(self, operation: Callable[[], T], max_retries: Optional[int] = None, retry_delay: Optional[float] = None) -> T: """ 带重试机制的操作执行 Args: operation: 要执行的操作 max_retries: 最大重试次数(可选,默认使用配置) retry_delay: 重试延迟(可选,默认使用配置) Returns: 操作结果 """ max_retries = max_retries or self.config.max_retries retry_delay = retry_delay or self.config.retry_delay last_exception = None for attempt in range(max_retries + 1): try: return operation() except Exception as e: last_exception = e if attempt < max_retries: self.logger.warning( f"Operation failed (attempt {attempt + 1}/{max_retries + 1}), " f"retrying in {retry_delay}s: {e}" ) time.sleep(retry_delay) retry_delay *= 2 # 指数退避 raise last_exception # 同步 API def make_dir(self, path: str, retry: bool = True) -> bool: """ 创建目录 Args: path: 要创建的目录路径 retry: 是否启用重试 Returns: 是否创建成功 """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False def _operation(): return self.backend.make_dir(path) if retry: return self._retry_operation(_operation) return _operation() def delete(self, path: str, recursive: bool = True, retry: bool = True) -> bool: """ 删除文件或目录 Args: path: 要删除的路径 recursive: 是否递归删除 retry: 是否启用重试 Returns: 是否删除成功 """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False def _operation(): return self.backend.delete(path, recursive) if retry: return self._retry_operation(_operation) return _operation() def copy_from_local(self, src: str, dst: str, retry: bool = True) -> bool: """ 从本地文件系统上传文件到 HDFS Args: src: 本地文件路径 dst: HDFS 目标路径 retry: 是否启用重试 Returns: 是否上传成功 """ if not os.path.exists(src): self.logger.error(f"Local file not found: {src}") return False if not validate_hdfs_path(dst): self.logger.error(f"Invalid HDFS path: {dst}") return False def _operation(): return self.backend.copy_from_local(src, dst) if retry: return self._retry_operation(_operation) return _operation() def copy_to_local(self, src: str, dst: str, retry: bool = True) -> bool: """ 从 HDFS 下载文件到本地文件系统 Args: src: HDFS 源路径 dst: 本地目标路径 retry: 是否启用重试 Returns: 是否下载成功 """ if not validate_hdfs_path(src): self.logger.error(f"Invalid HDFS path: {src}") return False def _operation(): return self.backend.copy_to_local(src, dst) if retry: return self._retry_operation(_operation) return _operation() def read_file(self, path: str, retry: bool = True) -> Optional[str]: """ 读取 HDFS 文件内容 Args: path: HDFS 文件路径 retry: 是否启用重试 Returns: 文件内容(字符串),如果失败返回 None """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return None if not self.exists(path): self.logger.error(f"File does not exist: {path}") return None def _operation(): return self.backend.read_file(path) if retry: return self._retry_operation(_operation) return _operation() def write_file(self, path: str, content: str, overwrite: bool = True, retry: bool = True) -> bool: """ 写入内容到 HDFS 文件 Args: path: HDFS 文件路径 content: 要写入的内容 overwrite: 是否覆盖已存在的文件 retry: 是否启用重试 Returns: 是否写入成功 """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False def _operation(): return self.backend.write_file(path, content, overwrite) if retry: return self._retry_operation(_operation) return _operation() def exists(self, path: str) -> bool: """ 检查 HDFS 路径是否存在 Args: path: HDFS 路径 Returns: 路径是否存在 """ if not validate_hdfs_path(path): return False return self.backend.exists(path) def list_dir(self, path: str, retry: bool = True) -> List[str]: """ 列出 HDFS 目录内容 Args: path: HDFS 目录路径 retry: 是否启用重试 Returns: 目录内容列表 """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return [] if not self.exists(path): self.logger.error(f"Directory does not exist: {path}") return [] def _operation(): return self.backend.list_dir(path) if retry: return self._retry_operation(_operation) return _operation() def get_file_status(self, path: str, retry: bool = True) -> Optional[FileStatus]: """ 获取 HDFS 文件状态 Args: path: HDFS 文件路径 retry: 是否启用重试 Returns: 文件状态对象,如果失败返回 None """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return None def _operation(): return self.backend.get_file_status(path) if retry: return self._retry_operation(_operation) return _operation() def get_file_size(self, path: str, retry: bool = True) -> Optional[int]: """ 获取 HDFS 文件大小 Args: path: HDFS 文件路径 retry: 是否启用重试 Returns: 文件大小(字节),如果失败返回 None """ status = self.get_file_status(path, retry) return status.length if status else None def rename(self, src: str, dst: str, retry: bool = True) -> bool: """ 重命名 HDFS 文件或目录 Args: src: 源路径 dst: 目标路径 retry: 是否启用重试 Returns: 是否重命名成功 """ if not validate_hdfs_path(src): self.logger.error(f"Invalid source path: {src}") return False if not validate_hdfs_path(dst): self.logger.error(f"Invalid destination path: {dst}") return False def _operation(): return self.backend.rename(src, dst) if retry: return self._retry_operation(_operation) return _operation() def set_permission(self, path: str, permission: str, retry: bool = True) -> bool: """ 设置 HDFS 文件权限 Args: path: HDFS 路径 permission: 权限字符串(如 '755') retry: 是否启用重试 Returns: 是否设置成功 """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False def _operation(): return self.backend.set_permission(path, permission) if retry: return self._retry_operation(_operation) return _operation() def set_owner(self, path: str, owner: Optional[str] = None, group: Optional[str] = None, retry: bool = True) -> bool: """ 设置 HDFS 文件所有者 Args: path: HDFS 路径 owner: 所有者(可选) group: 组(可选) retry: 是否启用重试 Returns: 是否设置成功 """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False if owner is None and group is None: self.logger.error("At least one of owner or group must be specified") return False def _operation(): return self.backend.set_owner(path, owner, group) if retry: return self._retry_operation(_operation) return _operation() # 异步 API(基于同步 API 的简单封装) async def make_dir_async(self, path: str, retry: bool = True) -> bool: """异步创建目录""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: self.make_dir(path, retry)) async def delete_async(self, path: str, recursive: bool = True, retry: bool = True) -> bool: """异步删除文件或目录""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: self.delete(path, recursive, retry)) async def copy_from_local_async(self, src: str, dst: str, retry: bool = True) -> bool: """异步上传文件""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: self.copy_from_local(src, dst, retry)) async def copy_to_local_async(self, src: str, dst: str, retry: bool = True) -> bool: """异步下载文件""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: self.copy_to_local(src, dst, retry)) async def read_file_async(self, path: str, retry: bool = True) -> Optional[str]: """异步读取文件""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: self.read_file(path, retry)) async def write_file_async(self, path: str, content: str, overwrite: bool = True, retry: bool = True) -> bool: """异步写入文件""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: self.write_file(path, content, overwrite, retry)) async def exists_async(self, path: str) -> bool: """异步检查路径是否存在""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: self.exists(path)) async def list_dir_async(self, path: str, retry: bool = True) -> List[str]: """异步列出目录内容""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: self.list_dir(path, retry)) async def get_file_status_async(self, path: str, retry: bool = True) -> Optional[FileStatus]: """异步获取文件状态""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: self.get_file_status(path, retry)) # 便捷方法 def upload_directory(self, local_dir: str, hdfs_dir: str, recursive: bool = True, retry: bool = True) -> bool: """ 上传整个目录到 HDFS Args: local_dir: 本地目录路径 hdfs_dir: HDFS 目标目录 recursive: 是否递归上传子目录 retry: 是否启用重试 Returns: 是否上传成功 """ if not os.path.isdir(local_dir): self.logger.error(f"Local directory not found: {local_dir}") return False # 确保目标目录存在 if not self.make_dir(hdfs_dir, retry): return False try: for item in os.listdir(local_dir): local_path = os.path.join(local_dir, item) hdfs_path = f"{hdfs_dir.rstrip('/')}/{item}" if os.path.isfile(local_path): if not self.copy_from_local(local_path, hdfs_path, retry): self.logger.error(f"Failed to upload file: {local_path}") return False elif os.path.isdir(local_path) and recursive: if not self.upload_directory(local_path, hdfs_path, recursive, retry): return False return True except Exception as e: self.logger.error(f"Failed to upload directory: {e}") return False def download_directory(self, hdfs_dir: str, local_dir: str, recursive: bool = True, retry: bool = True) -> bool: """ 从 HDFS 下载整个目录 Args: hdfs_dir: HDFS 源目录 local_dir: 本地目标目录 recursive: 是否递归下载子目录 retry: 是否启用重试 Returns: 是否下载成功 """ if not self.exists(hdfs_dir): self.logger.error(f"HDFS directory not found: {hdfs_dir}") return False # 确保本地目录存在 os.makedirs(local_dir, exist_ok=True) try: items = self.list_dir(hdfs_dir, retry) for hdfs_path in items: item_name = os.path.basename(hdfs_path) local_path = os.path.join(local_dir, item_name) status = self.get_file_status(hdfs_path, retry) if status is None: continue if status.is_directory: if recursive: if not self.download_directory(hdfs_path, local_path, recursive, retry): return False else: if not self.copy_to_local(hdfs_path, local_path, retry): self.logger.error(f"Failed to download file: {hdfs_path}") return False return True except Exception as e: self.logger.error(f"Failed to download directory: {e}") return False def walk(self, path: str) -> Iterator[Tuple[str, List[str], List[str]]]: """ 遍历 HDFS 目录树 类似于 Python 的 os.walk() Args: path: 起始目录路径 Yields: (dirpath, dirnames, filenames) 元组 """ if not self.exists(path): return status = self.get_file_status(path) if not status or not status.is_directory: return # 获取目录内容 items = self.list_dir(path) dirnames = [] filenames = [] for item in items: item_status = self.get_file_status(item) if item_status: if item_status.is_directory: dirnames.append(os.path.basename(item)) else: filenames.append(os.path.basename(item)) yield path, dirnames, filenames # 递归遍历子目录 for dirname in dirnames: subdir = f"{path.rstrip('/')}/{dirname}" yield from self.walk(subdir) # 便捷函数 def create_hdfs_client( config: Optional[HDFSConfig] = None, preferred_backend: Optional[BackendType] = None ) -> HDFSOperations: """ 创建 HDFS 客户端实例 Args: config: HDFS 配置(可选) preferred_backend: 首选后端类型(可选) Returns: HDFSOperations 实例 """ return HDFSOperations(config=config, preferred_backend=preferred_backend)