config.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. """
  2. 配置管理模块
  3. 提供现代化的配置管理功能:
  4. - 支持多种配置源(环境变量、配置文件、默认值)
  5. - 类型安全的配置访问
  6. - 配置验证
  7. - 配置热重载(可选)
  8. """
  9. import os
  10. import json
  11. from typing import Any, Dict, Optional, List, Union, Callable, TypeVar
  12. from dataclasses import dataclass, field, asdict
  13. from pathlib import Path
  14. from enum import Enum
  15. T = TypeVar('T')
  16. class ConfigSource(Enum):
  17. """配置来源枚举"""
  18. DEFAULT = "default"
  19. ENVIRONMENT = "environment"
  20. CONFIG_FILE = "config_file"
  21. RUNTIME = "runtime"
  22. @dataclass
  23. class ConfigValue:
  24. """配置值及其来源"""
  25. value: Any
  26. source: ConfigSource
  27. description: str = ""
  28. @dataclass
  29. class HDFSConfig:
  30. """HDFS 相关配置"""
  31. # 连接配置
  32. namenode_host: str = "localhost"
  33. namenode_port: int = 9000
  34. namenode_http_port: int = 50070
  35. # 认证配置
  36. user: Optional[str] = None
  37. kerberos_principal: Optional[str] = None
  38. kerberos_keytab: Optional[str] = None
  39. # 连接超时配置
  40. connect_timeout: int = 30
  41. read_timeout: int = 60
  42. write_timeout: int = 120
  43. # 重试配置
  44. max_retries: int = 3
  45. retry_delay: float = 1.0
  46. # 后端选择
  47. preferred_backend: str = "auto" # auto, cli, hdfs, pyhdfs, webhdfs
  48. # 命令行配置
  49. hadoop_home: Optional[str] = None
  50. hadoop_cmd: str = "hdfs"
  51. def to_dict(self) -> Dict[str, Any]:
  52. """转换为字典"""
  53. return asdict(self)
  54. @classmethod
  55. def from_dict(cls, data: Dict[str, Any]) -> 'HDFSConfig':
  56. """从字典创建配置"""
  57. return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
  58. @dataclass
  59. class SparkConfig:
  60. """Spark 相关配置"""
  61. # 应用配置
  62. app_name: str = "HadoopTools"
  63. master: Optional[str] = None # None 表示从配置自动获取
  64. # 资源配置
  65. driver_memory: str = "1g"
  66. executor_memory: str = "1g"
  67. executor_cores: int = 2
  68. num_executors: int = 2
  69. # 性能配置
  70. shuffle_partitions: int = 200
  71. default_parallelism: Optional[int] = None
  72. # 序列化配置
  73. serializer: str = "org.apache.spark.serializer.KryoSerializer"
  74. kryo_registration_required: bool = False
  75. # 日志配置
  76. log_level: str = "WARN"
  77. # 额外配置
  78. extra_configs: Dict[str, str] = field(default_factory=dict)
  79. def to_dict(self) -> Dict[str, Any]:
  80. """转换为字典"""
  81. return asdict(self)
  82. @classmethod
  83. def from_dict(cls, data: Dict[str, Any]) -> 'SparkConfig':
  84. """从字典创建配置"""
  85. return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
  86. @dataclass
  87. class MapReduceConfig:
  88. """MapReduce 相关配置"""
  89. # 作业配置
  90. job_name: str = "MapReduceJob"
  91. num_reducers: int = 1
  92. # 资源配置
  93. map_memory_mb: int = 1024
  94. reduce_memory_mb: int = 1024
  95. map_java_opts: str = "-Xmx819m"
  96. reduce_java_opts: str = "-Xmx819m"
  97. # 压缩配置
  98. map_output_compress: bool = True
  99. map_output_compression_codec: str = "org.apache.hadoop.io.compress.SnappyCodec"
  100. # 推测执行配置
  101. map_speculative: bool = False
  102. reduce_speculative: bool = False
  103. # 额外配置
  104. extra_configs: Dict[str, str] = field(default_factory=dict)
  105. def to_dict(self) -> Dict[str, Any]:
  106. """转换为字典"""
  107. return asdict(self)
  108. @classmethod
  109. def from_dict(cls, data: Dict[str, Any]) -> 'MapReduceConfig':
  110. """从字典创建配置"""
  111. return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
  112. @dataclass
  113. class GlobalConfig:
  114. """全局配置"""
  115. # 日志配置
  116. log_level: str = "INFO"
  117. log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  118. log_file: Optional[str] = None
  119. # 子配置
  120. hdfs: HDFSConfig = field(default_factory=HDFSConfig)
  121. spark: SparkConfig = field(default_factory=SparkConfig)
  122. mapreduce: MapReduceConfig = field(default_factory=MapReduceConfig)
  123. # 额外配置
  124. extra: Dict[str, Any] = field(default_factory=dict)
  125. class ConfigurationManager:
  126. """
  127. 配置管理器
  128. 提供统一的配置管理接口,支持多种配置源。
  129. """
  130. _instance: Optional['ConfigurationManager'] = None
  131. _config: Optional[GlobalConfig] = None
  132. _config_sources: Dict[str, ConfigValue] = field(default_factory=dict)
  133. def __new__(cls) -> 'ConfigurationManager':
  134. """单例模式"""
  135. if cls._instance is None:
  136. cls._instance = super().__new__(cls)
  137. return cls._instance
  138. def __init__(self):
  139. if self._config is None:
  140. self._config = GlobalConfig()
  141. self._load_from_environment()
  142. def _load_from_environment(self):
  143. """从环境变量加载配置"""
  144. env_mappings = {
  145. # HDFS 配置
  146. 'HADOOP_HOME': ('hdfs.hadoop_home', str),
  147. 'HDFS_NAMENODE_HOST': ('hdfs.namenode_host', str),
  148. 'HDFS_NAMENODE_PORT': ('hdfs.namenode_port', int),
  149. 'HDFS_USER': ('hdfs.user', str),
  150. 'HDFS_PREFERRED_BACKEND': ('hdfs.preferred_backend', str),
  151. # Spark 配置
  152. 'SPARK_APP_NAME': ('spark.app_name', str),
  153. 'SPARK_MASTER': ('spark.master', str),
  154. 'SPARK_DRIVER_MEMORY': ('spark.driver_memory', str),
  155. 'SPARK_EXECUTOR_MEMORY': ('spark.executor_memory', str),
  156. 'SPARK_LOG_LEVEL': ('spark.log_level', str),
  157. # MapReduce 配置
  158. 'MAPREDUCE_JOB_NAME': ('mapreduce.job_name', str),
  159. 'MAPREDUCE_NUM_REDUCERS': ('mapreduce.num_reducers', int),
  160. # 全局配置
  161. 'LOG_LEVEL': ('log_level', str),
  162. 'LOG_FILE': ('log_file', str),
  163. }
  164. for env_var, (config_path, type_func) in env_mappings.items():
  165. if env_var in os.environ:
  166. try:
  167. value = type_func(os.environ[env_var])
  168. self.set(config_path, value, ConfigSource.ENVIRONMENT)
  169. except (ValueError, TypeError) as e:
  170. print(f"Warning: Invalid value for {env_var}: {e}")
  171. def load_from_file(self, config_path: Union[str, Path]) -> bool:
  172. """
  173. 从配置文件加载配置
  174. 支持 JSON 格式的配置文件。
  175. Args:
  176. config_path: 配置文件路径
  177. Returns:
  178. 是否加载成功
  179. """
  180. config_path = Path(config_path)
  181. if not config_path.exists():
  182. return False
  183. try:
  184. with open(config_path, 'r', encoding='utf-8') as f:
  185. config_data = json.load(f)
  186. self._update_from_dict(config_data, ConfigSource.CONFIG_FILE)
  187. return True
  188. except (json.JSONDecodeError, IOError, KeyError) as e:
  189. print(f"Warning: Failed to load config from {config_path}: {e}")
  190. return False
  191. def _update_from_dict(self, data: Dict[str, Any], source: ConfigSource):
  192. """从字典更新配置"""
  193. # 处理顶层配置
  194. for key, value in data.items():
  195. if key == 'hdfs' and isinstance(value, dict):
  196. self._config.hdfs = HDFSConfig.from_dict(value)
  197. elif key == 'spark' and isinstance(value, dict):
  198. self._config.spark = SparkConfig.from_dict(value)
  199. elif key == 'mapreduce' and isinstance(value, dict):
  200. self._config.mapreduce = MapReduceConfig.from_dict(value)
  201. elif hasattr(self._config, key):
  202. self.set(key, value, source)
  203. else:
  204. # 额外配置
  205. self._config.extra[key] = value
  206. self._config_sources[f"extra.{key}"] = ConfigValue(
  207. value=value,
  208. source=source,
  209. description=f"Extra configuration: {key}"
  210. )
  211. def get(self, key: str, default: T = None) -> Optional[T]:
  212. """
  213. 获取配置值
  214. 支持点分隔的路径,如 'hdfs.namenode_host'
  215. Args:
  216. key: 配置键
  217. default: 默认值
  218. Returns:
  219. 配置值
  220. """
  221. parts = key.split('.')
  222. current = self._config
  223. for part in parts:
  224. if hasattr(current, part):
  225. current = getattr(current, part)
  226. elif isinstance(current, dict) and part in current:
  227. current = current[part]
  228. else:
  229. return default
  230. return current
  231. def set(self, key: str, value: Any, source: ConfigSource = ConfigSource.RUNTIME):
  232. """
  233. 设置配置值
  234. 支持点分隔的路径,如 'hdfs.namenode_host'
  235. Args:
  236. key: 配置键
  237. value: 配置值
  238. source: 配置来源
  239. """
  240. parts = key.split('.')
  241. if len(parts) == 1:
  242. # 顶层配置
  243. if hasattr(self._config, parts[0]):
  244. setattr(self._config, parts[0], value)
  245. self._config_sources[key] = ConfigValue(
  246. value=value,
  247. source=source,
  248. description=f"Global configuration: {key}"
  249. )
  250. else:
  251. # 嵌套配置
  252. current = self._config
  253. for part in parts[:-1]:
  254. if hasattr(current, part):
  255. current = getattr(current, part)
  256. elif isinstance(current, dict) and part in current:
  257. current = current[part]
  258. else:
  259. return # 路径不存在,静默失败
  260. # 设置最后一个属性
  261. last_part = parts[-1]
  262. if hasattr(current, last_part):
  263. setattr(current, last_part, value)
  264. self._config_sources[key] = ConfigValue(
  265. value=value,
  266. source=source,
  267. description=f"Configuration: {key}"
  268. )
  269. elif isinstance(current, dict):
  270. current[last_part] = value
  271. self._config_sources[key] = ConfigValue(
  272. value=value,
  273. source=source,
  274. description=f"Extra configuration: {key}"
  275. )
  276. def get_config_source(self, key: str) -> Optional[ConfigSource]:
  277. """
  278. 获取配置值的来源
  279. Args:
  280. key: 配置键
  281. Returns:
  282. 配置来源
  283. """
  284. if key in self._config_sources:
  285. return self._config_sources[key].source
  286. return None
  287. @property
  288. def hdfs(self) -> HDFSConfig:
  289. """获取 HDFS 配置"""
  290. return self._config.hdfs
  291. @property
  292. def spark(self) -> SparkConfig:
  293. """获取 Spark 配置"""
  294. return self._config.spark
  295. @property
  296. def mapreduce(self) -> MapReduceConfig:
  297. """获取 MapReduce 配置"""
  298. return self._config.mapreduce
  299. @property
  300. def global_config(self) -> GlobalConfig:
  301. """获取全局配置"""
  302. return self._config
  303. def to_dict(self) -> Dict[str, Any]:
  304. """转换为字典"""
  305. return {
  306. 'log_level': self._config.log_level,
  307. 'log_format': self._config.log_format,
  308. 'log_file': self._config.log_file,
  309. 'hdfs': self._config.hdfs.to_dict(),
  310. 'spark': self._config.spark.to_dict(),
  311. 'mapreduce': self._config.mapreduce.to_dict(),
  312. 'extra': self._config.extra,
  313. }
  314. def save_to_file(self, config_path: Union[str, Path]) -> bool:
  315. """
  316. 保存配置到文件
  317. Args:
  318. config_path: 配置文件路径
  319. Returns:
  320. 是否保存成功
  321. """
  322. try:
  323. with open(config_path, 'w', encoding='utf-8') as f:
  324. json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)
  325. return True
  326. except IOError as e:
  327. print(f"Warning: Failed to save config to {config_path}: {e}")
  328. return False
  329. def reset(self):
  330. """重置配置为默认值"""
  331. self._config = GlobalConfig()
  332. self._config_sources.clear()
  333. self._load_from_environment()
  334. # 便捷函数
  335. def get_config() -> ConfigurationManager:
  336. """获取配置管理器实例"""
  337. return ConfigurationManager()
  338. def load_config(config_path: Optional[Union[str, Path]] = None) -> ConfigurationManager:
  339. """
  340. 加载配置
  341. 优先从指定路径加载,然后从环境变量加载。
  342. Args:
  343. config_path: 配置文件路径(可选)
  344. Returns:
  345. 配置管理器实例
  346. """
  347. config = ConfigurationManager()
  348. if config_path:
  349. config.load_from_file(config_path)
  350. return config