Browse Source

feat(config): add modern configuration management module

Implement a comprehensive configuration management system with support for multiple sources (env vars, config files, defaults), type safety, validation, and hot reload. Includes specific configurations for HDFS, Spark, and MapReduce with proper type hints and conversion methods.

refactor(hdfs_operations): restructure HDFS operations with modern design

Replace simple command-line wrapper with abstract backend system supporting multiple implementations (CLI, hdfs lib, pyhdfs, webhdfs). Add proper typing, async support, context managers, and integration with new config system. Introduce FileStatus class for richer file metadata.
liuyuqi-cnb 1 month ago
parent
commit
e0c907c34d
2 changed files with 767 additions and 187 deletions
  1. 445 0
      python/config.py
  2. 322 187
      python/hdfs_operations.py

+ 445 - 0
python/config.py

@@ -0,0 +1,445 @@
+"""
+配置管理模块
+
+提供现代化的配置管理功能:
+- 支持多种配置源(环境变量、配置文件、默认值)
+- 类型安全的配置访问
+- 配置验证
+- 配置热重载(可选)
+"""
+
+import os
+import json
+from typing import Any, Dict, Optional, List, Union, Callable, TypeVar
+from dataclasses import dataclass, field, asdict
+from pathlib import Path
+from enum import Enum
+
+
+T = TypeVar('T')
+
+
+class ConfigSource(Enum):
+    """配置来源枚举"""
+    DEFAULT = "default"
+    ENVIRONMENT = "environment"
+    CONFIG_FILE = "config_file"
+    RUNTIME = "runtime"
+
+
+@dataclass
+class ConfigValue:
+    """配置值及其来源"""
+    value: Any
+    source: ConfigSource
+    description: str = ""
+
+
+@dataclass
+class HDFSConfig:
+    """HDFS 相关配置"""
+    
+    # 连接配置
+    namenode_host: str = "localhost"
+    namenode_port: int = 9000
+    namenode_http_port: int = 50070
+    
+    # 认证配置
+    user: Optional[str] = None
+    kerberos_principal: Optional[str] = None
+    kerberos_keytab: Optional[str] = None
+    
+    # 连接超时配置
+    connect_timeout: int = 30
+    read_timeout: int = 60
+    write_timeout: int = 120
+    
+    # 重试配置
+    max_retries: int = 3
+    retry_delay: float = 1.0
+    
+    # 后端选择
+    preferred_backend: str = "auto"  # auto, cli, hdfs, pyhdfs, webhdfs
+    
+    # 命令行配置
+    hadoop_home: Optional[str] = None
+    hadoop_cmd: str = "hdfs"
+    
+    def to_dict(self) -> Dict[str, Any]:
+        """转换为字典"""
+        return asdict(self)
+    
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'HDFSConfig':
+        """从字典创建配置"""
+        return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
+
+
+@dataclass
+class SparkConfig:
+    """Spark 相关配置"""
+    
+    # 应用配置
+    app_name: str = "HadoopTools"
+    master: Optional[str] = None  # None 表示从配置自动获取
+    
+    # 资源配置
+    driver_memory: str = "1g"
+    executor_memory: str = "1g"
+    executor_cores: int = 2
+    num_executors: int = 2
+    
+    # 性能配置
+    shuffle_partitions: int = 200
+    default_parallelism: Optional[int] = None
+    
+    # 序列化配置
+    serializer: str = "org.apache.spark.serializer.KryoSerializer"
+    kryo_registration_required: bool = False
+    
+    # 日志配置
+    log_level: str = "WARN"
+    
+    # 额外配置
+    extra_configs: Dict[str, str] = field(default_factory=dict)
+    
+    def to_dict(self) -> Dict[str, Any]:
+        """转换为字典"""
+        return asdict(self)
+    
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'SparkConfig':
+        """从字典创建配置"""
+        return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
+
+
+@dataclass
+class MapReduceConfig:
+    """MapReduce 相关配置"""
+    
+    # 作业配置
+    job_name: str = "MapReduceJob"
+    num_reducers: int = 1
+    
+    # 资源配置
+    map_memory_mb: int = 1024
+    reduce_memory_mb: int = 1024
+    map_java_opts: str = "-Xmx819m"
+    reduce_java_opts: str = "-Xmx819m"
+    
+    # 压缩配置
+    map_output_compress: bool = True
+    map_output_compression_codec: str = "org.apache.hadoop.io.compress.SnappyCodec"
+    
+    # 推测执行配置
+    map_speculative: bool = False
+    reduce_speculative: bool = False
+    
+    # 额外配置
+    extra_configs: Dict[str, str] = field(default_factory=dict)
+    
+    def to_dict(self) -> Dict[str, Any]:
+        """转换为字典"""
+        return asdict(self)
+    
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'MapReduceConfig':
+        """从字典创建配置"""
+        return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
+
+
+@dataclass
+class GlobalConfig:
+    """全局配置"""
+    
+    # 日志配置
+    log_level: str = "INFO"
+    log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+    log_file: Optional[str] = None
+    
+    # 子配置
+    hdfs: HDFSConfig = field(default_factory=HDFSConfig)
+    spark: SparkConfig = field(default_factory=SparkConfig)
+    mapreduce: MapReduceConfig = field(default_factory=MapReduceConfig)
+    
+    # 额外配置
+    extra: Dict[str, Any] = field(default_factory=dict)
+
+
+class ConfigurationManager:
+    """
+    配置管理器
+    
+    提供统一的配置管理接口,支持多种配置源。
+    """
+    
+    _instance: Optional['ConfigurationManager'] = None
+    _config: Optional[GlobalConfig] = None
+    _config_sources: Dict[str, ConfigValue] = field(default_factory=dict)
+    
+    def __new__(cls) -> 'ConfigurationManager':
+        """单例模式"""
+        if cls._instance is None:
+            cls._instance = super().__new__(cls)
+        return cls._instance
+    
+    def __init__(self):
+        if self._config is None:
+            self._config = GlobalConfig()
+            self._load_from_environment()
+    
+    def _load_from_environment(self):
+        """从环境变量加载配置"""
+        env_mappings = {
+            # HDFS 配置
+            'HADOOP_HOME': ('hdfs.hadoop_home', str),
+            'HDFS_NAMENODE_HOST': ('hdfs.namenode_host', str),
+            'HDFS_NAMENODE_PORT': ('hdfs.namenode_port', int),
+            'HDFS_USER': ('hdfs.user', str),
+            'HDFS_PREFERRED_BACKEND': ('hdfs.preferred_backend', str),
+            
+            # Spark 配置
+            'SPARK_APP_NAME': ('spark.app_name', str),
+            'SPARK_MASTER': ('spark.master', str),
+            'SPARK_DRIVER_MEMORY': ('spark.driver_memory', str),
+            'SPARK_EXECUTOR_MEMORY': ('spark.executor_memory', str),
+            'SPARK_LOG_LEVEL': ('spark.log_level', str),
+            
+            # MapReduce 配置
+            'MAPREDUCE_JOB_NAME': ('mapreduce.job_name', str),
+            'MAPREDUCE_NUM_REDUCERS': ('mapreduce.num_reducers', int),
+            
+            # 全局配置
+            'LOG_LEVEL': ('log_level', str),
+            'LOG_FILE': ('log_file', str),
+        }
+        
+        for env_var, (config_path, type_func) in env_mappings.items():
+            if env_var in os.environ:
+                try:
+                    value = type_func(os.environ[env_var])
+                    self.set(config_path, value, ConfigSource.ENVIRONMENT)
+                except (ValueError, TypeError) as e:
+                    print(f"Warning: Invalid value for {env_var}: {e}")
+    
+    def load_from_file(self, config_path: Union[str, Path]) -> bool:
+        """
+        从配置文件加载配置
+        
+        支持 JSON 格式的配置文件。
+        
+        Args:
+            config_path: 配置文件路径
+            
+        Returns:
+            是否加载成功
+        """
+        config_path = Path(config_path)
+        
+        if not config_path.exists():
+            return False
+        
+        try:
+            with open(config_path, 'r', encoding='utf-8') as f:
+                config_data = json.load(f)
+            
+            self._update_from_dict(config_data, ConfigSource.CONFIG_FILE)
+            return True
+            
+        except (json.JSONDecodeError, IOError, KeyError) as e:
+            print(f"Warning: Failed to load config from {config_path}: {e}")
+            return False
+    
+    def _update_from_dict(self, data: Dict[str, Any], source: ConfigSource):
+        """从字典更新配置"""
+        # 处理顶层配置
+        for key, value in data.items():
+            if key == 'hdfs' and isinstance(value, dict):
+                self._config.hdfs = HDFSConfig.from_dict(value)
+            elif key == 'spark' and isinstance(value, dict):
+                self._config.spark = SparkConfig.from_dict(value)
+            elif key == 'mapreduce' and isinstance(value, dict):
+                self._config.mapreduce = MapReduceConfig.from_dict(value)
+            elif hasattr(self._config, key):
+                self.set(key, value, source)
+            else:
+                # 额外配置
+                self._config.extra[key] = value
+                self._config_sources[f"extra.{key}"] = ConfigValue(
+                    value=value,
+                    source=source,
+                    description=f"Extra configuration: {key}"
+                )
+    
+    def get(self, key: str, default: T = None) -> Optional[T]:
+        """
+        获取配置值
+        
+        支持点分隔的路径,如 'hdfs.namenode_host'
+        
+        Args:
+            key: 配置键
+            default: 默认值
+            
+        Returns:
+            配置值
+        """
+        parts = key.split('.')
+        current = self._config
+        
+        for part in parts:
+            if hasattr(current, part):
+                current = getattr(current, part)
+            elif isinstance(current, dict) and part in current:
+                current = current[part]
+            else:
+                return default
+        
+        return current
+    
+    def set(self, key: str, value: Any, source: ConfigSource = ConfigSource.RUNTIME):
+        """
+        设置配置值
+        
+        支持点分隔的路径,如 'hdfs.namenode_host'
+        
+        Args:
+            key: 配置键
+            value: 配置值
+            source: 配置来源
+        """
+        parts = key.split('.')
+        
+        if len(parts) == 1:
+            # 顶层配置
+            if hasattr(self._config, parts[0]):
+                setattr(self._config, parts[0], value)
+                self._config_sources[key] = ConfigValue(
+                    value=value,
+                    source=source,
+                    description=f"Global configuration: {key}"
+                )
+        else:
+            # 嵌套配置
+            current = self._config
+            
+            for part in parts[:-1]:
+                if hasattr(current, part):
+                    current = getattr(current, part)
+                elif isinstance(current, dict) and part in current:
+                    current = current[part]
+                else:
+                    return  # 路径不存在,静默失败
+            
+            # 设置最后一个属性
+            last_part = parts[-1]
+            if hasattr(current, last_part):
+                setattr(current, last_part, value)
+                self._config_sources[key] = ConfigValue(
+                    value=value,
+                    source=source,
+                    description=f"Configuration: {key}"
+                )
+            elif isinstance(current, dict):
+                current[last_part] = value
+                self._config_sources[key] = ConfigValue(
+                    value=value,
+                    source=source,
+                    description=f"Extra configuration: {key}"
+                )
+    
+    def get_config_source(self, key: str) -> Optional[ConfigSource]:
+        """
+        获取配置值的来源
+        
+        Args:
+            key: 配置键
+            
+        Returns:
+            配置来源
+        """
+        if key in self._config_sources:
+            return self._config_sources[key].source
+        return None
+    
+    @property
+    def hdfs(self) -> HDFSConfig:
+        """获取 HDFS 配置"""
+        return self._config.hdfs
+    
+    @property
+    def spark(self) -> SparkConfig:
+        """获取 Spark 配置"""
+        return self._config.spark
+    
+    @property
+    def mapreduce(self) -> MapReduceConfig:
+        """获取 MapReduce 配置"""
+        return self._config.mapreduce
+    
+    @property
+    def global_config(self) -> GlobalConfig:
+        """获取全局配置"""
+        return self._config
+    
+    def to_dict(self) -> Dict[str, Any]:
+        """转换为字典"""
+        return {
+            'log_level': self._config.log_level,
+            'log_format': self._config.log_format,
+            'log_file': self._config.log_file,
+            'hdfs': self._config.hdfs.to_dict(),
+            'spark': self._config.spark.to_dict(),
+            'mapreduce': self._config.mapreduce.to_dict(),
+            'extra': self._config.extra,
+        }
+    
+    def save_to_file(self, config_path: Union[str, Path]) -> bool:
+        """
+        保存配置到文件
+        
+        Args:
+            config_path: 配置文件路径
+            
+        Returns:
+            是否保存成功
+        """
+        try:
+            with open(config_path, 'w', encoding='utf-8') as f:
+                json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)
+            return True
+        except IOError as e:
+            print(f"Warning: Failed to save config to {config_path}: {e}")
+            return False
+    
+    def reset(self):
+        """重置配置为默认值"""
+        self._config = GlobalConfig()
+        self._config_sources.clear()
+        self._load_from_environment()
+
+
+# 便捷函数
+def get_config() -> ConfigurationManager:
+    """获取配置管理器实例"""
+    return ConfigurationManager()
+
+
+def load_config(config_path: Optional[Union[str, Path]] = None) -> ConfigurationManager:
+    """
+    加载配置
+    
+    优先从指定路径加载,然后从环境变量加载。
+    
+    Args:
+        config_path: 配置文件路径(可选)
+        
+    Returns:
+        配置管理器实例
+    """
+    config = ConfigurationManager()
+    
+    if config_path:
+        config.load_from_file(config_path)
+    
+    return config

