hdfs_operations.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. """
  2. HDFS 文件系统操作模块
  3. 提供与 Java 版本 CommonOperation 类相同的功能:
  4. - 创建目录
  5. - 删除目录/文件
  6. - 上传文件
  7. - 读写文件
  8. - 检查文件是否存在
  9. - 列出目录内容
  10. """
  11. import os
  12. from typing import List, Optional, Tuple
  13. from .utils.helpers import run_command, validate_hdfs_path, setup_logger
  14. class HDFSOperations:
  15. """
  16. HDFS 文件系统操作类
  17. 封装了 Hadoop 命令行工具,提供与 HDFS 交互的各种方法。
  18. 功能与 Java 版本的 CommonOperation 类相对应。
  19. """
  20. def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'hdfs_operations'):
  21. """
  22. 初始化 HDFSOperations 实例
  23. Args:
  24. hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
  25. logger_name: 日志器名称
  26. """
  27. self.logger = setup_logger(logger_name)
  28. self.hadoop_home = hadoop_home or os.environ.get('HADOOP_HOME', '')
  29. self.hadoop_cmd = 'hdfs' if self._check_command_exists('hdfs') else 'hadoop'
  30. def _check_command_exists(self, cmd: str) -> bool:
  31. """
  32. 检查命令是否存在
  33. Args:
  34. cmd: 命令名称
  35. Returns:
  36. 命令是否存在
  37. """
  38. return os.system(f'which {cmd} > /dev/null 2>&1') == 0
  39. def _execute_hdfs_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]:
  40. """
  41. 执行 HDFS 命令
  42. Args:
  43. subcommand: HDFS 子命令(如 dfs, fs 等)
  44. args: 命令参数列表
  45. Returns:
  46. (return_code, stdout, stderr)
  47. """
  48. args = args or []
  49. cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}"
  50. self.logger.debug(f"Executing command: {cmd}")
  51. return run_command(cmd)
  52. def make_dir(self, path: str) -> bool:
  53. """
  54. 创建目录
  55. 对应 Java 版本的 makeDir 方法。
  56. Args:
  57. path: 要创建的目录路径
  58. Returns:
  59. 是否创建成功
  60. Example:
  61. >>> hdfs = HDFSOperations()
  62. >>> hdfs.make_dir('/user/root/test1')
  63. True
  64. """
  65. if not validate_hdfs_path(path):
  66. self.logger.error(f"Invalid HDFS path: {path}")
  67. return False
  68. self.logger.info(f"Creating directory: {path}")
  69. returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-mkdir', '-p', path])
  70. if returncode == 0:
  71. self.logger.info(f"Successfully created directory: {path}")
  72. return True
  73. else:
  74. self.logger.error(f"Failed to create directory: {path}, Error: {stderr}")
  75. return False
  76. def delete(self, path: str, recursive: bool = True) -> bool:
  77. """
  78. 删除文件或目录
  79. 对应 Java 版本的 delDir 和 delFile 方法。
  80. Args:
  81. path: 要删除的路径
  82. recursive: 是否递归删除(用于目录)
  83. Returns:
  84. 是否删除成功
  85. Example:
  86. >>> hdfs = HDFSOperations()
  87. >>> hdfs.delete('/user/hadoop/data/word.txt')
  88. True
  89. """
  90. if not validate_hdfs_path(path):
  91. self.logger.error(f"Invalid HDFS path: {path}")
  92. return False
  93. self.logger.info(f"Deleting: {path}")
  94. args = ['-rm', '-r'] if recursive else ['-rm']
  95. args.append(path)
  96. returncode, stdout, stderr = self._execute_hdfs_command('dfs', args)
  97. if returncode == 0:
  98. self.logger.info(f"Successfully deleted: {path}")
  99. return True
  100. else:
  101. self.logger.error(f"Failed to delete: {path}, Error: {stderr}")
  102. return False
  103. def copy_from_local(self, src: str, dst: str) -> bool:
  104. """
  105. 从本地文件系统上传文件到 HDFS
  106. 对应 Java 版本的 putFile 方法。
  107. Args:
  108. src: 本地文件路径
  109. dst: HDFS 目标路径
  110. Returns:
  111. 是否上传成功
  112. Example:
  113. >>> hdfs = HDFSOperations()
  114. >>> hdfs.copy_from_local('/home/hadoop/word.txt', '/user/hadoop/data/')
  115. True
  116. """
  117. if not os.path.exists(src):
  118. self.logger.error(f"Local file not found: {src}")
  119. return False
  120. if not validate_hdfs_path(dst):
  121. self.logger.error(f"Invalid HDFS path: {dst}")
  122. return False
  123. self.logger.info(f"Copying from local {src} to HDFS {dst}")
  124. returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-copyFromLocal', src, dst])
  125. if returncode == 0:
  126. self.logger.info(f"Successfully copied {src} to {dst}")
  127. return True
  128. else:
  129. self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
  130. return False
  131. def copy_to_local(self, src: str, dst: str) -> bool:
  132. """
  133. 从 HDFS 下载文件到本地文件系统
  134. Args:
  135. src: HDFS 源路径
  136. dst: 本地目标路径
  137. Returns:
  138. 是否下载成功
  139. """
  140. if not validate_hdfs_path(src):
  141. self.logger.error(f"Invalid HDFS path: {src}")
  142. return False
  143. self.logger.info(f"Copying from HDFS {src} to local {dst}")
  144. returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-copyToLocal', src, dst])
  145. if returncode == 0:
  146. self.logger.info(f"Successfully copied {src} to {dst}")
  147. return True
  148. else:
  149. self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
  150. return False
  151. def read_file(self, path: str) -> Optional[str]:
  152. """
  153. 读取 HDFS 文件内容
  154. 对应 Java 版本的 readFile 方法。
  155. Args:
  156. path: HDFS 文件路径
  157. Returns:
  158. 文件内容(字符串),如果失败返回 None
  159. Example:
  160. >>> hdfs = HDFSOperations()
  161. >>> content = hdfs.read_file('/user/hadoop/data/write.txt')
  162. >>> print(content)
  163. da jia hao,cai shi zhen de hao!
  164. """
  165. if not validate_hdfs_path(path):
  166. self.logger.error(f"Invalid HDFS path: {path}")
  167. return None
  168. if not self.exists(path):
  169. self.logger.error(f"File does not exist: {path}")
  170. return None
  171. self.logger.info(f"Reading file: {path}")
  172. returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-cat', path])
  173. if returncode == 0:
  174. self.logger.info(f"Successfully read file: {path}")
  175. return stdout
  176. else:
  177. self.logger.error(f"Failed to read file: {path}, Error: {stderr}")
  178. return None
  179. def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
  180. """
  181. 写入内容到 HDFS 文件
  182. 对应 Java 版本的 writeFile 方法。
  183. Args:
  184. path: HDFS 文件路径
  185. content: 要写入的内容
  186. overwrite: 是否覆盖已存在的文件
  187. Returns:
  188. 是否写入成功
  189. Example:
  190. >>> hdfs = HDFSOperations()
  191. >>> hdfs.write_file('/user/hadoop/data/write.txt', 'da jia hao,cai shi zhen de hao!')
  192. True
  193. """
  194. if not validate_hdfs_path(path):
  195. self.logger.error(f"Invalid HDFS path: {path}")
  196. return False
  197. self.logger.info(f"Writing to file: {path}")
  198. # 创建临时文件
  199. import tempfile
  200. with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
  201. temp_file.write(content)
  202. temp_path = temp_file.name
  203. try:
  204. # 使用 put 命令上传临时文件
  205. args = ['-put']
  206. if overwrite:
  207. args.append('-f')
  208. args.extend([temp_path, path])
  209. returncode, stdout, stderr = self._execute_hdfs_command('dfs', args)
  210. if returncode == 0:
  211. self.logger.info(f"Successfully wrote to file: {path}")
  212. return True
  213. else:
  214. self.logger.error(f"Failed to write to file: {path}, Error: {stderr}")
  215. return False
  216. finally:
  217. # 清理临时文件
  218. if os.path.exists(temp_path):
  219. os.unlink(temp_path)
  220. def exists(self, path: str) -> bool:
  221. """
  222. 检查 HDFS 路径是否存在
  223. Args:
  224. path: HDFS 路径
  225. Returns:
  226. 路径是否存在
  227. """
  228. if not validate_hdfs_path(path):
  229. return False
  230. returncode, _, _ = self._execute_hdfs_command('dfs', ['-test', '-e', path])
  231. return returncode == 0
  232. def list_dir(self, path: str) -> List[str]:
  233. """
  234. 列出 HDFS 目录内容
  235. Args:
  236. path: HDFS 目录路径
  237. Returns:
  238. 目录内容列表
  239. """
  240. if not validate_hdfs_path(path):
  241. self.logger.error(f"Invalid HDFS path: {path}")
  242. return []
  243. if not self.exists(path):
  244. self.logger.error(f"Directory does not exist: {path}")
  245. return []
  246. returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-ls', path])
  247. if returncode == 0:
  248. # 解析输出,提取文件名
  249. lines = stdout.strip().split('\n')
  250. # 跳过第一行(如果是目录列表的标题)
  251. if len(lines) > 0 and lines[0].startswith('Found'):
  252. lines = lines[1:]
  253. # 提取文件名(每一行的最后一个字段)
  254. files = []
  255. for line in lines:
  256. parts = line.split()
  257. if len(parts) >= 8:
  258. files.append(parts[-1])
  259. return files
  260. else:
  261. self.logger.error(f"Failed to list directory: {path}, Error: {stderr}")
  262. return []
  263. def get_file_size(self, path: str) -> Optional[int]:
  264. """
  265. 获取 HDFS 文件大小
  266. Args:
  267. path: HDFS 文件路径
  268. Returns:
  269. 文件大小(字节),如果失败返回 None
  270. """
  271. if not validate_hdfs_path(path):
  272. self.logger.error(f"Invalid HDFS path: {path}")
  273. return None
  274. if not self.exists(path):
  275. self.logger.error(f"File does not exist: {path}")
  276. return None
  277. returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-du', '-s', path])
  278. if returncode == 0:
  279. # 解析输出,提取文件大小
  280. parts = stdout.strip().split()
  281. if len(parts) >= 1:
  282. try:
  283. return int(parts[0])
  284. except ValueError:
  285. self.logger.error(f"Failed to parse file size: {stdout}")
  286. return None
  287. else:
  288. self.logger.error(f"Failed to get file size: {path}, Error: {stderr}")
  289. return None