| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385 |
- """
- HDFS 文件系统操作模块
- 提供现代化的 HDFS 操作能力:
- - 多种后端支持(命令行、hdfs 库、pyhdfs 库、WebHDFS)
- - 同步和异步 API
- - 上下文管理器支持
- - 配置管理集成
- - 重试机制
- - 丰富的错误处理
- 功能对应 Java 版本 CommonOperation 类:
- - 创建目录
- - 删除目录/文件
- - 上传文件
- - 读写文件
- - 检查文件是否存在
- - 列出目录内容
- - 获取文件信息
- """
- import os
- import asyncio
- import time
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from datetime import datetime
- from enum import Enum
- from pathlib import Path
- from typing import (
- Any, Callable, Dict, Generic, List, Optional,
- Tuple, Type, TypeVar, Union, Iterator, AsyncIterator
- )
- from contextlib import contextmanager, asynccontextmanager
- from .config import ConfigurationManager, HDFSConfig, get_config
- from .utils.helpers import (
- run_command, validate_hdfs_path, setup_logger, format_file_size
- )
- T = TypeVar('T')
- class BackendType(Enum):
- """HDFS 后端类型"""
- CLI = "cli" # 命令行工具
- HDFS_LIB = "hdfs_lib" # hdfs 库
- PYHDFS = "pyhdfs" # pyhdfs 库
- WEBHDFS = "webhdfs" # WebHDFS REST API
- AUTO = "auto" # 自动选择可用的后端
- @dataclass
- class FileStatus:
- """文件状态信息"""
- path: str
- is_directory: bool
- length: int = 0
- replication: int = 1
- block_size: int = 134217728 # 128MB 默认块大小
- modification_time: Optional[datetime] = None
- access_time: Optional[datetime] = None
- owner: str = ""
- group: str = ""
- permission: str = "644"
- is_snapshot: bool = False
-
- @property
- def size_formatted(self) -> str:
- """格式化的文件大小"""
- return format_file_size(self.length)
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典"""
- return {
- 'path': self.path,
- 'is_directory': self.is_directory,
- 'length': self.length,
- 'size_formatted': self.size_formatted,
- 'replication': self.replication,
- 'block_size': self.block_size,
- 'modification_time': self.modification_time.isoformat() if self.modification_time else None,
- 'access_time': self.access_time.isoformat() if self.access_time else None,
- 'owner': self.owner,
- 'group': self.group,
- 'permission': self.permission,
- 'is_snapshot': self.is_snapshot,
- }
- class HDFSBackend(ABC):
- """HDFS 后端抽象基类"""
-
- def __init__(self, config: HDFSConfig, logger):
- self.config = config
- self.logger = logger
-
- @abstractmethod
- def is_available(self) -> bool:
- """检查后端是否可用"""
- pass
-
- @abstractmethod
- def make_dir(self, path: str) -> bool:
- """创建目录"""
- pass
-
- @abstractmethod
- def delete(self, path: str, recursive: bool = True) -> bool:
- """删除文件或目录"""
- pass
-
- @abstractmethod
- def copy_from_local(self, src: str, dst: str) -> bool:
- """从本地上传文件到 HDFS"""
- pass
-
- @abstractmethod
- def copy_to_local(self, src: str, dst: str) -> bool:
- """从 HDFS 下载文件到本地"""
- pass
-
- @abstractmethod
- def read_file(self, path: str) -> Optional[str]:
- """读取文件内容"""
- pass
-
- @abstractmethod
- def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
- """写入文件内容"""
- pass
-
- @abstractmethod
- def exists(self, path: str) -> bool:
- """检查路径是否存在"""
- pass
-
- @abstractmethod
- def list_dir(self, path: str) -> List[str]:
- """列出目录内容"""
- pass
-
- @abstractmethod
- def get_file_status(self, path: str) -> Optional[FileStatus]:
- """获取文件状态"""
- pass
-
- @abstractmethod
- def get_file_size(self, path: str) -> Optional[int]:
- """获取文件大小"""
- pass
-
- @abstractmethod
- def rename(self, src: str, dst: str) -> bool:
- """重命名文件或目录"""
- pass
-
- @abstractmethod
- def set_permission(self, path: str, permission: str) -> bool:
- """设置文件权限"""
- pass
-
- @abstractmethod
- def set_owner(self, path: str, owner: Optional[str] = None,
- group: Optional[str] = None) -> bool:
- """设置文件所有者"""
- pass
- class CLIBackend(HDFSBackend):
- """命令行后端实现"""
-
- def __init__(self, config: HDFSConfig, logger):
- super().__init__(config, logger)
- self.hadoop_cmd = config.hadoop_cmd or 'hdfs'
- self._check_cmd_available()
-
- def _check_cmd_available(self):
- """检查命令是否可用"""
- if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0:
- self.hadoop_cmd = 'hadoop'
- if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0:
- self.logger.warning("Neither 'hdfs' nor 'hadoop' command found")
-
- def is_available(self) -> bool:
- """检查后端是否可用"""
- return os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') == 0
-
- def _execute_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]:
- """执行 HDFS 命令"""
- args = args or []
- cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}"
- self.logger.debug(f"Executing command: {cmd}")
- return run_command(cmd)
-
- def make_dir(self, path: str) -> bool:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Creating directory: {path}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-mkdir', '-p', path])
-
- if returncode == 0:
- self.logger.info(f"Successfully created directory: {path}")
- return True
- else:
- self.logger.error(f"Failed to create directory: {path}, Error: {stderr}")
- return False
-
- def delete(self, path: str, recursive: bool = True) -> bool:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Deleting: {path}")
- args = ['-rm', '-r'] if recursive else ['-rm']
- args.append(path)
-
- returncode, stdout, stderr = self._execute_command('dfs', args)
-
- if returncode == 0:
- self.logger.info(f"Successfully deleted: {path}")
- return True
- else:
- self.logger.error(f"Failed to delete: {path}, Error: {stderr}")
- return False
-
- def copy_from_local(self, src: str, dst: str) -> bool:
- if not os.path.exists(src):
- self.logger.error(f"Local file not found: {src}")
- return False
-
- if not validate_hdfs_path(dst):
- self.logger.error(f"Invalid HDFS path: {dst}")
- return False
-
- self.logger.info(f"Copying from local {src} to HDFS {dst}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-copyFromLocal', src, dst])
-
- if returncode == 0:
- self.logger.info(f"Successfully copied {src} to {dst}")
- return True
- else:
- self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
- return False
-
- def copy_to_local(self, src: str, dst: str) -> bool:
- if not validate_hdfs_path(src):
- self.logger.error(f"Invalid HDFS path: {src}")
- return False
-
- self.logger.info(f"Copying from HDFS {src} to local {dst}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-copyToLocal', src, dst])
-
- if returncode == 0:
- self.logger.info(f"Successfully copied {src} to {dst}")
- return True
- else:
- self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
- return False
-
- def read_file(self, path: str) -> Optional[str]:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return None
-
- if not self.exists(path):
- self.logger.error(f"File does not exist: {path}")
- return None
-
- self.logger.info(f"Reading file: {path}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-cat', path])
-
- if returncode == 0:
- self.logger.info(f"Successfully read file: {path}")
- return stdout
- else:
- self.logger.error(f"Failed to read file: {path}, Error: {stderr}")
- return None
-
- def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
- import tempfile
-
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Writing to file: {path}")
-
- with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
- temp_file.write(content)
- temp_path = temp_file.name
-
- try:
- args = ['-put']
- if overwrite:
- args.append('-f')
- args.extend([temp_path, path])
-
- returncode, stdout, stderr = self._execute_command('dfs', args)
-
- if returncode == 0:
- self.logger.info(f"Successfully wrote to file: {path}")
- return True
- else:
- self.logger.error(f"Failed to write to file: {path}, Error: {stderr}")
- return False
- finally:
- if os.path.exists(temp_path):
- os.unlink(temp_path)
-
- def exists(self, path: str) -> bool:
- if not validate_hdfs_path(path):
- return False
-
- returncode, _, _ = self._execute_command('dfs', ['-test', '-e', path])
- return returncode == 0
-
- def list_dir(self, path: str) -> List[str]:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return []
-
- if not self.exists(path):
- self.logger.error(f"Directory does not exist: {path}")
- return []
-
- returncode, stdout, stderr = self._execute_command('dfs', ['-ls', path])
-
- if returncode == 0:
- lines = stdout.strip().split('\n')
- if len(lines) > 0 and lines[0].startswith('Found'):
- lines = lines[1:]
-
- files = []
- for line in lines:
- parts = line.split()
- if len(parts) >= 8:
- files.append(parts[-1])
- return files
- else:
- self.logger.error(f"Failed to list directory: {path}, Error: {stderr}")
- return []
-
- def get_file_status(self, path: str) -> Optional[FileStatus]:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return None
-
- if not self.exists(path):
- self.logger.error(f"Path does not exist: {path}")
- return None
-
- returncode, stdout, stderr = self._execute_command('dfs', ['-stat', '%F,%s,%r,%b,%y,%z,%u,%g,%a', path])
-
- if returncode == 0:
- parts = stdout.strip().split(',')
- if len(parts) >= 9:
- is_dir = parts[0] == 'directory'
- try:
- return FileStatus(
- path=path,
- is_directory=is_dir,
- length=int(parts[1]) if parts[1] else 0,
- replication=int(parts[2]) if parts[2] else 1,
- block_size=int(parts[3]) if parts[3] else 134217728,
- modification_time=datetime.strptime(parts[4], '%Y-%m-%d %H:%M:%S') if parts[4] else None,
- access_time=datetime.strptime(parts[5], '%Y-%m-%d %H:%M:%S') if parts[5] else None,
- owner=parts[6],
- group=parts[7],
- permission=parts[8],
- )
- except (ValueError, IndexError) as e:
- self.logger.warning(f"Failed to parse file status: {e}")
-
- # 备用方法:使用 -ls
- returncode, stdout, stderr = self._execute_command('dfs', ['-ls', '-d', path])
- if returncode == 0:
- parts = stdout.strip().split()
- if len(parts) >= 8:
- is_dir = parts[0].startswith('d')
- try:
- return FileStatus(
- path=path,
- is_directory=is_dir,
- length=int(parts[4]) if parts[4] else 0,
- replication=int(parts[1]) if parts[1] and not is_dir else 1,
- owner=parts[2],
- group=parts[3],
- )
- except (ValueError, IndexError):
- pass
-
- return None
-
- def get_file_size(self, path: str) -> Optional[int]:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return None
-
- if not self.exists(path):
- self.logger.error(f"File does not exist: {path}")
- return None
-
- returncode, stdout, stderr = self._execute_command('dfs', ['-du', '-s', path])
-
- if returncode == 0:
- parts = stdout.strip().split()
- if len(parts) >= 1:
- try:
- return int(parts[0])
- except ValueError:
- self.logger.error(f"Failed to parse file size: {stdout}")
-
- return None
-
- def rename(self, src: str, dst: str) -> bool:
- if not validate_hdfs_path(src):
- self.logger.error(f"Invalid source path: {src}")
- return False
-
- if not validate_hdfs_path(dst):
- self.logger.error(f"Invalid destination path: {dst}")
- return False
-
- self.logger.info(f"Renaming {src} to {dst}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-mv', src, dst])
-
- if returncode == 0:
- self.logger.info(f"Successfully renamed {src} to {dst}")
- return True
- else:
- self.logger.error(f"Failed to rename {src} to {dst}, Error: {stderr}")
- return False
-
- def set_permission(self, path: str, permission: str) -> bool:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- self.logger.info(f"Setting permission of {path} to {permission}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-chmod', permission, path])
-
- if returncode == 0:
- self.logger.info(f"Successfully set permission of {path} to {permission}")
- return True
- else:
- self.logger.error(f"Failed to set permission of {path}, Error: {stderr}")
- return False
-
- def set_owner(self, path: str, owner: Optional[str] = None,
- group: Optional[str] = None) -> bool:
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- if owner is None and group is None:
- self.logger.error("At least one of owner or group must be specified")
- return False
-
- owner_str = owner if owner else ''
- group_str = f":{group}" if group else ''
- owner_group = f"{owner_str}{group_str}"
-
- self.logger.info(f"Setting owner of {path} to {owner_group}")
- returncode, stdout, stderr = self._execute_command('dfs', ['-chown', owner_group, path])
-
- if returncode == 0:
- self.logger.info(f"Successfully set owner of {path} to {owner_group}")
- return True
- else:
- self.logger.error(f"Failed to set owner of {path}, Error: {stderr}")
- return False
- class WebHDFSBackend(HDFSBackend):
- """WebHDFS REST API 后端实现"""
-
- def __init__(self, config: HDFSConfig, logger):
- super().__init__(config, logger)
- self.base_url = f"http://{config.namenode_host}:{config.namenode_http_port}/webhdfs/v1"
- self.user = config.user or os.environ.get('USER', 'hadoop')
- self._session = None
-
- def _get_session(self):
- """获取 HTTP 会话"""
- if self._session is None:
- try:
- import requests
- self._session = requests.Session()
- except ImportError:
- self.logger.error("requests library is required for WebHDFS backend")
- raise
- return self._session
-
- def is_available(self) -> bool:
- """检查后端是否可用"""
- try:
- import requests
- # 尝试连接 Namenode
- response = self._get_session().get(
- f"{self.base_url}/?op=LISTSTATUS",
- params={'user.name': self.user},
- timeout=5
- )
- return response.status_code in [200, 401, 403]
- except Exception:
- return False
-
- def make_dir(self, path: str) -> bool:
- try:
- session = self._get_session()
- response = session.put(
- f"{self.base_url}{path}",
- params={'op': 'MKDIRS', 'user.name': self.user},
- timeout=self.config.connect_timeout
- )
- return response.status_code == 200
- except Exception as e:
- self.logger.error(f"Failed to create directory: {e}")
- return False
-
- def delete(self, path: str, recursive: bool = True) -> bool:
- try:
- session = self._get_session()
- response = session.delete(
- f"{self.base_url}{path}",
- params={
- 'op': 'DELETE',
- 'recursive': str(recursive).lower(),
- 'user.name': self.user
- },
- timeout=self.config.connect_timeout
- )
- return response.status_code == 200
- except Exception as e:
- self.logger.error(f"Failed to delete: {e}")
- return False
-
- def copy_from_local(self, src: str, dst: str) -> bool:
- try:
- with open(src, 'rb') as f:
- content = f.read()
- return self.write_file(dst, content.decode('utf-8', errors='ignore'))
- except Exception as e:
- self.logger.error(f"Failed to copy from local: {e}")
- return False
-
- def copy_to_local(self, src: str, dst: str) -> bool:
- content = self.read_file(src)
- if content is None:
- return False
-
- try:
- with open(dst, 'w', encoding='utf-8') as f:
- f.write(content)
- return True
- except Exception as e:
- self.logger.error(f"Failed to copy to local: {e}")
- return False
-
- def read_file(self, path: str) -> Optional[str]:
- try:
- session = self._get_session()
- response = session.get(
- f"{self.base_url}{path}",
- params={'op': 'OPEN', 'user.name': self.user},
- timeout=self.config.read_timeout
- )
- if response.status_code == 200:
- return response.text
- else:
- self.logger.error(f"Failed to read file: {response.status_code}")
- return None
- except Exception as e:
- self.logger.error(f"Failed to read file: {e}")
- return None
-
- def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
- try:
- session = self._get_session()
-
- # 第一步:获取写入位置
- response = session.put(
- f"{self.base_url}{path}",
- params={
- 'op': 'CREATE',
- 'overwrite': str(overwrite).lower(),
- 'user.name': self.user
- },
- allow_redirects=False,
- timeout=self.config.connect_timeout
- )
-
- if response.status_code != 307:
- self.logger.error(f"Failed to get write location: {response.status_code}")
- return False
-
- # 第二步:写入数据到 DataNode
- location = response.headers.get('Location')
- if not location:
- self.logger.error("No Location header in response")
- return False
-
- response = session.put(
- location,
- data=content.encode('utf-8'),
- timeout=self.config.write_timeout
- )
-
- return response.status_code == 201
- except Exception as e:
- self.logger.error(f"Failed to write file: {e}")
- return False
-
- def exists(self, path: str) -> bool:
- status = self.get_file_status(path)
- return status is not None
-
- def list_dir(self, path: str) -> List[str]:
- try:
- session = self._get_session()
- response = session.get(
- f"{self.base_url}{path}",
- params={'op': 'LISTSTATUS', 'user.name': self.user},
- timeout=self.config.connect_timeout
- )
-
- if response.status_code == 200:
- data = response.json()
- files = []
- for status in data.get('FileStatuses', {}).get('FileStatus', []):
- files.append(f"{path.rstrip('/')}/{status['pathSuffix']}")
- return files
- else:
- self.logger.error(f"Failed to list directory: {response.status_code}")
- return []
- except Exception as e:
- self.logger.error(f"Failed to list directory: {e}")
- return []
-
- def get_file_status(self, path: str) -> Optional[FileStatus]:
- try:
- session = self._get_session()
- response = session.get(
- f"{self.base_url}{path}",
- params={'op': 'GETFILESTATUS', 'user.name': self.user},
- timeout=self.config.connect_timeout
- )
-
- if response.status_code == 200:
- data = response.json()
- status = data.get('FileStatus', {})
- return FileStatus(
- path=path,
- is_directory=status.get('type') == 'DIRECTORY',
- length=status.get('length', 0),
- replication=status.get('replication', 1),
- block_size=status.get('blockSize', 134217728),
- modification_time=datetime.fromtimestamp(status.get('modificationTime', 0) / 1000) if status.get('modificationTime') else None,
- access_time=datetime.fromtimestamp(status.get('accessTime', 0) / 1000) if status.get('accessTime') else None,
- owner=status.get('owner', ''),
- group=status.get('group', ''),
- permission=status.get('permission', '644'),
- )
- else:
- return None
- except Exception as e:
- self.logger.error(f"Failed to get file status: {e}")
- return None
-
- def get_file_size(self, path: str) -> Optional[int]:
- status = self.get_file_status(path)
- return status.length if status else None
-
- def rename(self, src: str, dst: str) -> bool:
- try:
- session = self._get_session()
- response = session.put(
- f"{self.base_url}{src}",
- params={
- 'op': 'RENAME',
- 'destination': dst,
- 'user.name': self.user
- },
- timeout=self.config.connect_timeout
- )
- return response.status_code == 200
- except Exception as e:
- self.logger.error(f"Failed to rename: {e}")
- return False
-
- def set_permission(self, path: str, permission: str) -> bool:
- try:
- session = self._get_session()
- response = session.put(
- f"{self.base_url}{path}",
- params={
- 'op': 'SETPERMISSION',
- 'permission': permission,
- 'user.name': self.user
- },
- timeout=self.config.connect_timeout
- )
- return response.status_code == 200
- except Exception as e:
- self.logger.error(f"Failed to set permission: {e}")
- return False
-
- def set_owner(self, path: str, owner: Optional[str] = None,
- group: Optional[str] = None) -> bool:
- try:
- session = self._get_session()
- params = {'op': 'SETOWNER', 'user.name': self.user}
- if owner:
- params['owner'] = owner
- if group:
- params['group'] = group
-
- response = session.put(
- f"{self.base_url}{path}",
- params=params,
- timeout=self.config.connect_timeout
- )
- return response.status_code == 200
- except Exception as e:
- self.logger.error(f"Failed to set owner: {e}")
- return False
- class BackendFactory:
- """后端工厂类"""
-
- _backends: Dict[BackendType, Type[HDFSBackend]] = {
- BackendType.CLI: CLIBackend,
- BackendType.WEBHDFS: WebHDFSBackend,
- }
-
- @classmethod
- def create(cls, backend_type: BackendType, config: HDFSConfig, logger) -> Optional[HDFSBackend]:
- """创建后端实例"""
- if backend_type not in cls._backends:
- logger.error(f"Unsupported backend type: {backend_type}")
- return None
-
- try:
- backend = cls._backends[backend_type](config, logger)
- if backend.is_available():
- return backend
- else:
- logger.warning(f"Backend {backend_type.value} is not available")
- return None
- except Exception as e:
- logger.error(f"Failed to create backend {backend_type.value}: {e}")
- return None
-
- @classmethod
- def create_auto(cls, config: HDFSConfig, logger) -> Optional[HDFSBackend]:
- """自动选择可用的后端"""
- priority_order = [
- BackendType.CLI,
- BackendType.WEBHDFS,
- ]
-
- for backend_type in priority_order:
- backend = cls.create(backend_type, config, logger)
- if backend:
- logger.info(f"Selected backend: {backend_type.value}")
- return backend
-
- logger.error("No available backend found")
- return None
- class HDFSOperations:
- """
- 现代化 HDFS 文件系统操作类
-
- 特性:
- - 多种后端支持(CLI、WebHDFS)
- - 同步和异步 API
- - 上下文管理器支持
- - 配置管理集成
- - 重试机制
- - 丰富的错误处理
-
- 功能与 Java 版本的 CommonOperation 类相对应。
- """
-
- def __init__(self,
- config: Optional[HDFSConfig] = None,
- config_manager: Optional[ConfigurationManager] = None,
- preferred_backend: Optional[BackendType] = None,
- logger_name: str = 'hdfs_operations'):
- """
- 初始化 HDFSOperations 实例
-
- Args:
- config: HDFS 配置(可选)
- config_manager: 配置管理器(可选)
- preferred_backend: 首选后端类型(可选)
- logger_name: 日志器名称
- """
- self.logger = setup_logger(logger_name)
-
- # 获取配置
- if config_manager is None:
- config_manager = get_config()
-
- if config is None:
- config = config_manager.hdfs
-
- self.config = config
- self._backend: Optional[HDFSBackend] = None
-
- # 选择后端
- backend_type = preferred_backend or BackendType.AUTO
- if backend_type == BackendType.AUTO:
- self._backend = BackendFactory.create_auto(config, self.logger)
- else:
- self._backend = BackendFactory.create(backend_type, config, self.logger)
-
- if self._backend is None:
- self.logger.warning("No HDFS backend available. Some operations may fail.")
-
- @property
- def backend(self) -> HDFSBackend:
- """获取后端实例"""
- if self._backend is None:
- raise RuntimeError("No HDFS backend available")
- return self._backend
-
- @property
- def is_available(self) -> bool:
- """检查后端是否可用"""
- return self._backend is not None and self._backend.is_available()
-
- @contextmanager
- def transaction(self):
- """
- 上下文管理器:事务支持
-
- 注意:HDFS 不支持真正的事务,这里只是提供一个上下文接口。
- """
- try:
- self.logger.info("Starting HDFS operation context")
- yield self
- except Exception as e:
- self.logger.error(f"HDFS operation failed: {e}")
- raise
- finally:
- self.logger.info("HDFS operation context completed")
-
- def _retry_operation(self, operation: Callable[[], T],
- max_retries: Optional[int] = None,
- retry_delay: Optional[float] = None) -> T:
- """
- 带重试机制的操作执行
-
- Args:
- operation: 要执行的操作
- max_retries: 最大重试次数(可选,默认使用配置)
- retry_delay: 重试延迟(可选,默认使用配置)
-
- Returns:
- 操作结果
- """
- max_retries = max_retries or self.config.max_retries
- retry_delay = retry_delay or self.config.retry_delay
-
- last_exception = None
-
- for attempt in range(max_retries + 1):
- try:
- return operation()
- except Exception as e:
- last_exception = e
- if attempt < max_retries:
- self.logger.warning(
- f"Operation failed (attempt {attempt + 1}/{max_retries + 1}), "
- f"retrying in {retry_delay}s: {e}"
- )
- time.sleep(retry_delay)
- retry_delay *= 2 # 指数退避
-
- raise last_exception
-
- # 同步 API
-
- def make_dir(self, path: str, retry: bool = True) -> bool:
- """
- 创建目录
-
- Args:
- path: 要创建的目录路径
- retry: 是否启用重试
-
- Returns:
- 是否创建成功
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- def _operation():
- return self.backend.make_dir(path)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- def delete(self, path: str, recursive: bool = True, retry: bool = True) -> bool:
- """
- 删除文件或目录
-
- Args:
- path: 要删除的路径
- recursive: 是否递归删除
- retry: 是否启用重试
-
- Returns:
- 是否删除成功
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- def _operation():
- return self.backend.delete(path, recursive)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- def copy_from_local(self, src: str, dst: str, retry: bool = True) -> bool:
- """
- 从本地文件系统上传文件到 HDFS
-
- Args:
- src: 本地文件路径
- dst: HDFS 目标路径
- retry: 是否启用重试
-
- Returns:
- 是否上传成功
- """
- if not os.path.exists(src):
- self.logger.error(f"Local file not found: {src}")
- return False
-
- if not validate_hdfs_path(dst):
- self.logger.error(f"Invalid HDFS path: {dst}")
- return False
-
- def _operation():
- return self.backend.copy_from_local(src, dst)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- def copy_to_local(self, src: str, dst: str, retry: bool = True) -> bool:
- """
- 从 HDFS 下载文件到本地文件系统
-
- Args:
- src: HDFS 源路径
- dst: 本地目标路径
- retry: 是否启用重试
-
- Returns:
- 是否下载成功
- """
- if not validate_hdfs_path(src):
- self.logger.error(f"Invalid HDFS path: {src}")
- return False
-
- def _operation():
- return self.backend.copy_to_local(src, dst)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- def read_file(self, path: str, retry: bool = True) -> Optional[str]:
- """
- 读取 HDFS 文件内容
-
- Args:
- path: HDFS 文件路径
- retry: 是否启用重试
-
- Returns:
- 文件内容(字符串),如果失败返回 None
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return None
-
- if not self.exists(path):
- self.logger.error(f"File does not exist: {path}")
- return None
-
- def _operation():
- return self.backend.read_file(path)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- def write_file(self, path: str, content: str,
- overwrite: bool = True, retry: bool = True) -> bool:
- """
- 写入内容到 HDFS 文件
-
- Args:
- path: HDFS 文件路径
- content: 要写入的内容
- overwrite: 是否覆盖已存在的文件
- retry: 是否启用重试
-
- Returns:
- 是否写入成功
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- def _operation():
- return self.backend.write_file(path, content, overwrite)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- def exists(self, path: str) -> bool:
- """
- 检查 HDFS 路径是否存在
-
- Args:
- path: HDFS 路径
-
- Returns:
- 路径是否存在
- """
- if not validate_hdfs_path(path):
- return False
-
- return self.backend.exists(path)
-
- def list_dir(self, path: str, retry: bool = True) -> List[str]:
- """
- 列出 HDFS 目录内容
-
- Args:
- path: HDFS 目录路径
- retry: 是否启用重试
-
- Returns:
- 目录内容列表
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return []
-
- if not self.exists(path):
- self.logger.error(f"Directory does not exist: {path}")
- return []
-
- def _operation():
- return self.backend.list_dir(path)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- def get_file_status(self, path: str, retry: bool = True) -> Optional[FileStatus]:
- """
- 获取 HDFS 文件状态
-
- Args:
- path: HDFS 文件路径
- retry: 是否启用重试
-
- Returns:
- 文件状态对象,如果失败返回 None
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return None
-
- def _operation():
- return self.backend.get_file_status(path)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- def get_file_size(self, path: str, retry: bool = True) -> Optional[int]:
- """
- 获取 HDFS 文件大小
-
- Args:
- path: HDFS 文件路径
- retry: 是否启用重试
-
- Returns:
- 文件大小(字节),如果失败返回 None
- """
- status = self.get_file_status(path, retry)
- return status.length if status else None
-
- def rename(self, src: str, dst: str, retry: bool = True) -> bool:
- """
- 重命名 HDFS 文件或目录
-
- Args:
- src: 源路径
- dst: 目标路径
- retry: 是否启用重试
-
- Returns:
- 是否重命名成功
- """
- if not validate_hdfs_path(src):
- self.logger.error(f"Invalid source path: {src}")
- return False
-
- if not validate_hdfs_path(dst):
- self.logger.error(f"Invalid destination path: {dst}")
- return False
-
- def _operation():
- return self.backend.rename(src, dst)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- def set_permission(self, path: str, permission: str, retry: bool = True) -> bool:
- """
- 设置 HDFS 文件权限
-
- Args:
- path: HDFS 路径
- permission: 权限字符串(如 '755')
- retry: 是否启用重试
-
- Returns:
- 是否设置成功
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- def _operation():
- return self.backend.set_permission(path, permission)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- def set_owner(self, path: str, owner: Optional[str] = None,
- group: Optional[str] = None, retry: bool = True) -> bool:
- """
- 设置 HDFS 文件所有者
-
- Args:
- path: HDFS 路径
- owner: 所有者(可选)
- group: 组(可选)
- retry: 是否启用重试
-
- Returns:
- 是否设置成功
- """
- if not validate_hdfs_path(path):
- self.logger.error(f"Invalid HDFS path: {path}")
- return False
-
- if owner is None and group is None:
- self.logger.error("At least one of owner or group must be specified")
- return False
-
- def _operation():
- return self.backend.set_owner(path, owner, group)
-
- if retry:
- return self._retry_operation(_operation)
- return _operation()
-
- # 异步 API(基于同步 API 的简单封装)
-
- async def make_dir_async(self, path: str, retry: bool = True) -> bool:
- """异步创建目录"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, lambda: self.make_dir(path, retry))
-
- async def delete_async(self, path: str, recursive: bool = True, retry: bool = True) -> bool:
- """异步删除文件或目录"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, lambda: self.delete(path, recursive, retry))
-
- async def copy_from_local_async(self, src: str, dst: str, retry: bool = True) -> bool:
- """异步上传文件"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, lambda: self.copy_from_local(src, dst, retry))
-
- async def copy_to_local_async(self, src: str, dst: str, retry: bool = True) -> bool:
- """异步下载文件"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, lambda: self.copy_to_local(src, dst, retry))
-
- async def read_file_async(self, path: str, retry: bool = True) -> Optional[str]:
- """异步读取文件"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, lambda: self.read_file(path, retry))
-
- async def write_file_async(self, path: str, content: str,
- overwrite: bool = True, retry: bool = True) -> bool:
- """异步写入文件"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, lambda: self.write_file(path, content, overwrite, retry))
-
- async def exists_async(self, path: str) -> bool:
- """异步检查路径是否存在"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, lambda: self.exists(path))
-
- async def list_dir_async(self, path: str, retry: bool = True) -> List[str]:
- """异步列出目录内容"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, lambda: self.list_dir(path, retry))
-
- async def get_file_status_async(self, path: str, retry: bool = True) -> Optional[FileStatus]:
- """异步获取文件状态"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, lambda: self.get_file_status(path, retry))
-
- # 便捷方法
-
- def upload_directory(self, local_dir: str, hdfs_dir: str,
- recursive: bool = True, retry: bool = True) -> bool:
- """
- 上传整个目录到 HDFS
-
- Args:
- local_dir: 本地目录路径
- hdfs_dir: HDFS 目标目录
- recursive: 是否递归上传子目录
- retry: 是否启用重试
-
- Returns:
- 是否上传成功
- """
- if not os.path.isdir(local_dir):
- self.logger.error(f"Local directory not found: {local_dir}")
- return False
-
- # 确保目标目录存在
- if not self.make_dir(hdfs_dir, retry):
- return False
-
- try:
- for item in os.listdir(local_dir):
- local_path = os.path.join(local_dir, item)
- hdfs_path = f"{hdfs_dir.rstrip('/')}/{item}"
-
- if os.path.isfile(local_path):
- if not self.copy_from_local(local_path, hdfs_path, retry):
- self.logger.error(f"Failed to upload file: {local_path}")
- return False
- elif os.path.isdir(local_path) and recursive:
- if not self.upload_directory(local_path, hdfs_path, recursive, retry):
- return False
-
- return True
- except Exception as e:
- self.logger.error(f"Failed to upload directory: {e}")
- return False
-
- def download_directory(self, hdfs_dir: str, local_dir: str,
- recursive: bool = True, retry: bool = True) -> bool:
- """
- 从 HDFS 下载整个目录
-
- Args:
- hdfs_dir: HDFS 源目录
- local_dir: 本地目标目录
- recursive: 是否递归下载子目录
- retry: 是否启用重试
-
- Returns:
- 是否下载成功
- """
- if not self.exists(hdfs_dir):
- self.logger.error(f"HDFS directory not found: {hdfs_dir}")
- return False
-
- # 确保本地目录存在
- os.makedirs(local_dir, exist_ok=True)
-
- try:
- items = self.list_dir(hdfs_dir, retry)
-
- for hdfs_path in items:
- item_name = os.path.basename(hdfs_path)
- local_path = os.path.join(local_dir, item_name)
-
- status = self.get_file_status(hdfs_path, retry)
- if status is None:
- continue
-
- if status.is_directory:
- if recursive:
- if not self.download_directory(hdfs_path, local_path, recursive, retry):
- return False
- else:
- if not self.copy_to_local(hdfs_path, local_path, retry):
- self.logger.error(f"Failed to download file: {hdfs_path}")
- return False
-
- return True
- except Exception as e:
- self.logger.error(f"Failed to download directory: {e}")
- return False
-
- def walk(self, path: str) -> Iterator[Tuple[str, List[str], List[str]]]:
- """
- 遍历 HDFS 目录树
-
- 类似于 Python 的 os.walk()
-
- Args:
- path: 起始目录路径
-
- Yields:
- (dirpath, dirnames, filenames) 元组
- """
- if not self.exists(path):
- return
-
- status = self.get_file_status(path)
- if not status or not status.is_directory:
- return
-
- # 获取目录内容
- items = self.list_dir(path)
- dirnames = []
- filenames = []
-
- for item in items:
- item_status = self.get_file_status(item)
- if item_status:
- if item_status.is_directory:
- dirnames.append(os.path.basename(item))
- else:
- filenames.append(os.path.basename(item))
-
- yield path, dirnames, filenames
-
- # 递归遍历子目录
- for dirname in dirnames:
- subdir = f"{path.rstrip('/')}/{dirname}"
- yield from self.walk(subdir)
- # 便捷函数
- def create_hdfs_client(
- config: Optional[HDFSConfig] = None,
- preferred_backend: Optional[BackendType] = None
- ) -> HDFSOperations:
- """
- 创建 HDFS 客户端实例
-
- Args:
- config: HDFS 配置(可选)
- preferred_backend: 首选后端类型(可选)
-
- Returns:
- HDFSOperations 实例
- """
- return HDFSOperations(config=config, preferred_backend=preferred_backend)
|