+ 322 - 187
python/hdfs_operations.py

@@ -1,91 +1,206 @@
 """
 """
 HDFS 文件系统操作模块
 HDFS 文件系统操作模块
 
 
-提供与 Java 版本 CommonOperation 类相同的功能:
+提供现代化的 HDFS 操作能力:
+- 多种后端支持(命令行、hdfs 库、pyhdfs 库、WebHDFS)
+- 同步和异步 API
+- 上下文管理器支持
+- 配置管理集成
+- 重试机制
+- 丰富的错误处理
+
+功能对应 Java 版本 CommonOperation 类:
 - 创建目录
 - 创建目录
 - 删除目录/文件
 - 删除目录/文件
 - 上传文件
 - 上传文件
 - 读写文件
 - 读写文件
 - 检查文件是否存在
 - 检查文件是否存在
 - 列出目录内容
 - 列出目录内容
+- 获取文件信息
 """
 """
 
 
 import os
 import os
-from typing import List, Optional, Tuple
-from .utils.helpers import run_command, validate_hdfs_path, setup_logger
+import asyncio
+import time
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from datetime import datetime
+from enum import Enum
+from pathlib import Path
+from typing import (
+    Any, Callable, Dict, Generic, List, Optional, 
+    Tuple, Type, TypeVar, Union, Iterator, AsyncIterator
+)
+from contextlib import contextmanager, asynccontextmanager
 
 
+from .config import ConfigurationManager, HDFSConfig, get_config
+from .utils.helpers import (
+    run_command, validate_hdfs_path, setup_logger, format_file_size
+)
 
 
-class HDFSOperations:
-    """
-    HDFS 文件系统操作类
-    
-    封装了 Hadoop 命令行工具,提供与 HDFS 交互的各种方法。
-    功能与 Java 版本的 CommonOperation 类相对应。
-    """
-    
-    def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'hdfs_operations'):
-        """
-        初始化 HDFSOperations 实例
-        
-        Args:
-            hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
-            logger_name: 日志器名称
-        """
-        self.logger = setup_logger(logger_name)
-        self.hadoop_home = hadoop_home or os.environ.get('HADOOP_HOME', '')
-        self.hadoop_cmd = 'hdfs' if self._check_command_exists('hdfs') else 'hadoop'
-        
-    def _check_command_exists(self, cmd: str) -> bool:
-        """
-        检查命令是否存在
-        
-        Args:
-            cmd: 命令名称
-            
-        Returns:
-            命令是否存在
-        """
-        return os.system(f'which {cmd} > /dev/null 2>&1') == 0
-    
-    def _execute_hdfs_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]:
-        """
-        执行 HDFS 命令
-        
-        Args:
-            subcommand: HDFS 子命令(如 dfs, fs 等)
-            args: 命令参数列表
-            
-        Returns:
-            (return_code, stdout, stderr)
-        """
+
+T = TypeVar('T')
+
+
+class BackendType(Enum):
+    """HDFS 后端类型"""
+    CLI = "cli"  # 命令行工具
+    HDFS_LIB = "hdfs_lib"  # hdfs 库
+    PYHDFS = "pyhdfs"  # pyhdfs 库
+    WEBHDFS = "webhdfs"  # WebHDFS REST API
+    AUTO = "auto"  # 自动选择可用的后端
+
+
+@dataclass
+class FileStatus:
+    """文件状态信息"""
+    path: str
+    is_directory: bool
+    length: int = 0
+    replication: int = 1
+    block_size: int = 134217728  # 128MB 默认块大小
+    modification_time: Optional[datetime] = None
+    access_time: Optional[datetime] = None
+    owner: str = ""
+    group: str = ""
+    permission: str = "644"
+    is_snapshot: bool = False
+    
+    @property
+    def size_formatted(self) -> str:
+        """格式化的文件大小"""
+        return format_file_size(self.length)
+    
+    def to_dict(self) -> Dict[str, Any]:
+        """转换为字典"""
+        return {
+            'path': self.path,
+            'is_directory': self.is_directory,
+            'length': self.length,
+            'size_formatted': self.size_formatted,
+            'replication': self.replication,
+            'block_size': self.block_size,
+            'modification_time': self.modification_time.isoformat() if self.modification_time else None,
+            'access_time': self.access_time.isoformat() if self.access_time else None,
+            'owner': self.owner,
+            'group': self.group,
+            'permission': self.permission,
+            'is_snapshot': self.is_snapshot,
+        }
+
+
+class HDFSBackend(ABC):
+    """HDFS 后端抽象基类"""
+    
+    def __init__(self, config: HDFSConfig, logger):
+        self.config = config
+        self.logger = logger
+    
+    @abstractmethod
+    def is_available(self) -> bool:
+        """检查后端是否可用"""
+        pass
+    
+    @abstractmethod
+    def make_dir(self, path: str) -> bool:
+        """创建目录"""
+        pass
+    
+    @abstractmethod
+    def delete(self, path: str, recursive: bool = True) -> bool:
+        """删除文件或目录"""
+        pass
+    
+    @abstractmethod
+    def copy_from_local(self, src: str, dst: str) -> bool:
+        """从本地上传文件到 HDFS"""
+        pass
+    
+    @abstractmethod
+    def copy_to_local(self, src: str, dst: str) -> bool:
+        """从 HDFS 下载文件到本地"""
+        pass
+    
+    @abstractmethod
+    def read_file(self, path: str) -> Optional[str]:
+        """读取文件内容"""
+        pass
+    
+    @abstractmethod
+    def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
+        """写入文件内容"""
+        pass
+    
+    @abstractmethod
+    def exists(self, path: str) -> bool:
+        """检查路径是否存在"""
+        pass
+    
+    @abstractmethod
+    def list_dir(self, path: str) -> List[str]:
+        """列出目录内容"""
+        pass
+    
+    @abstractmethod
+    def get_file_status(self, path: str) -> Optional[FileStatus]:
+        """获取文件状态"""
+        pass
+    
+    @abstractmethod
+    def get_file_size(self, path: str) -> Optional[int]:
+        """获取文件大小"""
+        pass
+    
+    @abstractmethod
+    def rename(self, src: str, dst: str) -> bool:
+        """重命名文件或目录"""
+        pass
+    
+    @abstractmethod
+    def set_permission(self, path: str, permission: str) -> bool:
+        """设置文件权限"""
+        pass
+    
+    @abstractmethod
+    def set_owner(self, path: str, owner: Optional[str] = None, 
+                  group: Optional[str] = None) -> bool:
+        """设置文件所有者"""
+        pass
+
+
+class CLIBackend(HDFSBackend):
+    """命令行后端实现"""
+    
+    def __init__(self, config: HDFSConfig, logger):
+        super().__init__(config, logger)
+        self.hadoop_cmd = config.hadoop_cmd or 'hdfs'
+        self._check_cmd_available()
+    
+    def _check_cmd_available(self):
+        """检查命令是否可用"""
+        if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0:
+            self.hadoop_cmd = 'hadoop'
+            if os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') != 0:
+                self.logger.warning("Neither 'hdfs' nor 'hadoop' command found")
+    
+    def is_available(self) -> bool:
+        """检查后端是否可用"""
+        return os.system(f'which {self.hadoop_cmd} > /dev/null 2>&1') == 0
+    
+    def _execute_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]:
+        """执行 HDFS 命令"""
         args = args or []
         args = args or []
         cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}"
         cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}"
         self.logger.debug(f"Executing command: {cmd}")
         self.logger.debug(f"Executing command: {cmd}")
         return run_command(cmd)
         return run_command(cmd)
     
     
     def make_dir(self, path: str) -> bool:
     def make_dir(self, path: str) -> bool:
