hdfs_operations.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. """
  2. HDFS 文件系统操作模块
  3. 提供现代化的 HDFS 操作能力:
  4. - 多种后端支持(命令行、hdfs 库、pyhdfs 库、WebHDFS)
  5. - 同步和异步 API
  6. - 上下文管理器支持
  7. - 配置管理集成
  8. - 重试机制
  9. - 丰富的错误处理
  10. 功能对应 Java 版本 CommonOperation 类:
  11. - 创建目录
  12. - 删除目录/文件
  13. - 上传文件
  14. - 读写文件
  15. - 检查文件是否存在
  16. - 列出目录内容
  17. - 获取文件信息
  18. """
  19. import os
  20. import asyncio
  21. import time
  22. from abc import ABC, abstractmethod
  23. from dataclasses import dataclass, field
  24. from datetime import datetime
  25. from enum import Enum
  26. from pathlib import Path
  27. from typing import (
  28. Any, Callable, Dict, Generic, List, Optional,
  29. Tuple, Type, TypeVar, Union, Iterator, AsyncIterator
  30. )
  31. from contextlib import contextmanager, asynccontextmanager
  32. from .config import ConfigurationManager, HDFSConfig, get_config
  33. from .utils.helpers import (
  34. run_command, validate_hdfs_path, setup_logger, format_file_size
  35. )
  36. T = TypeVar('T')
  37. class BackendType(Enum):
  38. """HDFS 后端类型"""
  39. CLI = "cli" # 命令行工具
  40. HDFS_LIB = "hdfs_lib" # hdfs 库
  41. PYHDFS = "pyhdfs" # pyhdfs 库
  42. WEBHDFS = "webhdfs" # WebHDFS REST API
  43. AUTO = "auto" # 自动选择可用的后端
  44. @dataclass
  45. class FileStatus:
  46. """文件状态信息"""
  47. path: str
  48. is_directory: bool
  49. length: int = 0
  50. replication: int = 1
  51. block_size: int = 134217728 # 128MB 默认块大小
  52. modification_time: Optional[datetime] = None
  53. access_time: Optional[datetime] = None
  54. owner: str = ""
  55. group: str = ""
  56. permission: str = "644"
  57. is_snapshot: bool = False
  58. @property
  59. def size_formatted(self) -> str:
  60. """格式化的文件大小"""
  61. return format_file_size(self.length)
  62. def to_dict(self) -> Dict[str, Any]:
  63. """转换为字典"""
  64. return {
  65. 'path': self.path,
  66. 'is_directory': self.is_directory,
  67. 'length': self.length,
  68. 'size_formatted': self.size_formatted,
  69. 'replication': self.replication,
  70. 'block_size': self.block_size,
  71. 'modification_time': self.modification_time.isoformat() if self.modification_time else None,
  72. 'access_time': self.access_time.isoformat() if self.access_time else None,
  73. 'owner': self.owner,
  74. 'group': self.group,
  75. 'permission': self.permission,
  76. 'is_snapshot': self.is_snapshot,
  77. }
  78. class HDFSBackend(ABC):
  79. """HDFS 后端抽象基类"""
  80. def __init__(self, config: HDFSConfig, logger):
  81. self.config = config
  82. self.logger = logger
  83. @abstractmethod
  84. def is_available(self) -> bool:
  85. """检查后端是否可用"""
  86. pass
  87. @abstractmethod
  88. def make_dir(self, path: str) -> bool:
  89. """创建目录"""
  90. pass
  91. @abstractmethod
  92. def delete(self, path: str, recursive: bool = True) -> bool:
  93. """删除文件或目录"""
  94. pass
  95. @abstractmethod
  96. def copy_from_local(self, src: str, dst: str) -> bool:
  97. """从本地上传文件到 HDFS"""
  98. pass
  99. @abstractmethod
  100. def copy_to_local(self, src: str, dst: str) -> bool:
  101. """从 HDFS 下载文件到本地"""
  102. pass
  103. @abstractmethod
  104. def read_file(self, path: str) -> Optional[str]:
  105. """读取文件内容"""
  106. pass
  107. @abstractmethod
  108. def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
  109. """写入文件内容"""
  110. pass
  111. @abstractmethod
  112. def exists(self, path: str) -> bool:
  113. """检查路径是否存在"""
  114. pass
  115. @abstractmethod
  116. def list_dir(self, path: str) -> List[str]:
  117. """列出目录内容"""
  118. pass
  119. @abstractmethod
  120. def get_file_status(self, path: str) -> Optional[FileStatus]:
  121. """获取文件状态"""
  122. pass
  123. @abstractmethod
  124. def get_file_size(self, path: str) -> Optional[int]:
  125. """获取文件大小"""
  126. pass
  127. @abstractmethod
  128. def rename(self, src: str, dst: str) -> bool:
  129. """重命名文件或目录"""
  130. pass
  131. @abstractmethod
  132. def set_permission(self, path: str, permission: str) -> bool:
  133. """设置文件权限"""
  134. pass
  135. @abstractmethod
  136. def set_owner(self, path: str, owner: Optional[str] = None,
  137. group: Optional[str] = None) -> bool:
  138. """设置文件所有者"""
  139. pass
  140. class CLIBackend(HDFSBackend):
  141. """命令行后端实现"""
  142. def __init__(self, config: HDFSConfig, logger):
  143. super().__init__(config, logger)
  144. self.hadoop_cmd = config.hadoop_cmd or 'hdfs'
  145. self._check_cmd_available()
  146. def _check_cmd_available(self):
  147. """检查命令是否可用"""
  148. if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0:
  149. self.hadoop_cmd = 'hadoop'
  150. if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0:
  151. self.logger.warning("Neither 'hdfs' nor 'hadoop' command found")
  152. def is_available(self) -> bool:
  153. """检查后端是否可用"""
  154. return os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') == 0
  155. def _execute_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]:
  156. """执行 HDFS 命令"""
  157. args = args or []
  158. cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}"
  159. self.logger.debug(f"Executing command: {cmd}")
  160. return run_command(cmd)
  161. def make_dir(self, path: str) -> bool:
  162. if not validate_hdfs_path(path):
  163. self.logger.error(f"Invalid HDFS path: {path}")
  164. return False
  165. self.logger.info(f"Creating directory: {path}")
  166. returncode, stdout, stderr = self._execute_command('dfs', ['-mkdir', '-p', path])
  167. if returncode == 0:
  168. self.logger.info(f"Successfully created directory: {path}")
  169. return True
  170. else:
  171. self.logger.error(f"Failed to create directory: {path}, Error: {stderr}")
  172. return False
  173. def delete(self, path: str, recursive: bool = True) -> bool:
  174. if not validate_hdfs_path(path):
  175. self.logger.error(f"Invalid HDFS path: {path}")
  176. return False
  177. self.logger.info(f"Deleting: {path}")
  178. args = ['-rm', '-r'] if recursive else ['-rm']
  179. args.append(path)
  180. returncode, stdout, stderr = self._execute_command('dfs', args)
  181. if returncode == 0:
  182. self.logger.info(f"Successfully deleted: {path}")
  183. return True
  184. else:
  185. self.logger.error(f"Failed to delete: {path}, Error: {stderr}")
  186. return False
  187. def copy_from_local(self, src: str, dst: str) -> bool:
  188. if not os.path.exists(src):
  189. self.logger.error(f"Local file not found: {src}")
  190. return False
  191. if not validate_hdfs_path(dst):
  192. self.logger.error(f"Invalid HDFS path: {dst}")
  193. return False
  194. self.logger.info(f"Copying from local {src} to HDFS {dst}")
  195. returncode, stdout, stderr = self._execute_command('dfs', ['-copyFromLocal', src, dst])
  196. if returncode == 0:
  197. self.logger.info(f"Successfully copied {src} to {dst}")
  198. return True
  199. else:
  200. self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
  201. return False
  202. def copy_to_local(self, src: str, dst: str) -> bool:
  203. if not validate_hdfs_path(src):
  204. self.logger.error(f"Invalid HDFS path: {src}")
  205. return False
  206. self.logger.info(f"Copying from HDFS {src} to local {dst}")
  207. returncode, stdout, stderr = self._execute_command('dfs', ['-copyToLocal', src, dst])
  208. if returncode == 0:
  209. self.logger.info(f"Successfully copied {src} to {dst}")
  210. return True
  211. else:
  212. self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
  213. return False
  214. def read_file(self, path: str) -> Optional[str]:
  215. if not validate_hdfs_path(path):
  216. self.logger.error(f"Invalid HDFS path: {path}")
  217. return None
  218. if not self.exists(path):
  219. self.logger.error(f"File does not exist: {path}")
  220. return None
  221. self.logger.info(f"Reading file: {path}")
  222. returncode, stdout, stderr = self._execute_command('dfs', ['-cat', path])
  223. if returncode == 0:
  224. self.logger.info(f"Successfully read file: {path}")
  225. return stdout
  226. else:
  227. self.logger.error(f"Failed to read file: {path}, Error: {stderr}")
  228. return None
  229. def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
  230. import tempfile
  231. if not validate_hdfs_path(path):
  232. self.logger.error(f"Invalid HDFS path: {path}")
  233. return False
  234. self.logger.info(f"Writing to file: {path}")
  235. with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
  236. temp_file.write(content)
  237. temp_path = temp_file.name
  238. try:
  239. args = ['-put']
  240. if overwrite:
  241. args.append('-f')
  242. args.extend([temp_path, path])
  243. returncode, stdout, stderr = self._execute_command('dfs', args)
  244. if returncode == 0:
  245. self.logger.info(f"Successfully wrote to file: {path}")
  246. return True
  247. else:
  248. self.logger.error(f"Failed to write to file: {path}, Error: {stderr}")
  249. return False
  250. finally:
  251. if os.path.exists(temp_path):
  252. os.unlink(temp_path)
  253. def exists(self, path: str) -> bool:
  254. if not validate_hdfs_path(path):
  255. return False
  256. returncode, _, _ = self._execute_command('dfs', ['-test', '-e', path])
  257. return returncode == 0
  258. def list_dir(self, path: str) -> List[str]:
  259. if not validate_hdfs_path(path):
  260. self.logger.error(f"Invalid HDFS path: {path}")
  261. return []
  262. if not self.exists(path):
  263. self.logger.error(f"Directory does not exist: {path}")
  264. return []
  265. returncode, stdout, stderr = self._execute_command('dfs', ['-ls', path])
  266. if returncode == 0:
  267. lines = stdout.strip().split('\n')
  268. if len(lines) > 0 and lines[0].startswith('Found'):
  269. lines = lines[1:]
  270. files = []
  271. for line in lines:
  272. parts = line.split()
  273. if len(parts) >= 8:
  274. files.append(parts[-1])
  275. return files
  276. else:
  277. self.logger.error(f"Failed to list directory: {path}, Error: {stderr}")
  278. return []
  279. def get_file_status(self, path: str) -> Optional[FileStatus]:
  280. if not validate_hdfs_path(path):
  281. self.logger.error(f"Invalid HDFS path: {path}")
  282. return None
  283. if not self.exists(path):
  284. self.logger.error(f"Path does not exist: {path}")
  285. return None
  286. returncode, stdout, stderr = self._execute_command('dfs', ['-stat', '%F,%s,%r,%b,%y,%z,%u,%g,%a', path])
  287. if returncode == 0:
  288. parts = stdout.strip().split(',')
  289. if len(parts) >= 9:
  290. is_dir = parts[0] == 'directory'
  291. try:
  292. return FileStatus(
  293. path=path,
  294. is_directory=is_dir,
  295. length=int(parts[1]) if parts[1] else 0,
  296. replication=int(parts[2]) if parts[2] else 1,
  297. block_size=int(parts[3]) if parts[3] else 134217728,
  298. modification_time=datetime.strptime(parts[4], '%Y-%m-%d %H:%M:%S') if parts[4] else None,
  299. access_time=datetime.strptime(parts[5], '%Y-%m-%d %H:%M:%S') if parts[5] else None,
  300. owner=parts[6],
  301. group=parts[7],
  302. permission=parts[8],
  303. )
  304. except (ValueError, IndexError) as e:
  305. self.logger.warning(f"Failed to parse file status: {e}")
  306. # 备用方法:使用 -ls
  307. returncode, stdout, stderr = self._execute_command('dfs', ['-ls', '-d', path])
  308. if returncode == 0:
  309. parts = stdout.strip().split()
  310. if len(parts) >= 8:
  311. is_dir = parts[0].startswith('d')
  312. try:
  313. return FileStatus(
  314. path=path,
  315. is_directory=is_dir,
  316. length=int(parts[4]) if parts[4] else 0,
  317. replication=int(parts[1]) if parts[1] and not is_dir else 1,
  318. owner=parts[2],
  319. group=parts[3],
  320. )
  321. except (ValueError, IndexError):
  322. pass
  323. return None
  324. def get_file_size(self, path: str) -> Optional[int]:
  325. if not validate_hdfs_path(path):
  326. self.logger.error(f"Invalid HDFS path: {path}")
  327. return None
  328. if not self.exists(path):
  329. self.logger.error(f"File does not exist: {path}")
  330. return None
  331. returncode, stdout, stderr = self._execute_command('dfs', ['-du', '-s', path])
  332. if returncode == 0:
  333. parts = stdout.strip().split()
  334. if len(parts) >= 1:
  335. try:
  336. return int(parts[0])
  337. except ValueError:
  338. self.logger.error(f"Failed to parse file size: {stdout}")
  339. return None
  340. def rename(self, src: str, dst: str) -> bool:
  341. if not validate_hdfs_path(src):
  342. self.logger.error(f"Invalid source path: {src}")
  343. return False
  344. if not validate_hdfs_path(dst):
  345. self.logger.error(f"Invalid destination path: {dst}")
  346. return False
  347. self.logger.info(f"Renaming {src} to {dst}")
  348. returncode, stdout, stderr = self._execute_command('dfs', ['-mv', src, dst])
  349. if returncode == 0:
  350. self.logger.info(f"Successfully renamed {src} to {dst}")
  351. return True
  352. else:
  353. self.logger.error(f"Failed to rename {src} to {dst}, Error: {stderr}")
  354. return False
  355. def set_permission(self, path: str, permission: str) -> bool:
  356. if not validate_hdfs_path(path):
  357. self.logger.error(f"Invalid HDFS path: {path}")
  358. return False
  359. self.logger.info(f"Setting permission of {path} to {permission}")
  360. returncode, stdout, stderr = self._execute_command('dfs', ['-chmod', permission, path])
  361. if returncode == 0:
  362. self.logger.info(f"Successfully set permission of {path} to {permission}")
  363. return True
  364. else:
  365. self.logger.error(f"Failed to set permission of {path}, Error: {stderr}")
  366. return False
  367. def set_owner(self, path: str, owner: Optional[str] = None,
  368. group: Optional[str] = None) -> bool:
  369. if not validate_hdfs_path(path):
  370. self.logger.error(f"Invalid HDFS path: {path}")
  371. return False
  372. if owner is None and group is None:
  373. self.logger.error("At least one of owner or group must be specified")
  374. return False
  375. owner_str = owner if owner else ''
  376. group_str = f":{group}" if group else ''
  377. owner_group = f"{owner_str}{group_str}"
  378. self.logger.info(f"Setting owner of {path} to {owner_group}")
  379. returncode, stdout, stderr = self._execute_command('dfs', ['-chown', owner_group, path])
  380. if returncode == 0:
  381. self.logger.info(f"Successfully set owner of {path} to {owner_group}")
  382. return True
  383. else:
  384. self.logger.error(f"Failed to set owner of {path}, Error: {stderr}")
  385. return False
  386. class WebHDFSBackend(HDFSBackend):
  387. """WebHDFS REST API 后端实现"""
  388. def __init__(self, config: HDFSConfig, logger):
  389. super().__init__(config, logger)
  390. self.base_url = f"http://{config.namenode_host}:{config.namenode_http_port}/webhdfs/v1"
  391. self.user = config.user or os.environ.get('USER', 'hadoop')
  392. self._session = None
  393. def _get_session(self):
  394. """获取 HTTP 会话"""
  395. if self._session is None:
  396. try:
  397. import requests
  398. self._session = requests.Session()
  399. except ImportError:
  400. self.logger.error("requests library is required for WebHDFS backend")
  401. raise
  402. return self._session
  403. def is_available(self) -> bool:
  404. """检查后端是否可用"""
  405. try:
  406. import requests
  407. # 尝试连接 Namenode
  408. response = self._get_session().get(
  409. f"{self.base_url}/?op=LISTSTATUS",
  410. params={'user.name': self.user},
  411. timeout=5
  412. )
  413. return response.status_code in [200, 401, 403]
  414. except Exception:
  415. return False
  416. def make_dir(self, path: str) -> bool:
  417. try:
  418. session = self._get_session()
  419. response = session.put(
  420. f"{self.base_url}{path}",
  421. params={'op': 'MKDIRS', 'user.name': self.user},
  422. timeout=self.config.connect_timeout
  423. )
  424. return response.status_code == 200
  425. except Exception as e:
  426. self.logger.error(f"Failed to create directory: {e}")
  427. return False
  428. def delete(self, path: str, recursive: bool = True) -> bool:
  429. try:
  430. session = self._get_session()
  431. response = session.delete(
  432. f"{self.base_url}{path}",
  433. params={
  434. 'op': 'DELETE',
  435. 'recursive': str(recursive).lower(),
  436. 'user.name': self.user
  437. },
  438. timeout=self.config.connect_timeout
  439. )
  440. return response.status_code == 200
  441. except Exception as e:
  442. self.logger.error(f"Failed to delete: {e}")
  443. return False
  444. def copy_from_local(self, src: str, dst: str) -> bool:
  445. try:
  446. with open(src, 'rb') as f:
  447. content = f.read()
  448. return self.write_file(dst, content.decode('utf-8', errors='ignore'))
  449. except Exception as e:
  450. self.logger.error(f"Failed to copy from local: {e}")
  451. return False
  452. def copy_to_local(self, src: str, dst: str) -> bool:
  453. content = self.read_file(src)
  454. if content is None:
  455. return False
  456. try:
  457. with open(dst, 'w', encoding='utf-8') as f:
  458. f.write(content)
  459. return True
  460. except Exception as e:
  461. self.logger.error(f"Failed to copy to local: {e}")
  462. return False
  463. def read_file(self, path: str) -> Optional[str]:
  464. try:
  465. session = self._get_session()
  466. response = session.get(
  467. f"{self.base_url}{path}",
  468. params={'op': 'OPEN', 'user.name': self.user},
  469. timeout=self.config.read_timeout
  470. )
  471. if response.status_code == 200:
  472. return response.text
  473. else:
  474. self.logger.error(f"Failed to read file: {response.status_code}")
  475. return None
  476. except Exception as e:
  477. self.logger.error(f"Failed to read file: {e}")
  478. return None
  479. def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
  480. try:
  481. session = self._get_session()
  482. # 第一步:获取写入位置
  483. response = session.put(
  484. f"{self.base_url}{path}",
  485. params={
  486. 'op': 'CREATE',
  487. 'overwrite': str(overwrite).lower(),
  488. 'user.name': self.user
  489. },
  490. allow_redirects=False,
  491. timeout=self.config.connect_timeout
  492. )
  493. if response.status_code != 307:
  494. self.logger.error(f"Failed to get write location: {response.status_code}")
  495. return False
  496. # 第二步:写入数据到 DataNode
  497. location = response.headers.get('Location')
  498. if not location:
  499. self.logger.error("No Location header in response")
  500. return False
  501. response = session.put(
  502. location,
  503. data=content.encode('utf-8'),
  504. timeout=self.config.write_timeout
  505. )
  506. return response.status_code == 201
  507. except Exception as