""" 配置管理模块 提供现代化的配置管理功能: - 支持多种配置源(环境变量、配置文件、默认值) - 类型安全的配置访问 - 配置验证 - 配置热重载(可选) """ import os import json from typing import Any, Dict, Optional, List, Union, Callable, TypeVar from dataclasses import dataclass, field, asdict from pathlib import Path from enum import Enum T = TypeVar('T') class ConfigSource(Enum): """配置来源枚举""" DEFAULT = "default" ENVIRONMENT = "environment" CONFIG_FILE = "config_file" RUNTIME = "runtime" @dataclass class ConfigValue: """配置值及其来源""" value: Any source: ConfigSource description: str = "" @dataclass class HDFSConfig: """HDFS 相关配置""" # 连接配置 namenode_host: str = "localhost" namenode_port: int = 9000 namenode_http_port: int = 50070 # 认证配置 user: Optional[str] = None kerberos_principal: Optional[str] = None kerberos_keytab: Optional[str] = None # 连接超时配置 connect_timeout: int = 30 read_timeout: int = 60 write_timeout: int = 120 # 重试配置 max_retries: int = 3 retry_delay: float = 1.0 # 后端选择 preferred_backend: str = "auto" # auto, cli, hdfs, pyhdfs, webhdfs # 命令行配置 hadoop_home: Optional[str] = None hadoop_cmd: str = "hdfs" def to_dict(self) -> Dict[str, Any]: """转换为字典""" return asdict(self) @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'HDFSConfig': """从字典创建配置""" return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}) @dataclass class SparkConfig: """Spark 相关配置""" # 应用配置 app_name: str = "HadoopTools" master: Optional[str] = None # None 表示从配置自动获取 # 资源配置 driver_memory: str = "1g" executor_memory: str = "1g" executor_cores: int = 2 num_executors: int = 2 # 性能配置 shuffle_partitions: int = 200 default_parallelism: Optional[int] = None # 序列化配置 serializer: str = "org.apache.spark.serializer.KryoSerializer" kryo_registration_required: bool = False # 日志配置 log_level: str = "WARN" # 额外配置 extra_configs: Dict[str, str] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return asdict(self) @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'SparkConfig': """从字典创建配置""" return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}) @dataclass class MapReduceConfig: """MapReduce 相关配置""" # 作业配置 job_name: str = "MapReduceJob" num_reducers: int = 1 # 资源配置 map_memory_mb: int = 1024 reduce_memory_mb: int = 1024 map_java_opts: str = "-Xmx819m" reduce_java_opts: str = "-Xmx819m" # 压缩配置 map_output_compress: bool = True map_output_compression_codec: str = "org.apache.hadoop.io.compress.SnappyCodec" # 推测执行配置 map_speculative: bool = False reduce_speculative: bool = False # 额外配置 extra_configs: Dict[str, str] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return asdict(self) @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'MapReduceConfig': """从字典创建配置""" return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}) @dataclass class GlobalConfig: """全局配置""" # 日志配置 log_level: str = "INFO" log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" log_file: Optional[str] = None # 子配置 hdfs: HDFSConfig = field(default_factory=HDFSConfig) spark: SparkConfig = field(default_factory=SparkConfig) mapreduce: MapReduceConfig = field(default_factory=MapReduceConfig) # 额外配置 extra: Dict[str, Any] = field(default_factory=dict) class ConfigurationManager: """ 配置管理器 提供统一的配置管理接口,支持多种配置源。 """ _instance: Optional['ConfigurationManager'] = None _config: Optional[GlobalConfig] = None _config_sources: Dict[str, ConfigValue] = field(default_factory=dict) def __new__(cls) -> 'ConfigurationManager': """单例模式""" if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if self._config is None: self._config = GlobalConfig() self._load_from_environment() def _load_from_environment(self): """从环境变量加载配置""" env_mappings = { # HDFS 配置 'HADOOP_HOME': ('hdfs.hadoop_home', str), 'HDFS_NAMENODE_HOST': ('hdfs.namenode_host', str), 'HDFS_NAMENODE_PORT': ('hdfs.namenode_port', int), 'HDFS_USER': ('hdfs.user', str), 'HDFS_PREFERRED_BACKEND': ('hdfs.preferred_backend', str), # Spark 配置 'SPARK_APP_NAME': ('spark.app_name', str), 'SPARK_MASTER': ('spark.master', str), 'SPARK_DRIVER_MEMORY': ('spark.driver_memory', str), 'SPARK_EXECUTOR_MEMORY': ('spark.executor_memory', str), 'SPARK_LOG_LEVEL': ('spark.log_level', str), # MapReduce 配置 'MAPREDUCE_JOB_NAME': ('mapreduce.job_name', str), 'MAPREDUCE_NUM_REDUCERS': ('mapreduce.num_reducers', int), # 全局配置 'LOG_LEVEL': ('log_level', str), 'LOG_FILE': ('log_file', str), } for env_var, (config_path, type_func) in env_mappings.items(): if env_var in os.environ: try: value = type_func(os.environ[env_var]) self.set(config_path, value, ConfigSource.ENVIRONMENT) except (ValueError, TypeError) as e: print(f"Warning: Invalid value for {env_var}: {e}") def load_from_file(self, config_path: Union[str, Path]) -> bool: """ 从配置文件加载配置 支持 JSON 格式的配置文件。 Args: config_path: 配置文件路径 Returns: 是否加载成功 """ config_path = Path(config_path) if not config_path.exists(): return False try: with open(config_path, 'r', encoding='utf-8') as f: config_data = json.load(f) self._update_from_dict(config_data, ConfigSource.CONFIG_FILE) return True except (json.JSONDecodeError, IOError, KeyError) as e: print(f"Warning: Failed to load config from {config_path}: {e}") return False def _update_from_dict(self, data: Dict[str, Any], source: ConfigSource): """从字典更新配置""" # 处理顶层配置 for key, value in data.items(): if key == 'hdfs' and isinstance(value, dict): self._config.hdfs = HDFSConfig.from_dict(value) elif key == 'spark' and isinstance(value, dict): self._config.spark = SparkConfig.from_dict(value) elif key == 'mapreduce' and isinstance(value, dict): self._config.mapreduce = MapReduceConfig.from_dict(value) elif hasattr(self._config, key): self.set(key, value, source) else: # 额外配置 self._config.extra[key] = value self._config_sources[f"extra.{key}"] = ConfigValue( value=value, source=source, description=f"Extra configuration: {key}" ) def get(self, key: str, default: T = None) -> Optional[T]: """ 获取配置值 支持点分隔的路径,如 'hdfs.namenode_host' Args: key: 配置键 default: 默认值 Returns: 配置值 """ parts = key.split('.') current = self._config for part in parts: if hasattr(current, part): current = getattr(current, part) elif isinstance(current, dict) and part in current: current = current[part] else: return default return current def set(self, key: str, value: Any, source: ConfigSource = ConfigSource.RUNTIME): """ 设置配置值 支持点分隔的路径,如 'hdfs.namenode_host' Args: key: 配置键 value: 配置值 source: 配置来源 """ parts = key.split('.') if len(parts) == 1: # 顶层配置 if hasattr(self._config, parts[0]): setattr(self._config, parts[0], value) self._config_sources[key] = ConfigValue( value=value, source=source, description=f"Global configuration: {key}" ) else: # 嵌套配置 current = self._config for part in parts[:-1]: if hasattr(current, part): current = getattr(current, part) elif isinstance(current, dict) and part in current: current = current[part] else: return # 路径不存在,静默失败 # 设置最后一个属性 last_part = parts[-1] if hasattr(current, last_part): setattr(current, last_part, value) self._config_sources[key] = ConfigValue( value=value, source=source, description=f"Configuration: {key}" ) elif isinstance(current, dict): current[last_part] = value self._config_sources[key] = ConfigValue( value=value, source=source, description=f"Extra configuration: {key}" ) def get_config_source(self, key: str) -> Optional[ConfigSource]: """ 获取配置值的来源 Args: key: 配置键 Returns: 配置来源 """ if key in self._config_sources: return self._config_sources[key].source return None @property def hdfs(self) -> HDFSConfig: """获取 HDFS 配置""" return self._config.hdfs @property def spark(self) -> SparkConfig: """获取 Spark 配置""" return self._config.spark @property def mapreduce(self) -> MapReduceConfig: """获取 MapReduce 配置""" return self._config.mapreduce @property def global_config(self) -> GlobalConfig: """获取全局配置""" return self._config def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { 'log_level': self._config.log_level, 'log_format': self._config.log_format, 'log_file': self._config.log_file, 'hdfs': self._config.hdfs.to_dict(), 'spark': self._config.spark.to_dict(), 'mapreduce': self._config.mapreduce.to_dict(), 'extra': self._config.extra, } def save_to_file(self, config_path: Union[str, Path]) -> bool: """ 保存配置到文件 Args: config_path: 配置文件路径 Returns: 是否保存成功 """ try: with open(config_path, 'w', encoding='utf-8') as f: json.dump(self.to_dict(), f, indent=2, ensure_ascii=False) return True except IOError as e: print(f"Warning: Failed to save config to {config_path}: {e}") return False def reset(self): """重置配置为默认值""" self._config = GlobalConfig() self._config_sources.clear() self._load_from_environment() # 便捷函数 def get_config() -> ConfigurationManager: """获取配置管理器实例""" return ConfigurationManager() def load_config(config_path: Optional[Union[str, Path]] = None) -> ConfigurationManager: """ 加载配置 优先从指定路径加载,然后从环境变量加载。 Args: config_path: 配置文件路径(可选) Returns: 配置管理器实例 """ config = ConfigurationManager() if config_path: config.load_from_file(config_path) return config