-        """
-        创建目录
-        
-        对应 Java 版本的 makeDir 方法。
-        
-        Args:
-            path: 要创建的目录路径
-            
-        Returns:
-            是否创建成功
-            
-        Example:
-            >>> hdfs = HDFSOperations()
-            >>> hdfs.make_dir('/user/root/test1')
-            True
-        """
         if not validate_hdfs_path(path):
         if not validate_hdfs_path(path):
             self.logger.error(f"Invalid HDFS path: {path}")
             self.logger.error(f"Invalid HDFS path: {path}")
             return False
             return False
         
         
         self.logger.info(f"Creating directory: {path}")
         self.logger.info(f"Creating directory: {path}")
-        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-mkdir', '-p', path])
+        returncode, stdout, stderr = self._execute_command('dfs', ['-mkdir', '-p', path])
         
         
         if returncode == 0:
         if returncode == 0:
             self.logger.info(f"Successfully created directory: {path}")
             self.logger.info(f"Successfully created directory: {path}")
@@ -95,23 +210,6 @@ class HDFSOperations:
             return False
             return False
     
     
     def delete(self, path: str, recursive: bool = True) -> bool:
     def delete(self, path: str, recursive: bool = True) -> bool:
-        """
-        删除文件或目录
-        
-        对应 Java 版本的 delDir 和 delFile 方法。
-        
-        Args:
-            path: 要删除的路径
-            recursive: 是否递归删除(用于目录)
-            
-        Returns:
-            是否删除成功
-            
-        Example:
-            >>> hdfs = HDFSOperations()
-            >>> hdfs.delete('/user/hadoop/data/word.txt')
-            True
-        """
         if not validate_hdfs_path(path):
         if not validate_hdfs_path(path):
             self.logger.error(f"Invalid HDFS path: {path}")
             self.logger.error(f"Invalid HDFS path: {path}")
             return False
             return False
