| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- """
- HDFS 文件系统操作模块
- 提供现代化的 HDFS 操作能力:
- - 多种后端支持(命令行、hdfs 库、pyhdfs 库、WebHDFS)
- - 同步和异步 API
- - 上下文管理器支持
- - 配置管理集成
- - 重试机制
- - 丰富的错误处理
- 功能对应 Java 版本 CommonOperation 类:
- - 创建目录
- - 删除目录/文件
- - 上传文件
- - 读写文件
- - 检查文件是否存在
- - 列出目录内容
- - 获取文件信息
- """
- import os
- import asyncio
- import time
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from datetime import datetime
- from enum import Enum
- from pathlib import Path
- from typing import (
- Any, Callable, Dict, Generic, List, Optional,
- Tuple, Type, TypeVar, Union, Iterator, AsyncIterator
- )
- from contextlib import contextmanager, asynccontextmanager
- from .config import ConfigurationManager, HDFSConfig, get_config
- from .utils.helpers import (
- run_command, validate_hdfs_path, setup_logger, format_file_size
- )
- T = TypeVar('T')
- class BackendType(Enum):
- """HDFS 后端类型"""
- CLI = "cli" # 命令行工具
- HDFS_LIB = "hdfs_lib" # hdfs 库
- PYHDFS = "pyhdfs" # pyhdfs 库
- WEBHDFS = "webhdfs" # WebHDFS REST API
- AUTO = "auto" # 自动选择可用的后端
- @dataclass
- class FileStatus:
- """文件状态信息"""
- path: str
- is_directory: bool
- length: int = 0
- replication: int = 1
- block_size: int = 134217728 # 128MB 默认块大小
- modification_time: Optional[datetime] = None
- access_time: Optional[datetime] = None
- owner: str = ""
- group: str = ""
- permission: str = "644"
- is_snapshot: bool = False
-
- @property
- def size_formatted(self) -> str:
- """格式化的文件大小"""
- return format_file_size(self.length)
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典"""
- return {
- 'path': self.path,
- 'is_directory': self.is_directory,
- 'length': self.length,
- 'size_formatted': self.size_formatted,
- 'replication': self.replication,
- 'block_size': self.block_size,
- 'modification_time': self.modification_time.isoformat() if self.modification_time else None,
- 'access_time': self.access_time.isoformat() if self.access_time else None,
- 'owner': self.owner,
- 'group': self.group,
- 'permission': self.permission,
- 'is_snapshot': self.is_snapshot,
- }
- class HDFSBackend(ABC):
- """HDFS 后端抽象基类"""
-
- def __init__(self, config: HDFSConfig, logger):
- self.config = config
- self.logger = logger
-
- @abstractmethod
- def is_available(self) -> bool:
- """检查后端是否可用"""
- pass
-
- @abstractmethod
- def make_dir(self, path: str) -> bool:
- """创建目录"""
- pass
-
- @abstractmethod
- def delete(self, path: str, recursive: bool = True) -> bool:
- """删除文件或目录"""
- pass
-
- @abstractmethod
- def copy_from_local(self, src: str, dst: str) -> bool:
- """从本地上传文件到 HDFS"""
- pass
-
- @abstractmethod
- def copy_to_local(self, src: str, dst: str) -> bool:
- """从 HDFS 下载文件到本地"""
- pass
-
- @abstractmethod
- def read_file(self, path: str) -> Optional[str]:
- """读取文件内容"""
- pass
-
- @abstractmethod
- def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
- """写入文件内容"""
- pass
-
- @abstractmethod
- def exists(self, path: str) -> bool:
- """检查路径是否存在"""
- pass
-
- @abstractmethod
- def list_dir(self, path: str) -> List[str]:
- """列出目录内容"""
- pass
-
- @abstractmethod
- def get_file_status(self, path: str) -> Optional[FileStatus]:
- """获取文件状态"""
- pass
-
- @abstractmethod
- def get_file_size(self, path: str) -> Optional[int]:
- """获取文件大小"""
- pass
-
- @abstractmethod
- def rename(self, src: str, dst: str) -> bool:
- """重命名文件或目录"""
- pass
-
- @abstractmethod
- def set_permission(self, path: str, permission: str) -> bool:
- """设置文件权限"""
- pass
-
- @abstractmethod
- def set_owner(self, path: str, owner: Optional[str] = None,
- group: Optional[str] = None) -> bool:
- """设置文件所有者"""
- pass
- class CLIBackend(HDFSBackend):
- """命令行后端实现"""
-
- def __init__(self, config: HDFSConfig, logger):
- super().__init__(config, logger)
- self.hadoop_cmd = config.hadoop_cmd or 'hdfs'
- self._check_cmd_available()
-
- def _check_cmd_available(self):
- """检查命令是否可用"""
- if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0:
- self.hadoop_cmd = 'hadoop'
- if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0:
- self.logger.warning("Neither 'hdfs' nor 'hadoop' command found")
-
- def is_available(self) -> bool:
- """检查后端是否可用"""
- return os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') == 0
-
- def _execute_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]:
- """执行 HDFS 命令"""
- args = args or []
- cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}"
- self.logger.debug(f"Executing command: {cmd}")
- return run_command(cmd)
-
- def make_dir(self, path: str) -> bool:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Creating directory: {path}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-mkdir', '-p', path])
-
- if returncode == 0:
- self.logger.info(f"Successfully created directory: {path}")
- return True
- else:
- self.logger.error(f"Failed to create directory: {path}, Error: {stderr}")
- return False
-
- def delete(self, path: str, recursive: bool = True) -> bool:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Deleting: {path}")
- args = ['-rm', '-r'] if recursive else ['-rm']
- args.append(path)
-
- returncode, stdout, stderr = self._execute_command('dfs', args)
-
- if returncode == 0:
- self.logger.info(f"Successfully deleted: {path}")
- return True
- else:
- self.logger.error(f"Failed to delete: {path}, Error: {stderr}")
- return False
-
- def copy_from_local(self, src: str, dst: str) -> bool:
- if not os.path.exists(src):
- self.logger.error(f"Local file not found: {src}")
- return False
-
- if not validate_hdfs_path(dst):
- self.logger.error(f"Invalid HDFS path: {dst}")
- return False
-
- self.logger.info(f"Copying from local {src} to HDFS {dst}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-copyFromLocal', src, dst])
-
- if returncode == 0:
- self.logger.info(f"Successfully copied {src} to {dst}")
- return True
- else:
- self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
- return False
-
- def copy_to_local(self, src: str, dst: str) -> bool:
- if not validate_hdfs_path(src):
- self.logger.error(f"Invalid HDFS path: {src}")
- return False
-
- self.logger.info(f"Copying from HDFS {src} to local {dst}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-copyToLocal', src, dst])
-
- if returncode == 0:
- self.logger.info(f"Successfully copied {src} to {dst}")
- return True
- else:
- self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
- return False
-
- def read_file(self, path: str) -> Optional[str]:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return None
-
- if not self.exists(path):
- self.logger.error(f"File does not exist: {path}")
- return None
-
- self.logger.info(f"Reading file: {path}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-cat', path])
-
- if returncode == 0:
- self.logger.info(f"Successfully read file: {path}")
- return stdout
- else:
- self.logger.error(f"Failed to read file: {path}, Error: {stderr}")
- return None
-
- def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
- import tempfile
-
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Writing to file: {path}")
-
- with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
- temp_file.write(content)
- temp_path = temp_file.name
-
- try:
- args = ['-put']
- if overwrite:
- args.append('-f')
- args.extend([temp_path, path])
-
- returncode, stdout, stderr = self._execute_command('dfs', args)
-
- if returncode == 0:
- self.logger.info(f"Successfully wrote to file: {path}")
- return True
- else:
- self.logger.error(f"Failed to write to file: {path}, Error: {stderr}")
- return False
- finally:
- if os.path.exists(temp_path):
- os.unlink(temp_path)
-
- def exists(self, path: str) -> bool:
- if not validate_hdfs_path(path):
- return False
-
- returncode, _, _ = self._execute_command('dfs', ['-test', '-e', path])
- return returncode == 0
-
- def list_dir(self, path: str) -> List[str]:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return []
-
- if not self.exists(path):
- self.logger.error(f"Directory does not exist: {path}")
- return []
-
- returncode, stdout, stderr = self._execute_command('dfs', ['-ls', path])
-
- if returncode == 0:
- lines = stdout.strip().split('\n')
- if len(lines) > 0 and lines[0].startswith('Found'):
- lines = lines[1:]
-
- files = []
- for line in lines:
- parts = line.split()
- if len(parts) >= 8:
- files.append(parts[-1])
- return files
- else:
- self.logger.error(f"Failed to list directory: {path}, Error: {stderr}")
- return []
-
- def get_file_status(self, path: str) -> Optional[FileStatus]:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return None
-
- if not self.exists(path):
- self.logger.error(f"Path does not exist: {path}")
- return None
-
- returncode, stdout, stderr = self._execute_command('dfs', ['-stat', '%F,%s,%r,%b,%y,%z,%u,%g,%a', path])
-
- if returncode == 0:
- parts = stdout.strip().split(',')
- if len(parts) >= 9:
- is_dir = parts[0] == 'directory'
- try:
- return FileStatus(
- path=path,
- is_directory=is_dir,
- length=int(parts[1]) if parts[1] else 0,
- replication=int(parts[2]) if parts[2] else 1,
- block_size=int(parts[3]) if parts[3] else 134217728,
- modification_time=datetime.strptime(parts[4], '%Y-%m-%d %H:%M:%S') if parts[4] else None,
- access_time=datetime.strptime(parts[5], '%Y-%m-%d %H:%M:%S') if parts[5] else None,
- owner=parts[6],
- group=parts[7],
- permission=parts[8],
- )
- except (ValueError, IndexError) as e:
- self.logger.warning(f"Failed to parse file status: {e}")
-
- # 备用方法:使用 -ls
- returncode, stdout, stderr = self._execute_command('dfs', ['-ls', '-d', path])
- if returncode == 0:
- parts = stdout.strip().split()
- if len(parts) >= 8:
- is_dir = parts[0].startswith('d')
- try:
- return FileStatus(
- path=path,
- is_directory=is_dir,
- length=int(parts[4]) if parts[4] else 0,
- replication=int(parts[1]) if parts[1] and not is_dir else 1,
- owner=parts[2],
- group=parts[3],
- )
- except (ValueError, IndexError):
- pass
-
- return None
-
- def get_file_size(self, path: str) -> Optional[int]:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return None
-
- if not self.exists(path):
- self.logger.error(f"File does not exist: {path}")
- return None
-
- returncode, stdout, stderr = self._execute_command('dfs', ['-du', '-s', path])
-
- if returncode == 0:
- parts = stdout.strip().split()
- if len(parts) >= 1:
- try:
- return int(parts[0])
- except ValueError:
- self.logger.error(f"Failed to parse file size: {stdout}")
-
- return None
-
- def rename(self, src: str, dst: str) -> bool:
- if not validate_hdfs_path(src):
- self.logger.error(f"Invalid source path: {src}")
- return False
-
- if not validate_hdfs_path(dst):
- self.logger.error(f"Invalid destination path: {dst}")
- return False
-
- self.logger.info(f"Renaming {src} to {dst}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-mv', src, dst])
-
- if returncode == 0:
- self.logger.info(f"Successfully renamed {src} to {dst}")
- return True
- else:
- self.logger.error(f"Failed to rename {src} to {dst}, Error: {stderr}")
- return False
-
- def set_permission(self, path: str, permission: str) -> bool:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Setting permission of {path} to {permission}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-chmod', permission, path])
-
- if returncode == 0:
- self.logger.info(f"Successfully set permission of {path} to {permission}")
- return True
- else:
- self.logger.error(f"Failed to set permission of {path}, Error: {stderr}")
- return False
-
- def set_owner(self, path: str, owner: Optional[str] = None,
- group: Optional[str] = None) -> bool:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- if owner is None and group is None:
- self.logger.error("At least one of owner or group must be specified")
- return False
-
- owner_str = owner if owner else ''
- group_str = f":{group}" if group else ''
- owner_group = f"{owner_str}{group_str}"
-
- self.logger.info(f"Setting owner of {path} to {owner_group}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-chown', owner_group, path])
-
- if returncode == 0:
- self.logger.info(f"Successfully set owner of {path} to {owner_group}")
- return True
- else:
- self.logger.error(f"Failed to set owner of {path}, Error: {stderr}")
- return False
- class WebHDFSBackend(HDFSBackend):
- """WebHDFS REST API 后端实现"""
-
- def __init__(self, config: HDFSConfig, logger):
- super().__init__(config, logger)
- self.base_url = f"http://{config.namenode_host}:{config.namenode_http_port}/webhdfs/v1"
- self.user = config.user or os.environ.get('USER', 'hadoop')
- self._session = None
-
- def _get_session(self):
- """获取 HTTP 会话"""
- if self._session is None:
- try:
- import requests
- self._session = requests.Session()
- except ImportError:
- self.logger.error("requests library is required for WebHDFS backend")
- raise
- return self._session
-
- def is_available(self) -> bool:
- """检查后端是否可用"""
- try:
|