| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365 |
- """
- HDFS 文件系统操作模块
- 提供与 Java 版本 CommonOperation 类相同的功能:
- - 创建目录
- - 删除目录/文件
- - 上传文件
- - 读写文件
- - 检查文件是否存在
- - 列出目录内容
- """
- import os
- from typing import List, Optional, Tuple
- from .utils.helpers import run_command, validate_hdfs_path, setup_logger
- class HDFSOperations:
- """
- HDFS 文件系统操作类
-
- 封装了 Hadoop 命令行工具,提供与 HDFS 交互的各种方法。
- 功能与 Java 版本的 CommonOperation 类相对应。
- """
-
- def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'hdfs_operations'):
- """
- 初始化 HDFSOperations 实例
-
- Args:
- hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
- logger_name: 日志器名称
- """
- self.logger = setup_logger(logger_name)
- self.hadoop_home = hadoop_home or os.environ.get('HADOOP_HOME', '')
- self.hadoop_cmd = 'hdfs' if self._check_command_exists('hdfs') else 'hadoop'
-
- def _check_command_exists(self, cmd: str) -> bool:
- """
- 检查命令是否存在
-
- Args:
- cmd: 命令名称
-
- Returns:
- 命令是否存在
- """
- return os.system(f'which {cmd} > /dev/null 2>&1') == 0
-
- def _execute_hdfs_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]:
- """
- 执行 HDFS 命令
-
- Args:
- subcommand: HDFS 子命令(如 dfs, fs 等)
- args: 命令参数列表
-
- Returns:
- (return_code, stdout, stderr)
- """
- args = args or []
- cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}"
- self.logger.debug(f"Executing command: {cmd}")
- return run_command(cmd)
-
- def make_dir(self, path: str) -> bool:
- """
- 创建目录
-
- 对应 Java 版本的 makeDir 方法。
-
- Args:
- path: 要创建的目录路径
-
- Returns:
- 是否创建成功
-
- Example:
- >>> hdfs = HDFSOperations()
- >>> hdfs.make_dir('/user/root/test1')
- True
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Creating directory: {path}")
- returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-mkdir', '-p', path])
-
- if returncode == 0:
- self.logger.info(f"Successfully created directory: {path}")
- return True
- else:
- self.logger.error(f"Failed to create directory: {path}, Error: {stderr}")
- return False
-
- def delete(self, path: str, recursive: bool = True) -> bool:
- """
- 删除文件或目录
-
- 对应 Java 版本的 delDir 和 delFile 方法。
-
- Args:
- path: 要删除的路径
- recursive: 是否递归删除(用于目录)
-
- Returns:
- 是否删除成功
-
- Example:
- >>> hdfs = HDFSOperations()
- >>> hdfs.delete('/user/hadoop/data/word.txt')
- True
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Deleting: {path}")
- args = ['-rm', '-r'] if recursive else ['-rm']
- args.append(path)
-
- returncode, stdout, stderr = self._execute_hdfs_command('dfs', args)
-
- if returncode == 0:
- self.logger.info(f"Successfully deleted: {path}")
- return True
- else:
- self.logger.error(f"Failed to delete: {path}, Error: {stderr}")
- return False
-
- def copy_from_local(self, src: str, dst: str) -> bool:
- """
- 从本地文件系统上传文件到 HDFS
-
- 对应 Java 版本的 putFile 方法。
-
- Args:
- src: 本地文件路径
- dst: HDFS 目标路径
-
- Returns:
- 是否上传成功
-
- Example:
- >>> hdfs = HDFSOperations()
- >>> hdfs.copy_from_local('/home/hadoop/word.txt', '/user/hadoop/data/')
- True
- """
- if not os.path.exists(src):
- self.logger.error(f"Local file not found: {src}")
- return False
-
- if not validate_hdfs_path(dst):
- self.logger.error(f"Invalid HDFS path: {dst}")
- return False
-
- self.logger.info(f"Copying from local {src} to HDFS {dst}")
- returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-copyFromLocal', src, dst])
-
- if returncode == 0:
- self.logger.info(f"Successfully copied {src} to {dst}")
- return True
- else:
- self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
- return False
-
- def copy_to_local(self, src: str, dst: str) -> bool:
- """
- 从 HDFS 下载文件到本地文件系统
-
- Args:
- src: HDFS 源路径
- dst: 本地目标路径
-
- Returns:
- 是否下载成功
- """
- if not validate_hdfs_path(src):
- self.logger.error(f"Invalid HDFS path: {src}")
- return False
-
- self.logger.info(f"Copying from HDFS {src} to local {dst}")
- returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-copyToLocal', src, dst])
-
- if returncode == 0:
- self.logger.info(f"Successfully copied {src} to {dst}")
- return True
- else:
- self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
- return False
-
- def read_file(self, path: str) -> Optional[str]:
- """
- 读取 HDFS 文件内容
-
- 对应 Java 版本的 readFile 方法。
-
- Args:
- path: HDFS 文件路径
-
- Returns:
- 文件内容(字符串),如果失败返回 None
-
- Example:
- >>> hdfs = HDFSOperations()
- >>> content = hdfs.read_file('/user/hadoop/data/write.txt')
- >>> print(content)
- da jia hao,cai shi zhen de hao!
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return None
-
- if not self.exists(path):
- self.logger.error(f"File does not exist: {path}")
- return None
-
- self.logger.info(f"Reading file: {path}")
- returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-cat', path])
-
- if returncode == 0:
- self.logger.info(f"Successfully read file: {path}")
- return stdout
- else:
- self.logger.error(f"Failed to read file: {path}, Error: {stderr}")
- return None
-
- def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
- """
- 写入内容到 HDFS 文件
-
- 对应 Java 版本的 writeFile 方法。
-
- Args:
- path: HDFS 文件路径
- content: 要写入的内容
- overwrite: 是否覆盖已存在的文件
-
- Returns:
- 是否写入成功
-
- Example:
- >>> hdfs = HDFSOperations()
- >>> hdfs.write_file('/user/hadoop/data/write.txt', 'da jia hao,cai shi zhen de hao!')
- True
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Writing to file: {path}")
-
- # 创建临时文件
- import tempfile
- with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
- temp_file.write(content)
- temp_path = temp_file.name
-
- try:
- # 使用 put 命令上传临时文件
- args = ['-put']
- if overwrite:
- args.append('-f')
- args.extend([temp_path, path])
-
- returncode, stdout, stderr = self._execute_hdfs_command('dfs', args)
-
- if returncode == 0:
- self.logger.info(f"Successfully wrote to file: {path}")
- return True
- else:
- self.logger.error(f"Failed to write to file: {path}, Error: {stderr}")
- return False
- finally:
- # 清理临时文件
- if os.path.exists(temp_path):
- os.unlink(temp_path)
-
- def exists(self, path: str) -> bool:
- """
- 检查 HDFS 路径是否存在
-
- Args:
- path: HDFS 路径
-
- Returns:
- 路径是否存在
- """
- if not validate_hdfs_path(path):
- return False
-
- returncode, _, _ = self._execute_hdfs_command('dfs', ['-test', '-e', path])
- return returncode == 0
-
- def list_dir(self, path: str) -> List[str]:
- """
- 列出 HDFS 目录内容
-
- Args:
- path: HDFS 目录路径
-
- Returns:
- 目录内容列表
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return []
-
- if not self.exists(path):
- self.logger.error(f"Directory does not exist: {path}")
- return []
-
- returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-ls', path])
-
- if returncode == 0:
- # 解析输出,提取文件名
- lines = stdout.strip().split('\n')
- # 跳过第一行(如果是目录列表的标题)
- if len(lines) > 0 and lines[0].startswith('Found'):
- lines = lines[1:]
-
- # 提取文件名(每一行的最后一个字段)
- files = []
- for line in lines:
- parts = line.split()
- if len(parts) >= 8:
- files.append(parts[-1])
- return files
- else:
- self.logger.error(f"Failed to list directory: {path}, Error: {stderr}")
- return []
-
- def get_file_size(self, path: str) -> Optional[int]:
- """
- 获取 HDFS 文件大小
-
- Args:
- path: HDFS 文件路径
-
- Returns:
- 文件大小(字节),如果失败返回 None
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return None
-
- if not self.exists(path):
- self.logger.error(f"File does not exist: {path}")
- return None
-
- returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-du', '-s', path])
-
- if returncode == 0:
- # 解析输出,提取文件大小
- parts = stdout.strip().split()
- if len(parts) >= 1:
- try:
- return int(parts[0])
- except ValueError:
- self.logger.error(f"Failed to parse file size: {stdout}")
- return None
- else:
- self.logger.error(f"Failed to get file size: {path}, Error: {stderr}")
- return None
|