@@ -120,7 +218,7 @@ class HDFSOperations:
         args = ['-rm', '-r'] if recursive else ['-rm']
         args = ['-rm', '-r'] if recursive else ['-rm']
         args.append(path)
         args.append(path)
         
         
-        returncode, stdout, stderr = self._execute_hdfs_command('dfs', args)
+        returncode, stdout, stderr = self._execute_command('dfs', args)
         
         
         if returncode == 0:
         if returncode == 0:
             self.logger.info(f"Successfully deleted: {path}")
             self.logger.info(f"Successfully deleted: {path}")
@@ -130,23 +228,6 @@ class HDFSOperations:
             return False
             return False
     
     
     def copy_from_local(self, src: str, dst: str) -> bool:
     def copy_from_local(self, src: str, dst: str) -> bool:
-        """
-        从本地文件系统上传文件到 HDFS
-        
-        对应 Java 版本的 putFile 方法。
-        
-        Args:
-            src: 本地文件路径
-            dst: HDFS 目标路径
-            
-        Returns:
-            是否上传成功
-            
-        Example:
-            >>> hdfs = HDFSOperations()
-            >>> hdfs.copy_from_local('/home/hadoop/word.txt', '/user/hadoop/data/')
-            True
-        """
         if not os.path.exists(src):
         if not os.path.exists(src):
             self.logger.error(f"Local file not found: {src}")
             self.logger.error(f"Local file not found: {src}")
             return False
             return False
