hdfs_operations.py 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385
  1. """
  2. HDFS 文件系统操作模块
  3. 提供现代化的 HDFS 操作能力:
  4. - 多种后端支持(命令行、hdfs 库、pyhdfs 库、WebHDFS)
  5. - 同步和异步 API
  6. - 上下文管理器支持
  7. - 配置管理集成
  8. - 重试机制
  9. - 丰富的错误处理
  10. 功能对应 Java 版本 CommonOperation 类:
  11. - 创建目录
  12. - 删除目录/文件
  13. - 上传文件
  14. - 读写文件
  15. - 检查文件是否存在
  16. - 列出目录内容
  17. - 获取文件信息
  18. """
  19. import os
  20. import asyncio
  21. import time
  22. from abc import ABC, abstractmethod
  23. from dataclasses import dataclass, field
  24. from datetime import datetime
  25. from enum import Enum
  26. from pathlib import Path
  27. from typing import (
  28. Any, Callable, Dict, Generic, List, Optional,
  29. Tuple, Type, TypeVar, Union, Iterator, AsyncIterator
  30. )
  31. from contextlib import contextmanager, asynccontextmanager
  32. from .config import ConfigurationManager, HDFSConfig, get_config
  33. from .utils.helpers import (
  34. run_command, validate_hdfs_path, setup_logger, format_file_size
  35. )
  36. T = TypeVar('T')
  37. class BackendType(Enum):
  38. """HDFS 后端类型"""
  39. CLI = "cli" # 命令行工具
  40. HDFS_LIB = "hdfs_lib" # hdfs 库
  41. PYHDFS = "pyhdfs" # pyhdfs 库
  42. WEBHDFS = "webhdfs" # WebHDFS REST API
  43. AUTO = "auto" # 自动选择可用的后端
  44. @dataclass
  45. class FileStatus:
  46. """文件状态信息"""
  47. path: str
  48. is_directory: bool
  49. length: int = 0
  50. replication: int = 1
  51. block_size: int = 134217728 # 128MB 默认块大小
  52. modification_time: Optional[datetime] = None
  53. access_time: Optional[datetime] = None
  54. owner: str = ""
  55. group: str = ""
  56. permission: str = "644"
  57. is_snapshot: bool = False
  58. @property
  59. def size_formatted(self) -> str:
  60. """格式化的文件大小"""
  61. return format_file_size(self.length)
  62. def to_dict(self) -> Dict[str, Any]:
  63. """转换为字典"""
  64. return {
  65. 'path': self.path,
  66. 'is_directory': self.is_directory,
  67. 'length': self.length,
  68. 'size_formatted': self.size_formatted,
  69. 'replication': self.replication,
  70. 'block_size': self.block_size,
  71. 'modification_time': self.modification_time.isoformat() if self.modification_time else None,
  72. 'access_time': self.access_time.isoformat() if self.access_time else None,
  73. 'owner': self.owner,
  74. 'group': self.group,
  75. 'permission': self.permission,
  76. 'is_snapshot': self.is_snapshot,
  77. }
  78. class HDFSBackend(ABC):
  79. """HDFS 后端抽象基类"""
  80. def __init__(self, config: HDFSConfig, logger):
  81. self.config = config
  82. self.logger = logger
  83. @abstractmethod
  84. def is_available(self) -> bool:
  85. """检查后端是否可用"""
  86. pass
  87. @abstractmethod
  88. def make_dir(self, path: str) -> bool:
  89. """创建目录"""
  90. pass
  91. @abstractmethod
  92. def delete(self, path: str, recursive: bool = True) -> bool:
  93. """删除文件或目录"""
  94. pass
  95. @abstractmethod
  96. def copy_from_local(self, src: str, dst: str) -> bool:
  97. """从本地上传文件到 HDFS"""
  98. pass
  99. @abstractmethod
  100. def copy_to_local(self, src: str, dst: str) -> bool:
  101. """从 HDFS 下载文件到本地"""
  102. pass
  103. @abstractmethod
  104. def read_file(self, path: str) -> Optional[str]:
  105. """读取文件内容"""
  106. pass
  107. @abstractmethod
  108. def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
  109. """写入文件内容"""
  110. pass
  111. @abstractmethod
  112. def exists(self, path: str) -> bool:
  113. """检查路径是否存在"""
  114. pass
  115. @abstractmethod
  116. def list_dir(self, path: str) -> List[str]:
  117. """列出目录内容"""
  118. pass
  119. @abstractmethod
  120. def get_file_status(self, path: str) -> Optional[FileStatus]:
  121. """获取文件状态"""
  122. pass
  123. @abstractmethod
  124. def get_file_size(self, path: str) -> Optional[int]:
  125. """获取文件大小"""
  126. pass
  127. @abstractmethod
  128. def rename(self, src: str, dst: str) -> bool:
  129. """重命名文件或目录"""
  130. pass
  131. @abstractmethod
  132. def set_permission(self, path: str, permission: str) -> bool:
  133. """设置文件权限"""
  134. pass
  135. @abstractmethod
  136. def set_owner(self, path: str, owner: Optional[str] = None,
  137. group: Optional[str] = None) -> bool:
  138. """设置文件所有者"""
  139. pass
  140. class CLIBackend(HDFSBackend):
  141. """命令行后端实现"""
  142. def __init__(self, config: HDFSConfig, logger):
  143. super().__init__(config, logger)
  144. self.hadoop_cmd = config.hadoop_cmd or 'hdfs'
  145. self._check_cmd_available()
  146. def _check_cmd_available(self):
  147. """检查命令是否可用"""
  148. if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0:
  149. self.hadoop_cmd = 'hadoop'
  150. if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0:
  151. self.logger.warning("Neither 'hdfs' nor 'hadoop' command found")
  152. def is_available(self) -> bool:
  153. """检查后端是否可用"""
  154. return os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') == 0
  155. def _execute_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]:
  156. """执行 HDFS 命令"""
  157. args = args or []
  158. cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}"
  159. self.logger.debug(f"Executing command: {cmd}")
  160. return run_command(cmd)
  161. def make_dir(self, path: str) -> bool:
  162. if not validate_hdfs_path(path):
  163. self.logger.error(f"Invalid HDFS path: {path}")
  164. return False
  165. self.logger.info(f"Creating directory: {path}")
  166. returncode, stdout, stderr = self._execute_command('dfs', ['-mkdir', '-p', path])
  167. if returncode == 0:
  168. self.logger.info(f"Successfully created directory: {path}")
  169. return True
  170. else:
  171. self.logger.error(f"Failed to create directory: {path}, Error: {stderr}")
  172. return False
  173. def delete(self, path: str, recursive: bool = True) -> bool:
  174. if not validate_hdfs_path(path):
  175. self.logger.error(f"Invalid HDFS path: {path}")
  176. return False
  177. self.logger.info(f"Deleting: {path}")
  178. args = ['-rm', '-r'] if recursive else ['-rm']
  179. args.append(path)
  180. returncode, stdout, stderr = self._execute_command('dfs', args)
  181. if returncode == 0:
  182. self.logger.info(f"Successfully deleted: {path}")
  183. return True
  184. else:
  185. self.logger.error(f"Failed to delete: {path}, Error: {stderr}")
  186. return False
  187. def copy_from_local(self, src: str, dst: str) -> bool:
  188. if not os.path.exists(src):
  189. self.logger.error(f"Local file not found: {src}")
  190. return False
  191. if not validate_hdfs_path(dst):
  192. self.logger.error(f"Invalid HDFS path: {dst}")
  193. return False
  194. self.logger.info(f"Copying from local {src} to HDFS {dst}")
  195. returncode, stdout, stderr = self._execute_command('dfs', ['-copyFromLocal', src, dst])
  196. if returncode == 0:
  197. self.logger.info(f"Successfully copied {src} to {dst}")
  198. return True
  199. else:
  200. self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
  201. return False
  202. def copy_to_local(self, src: str, dst: str) -> bool:
  203. if not validate_hdfs_path(src):
  204. self.logger.error(f"Invalid HDFS path: {src}")
  205. return False
  206. self.logger.info(f"Copying from HDFS {src} to local {dst}")
  207. returncode, stdout, stderr = self._execute_command('dfs', ['-copyToLocal', src, dst])
  208. if returncode == 0:
  209. self.logger.info(f"Successfully copied {src} to {dst}")
  210. return True
  211. else:
  212. self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
  213. return False
  214. def read_file(self, path: str) -> Optional[str]:
  215. if not validate_hdfs_path(path):
  216. self.logger.error(f"Invalid HDFS path: {path}")
  217. return None
  218. if not self.exists(path):
  219. self.logger.error(f"File does not exist: {path}")
  220. return None
  221. self.logger.info(f"Reading file: {path}")
  222. returncode, stdout, stderr = self._execute_command('dfs', ['-cat', path])
  223. if returncode == 0:
  224. self.logger.info(f"Successfully read file: {path}")
  225. return stdout
  226. else:
  227. self.logger.error(f"Failed to read file: {path}, Error: {stderr}")
  228. return None
  229. def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
  230. import tempfile
  231. if not validate_hdfs_path(path):
  232. self.logger.error(f"Invalid HDFS path: {path}")
  233. return False
  234. self.logger.info(f"Writing to file: {path}")
  235. with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
  236. temp_file.write(content)
  237. temp_path = temp_file.name
  238. try:
  239. args = ['-put']
  240. if overwrite:
  241. args.append('-f')
  242. args.extend([temp_path, path])
  243. returncode, stdout, stderr = self._execute_command('dfs', args)
  244. if returncode == 0:
  245. self.logger.info(f"Successfully wrote to file: {path}")
  246. return True
  247. else:
  248. self.logger.error(f"Failed to write to file: {path}, Error: {stderr}")
  249. return False
  250. finally:
  251. if os.path.exists(temp_path):
  252. os.unlink(temp_path)
  253. def exists(self, path: str) -> bool:
  254. if not validate_hdfs_path(path):
  255. return False
  256. returncode, _, _ = self._execute_command('dfs', ['-test', '-e', path])
  257. return returncode == 0
  258. def list_dir(self, path: str) -> List[str]:
  259. if not validate_hdfs_path(path):
  260. self.logger.error(f"Invalid HDFS path: {path}")
  261. return []
  262. if not self.exists(path):
  263. self.logger.error(f"Directory does not exist: {path}")
  264. return []
  265. returncode, stdout, stderr = self._execute_command('dfs', ['-ls', path])
  266. if returncode == 0:
  267. lines = stdout.strip().split('\n')
  268. if len(lines) > 0 and lines[0].startswith('Found'):
  269. lines = lines[1:]
  270. files = []
  271. for line in lines:
  272. parts = line.split()
  273. if len(parts) >= 8:
  274. files.append(parts[-1])
  275. return files
  276. else:
  277. self.logger.error(f"Failed to list directory: {path}, Error: {stderr}")
  278. return []
  279. def get_file_status(self, path: str) -> Optional[FileStatus]:
  280. if not validate_hdfs_path(path):
  281. self.logger.error(f"Invalid HDFS path: {path}")
  282. return None
  283. if not self.exists(path):
  284. self.logger.error(f"Path does not exist: {path}")
  285. return None
  286. returncode, stdout, stderr = self._execute_command('dfs', ['-stat', '%F,%s,%r,%b,%y,%z,%u,%g,%a', path])
  287. if returncode == 0:
  288. parts = stdout.strip().split(',')
  289. if len(parts) >= 9:
  290. is_dir = parts[0] == 'directory'
  291. try:
  292. return FileStatus(
  293. path=path,
  294. is_directory=is_dir,
  295. length=int(parts[1]) if parts[1] else 0,
  296. replication=int(parts[2]) if parts[2] else 1,
  297. block_size=int(parts[3]) if parts[3] else 134217728,
  298. modification_time=datetime.strptime(parts[4], '%Y-%m-%d %H:%M:%S') if parts[4] else None,
  299. access_time=datetime.strptime(parts[5], '%Y-%m-%d %H:%M:%S') if parts[5] else None,
  300. owner=parts[6],
  301. group=parts[7],
  302. permission=parts[8],
  303. )
  304. except (ValueError, IndexError) as e:
  305. self.logger.warning(f"Failed to parse file status: {e}")
  306. # 备用方法:使用 -ls
  307. returncode, stdout, stderr = self._execute_command('dfs', ['-ls', '-d', path])
  308. if returncode == 0:
  309. parts = stdout.strip().split()
  310. if len(parts) >= 8:
  311. is_dir = parts[0].startswith('d')
  312. try:
  313. return FileStatus(
  314. path=path,
  315. is_directory=is_dir,
  316. length=int(parts[4]) if parts[4] else 0,
  317. replication=int(parts[1]) if parts[1] and not is_dir else 1,
  318. owner=parts[2],
  319. group=parts[3],
  320. )
  321. except (ValueError, IndexError):
  322. pass
  323. return None
  324. def get_file_size(self, path: str) -> Optional[int]:
  325. if not validate_hdfs_path(path):
  326. self.logger.error(f"Invalid HDFS path: {path}")
  327. return None
  328. if not self.exists(path):
  329. self.logger.error(f"File does not exist: {path}")
  330. return None
  331. returncode, stdout, stderr = self._execute_command('dfs', ['-du', '-s', path])
  332. if returncode == 0:
  333. parts = stdout.strip().split()
  334. if len(parts) >= 1:
  335. try:
  336. return int(parts[0])
  337. except ValueError:
  338. self.logger.error(f"Failed to parse file size: {stdout}")
  339. return None
  340. def rename(self, src: str, dst: str) -> bool:
  341. if not validate_hdfs_path(src):
  342. self.logger.error(f"Invalid source path: {src}")
  343. return False
  344. if not validate_hdfs_path(dst):
  345. self.logger.error(f"Invalid destination path: {dst}")
  346. return False
  347. self.logger.info(f"Renaming {src} to {dst}")
  348. returncode, stdout, stderr = self._execute_command('dfs', ['-mv', src, dst])
  349. if returncode == 0:
  350. self.logger.info(f"Successfully renamed {src} to {dst}")
  351. return True
  352. else:
  353. self.logger.error(f"Failed to rename {src} to {dst}, Error: {stderr}")
  354. return False
  355. def set_permission(self, path: str, permission: str) -> bool:
  356. if not validate_hdfs_path(path):
  357. self.logger.error(f"Invalid HDFS path: {path}")
  358. return False
  359. self.logger.info(f"Setting permission of {path} to {permission}")
  360. returncode, stdout, stderr = self._execute_command('dfs', ['-chmod', permission, path])
  361. if returncode == 0:
  362. self.logger.info(f"Successfully set permission of {path} to {permission}")
  363. return True
  364. else:
  365. self.logger.error(f"Failed to set permission of {path}, Error: {stderr}")
  366. return False
  367. def set_owner(self, path: str, owner: Optional[str] = None,
  368. group: Optional[str] = None) -> bool:
  369. if not validate_hdfs_path(path):
  370. self.logger.error(f"Invalid HDFS path: {path}")
  371. return False
  372. if owner is None and group is None:
  373. self.logger.error("At least one of owner or group must be specified")
  374. return False
  375. owner_str = owner if owner else ''
  376. group_str = f":{group}" if group else ''
  377. owner_group = f"{owner_str}{group_str}"
  378. self.logger.info(f"Setting owner of {path} to {owner_group}")
  379. returncode, stdout, stderr = self._execute_command('dfs', ['-chown', owner_group, path])
  380. if returncode == 0:
  381. self.logger.info(f"Successfully set owner of {path} to {owner_group}")
  382. return True
  383. else:
  384. self.logger.error(f"Failed to set owner of {path}, Error: {stderr}")
  385. return False
  386. class WebHDFSBackend(HDFSBackend):
  387. """WebHDFS REST API 后端实现"""
  388. def __init__(self, config: HDFSConfig, logger):
  389. super().__init__(config, logger)
  390. self.base_url = f"http://{config.namenode_host}:{config.namenode_http_port}/webhdfs/v1"
  391. self.user = config.user or os.environ.get('USER', 'hadoop')
  392. self._session = None
  393. def _get_session(self):
  394. """获取 HTTP 会话"""
  395. if self._session is None:
  396. try:
  397. import requests
  398. self._session = requests.Session()
  399. except ImportError:
  400. self.logger.error("requests library is required for WebHDFS backend")
  401. raise
  402. return self._session
  403. def is_available(self) -> bool:
  404. """检查后端是否可用"""
  405. try:
  406. import requests
  407. # 尝试连接 Namenode
  408. response = self._get_session().get(
  409. f"{self.base_url}/?op=LISTSTATUS",
  410. params={'user.name': self.user},
  411. timeout=5
  412. )
  413. return response.status_code in [200, 401, 403]
  414. except Exception:
  415. return False
  416. def make_dir(self, path: str) -> bool:
  417. try:
  418. session = self._get_session()
  419. response = session.put(
  420. f"{self.base_url}{path}",
  421. params={'op': 'MKDIRS', 'user.name': self.user},
  422. timeout=self.config.connect_timeout
  423. )
  424. return response.status_code == 200
  425. except Exception as e:
  426. self.logger.error(f"Failed to create directory: {e}")
  427. return False
  428. def delete(self, path: str, recursive: bool = True) -> bool:
  429. try:
  430. session = self._get_session()
  431. response = session.delete(
  432. f"{self.base_url}{path}",
  433. params={
  434. 'op': 'DELETE',
  435. 'recursive': str(recursive).lower(),
  436. 'user.name': self.user
  437. },
  438. timeout=self.config.connect_timeout
  439. )
  440. return response.status_code == 200
  441. except Exception as e:
  442. self.logger.error(f"Failed to delete: {e}")
  443. return False
  444. def copy_from_local(self, src: str, dst: str) -> bool:
  445. try:
  446. with open(src, 'rb') as f:
  447. content = f.read()
  448. return self.write_file(dst, content.decode('utf-8', errors='ignore'))
  449. except Exception as e:
  450. self.logger.error(f"Failed to copy from local: {e}")
  451. return False
  452. def copy_to_local(self, src: str, dst: str) -> bool:
  453. content = self.read_file(src)
  454. if content is None:
  455. return False
  456. try:
  457. with open(dst, 'w', encoding='utf-8') as f:
  458. f.write(content)
  459. return True
  460. except Exception as e:
  461. self.logger.error(f"Failed to copy to local: {e}")
  462. return False
  463. def read_file(self, path: str) -> Optional[str]:
  464. try:
  465. session = self._get_session()
  466. response = session.get(
  467. f"{self.base_url}{path}",
  468. params={'op': 'OPEN', 'user.name': self.user},
  469. timeout=self.config.read_timeout
  470. )
  471. if response.status_code == 200:
  472. return response.text
  473. else:
  474. self.logger.error(f"Failed to read file: {response.status_code}")
  475. return None
  476. except Exception as e:
  477. self.logger.error(f"Failed to read file: {e}")
  478. return None
  479. def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
  480. try:
  481. session = self._get_session()
  482. # 第一步:获取写入位置
  483. response = session.put(
  484. f"{self.base_url}{path}",
  485. params={
  486. 'op': 'CREATE',
  487. 'overwrite': str(overwrite).lower(),
  488. 'user.name': self.user
  489. },
  490. allow_redirects=False,
  491. timeout=self.config.connect_timeout
  492. )
  493. if response.status_code != 307:
  494. self.logger.error(f"Failed to get write location: {response.status_code}")
  495. return False
  496. # 第二步:写入数据到 DataNode
  497. location = response.headers.get('Location')
  498. if not location:
  499. self.logger.error("No Location header in response")
  500. return False
  501. response = session.put(
  502. location,
  503. data=content.encode('utf-8'),
  504. timeout=self.config.write_timeout
  505. )
  506. return response.status_code == 201
  507. except Exception as e:
  508. self.logger.error(f"Failed to write file: {e}")
  509. return False
  510. def exists(self, path: str) -> bool:
  511. status = self.get_file_status(path)
  512. return status is not None
  513. def list_dir(self, path: str) -> List[str]:
  514. try:
  515. session = self._get_session()
  516. response = session.get(
  517. f"{self.base_url}{path}",
  518. params={'op': 'LISTSTATUS', 'user.name': self.user},
  519. timeout=self.config.connect_timeout
  520. )
  521. if response.status_code == 200:
  522. data = response.json()
  523. files = []
  524. for status in data.get('FileStatuses', {}).get('FileStatus', []):
  525. files.append(f"{path.rstrip('/')}/{status['pathSuffix']}")
  526. return files
  527. else:
  528. self.logger.error(f"Failed to list directory: {response.status_code}")
  529. return []
  530. except Exception as e:
  531. self.logger.error(f"Failed to list directory: {e}")
  532. return []
  533. def get_file_status(self, path: str) -> Optional[FileStatus]:
  534. try:
  535. session = self._get_session()
  536. response = session.get(
  537. f"{self.base_url}{path}",
  538. params={'op': 'GETFILESTATUS', 'user.name': self.user},
  539. timeout=self.config.connect_timeout
  540. )
  541. if response.status_code == 200:
  542. data = response.json()
  543. status = data.get('FileStatus', {})
  544. return FileStatus(
  545. path=path,
  546. is_directory=status.get('type') == 'DIRECTORY',
  547. length=status.get('length', 0),
  548. replication=status.get('replication', 1),
  549. block_size=status.get('blockSize', 134217728),
  550. modification_time=datetime.fromtimestamp(status.get('modificationTime', 0) / 1000) if status.get('modificationTime') else None,
  551. access_time=datetime.fromtimestamp(status.get('accessTime', 0) / 1000) if status.get('accessTime') else None,
  552. owner=status.get('owner', ''),
  553. group=status.get('group', ''),
  554. permission=status.get('permission', '644'),
  555. )
  556. else:
  557. return None
  558. except Exception as e:
  559. self.logger.error(f"Failed to get file status: {e}")
  560. return None
  561. def get_file_size(self, path: str) -> Optional[int]:
  562. status = self.get_file_status(path)
  563. return status.length if status else None
  564. def rename(self, src: str, dst: str) -> bool:
  565. try:
  566. session = self._get_session()
  567. response = session.put(
  568. f"{self.base_url}{src}",
  569. params={
  570. 'op': 'RENAME',
  571. 'destination': dst,
  572. 'user.name': self.user
  573. },
  574. timeout=self.config.connect_timeout
  575. )
  576. return response.status_code == 200
  577. except Exception as e:
  578. self.logger.error(f"Failed to rename: {e}")
  579. return False
  580. def set_permission(self, path: str, permission: str) -> bool:
  581. try:
  582. session = self._get_session()
  583. response = session.put(
  584. f"{self.base_url}{path}",
  585. params={
  586. 'op': 'SETPERMISSION',
  587. 'permission': permission,
  588. 'user.name': self.user
  589. },
  590. timeout=self.config.connect_timeout
  591. )
  592. return response.status_code == 200
  593. except Exception as e:
  594. self.logger.error(f"Failed to set permission: {e}")
  595. return False
  596. def set_owner(self, path: str, owner: Optional[str] = None,
  597. group: Optional[str] = None) -> bool:
  598. try:
  599. session = self._get_session()
  600. params = {'op': 'SETOWNER', 'user.name': self.user}
  601. if owner:
  602. params['owner'] = owner
  603. if group:
  604. params['group'] = group
  605. response = session.put(
  606. f"{self.base_url}{path}",
  607. params=params,
  608. timeout=self.config.connect_timeout
  609. )
  610. return response.status_code == 200
  611. except Exception as e:
  612. self.logger.error(f"Failed to set owner: {e}")
  613. return False
  614. class BackendFactory:
  615. """后端工厂类"""
  616. _backends: Dict[BackendType, Type[HDFSBackend]] = {
  617. BackendType.CLI: CLIBackend,
  618. BackendType.WEBHDFS: WebHDFSBackend,
  619. }
  620. @classmethod
  621. def create(cls, backend_type: BackendType, config: HDFSConfig, logger) -> Optional[HDFSBackend]:
  622. """创建后端实例"""
  623. if backend_type not in cls._backends:
  624. logger.error(f"Unsupported backend type: {backend_type}")
  625. return None
  626. try:
  627. backend = cls._backends[backend_type](config, logger)
  628. if backend.is_available():
  629. return backend
  630. else:
  631. logger.warning(f"Backend {backend_type.value} is not available")
  632. return None
  633. except Exception as e:
  634. logger.error(f"Failed to create backend {backend_type.value}: {e}")
  635. return None
  636. @classmethod
  637. def create_auto(cls, config: HDFSConfig, logger) -> Optional[HDFSBackend]:
  638. """自动选择可用的后端"""
  639. priority_order = [
  640. BackendType.CLI,
  641. BackendType.WEBHDFS,
  642. ]
  643. for backend_type in priority_order:
  644. backend = cls.create(backend_type, config, logger)
  645. if backend:
  646. logger.info(f"Selected backend: {backend_type.value}")
  647. return backend
  648. logger.error("No available backend found")
  649. return None
  650. class HDFSOperations:
  651. """
  652. 现代化 HDFS 文件系统操作类
  653. 特性:
  654. - 多种后端支持(CLI、WebHDFS)
  655. - 同步和异步 API
  656. - 上下文管理器支持
  657. - 配置管理集成
  658. - 重试机制
  659. - 丰富的错误处理
  660. 功能与 Java 版本的 CommonOperation 类相对应。
  661. """
  662. def __init__(self,
  663. config: Optional[HDFSConfig] = None,
  664. config_manager: Optional[ConfigurationManager] = None,
  665. preferred_backend: Optional[BackendType] = None,
  666. logger_name: str = 'hdfs_operations'):
  667. """
  668. 初始化 HDFSOperations 实例
  669. Args:
  670. config: HDFS 配置(可选)
  671. config_manager: 配置管理器(可选)
  672. preferred_backend: 首选后端类型(可选)
  673. logger_name: 日志器名称
  674. """
  675. self.logger = setup_logger(logger_name)
  676. # 获取配置
  677. if config_manager is None:
  678. config_manager = get_config()
  679. if config is None:
  680. config = config_manager.hdfs
  681. self.config = config
  682. self._backend: Optional[HDFSBackend] = None
  683. # 选择后端
  684. backend_type = preferred_backend or BackendType.AUTO
  685. if backend_type == BackendType.AUTO:
  686. self._backend = BackendFactory.create_auto(config, self.logger)
  687. else:
  688. self._backend = BackendFactory.create(backend_type, config, self.logger)
  689. if self._backend is None:
  690. self.logger.warning("No HDFS backend available. Some operations may fail.")
  691. @property
  692. def backend(self) -> HDFSBackend:
  693. """获取后端实例"""
  694. if self._backend is None:
  695. raise RuntimeError("No HDFS backend available")
  696. return self._backend
  697. @property
  698. def is_available(self) -> bool:
  699. """检查后端是否可用"""
  700. return self._backend is not None and self._backend.is_available()
  701. @contextmanager
  702. def transaction(self):
  703. """
  704. 上下文管理器:事务支持
  705. 注意:HDFS 不支持真正的事务,这里只是提供一个上下文接口。
  706. """
  707. try:
  708. self.logger.info("Starting HDFS operation context")
  709. yield self
  710. except Exception as e:
  711. self.logger.error(f"HDFS operation failed: {e}")
  712. raise
  713. finally:
  714. self.logger.info("HDFS operation context completed")
  715. def _retry_operation(self, operation: Callable[[], T],
  716. max_retries: Optional[int] = None,
  717. retry_delay: Optional[float] = None) -> T:
  718. """
  719. 带重试机制的操作执行
  720. Args:
  721. operation: 要执行的操作
  722. max_retries: 最大重试次数(可选,默认使用配置)
  723. retry_delay: 重试延迟(可选,默认使用配置)
  724. Returns:
  725. 操作结果
  726. """
  727. max_retries = max_retries or self.config.max_retries
  728. retry_delay = retry_delay or self.config.retry_delay
  729. last_exception = None
  730. for attempt in range(max_retries + 1):
  731. try:
  732. return operation()
  733. except Exception as e:
  734. last_exception = e
  735. if attempt < max_retries:
  736. self.logger.warning(
  737. f"Operation failed (attempt {attempt + 1}/{max_retries + 1}), "
  738. f"retrying in {retry_delay}s: {e}"
  739. )
  740. time.sleep(retry_delay)
  741. retry_delay *= 2 # 指数退避
  742. raise last_exception
  743. # 同步 API
  744. def make_dir(self, path: str, retry: bool = True) -> bool:
  745. """
  746. 创建目录
  747. Args:
  748. path: 要创建的目录路径
  749. retry: 是否启用重试
  750. Returns:
  751. 是否创建成功
  752. """
  753. if not validate_hdfs_path(path):
  754. self.logger.error(f"Invalid HDFS path: {path}")
  755. return False
  756. def _operation():
  757. return self.backend.make_dir(path)
  758. if retry:
  759. return self._retry_operation(_operation)
  760. return _operation()
  761. def delete(self, path: str, recursive: bool = True, retry: bool = True) -> bool:
  762. """
  763. 删除文件或目录
  764. Args:
  765. path: 要删除的路径
  766. recursive: 是否递归删除
  767. retry: 是否启用重试
  768. Returns:
  769. 是否删除成功
  770. """
  771. if not validate_hdfs_path(path):
  772. self.logger.error(f"Invalid HDFS path: {path}")
  773. return False
  774. def _operation():
  775. return self.backend.delete(path, recursive)
  776. if retry:
  777. return self._retry_operation(_operation)
  778. return _operation()
  779. def copy_from_local(self, src: str, dst: str, retry: bool = True) -> bool:
  780. """
  781. 从本地文件系统上传文件到 HDFS
  782. Args:
  783. src: 本地文件路径
  784. dst: HDFS 目标路径
  785. retry: 是否启用重试
  786. Returns:
  787. 是否上传成功
  788. """
  789. if not os.path.exists(src):
  790. self.logger.error(f"Local file not found: {src}")
  791. return False
  792. if not validate_hdfs_path(dst):
  793. self.logger.error(f"Invalid HDFS path: {dst}")
  794. return False
  795. def _operation():
  796. return self.backend.copy_from_local(src, dst)
  797. if retry:
  798. return self._retry_operation(_operation)
  799. return _operation()
  800. def copy_to_local(self, src: str, dst: str, retry: bool = True) -> bool:
  801. """
  802. 从 HDFS 下载文件到本地文件系统
  803. Args:
  804. src: HDFS 源路径
  805. dst: 本地目标路径
  806. retry: 是否启用重试
  807. Returns:
  808. 是否下载成功
  809. """
  810. if not validate_hdfs_path(src):
  811. self.logger.error(f"Invalid HDFS path: {src}")
  812. return False
  813. def _operation():
  814. return self.backend.copy_to_local(src, dst)
  815. if retry:
  816. return self._retry_operation(_operation)
  817. return _operation()
  818. def read_file(self, path: str, retry: bool = True) -> Optional[str]:
  819. """
  820. 读取 HDFS 文件内容
  821. Args:
  822. path: HDFS 文件路径
  823. retry: 是否启用重试
  824. Returns:
  825. 文件内容(字符串),如果失败返回 None
  826. """
  827. if not validate_hdfs_path(path):
  828. self.logger.error(f"Invalid HDFS path: {path}")
  829. return None
  830. if not self.exists(path):
  831. self.logger.error(f"File does not exist: {path}")
  832. return None
  833. def _operation():
  834. return self.backend.read_file(path)
  835. if retry:
  836. return self._retry_operation(_operation)
  837. return _operation()
  838. def write_file(self, path: str, content: str,
  839. overwrite: bool = True, retry: bool = True) -> bool:
  840. """
  841. 写入内容到 HDFS 文件
  842. Args:
  843. path: HDFS 文件路径
  844. content: 要写入的内容
  845. overwrite: 是否覆盖已存在的文件
  846. retry: 是否启用重试
  847. Returns:
  848. 是否写入成功
  849. """
  850. if not validate_hdfs_path(path):
  851. self.logger.error(f"Invalid HDFS path: {path}")
  852. return False
  853. def _operation():
  854. return self.backend.write_file(path, content, overwrite)
  855. if retry:
  856. return self._retry_operation(_operation)
  857. return _operation()
  858. def exists(self, path: str) -> bool:
  859. """
  860. 检查 HDFS 路径是否存在
  861. Args:
  862. path: HDFS 路径
  863. Returns:
  864. 路径是否存在
  865. """
  866. if not validate_hdfs_path(path):
  867. return False
  868. return self.backend.exists(path)
  869. def list_dir(self, path: str, retry: bool = True) -> List[str]:
  870. """
  871. 列出 HDFS 目录内容
  872. Args:
  873. path: HDFS 目录路径
  874. retry: 是否启用重试
  875. Returns:
  876. 目录内容列表
  877. """
  878. if not validate_hdfs_path(path):
  879. self.logger.error(f"Invalid HDFS path: {path}")
  880. return []
  881. if not self.exists(path):
  882. self.logger.error(f"Directory does not exist: {path}")
  883. return []
  884. def _operation():
  885. return self.backend.list_dir(path)
  886. if retry:
  887. return self._retry_operation(_operation)
  888. return _operation()
  889. def get_file_status(self, path: str, retry: bool = True) -> Optional[FileStatus]:
  890. """
  891. 获取 HDFS 文件状态
  892. Args:
  893. path: HDFS 文件路径
  894. retry: 是否启用重试
  895. Returns:
  896. 文件状态对象,如果失败返回 None
  897. """
  898. if not validate_hdfs_path(path):
  899. self.logger.error(f"Invalid HDFS path: {path}")
  900. return None
  901. def _operation():
  902. return self.backend.get_file_status(path)
  903. if retry:
  904. return self._retry_operation(_operation)
  905. return _operation()
  906. def get_file_size(self, path: str, retry: bool = True) -> Optional[int]:
  907. """
  908. 获取 HDFS 文件大小
  909. Args:
  910. path: HDFS 文件路径
  911. retry: 是否启用重试
  912. Returns:
  913. 文件大小(字节),如果失败返回 None
  914. """
  915. status = self.get_file_status(path, retry)
  916. return status.length if status else None
  917. def rename(self, src: str, dst: str, retry: bool = True) -> bool:
  918. """
  919. 重命名 HDFS 文件或目录
  920. Args:
  921. src: 源路径
  922. dst: 目标路径
  923. retry: 是否启用重试
  924. Returns:
  925. 是否重命名成功
  926. """
  927. if not validate_hdfs_path(src):
  928. self.logger.error(f"Invalid source path: {src}")
  929. return False
  930. if not validate_hdfs_path(dst):
  931. self.logger.error(f"Invalid destination path: {dst}")
  932. return False
  933. def _operation():
  934. return self.backend.rename(src, dst)
  935. if retry:
  936. return self._retry_operation(_operation)
  937. return _operation()
  938. def set_permission(self, path: str, permission: str, retry: bool = True) -> bool:
  939. """
  940. 设置 HDFS 文件权限
  941. Args:
  942. path: HDFS 路径
  943. permission: 权限字符串(如 '755')
  944. retry: 是否启用重试
  945. Returns:
  946. 是否设置成功
  947. """
  948. if not validate_hdfs_path(path):
  949. self.logger.error(f"Invalid HDFS path: {path}")
  950. return False
  951. def _operation():
  952. return self.backend.set_permission(path, permission)
  953. if retry:
  954. return self._retry_operation(_operation)
  955. return _operation()
  956. def set_owner(self, path: str, owner: Optional[str] = None,
  957. group: Optional[str] = None, retry: bool = True) -> bool:
  958. """
  959. 设置 HDFS 文件所有者
  960. Args:
  961. path: HDFS 路径
  962. owner: 所有者(可选)
  963. group: 组(可选)
  964. retry: 是否启用重试
  965. Returns:
  966. 是否设置成功
  967. """
  968. if not validate_hdfs_path(path):
  969. self.logger.error(f"Invalid HDFS path: {path}")
  970. return False
  971. if owner is None and group is None:
  972. self.logger.error("At least one of owner or group must be specified")
  973. return False
  974. def _operation():
  975. return self.backend.set_owner(path, owner, group)
  976. if retry:
  977. return self._retry_operation(_operation)
  978. return _operation()
  979. # 异步 API(基于同步 API 的简单封装)
  980. async def make_dir_async(self, path: str, retry: bool = True) -> bool:
  981. """异步创建目录"""
  982. loop = asyncio.get_event_loop()
  983. return await loop.run_in_executor(None, lambda: self.make_dir(path, retry))
  984. async def delete_async(self, path: str, recursive: bool = True, retry: bool = True) -> bool:
  985. """异步删除文件或目录"""
  986. loop = asyncio.get_event_loop()
  987. return await loop.run_in_executor(None, lambda: self.delete(path, recursive, retry))
  988. async def copy_from_local_async(self, src: str, dst: str, retry: bool = True) -> bool:
  989. """异步上传文件"""
  990. loop = asyncio.get_event_loop()
  991. return await loop.run_in_executor(None, lambda: self.copy_from_local(src, dst, retry))
  992. async def copy_to_local_async(self, src: str, dst: str, retry: bool = True) -> bool:
  993. """异步下载文件"""
  994. loop = asyncio.get_event_loop()
  995. return await loop.run_in_executor(None, lambda: self.copy_to_local(src, dst, retry))
  996. async def read_file_async(self, path: str, retry: bool = True) -> Optional[str]:
  997. """异步读取文件"""
  998. loop = asyncio.get_event_loop()
  999. return await loop.run_in_executor(None, lambda: self.read_file(path, retry))
  1000. async def write_file_async(self, path: str, content: str,
  1001. overwrite: bool = True, retry: bool = True) -> bool:
  1002. """异步写入文件"""
  1003. loop = asyncio.get_event_loop()
  1004. return await loop.run_in_executor(None, lambda: self.write_file(path, content, overwrite, retry))
  1005. async def exists_async(self, path: str) -> bool:
  1006. """异步检查路径是否存在"""
  1007. loop = asyncio.get_event_loop()
  1008. return await loop.run_in_executor(None, lambda: self.exists(path))
  1009. async def list_dir_async(self, path: str, retry: bool = True) -> List[str]:
  1010. """异步列出目录内容"""
  1011. loop = asyncio.get_event_loop()
  1012. return await loop.run_in_executor(None, lambda: self.list_dir(path, retry))
  1013. async def get_file_status_async(self, path: str, retry: bool = True) -> Optional[FileStatus]:
  1014. """异步获取文件状态"""
  1015. loop = asyncio.get_event_loop()
  1016. return await loop.run_in_executor(None, lambda: self.get_file_status(path, retry))
  1017. # 便捷方法
  1018. def upload_directory(self, local_dir: str, hdfs_dir: str,
  1019. recursive: bool = True, retry: bool = True) -> bool:
  1020. """
  1021. 上传整个目录到 HDFS
  1022. Args:
  1023. local_dir: 本地目录路径
  1024. hdfs_dir: HDFS 目标目录
  1025. recursive: 是否递归上传子目录
  1026. retry: 是否启用重试
  1027. Returns:
  1028. 是否上传成功
  1029. """
  1030. if not os.path.isdir(local_dir):
  1031. self.logger.error(f"Local directory not found: {local_dir}")
  1032. return False
  1033. # 确保目标目录存在
  1034. if not self.make_dir(hdfs_dir, retry):
  1035. return False
  1036. try:
  1037. for item in os.listdir(local_dir):
  1038. local_path = os.path.join(local_dir, item)
  1039. hdfs_path = f"{hdfs_dir.rstrip('/')}/{item}"
  1040. if os.path.isfile(local_path):
  1041. if not self.copy_from_local(local_path, hdfs_path, retry):
  1042. self.logger.error(f"Failed to upload file: {local_path}")
  1043. return False
  1044. elif os.path.isdir(local_path) and recursive:
  1045. if not self.upload_directory(local_path, hdfs_path, recursive, retry):
  1046. return False
  1047. return True
  1048. except Exception as e:
  1049. self.logger.error(f"Failed to upload directory: {e}")
  1050. return False
  1051. def download_directory(self, hdfs_dir: str, local_dir: str,
  1052. recursive: bool = True, retry: bool = True) -> bool:
  1053. """
  1054. 从 HDFS 下载整个目录
  1055. Args:
  1056. hdfs_dir: HDFS 源目录
  1057. local_dir: 本地目标目录
  1058. recursive: 是否递归下载子目录
  1059. retry: 是否启用重试
  1060. Returns:
  1061. 是否下载成功
  1062. """
  1063. if not self.exists(hdfs_dir):
  1064. self.logger.error(f"HDFS directory not found: {hdfs_dir}")
  1065. return False
  1066. # 确保本地目录存在
  1067. os.makedirs(local_dir, exist_ok=True)
  1068. try:
  1069. items = self.list_dir(hdfs_dir, retry)
  1070. for hdfs_path in items:
  1071. item_name = os.path.basename(hdfs_path)
  1072. local_path = os.path.join(local_dir, item_name)
  1073. status = self.get_file_status(hdfs_path, retry)
  1074. if status is None:
  1075. continue
  1076. if status.is_directory:
  1077. if recursive:
  1078. if not self.download_directory(hdfs_path, local_path, recursive, retry):
  1079. return False
  1080. else:
  1081. if not self.copy_to_local(hdfs_path, local_path, retry):
  1082. self.logger.error(f"Failed to download file: {hdfs_path}")
  1083. return False
  1084. return True
  1085. except Exception as e:
  1086. self.logger.error(f"Failed to download directory: {e}")
  1087. return False
  1088. def walk(self, path: str) -> Iterator[Tuple[str, List[str], List[str]]]:
  1089. """
  1090. 遍历 HDFS 目录树
  1091. 类似于 Python 的 os.walk()
  1092. Args:
  1093. path: 起始目录路径
  1094. Yields:
  1095. (dirpath, dirnames, filenames) 元组
  1096. """
  1097. if not self.exists(path):
  1098. return
  1099. status = self.get_file_status(path)
  1100. if not status or not status.is_directory:
  1101. return
  1102. # 获取目录内容
  1103. items = self.list_dir(path)
  1104. dirnames = []
  1105. filenames = []
  1106. for item in items:
  1107. item_status = self.get_file_status(item)
  1108. if item_status:
  1109. if item_status.is_directory:
  1110. dirnames.append(os.path.basename(item))
  1111. else:
  1112. filenames.append(os.path.basename(item))
  1113. yield path, dirnames, filenames
  1114. # 递归遍历子目录
  1115. for dirname in dirnames:
  1116. subdir = f"{path.rstrip('/')}/{dirname}"
  1117. yield from self.walk(subdir)
  1118. # 便捷函数
  1119. def create_hdfs_client(
  1120. config: Optional[HDFSConfig] = None,
  1121. preferred_backend: Optional[BackendType] = None
  1122. ) -> HDFSOperations:
  1123. """
  1124. 创建 HDFS 客户端实例
  1125. Args:
  1126. config: HDFS 配置(可选)
  1127. preferred_backend: 首选后端类型(可选)
  1128. Returns:
  1129. HDFSOperations 实例
  1130. """
  1131. return HDFSOperations(config=config, preferred_backend=preferred_backend)