| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445 |
- """
- 配置管理模块
- 提供现代化的配置管理功能:
- - 支持多种配置源(环境变量、配置文件、默认值)
- - 类型安全的配置访问
- - 配置验证
- - 配置热重载(可选)
- """
- import os
- import json
- from typing import Any, Dict, Optional, List, Union, Callable, TypeVar
- from dataclasses import dataclass, field, asdict
- from pathlib import Path
- from enum import Enum
- T = TypeVar('T')
- class ConfigSource(Enum):
- """配置来源枚举"""
- DEFAULT = "default"
- ENVIRONMENT = "environment"
- CONFIG_FILE = "config_file"
- RUNTIME = "runtime"
- @dataclass
- class ConfigValue:
- """配置值及其来源"""
- value: Any
- source: ConfigSource
- description: str = ""
- @dataclass
- class HDFSConfig:
- """HDFS 相关配置"""
-
- # 连接配置
- namenode_host: str = "localhost"
- namenode_port: int = 9000
- namenode_http_port: int = 50070
-
- # 认证配置
- user: Optional[str] = None
- kerberos_principal: Optional[str] = None
- kerberos_keytab: Optional[str] = None
-
- # 连接超时配置
- connect_timeout: int = 30
- read_timeout: int = 60
- write_timeout: int = 120
-
- # 重试配置
- max_retries: int = 3
- retry_delay: float = 1.0
-
- # 后端选择
- preferred_backend: str = "auto" # auto, cli, hdfs, pyhdfs, webhdfs
-
- # 命令行配置
- hadoop_home: Optional[str] = None
- hadoop_cmd: str = "hdfs"
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典"""
- return asdict(self)
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'HDFSConfig':
- """从字典创建配置"""
- return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
- @dataclass
- class SparkConfig:
- """Spark 相关配置"""
-
- # 应用配置
- app_name: str = "HadoopTools"
- master: Optional[str] = None # None 表示从配置自动获取
-
- # 资源配置
- driver_memory: str = "1g"
- executor_memory: str = "1g"
- executor_cores: int = 2
- num_executors: int = 2
-
- # 性能配置
- shuffle_partitions: int = 200
- default_parallelism: Optional[int] = None
-
- # 序列化配置
- serializer: str = "org.apache.spark.serializer.KryoSerializer"
- kryo_registration_required: bool = False
-
- # 日志配置
- log_level: str = "WARN"
-
- # 额外配置
- extra_configs: Dict[str, str] = field(default_factory=dict)
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典"""
- return asdict(self)
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'SparkConfig':
- """从字典创建配置"""
- return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
- @dataclass
- class MapReduceConfig:
- """MapReduce 相关配置"""
-
- # 作业配置
- job_name: str = "MapReduceJob"
- num_reducers: int = 1
-
- # 资源配置
- map_memory_mb: int = 1024
- reduce_memory_mb: int = 1024
- map_java_opts: str = "-Xmx819m"
- reduce_java_opts: str = "-Xmx819m"
-
- # 压缩配置
- map_output_compress: bool = True
- map_output_compression_codec: str = "org.apache.hadoop.io.compress.SnappyCodec"
-
- # 推测执行配置
- map_speculative: bool = False
- reduce_speculative: bool = False
-
- # 额外配置
- extra_configs: Dict[str, str] = field(default_factory=dict)
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典"""
- return asdict(self)
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'MapReduceConfig':
- """从字典创建配置"""
- return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
- @dataclass
- class GlobalConfig:
- """全局配置"""
-
- # 日志配置
- log_level: str = "INFO"
- log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
- log_file: Optional[str] = None
-
- # 子配置
- hdfs: HDFSConfig = field(default_factory=HDFSConfig)
- spark: SparkConfig = field(default_factory=SparkConfig)
- mapreduce: MapReduceConfig = field(default_factory=MapReduceConfig)
-
- # 额外配置
- extra: Dict[str, Any] = field(default_factory=dict)
- class ConfigurationManager:
- """
- 配置管理器
-
- 提供统一的配置管理接口,支持多种配置源。
- """
-
- _instance: Optional['ConfigurationManager'] = None
- _config: Optional[GlobalConfig] = None
- _config_sources: Dict[str, ConfigValue] = field(default_factory=dict)
-
- def __new__(cls) -> 'ConfigurationManager':
- """单例模式"""
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- return cls._instance
-
- def __init__(self):
- if self._config is None:
- self._config = GlobalConfig()
- self._load_from_environment()
-
- def _load_from_environment(self):
- """从环境变量加载配置"""
- env_mappings = {
- # HDFS 配置
- 'HADOOP_HOME': ('hdfs.hadoop_home', str),
- 'HDFS_NAMENODE_HOST': ('hdfs.namenode_host', str),
- 'HDFS_NAMENODE_PORT': ('hdfs.namenode_port', int),
- 'HDFS_USER': ('hdfs.user', str),
- 'HDFS_PREFERRED_BACKEND': ('hdfs.preferred_backend', str),
-
- # Spark 配置
- 'SPARK_APP_NAME': ('spark.app_name', str),
- 'SPARK_MASTER': ('spark.master', str),
- 'SPARK_DRIVER_MEMORY': ('spark.driver_memory', str),
- 'SPARK_EXECUTOR_MEMORY': ('spark.executor_memory', str),
- 'SPARK_LOG_LEVEL': ('spark.log_level', str),
-
- # MapReduce 配置
- 'MAPREDUCE_JOB_NAME': ('mapreduce.job_name', str),
- 'MAPREDUCE_NUM_REDUCERS': ('mapreduce.num_reducers', int),
-
- # 全局配置
- 'LOG_LEVEL': ('log_level', str),
- 'LOG_FILE': ('log_file', str),
- }
-
- for env_var, (config_path, type_func) in env_mappings.items():
- if env_var in os.environ:
- try:
- value = type_func(os.environ[env_var])
- self.set(config_path, value, ConfigSource.ENVIRONMENT)
- except (ValueError, TypeError) as e:
- print(f"Warning: Invalid value for {env_var}: {e}")
-
- def load_from_file(self, config_path: Union[str, Path]) -> bool:
- """
- 从配置文件加载配置
-
- 支持 JSON 格式的配置文件。
-
- Args:
- config_path: 配置文件路径
-
- Returns:
- 是否加载成功
- """
- config_path = Path(config_path)
-
- if not config_path.exists():
- return False
-
- try:
- with open(config_path, 'r', encoding='utf-8') as f:
- config_data = json.load(f)
-
- self._update_from_dict(config_data, ConfigSource.CONFIG_FILE)
- return True
-
- except (json.JSONDecodeError, IOError, KeyError) as e:
- print(f"Warning: Failed to load config from {config_path}: {e}")
- return False
-
- def _update_from_dict(self, data: Dict[str, Any], source: ConfigSource):
- """从字典更新配置"""
- # 处理顶层配置
- for key, value in data.items():
- if key == 'hdfs' and isinstance(value, dict):
- self._config.hdfs = HDFSConfig.from_dict(value)
- elif key == 'spark' and isinstance(value, dict):
- self._config.spark = SparkConfig.from_dict(value)
- elif key == 'mapreduce' and isinstance(value, dict):
- self._config.mapreduce = MapReduceConfig.from_dict(value)
- elif hasattr(self._config, key):
- self.set(key, value, source)
- else:
- # 额外配置
- self._config.extra[key] = value
- self._config_sources[f"extra.{key}"] = ConfigValue(
- value=value,
- source=source,
- description=f"Extra configuration: {key}"
- )
-
- def get(self, key: str, default: T = None) -> Optional[T]:
- """
- 获取配置值
-
- 支持点分隔的路径,如 'hdfs.namenode_host'
-
- Args:
- key: 配置键
- default: 默认值
-
- Returns:
- 配置值
- """
- parts = key.split('.')
- current = self._config
-
- for part in parts:
- if hasattr(current, part):
- current = getattr(current, part)
- elif isinstance(current, dict) and part in current:
- current = current[part]
- else:
- return default
-
- return current
-
- def set(self, key: str, value: Any, source: ConfigSource = ConfigSource.RUNTIME):
- """
- 设置配置值
-
- 支持点分隔的路径,如 'hdfs.namenode_host'
-
- Args:
- key: 配置键
- value: 配置值
- source: 配置来源
- """
- parts = key.split('.')
-
- if len(parts) == 1:
- # 顶层配置
- if hasattr(self._config, parts[0]):
- setattr(self._config, parts[0], value)
- self._config_sources[key] = ConfigValue(
- value=value,
- source=source,
- description=f"Global configuration: {key}"
- )
- else:
- # 嵌套配置
- current = self._config
-
- for part in parts[:-1]:
- if hasattr(current, part):
- current = getattr(current, part)
- elif isinstance(current, dict) and part in current:
- current = current[part]
- else:
- return # 路径不存在,静默失败
-
- # 设置最后一个属性
- last_part = parts[-1]
- if hasattr(current, last_part):
- setattr(current, last_part, value)
- self._config_sources[key] = ConfigValue(
- value=value,
- source=source,
- description=f"Configuration: {key}"
- )
- elif isinstance(current, dict):
- current[last_part] = value
- self._config_sources[key] = ConfigValue(
- value=value,
- source=source,
- description=f"Extra configuration: {key}"
- )
-
- def get_config_source(self, key: str) -> Optional[ConfigSource]:
- """
- 获取配置值的来源
-
- Args:
- key: 配置键
-
- Returns:
- 配置来源
- """
- if key in self._config_sources:
- return self._config_sources[key].source
- return None
-
- @property
- def hdfs(self) -> HDFSConfig:
- """获取 HDFS 配置"""
- return self._config.hdfs
-
- @property
- def spark(self) -> SparkConfig:
- """获取 Spark 配置"""
- return self._config.spark
-
- @property
- def mapreduce(self) -> MapReduceConfig:
- """获取 MapReduce 配置"""
- return self._config.mapreduce
-
- @property
- def global_config(self) -> GlobalConfig:
- """获取全局配置"""
- return self._config
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典"""
- return {
- 'log_level': self._config.log_level,
- 'log_format': self._config.log_format,
- 'log_file': self._config.log_file,
- 'hdfs': self._config.hdfs.to_dict(),
- 'spark': self._config.spark.to_dict(),
- 'mapreduce': self._config.mapreduce.to_dict(),
- 'extra': self._config.extra,
- }
-
- def save_to_file(self, config_path: Union[str, Path]) -> bool:
- """
- 保存配置到文件
-
- Args:
- config_path: 配置文件路径
-
- Returns:
- 是否保存成功
- """
- try:
- with open(config_path, 'w', encoding='utf-8') as f:
- json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)
- return True
- except IOError as e:
- print(f"Warning: Failed to save config to {config_path}: {e}")
- return False
-
- def reset(self):
- """重置配置为默认值"""
- self._config = GlobalConfig()
- self._config_sources.clear()
- self._load_from_environment()
- # 便捷函数
- def get_config() -> ConfigurationManager:
- """获取配置管理器实例"""
- return ConfigurationManager()
- def load_config(config_path: Optional[Union[str, Path]] = None) -> ConfigurationManager:
- """
- 加载配置
-
- 优先从指定路径加载,然后从环境变量加载。
-
- Args:
- config_path: 配置文件路径(可选)
-
- Returns:
- 配置管理器实例
- """
- config = ConfigurationManager()
-
- if config_path:
- config.load_from_file(config_path)
-
- return config
|