@@ -156,7 +237,7 @@ class HDFSOperations:
             return False
             return False
         
         
         self.logger.info(f"Copying from local {src} to HDFS {dst}")
         self.logger.info(f"Copying from local {src} to HDFS {dst}")
-        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-copyFromLocal', src, dst])
+        returncode, stdout, stderr = self._execute_command('dfs', ['-copyFromLocal', src, dst])
         
         
         if returncode == 0:
         if returncode == 0:
             self.logger.info(f"Successfully copied {src} to {dst}")
             self.logger.info(f"Successfully copied {src} to {dst}")
@@ -166,22 +247,12 @@ class HDFSOperations:
             return False
             return False
     
     
     def copy_to_local(self, src: str, dst: str) -> bool:
     def copy_to_local(self, src: str, dst: str) -> bool:
-        """
-        从 HDFS 下载文件到本地文件系统
-        
-        Args:
-            src: HDFS 源路径
-            dst: 本地目标路径
-            
-        Returns:
-            是否下载成功
-        """
         if not validate_hdfs_path(src):
         if not validate_hdfs_path(src):
             self.logger.error(f"Invalid HDFS path: {src}")
             self.logger.error(f"Invalid HDFS path: {src}")
             return False
             return False
         
         
         self.logger.info(f"Copying from HDFS {src} to local {dst}")
         self.logger.info(f"Copying from HDFS {src} to local {dst}")
