""" HDFS 文件系统操作模块 提供与 Java 版本 CommonOperation 类相同的功能: - 创建目录 - 删除目录/文件 - 上传文件 - 读写文件 - 检查文件是否存在 - 列出目录内容 """ import os from typing import List, Optional, Tuple from .utils.helpers import run_command, validate_hdfs_path, setup_logger class HDFSOperations: """ HDFS 文件系统操作类 封装了 Hadoop 命令行工具,提供与 HDFS 交互的各种方法。 功能与 Java 版本的 CommonOperation 类相对应。 """ def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'hdfs_operations'): """ 初始化 HDFSOperations 实例 Args: hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取) logger_name: 日志器名称 """ self.logger = setup_logger(logger_name) self.hadoop_home = hadoop_home or os.environ.get('HADOOP_HOME', '') self.hadoop_cmd = 'hdfs' if self._check_command_exists('hdfs') else 'hadoop' def _check_command_exists(self, cmd: str) -> bool: """ 检查命令是否存在 Args: cmd: 命令名称 Returns: 命令是否存在 """ return os.system(f'which {cmd} > /dev/null 2>&1') == 0 def _execute_hdfs_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]: """ 执行 HDFS 命令 Args: subcommand: HDFS 子命令(如 dfs, fs 等) args: 命令参数列表 Returns: (return_code, stdout, stderr) """ args = args or [] cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}" self.logger.debug(f"Executing command: {cmd}") return run_command(cmd) def make_dir(self, path: str) -> bool: """ 创建目录 对应 Java 版本的 makeDir 方法。 Args: path: 要创建的目录路径 Returns: 是否创建成功 Example: >>> hdfs = HDFSOperations() >>> hdfs.make_dir('/user/root/test1') True """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Creating directory: {path}") returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-mkdir', '-p', path]) if returncode == 0: self.logger.info(f"Successfully created directory: {path}") return True else: self.logger.error(f"Failed to create directory: {path}, Error: {stderr}") return False def delete(self, path: str, recursive: bool = True) -> bool: """ 删除文件或目录 对应 Java 版本的 delDir 和 delFile 方法。 Args: path: 要删除的路径 recursive: 是否递归删除(用于目录) Returns: 是否删除成功 Example: >>> hdfs = HDFSOperations() >>> hdfs.delete('/user/hadoop/data/word.txt') True """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Deleting: {path}") args = ['-rm', '-r'] if recursive else ['-rm'] args.append(path) returncode, stdout, stderr = self._execute_hdfs_command('dfs', args) if returncode == 0: self.logger.info(f"Successfully deleted: {path}") return True else: self.logger.error(f"Failed to delete: {path}, Error: {stderr}") return False def copy_from_local(self, src: str, dst: str) -> bool: """ 从本地文件系统上传文件到 HDFS 对应 Java 版本的 putFile 方法。 Args: src: 本地文件路径 dst: HDFS 目标路径 Returns: 是否上传成功 Example: >>> hdfs = HDFSOperations() >>> hdfs.copy_from_local('/home/hadoop/word.txt', '/user/hadoop/data/') True """ if not os.path.exists(src): self.logger.error(f"Local file not found: {src}") return False if not validate_hdfs_path(dst): self.logger.error(f"Invalid HDFS path: {dst}") return False self.logger.info(f"Copying from local {src} to HDFS {dst}") returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-copyFromLocal', src, dst]) if returncode == 0: self.logger.info(f"Successfully copied {src} to {dst}") return True else: self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}") return False def copy_to_local(self, src: str, dst: str) -> bool: """ 从 HDFS 下载文件到本地文件系统 Args: src: HDFS 源路径 dst: 本地目标路径 Returns: 是否下载成功 """ if not validate_hdfs_path(src): self.logger.error(f"Invalid HDFS path: {src}") return False self.logger.info(f"Copying from HDFS {src} to local {dst}") returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-copyToLocal', src, dst]) if returncode == 0: self.logger.info(f"Successfully copied {src} to {dst}") return True else: self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}") return False def read_file(self, path: str) -> Optional[str]: """ 读取 HDFS 文件内容 对应 Java 版本的 readFile 方法。 Args: path: HDFS 文件路径 Returns: 文件内容(字符串),如果失败返回 None Example: >>> hdfs = HDFSOperations() >>> content = hdfs.read_file('/user/hadoop/data/write.txt') >>> print(content) da jia hao,cai shi zhen de hao! """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return None if not self.exists(path): self.logger.error(f"File does not exist: {path}") return None self.logger.info(f"Reading file: {path}") returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-cat', path]) if returncode == 0: self.logger.info(f"Successfully read file: {path}") return stdout else: self.logger.error(f"Failed to read file: {path}, Error: {stderr}") return None def write_file(self, path: str, content: str, overwrite: bool = True) -> bool: """ 写入内容到 HDFS 文件 对应 Java 版本的 writeFile 方法。 Args: path: HDFS 文件路径 content: 要写入的内容 overwrite: 是否覆盖已存在的文件 Returns: 是否写入成功 Example: >>> hdfs = HDFSOperations() >>> hdfs.write_file('/user/hadoop/data/write.txt', 'da jia hao,cai shi zhen de hao!') True """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return False self.logger.info(f"Writing to file: {path}") # 创建临时文件 import tempfile with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file: temp_file.write(content) temp_path = temp_file.name try: # 使用 put 命令上传临时文件 args = ['-put'] if overwrite: args.append('-f') args.extend([temp_path, path]) returncode, stdout, stderr = self._execute_hdfs_command('dfs', args) if returncode == 0: self.logger.info(f"Successfully wrote to file: {path}") return True else: self.logger.error(f"Failed to write to file: {path}, Error: {stderr}") return False finally: # 清理临时文件 if os.path.exists(temp_path): os.unlink(temp_path) def exists(self, path: str) -> bool: """ 检查 HDFS 路径是否存在 Args: path: HDFS 路径 Returns: 路径是否存在 """ if not validate_hdfs_path(path): return False returncode, _, _ = self._execute_hdfs_command('dfs', ['-test', '-e', path]) return returncode == 0 def list_dir(self, path: str) -> List[str]: """ 列出 HDFS 目录内容 Args: path: HDFS 目录路径 Returns: 目录内容列表 """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return [] if not self.exists(path): self.logger.error(f"Directory does not exist: {path}") return [] returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-ls', path]) if returncode == 0: # 解析输出,提取文件名 lines = stdout.strip().split('\n') # 跳过第一行(如果是目录列表的标题) if len(lines) > 0 and lines[0].startswith('Found'): lines = lines[1:] # 提取文件名(每一行的最后一个字段) files = [] for line in lines: parts = line.split() if len(parts) >= 8: files.append(parts[-1]) return files else: self.logger.error(f"Failed to list directory: {path}, Error: {stderr}") return [] def get_file_size(self, path: str) -> Optional[int]: """ 获取 HDFS 文件大小 Args: path: HDFS 文件路径 Returns: 文件大小(字节),如果失败返回 None """ if not validate_hdfs_path(path): self.logger.error(f"Invalid HDFS path: {path}") return None if not self.exists(path): self.logger.error(f"File does not exist: {path}") return None returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-du', '-s', path]) if returncode == 0: # 解析输出,提取文件大小 parts = stdout.strip().split() if len(parts) >= 1: try: return int(parts[0]) except ValueError: self.logger.error(f"Failed to parse file size: {stdout}") return None else: self.logger.error(f"Failed to get file size: {path}, Error: {stderr}") return None