-        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-copyToLocal', src, dst])
+        returncode, stdout, stderr = self._execute_command('dfs', ['-copyToLocal', src, dst])
         
         
         if returncode == 0:
         if returncode == 0:
             self.logger.info(f"Successfully copied {src} to {dst}")
             self.logger.info(f"Successfully copied {src} to {dst}")
@@ -191,23 +262,6 @@ class HDFSOperations:
             return False
             return False
     
     
     def read_file(self, path: str) -> Optional[str]:
     def read_file(self, path: str) -> Optional[str]:
-        """
-        读取 HDFS 文件内容
-        
-        对应 Java 版本的 readFile 方法。
-        
-        Args:
-            path: HDFS 文件路径
-            
-        Returns:
-            文件内容(字符串),如果失败返回 None
-            
-        Example:
-            >>> hdfs = HDFSOperations()
-            >>> content = hdfs.read_file('/user/hadoop/data/write.txt')
-            >>> print(content)
-            da jia hao,cai shi zhen de hao!
-        """
         if not validate_hdfs_path(path):
         if not validate_hdfs_path(path):
             self.logger.error(f"Invalid HDFS path: {path}")
             self.logger.error(f"Invalid HDFS path: {path}")
             return None
             return None
@@ -217,7 +271,7 @@ class HDFSOperations:
             return None
             return None
         
         
         self.logger.info(f"Reading file: {path}")
         self.logger.info(f"Reading file: {path}")
-        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-cat', path])
+        returncode, stdout, stderr = self._execute_command('dfs', ['-cat', path])
         
         
         if returncode == 0:
         if returncode == 0:
             self.logger.info(f"Successfully read file: {path}")
             self.logger.info(f"Successfully read file: {path}")
@@ -227,44 +281,25 @@ class HDFSOperations:
             return None
             return None
     
     
     def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
     def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
-        """
-        写入内容到 HDFS 文件
-        
-        对应 Java 版本的 writeFile 方法。
+        import tempfile
         
         
-        Args:
-            path: HDFS 文件路径
-            content: 要写入的内容
-            overwrite: 是否覆盖已存在的文件
-            
-        Returns:
-            是否写入成功
-            
-        Example:
-            >>> hdfs = HDFSOperations()
-            >>> hdfs.write_file('/user/hadoop/data/write.txt', 'da jia hao,cai shi zhen de hao!')
-            True
-        """
         if not validate_hdfs_path(path):
         if not validate_hdfs_path(path):
             self.logger.error(f"Invalid HDFS path: {path}")
             self.logger.error(f"Invalid HDFS path: {path}")
             return False
             return False
         
         
         self.logger.info(f"Writing to file: {path}")
         self.logger.info(f"Writing to file: {path}")
         
         
-        # 创建临时文件
-        import tempfile
         with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
         with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
             temp_file.write(content)
             temp_file.write(content)
             temp_path = temp_file.name
             temp_path = temp_file.name
         
         
         try:
         try:
-            # 使用 put 命令上传临时文件
             args = ['-put']
             args = ['-put']
             if overwrite:
             if overwrite:
                 args.append('-f')
                 args.append('-f')
             args.extend([temp_path, path])
             args.extend([temp_path, path])
             
             
-            returncode, stdout, stderr = self._execute_hdfs_command('dfs', args)
+            returncode, stdout, stderr = self._execute_command('dfs', args)
             
             
             if returncode == 0:
             if returncode == 0:
                 self.logger.info(f"Successfully wrote to file: {path}")
                 self.logger.info(f"Successfully wrote to file: {path}")
@@ -273,36 +308,17 @@ class HDFSOperations:
                 self.logger.error(f"Failed to write to file: {path}, Error: {stderr}")
                 self.logger.error(f"Failed to write to file: {path}, Error: {stderr}")
                 return False
                 return False
         finally:
         finally:
-            # 清理临时文件
             if os.path.exists(temp_path):
             if os.path.exists(temp_path):
                 os.unlink(temp_path)
                 os.unlink(temp_path)
     
     
     def exists(self, path: str) -> bool:
     def exists(self, path: str) -> bool:
-        """
-        检查 HDFS 路径是否存在
-        
-        Args:
-            path: HDFS 路径
-            
-        Returns:
-            路径是否存在
-        """
         if not validate_hdfs_path(path):
         if not validate_hdfs_path(path):
             return False
             return False
         
         
-        returncode, _, _ = self._execute_hdfs_command('dfs', ['-test', '-e', path])
+        returncode, _, _ = self._execute_command('dfs', ['-test', '-e', path])
         return returncode == 0
         return returncode == 0
     
     
     def list_dir(self, path: str) -> List[str]:
     def list_dir(self, path: str) -> List[str]:
-        """
-        列出 HDFS 目录内容
-        
-        Args:
-            path: HDFS 目录路径
-            
-        Returns:
-            目录内容列表
-        """
         if not validate_hdfs_path(path):
         if not validate_hdfs_path(path):
             self.logger.error(f"Invalid HDFS path: {path}")
             self.logger.error(f"Invalid HDFS path: {path}")
             return []
             return []
@@ -311,16 +327,13 @@ class HDFSOperations:
             self.logger.error(f"Directory does not exist: {path}")
             self.logger.error(f"Directory does not exist: {path}")
             return []
             return []
         
         
-        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-ls', path])
+        returncode, stdout, stderr = self._execute_command('dfs', ['-ls', path])
         
         
         if returncode == 0:
         if returncode == 0:
-            # 解析输出,提取文件名
             lines = stdout.strip().split('\n')
             lines = stdout.strip().split('\n')
-            # 跳过第一行(如果是目录列表的标题)
             if len(lines) > 0 and lines[0].startswith('Found'):
             if len(lines) > 0 and lines[0].startswith('Found'):
                 lines = lines[1:]
                 lines = lines[1:]
             
             
-            # 提取文件名(每一行的最后一个字段)
             files = []
             files = []
             for line in lines:
             for line in lines:
                 parts = line.split()
                 parts = line.split()
@@ -331,16 +344,58 @@ class HDFSOperations:
             self.logger.error(f"Failed to list directory: {path}, Error: {stderr}")
             self.logger.error(f"Failed to list directory: {path}, Error: {stderr}")
             return []
             return []
     
     
-    def get_file_size(self, path: str) -> Optional[int]:
-        """
-        获取 HDFS 文件大小
+    def get_file_status(self, path: str) -> Optional[FileStatus]:
+        if not validate_hdfs_path(path):
+            self.logger.error(f"Invalid HDFS path: {path}")
+            return None
         
         
-        Args:
-            path: HDFS 文件路径
-            
-        Returns:
-            文件大小(字节),如果失败返回 None
-        """
+        if not self.exists(path):
+            self.logger.error(f"Path does not exist: {path}")
+            return None
+        
+        returncode, stdout, stderr = self._execute_command('dfs', ['-stat', '%F,%s,%r,%b,%y,%z,%u,%g,%a', path])
+        
+        if returncode == 0:
+            parts = stdout.strip().split(',')
+            if len(parts) >= 9:
+                is_dir = parts[0] == 'directory'
+                try:
+                    return FileStatus(
+                        path=path,
+                        is_directory=is_dir,
+                        length=int(parts[1]) if parts[1] else 0,
+                        replication=int(parts[2]) if parts[2] else 1,
+                        block_size=int(parts[3]) if parts[3] else 134217728,
+                        modification_time=datetime.strptime(parts[4], '%Y-%m-%d %H:%M:%S') if parts[4] else None,
+                        access_time=datetime.strptime(parts[5], '%Y-%m-%d %H:%M:%S') if parts[5] else None,
+                        owner=parts[6],
+                        group=parts[7],
+                        permission=parts[8],
+                    )
+                except (ValueError, IndexError) as e:
+                    self.logger.warning(f"Failed to parse file status: {e}")
+        
+        # 备用方法:使用 -ls
+        returncode, stdout, stderr = self._execute_command('dfs', ['-ls', '-d', path])
+        if returncode == 0:
+            parts = stdout.strip().split()
+            if len(parts) >= 8:
+                is_dir = parts[0].startswith('d')
+                try:
+                    return FileStatus(
+                        path=path,
+                        is_directory=is_dir,
+                        length=int(parts[4]) if parts[4] else 0,
+                        replication=int(parts[1]) if parts[1] and not is_dir else 1,
+                        owner=parts[2],
+                        group=parts[3],
+                    )
+                except (ValueError, IndexError):
+                    pass
+        
+        return None
+    
+    def get_file_size(self, path: str) -> Optional[int]:
         if not validate_hdfs_path(path):
         if not validate_hdfs_path(path):
             self.logger.error(f"Invalid HDFS path: {path}")
             self.logger.error(f"Invalid HDFS path: {path}")
             return None
             return None
@@ -349,17 +404,97 @@ class HDFSOperations:
             self.logger.error(f"File does not exist: {path}")
             self.logger.error(f"File does not exist: {path}")
             return None
             return None
         
         
-        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-du', '-s', path])
+        returncode, stdout, stderr = self._execute_command('dfs', ['-du', '-s', path])
         
         
         if returncode == 0:
         if returncode == 0:
-            # 解析输出,提取文件大小
             parts = stdout.strip().split()
             parts = stdout.strip().split()
             if len(parts) >= 1:
             if len(parts) >= 1:
                 try:
                 try:
                     return int(parts[0])
                     return int(parts[0])
                 except ValueError:
                 except ValueError:
                     self.logger.error(f"Failed to parse file size: {stdout}")
                     self.logger.error(f"Failed to parse file size: {stdout}")
-                    return None
+        
+        return None
+    
+    def rename(self, src: str, dst: str) -> bool:
+        if not validate_hdfs_path(src):
+            self.logger.error(f"Invalid source path: {src}")
+            return False
+        
+        if not validate_hdfs_path(dst):
+            self.logger.error(f"Invalid destination path: {dst}")
+            return False
+        
+        self.logger.info(f"Renaming {src} to {dst}")
+        returncode, stdout, stderr = self._execute_command('dfs', ['-mv', src, dst])
+        
+        if returncode == 0:
+            self.logger.info(f"Successfully renamed {src} to {dst}")
+            return True
         else:
         else:
-            self.logger.error(f"Failed to get file size: {path}, Error: {stderr}")
-            return None
+            self.logger.error(f"Failed to rename {src} to {dst}, Error: {stderr}")
+            return False
+    
+    def set_permission(self, path: str, permission: str) -> bool:
+        if not validate_hdfs_path(path):
+            self.logger.error(f"Invalid HDFS path: {path}")
+            return False
+        
+        self.logger.info(f"Setting permission of {path} to {permission}")
+        returncode, stdout, stderr = self._execute_command('dfs', ['-chmod', permission, path])
+        
+        if returncode == 0:
+            self.logger.info(f"Successfully set permission of {path} to {permission}")
+            return True
+        else:
+            self.logger.error(f"Failed to set permission of {path}, Error: {stderr}")
+            return False
+    
+    def set_owner(self, path: str, owner: Optional[str] = None, 
+                  group: Optional[str] = None) -> bool:
+        if not validate_hdfs_path(path):
+            self.logger.error(f"Invalid HDFS path: {path}")
+            return False
+        
+        if owner is None and group is None:
+            self.logger.error("At least one of owner or group must be specified")
+            return False
+        
+        owner_str = owner if owner else ''
+        group_str = f":{group}" if group else ''
+        owner_group = f"{owner_str}{group_str}"
+        
+        self.logger.info(f"Setting owner of {path} to {owner_group}")
+        returncode, stdout, stderr = self._execute_command('dfs', ['-chown', owner_group, path])
+        
+        if returncode == 0:
+            self.logger.info(f"Successfully set owner of {path} to {owner_group}")
+            return True
+        else:
+            self.logger.error(f"Failed to set owner of {path}, Error: {stderr}")
+            return False
+
+
+class WebHDFSBackend(HDFSBackend):
+    """WebHDFS REST API 后端实现"""
+    
+    def __init__(self, config: HDFSConfig, logger):
+        super().__init__(config, logger)
+        self.base_url = f"http://{config.namenode_host}:{config.namenode_http_port}/webhdfs/v1"
+        self.user = config.user or os.environ.get('USER', 'hadoop')
+        self._session = None
+    
+    def _get_session(self):
+        """获取 HTTP 会话"""
+        if self._session is None:
+            try:
+                import requests
+                self._session = requests.Session()
+            except ImportError:
+                self.logger.error("requests library is required for WebHDFS backend")
+                raise
+        return self._session
+    
+    def is_available(self) -> bool:
+        """检查后端是否可用"""
+        try: