wordcount_streaming.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767
  1. """
  2. Hadoop Streaming 方式的词频统计模块 - 现代化版本
  3. 对应 Java 版本的 WordCount 类,使用 Hadoop Streaming 方式实现:
  4. - Mapper: 从标准输入读取数据,分割为单词,输出 <单词, 1>
  5. - Reducer: 从标准输入读取 Mapper 输出,统计每个单词的总次数
  6. - Combiner: 可选的本地合并,减少数据传输
  7. 现代化特性:
  8. - 配置管理集成
  9. - 改进的错误处理
  10. - 类型安全的数据类
  11. - 增强的统计功能
  12. - 灵活的命令行参数
  13. """
  14. import sys
  15. import os
  16. import json
  17. from collections import defaultdict
  18. from dataclasses import dataclass, field, asdict
  19. from typing import Dict, List, Optional, Tuple, Any, Iterator
  20. from enum import Enum
  21. from ..utils.helpers import run_command, setup_logger, format_file_size, default_value
  22. from ..config import ConfigurationManager, MapReduceConfig, get_config
  23. class OutputFormat(Enum):
  24. """
  25. 输出格式枚举
  26. """
  27. TEXT = 'text'
  28. JSON = 'json'
  29. CSV = 'csv'
  30. @dataclass
  31. class WordCountResult:
  32. """
  33. 词频统计结果数据类
  34. """
  35. total_words: int = 0
  36. unique_words: int = 0
  37. top_words: List[Tuple[str, int]] = field(default_factory=list)
  38. word_counts: Dict[str, int] = field(default_factory=dict)
  39. execution_time_ms: float = 0.0
  40. @property
  41. def most_frequent_word(self) -> Optional[Tuple[str, int]]:
  42. """
  43. 获取出现频率最高的单词
  44. Returns:
  45. (单词, 次数) 元组,如果没有单词返回 None
  46. """
  47. return self.top_words[0] if self.top_words else None
  48. @property
  49. def avg_word_frequency(self) -> float:
  50. """
  51. 计算平均词频
  52. Returns:
  53. 平均每个单词出现的次数
  54. """
  55. return self.total_words / self.unique_words if self.unique_words > 0 else 0.0
  56. def to_dict(self) -> Dict[str, Any]:
  57. """
  58. 转换为字典
  59. Returns:
  60. 字典表示
  61. """
  62. data = asdict(self)
  63. data['most_frequent_word'] = self.most_frequent_word
  64. data['avg_word_frequency'] = self.avg_word_frequency
  65. return data
  66. def to_json(self, indent: Optional[int] = None) -> str:
  67. """
  68. 转换为 JSON 字符串
  69. Args:
  70. indent: 缩进空格数
  71. Returns:
  72. JSON 字符串
  73. """
  74. return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent)
  75. def save_to_file(self, file_path: str, format: OutputFormat = OutputFormat.JSON):
  76. """
  77. 保存结果到文件
  78. Args:
  79. file_path: 文件路径
  80. format: 输出格式
  81. """
  82. os.makedirs(os.path.dirname(os.path.abspath(file_path)), exist_ok=True)
  83. if format == OutputFormat.JSON:
  84. with open(file_path, 'w', encoding='utf-8') as f:
  85. json.dump(self.to_dict(), f, ensure_ascii=False, indent=2)
  86. elif format == OutputFormat.CSV:
  87. with open(file_path, 'w', encoding='utf-8') as f:
  88. f.write('word,count\n')
  89. for word, count in sorted(self.word_counts.items(), key=lambda x: x[1], reverse=True):
  90. f.write(f'"{word}",{count}\n')
  91. else:
  92. with open(file_path, 'w', encoding='utf-8') as f:
  93. f.write(f"Total words: {self.total_words}\n")
  94. f.write(f"Unique words: {self.unique_words}\n")
  95. f.write(f"Most frequent: {self.most_frequent_word}\n")
  96. f.write(f"\nTop 20 words:\n")
  97. for word, count in self.top_words[:20]:
  98. f.write(f"{word}: {count}\n")
  99. class WordCountStreaming:
  100. """
  101. Hadoop Streaming 方式的词频统计类 - 现代化版本
  102. 封装了 Hadoop Streaming 作业的执行,提供与 Java 版本 WordCount 类类似的功能。
  103. 现代化特性:
  104. - 配置管理集成
  105. - 改进的错误处理
  106. - 类型安全的数据类
  107. - 增强的统计功能
  108. - 灵活的命令行参数
  109. """
  110. def __init__(self,
  111. hadoop_home: Optional[str] = None,
  112. logger_name: str = 'wordcount_streaming',
  113. config: Optional[MapReduceConfig] = None):
  114. """
  115. 初始化 WordCountStreaming 实例
  116. Args:
  117. hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
  118. logger_name: 日志器名称
  119. config: MapReduce 配置对象(可选)
  120. """
  121. self.logger = setup_logger(logger_name)
  122. # 获取配置
  123. if config is None:
  124. config = get_config().mapreduce
  125. self.config = config
  126. # Hadoop 配置
  127. self.hadoop_home = hadoop_home or os.environ.get('HADOOP_HOME', '')
  128. self.hadoop_cmd = 'hadoop'
  129. # 从配置中获取设置
  130. self.use_combiner = config.use_combiner
  131. self.num_reducers = config.num_reducers
  132. self.job_timeout = config.job_timeout
  133. def mapper(self, line: str,
  134. case_sensitive: bool = False,
  135. min_word_length: int = 1,
  136. stop_words: Optional[List[str]] = None) -> List[Tuple[str, int]]:
  137. """
  138. Mapper 函数:将一行文本分割为单词,输出 <单词, 1>
  139. 对应 Java 版本的 TokenizerMapper.map 方法。
  140. Args:
  141. line: 输入的一行文本
  142. case_sensitive: 是否区分大小写(默认 False)
  143. min_word_length: 最小单词长度(默认 1)
  144. stop_words: 停用词列表(可选)
  145. Returns:
  146. 单词和计数的元组列表
  147. Example:
  148. >>> wc = WordCountStreaming()
  149. >>> wc.mapper("hello world hello")
  150. [('hello', 1), ('world', 1), ('hello', 1)]
  151. """
  152. results = []
  153. # 分割文本为单词(使用空格、制表符等分隔符)
  154. words = line.strip().split()
  155. for word in words:
  156. # 清理单词(移除标点符号)
  157. word = word.strip('.,!?;:()[]{}"\'')
  158. # 处理大小写
  159. if not case_sensitive:
  160. word = word.lower()
  161. # 检查单词长度
  162. if len(word) < min_word_length:
  163. continue
  164. # 检查停用词
  165. if stop_words and word in stop_words:
  166. continue
  167. if word:
  168. results.append((word, 1))
  169. return results
  170. def combiner(self, pairs: List[Tuple[str, int]]) -> List[Tuple[str, int]]:
  171. """
  172. Combiner 函数:在 Mapper 端进行本地合并
  173. 对应 Java 版本的 Combiner 功能,减少数据传输量。
  174. Args:
  175. pairs: Mapper 输出的 <单词, 1> 列表
  176. Returns:
  177. 合并后的 <单词, 本地计数> 列表
  178. Example:
  179. >>> wc = WordCountStreaming()
  180. >>> wc.combiner([('hello', 1), ('world', 1), ('hello', 1)])
  181. [('hello', 2), ('world', 1)]
  182. """
  183. word_counts = defaultdict(int)
  184. for word, count in pairs:
  185. word_counts[word] += count
  186. return [(word, count) for word, count in word_counts.items()]
  187. def reducer(self, word: str, counts: List[int]) -> Tuple[str, int]:
  188. """
  189. Reducer 函数:统计每个单词的总次数
  190. 对应 Java 版本的 IntSumReducer.reduce 方法。
  191. Args:
  192. word: 单词
  193. counts: 该单词的所有计数列表
  194. Returns:
  195. <单词, 总次数> 元组
  196. Example:
  197. >>> wc = WordCountStreaming()
  198. >>> wc.reducer('hello', [1, 1, 1])
  199. ('hello', 3)
  200. """
  201. total = sum(counts)
  202. return (word, total)
  203. def run_mapper_from_stdin(self,
  204. case_sensitive: bool = False,
  205. min_word_length: int = 1,
  206. stop_words: Optional[List[str]] = None):
  207. """
  208. 从标准输入运行 Mapper(用于 Hadoop Streaming)
  209. 从 stdin 读取每行数据,执行 Mapper 逻辑,输出到 stdout。
  210. Args:
  211. case_sensitive: 是否区分大小写
  212. min_word_length: 最小单词长度
  213. stop_words: 停用词列表
  214. """
  215. for line in sys.stdin:
  216. pairs = self.mapper(line, case_sensitive, min_word_length, stop_words)
  217. for word, count in pairs:
  218. print(f"{word}\t{count}")
  219. def run_reducer_from_stdin(self):
  220. """
  221. 从标准输入运行 Reducer(用于 Hadoop Streaming)
  222. 从 stdin 读取 Mapper 输出,执行 Reducer 逻辑,输出到 stdout。
  223. 假设输入已经按键排序(Hadoop Streaming 会自动排序)。
  224. """
  225. current_word = None
  226. current_counts = []
  227. for line in sys.stdin:
  228. line = line.strip()
  229. if not line:
  230. continue
  231. # 解析输入:单词\t计数
  232. parts = line.split('\t', 1)
  233. if len(parts) != 2:
  234. continue
  235. word, count_str = parts
  236. try:
  237. count = int(count_str)
  238. except ValueError:
  239. continue
  240. # 处理相同单词的计数
  241. if current_word == word:
  242. current_counts.append(count)
  243. else:
  244. # 输出前一个单词的结果
  245. if current_word is not None:
  246. result_word, result_count = self.reducer(current_word, current_counts)
  247. print(f"{result_word}\t{result_count}")
  248. # 开始处理新单词
  249. current_word = word
  250. current_counts = [count]
  251. # 输出最后一个单词的结果
  252. if current_word is not None:
  253. result_word, result_count = self.reducer(current_word, current_counts)
  254. print(f"{result_word}\t{result_count}")
  255. def run(self, input_path: str, output_path: str,
  256. mapper_script: Optional[str] = None,
  257. reducer_script: Optional[str] = None,
  258. combiner: Optional[bool] = None,
  259. num_reducers: Optional[int] = None,
  260. timeout: Optional[int] = None) -> bool:
  261. """
  262. 运行完整的 WordCount 作业
  263. 使用 Hadoop Streaming 提交作业到 Hadoop 集群。
  264. Args:
  265. input_path: HDFS 输入路径
  266. output_path: HDFS 输出路径(不能已存在)
  267. mapper_script: Mapper 脚本路径(可选,默认使用当前脚本)
  268. reducer_script: Reducer 脚本路径(可选,默认使用当前脚本)
  269. combiner: 是否使用 Combiner(可选,默认为配置中的设置)
  270. num_reducers: Reducer 任务数量(可选,默认为配置中的设置)
  271. timeout: 超时时间(秒,可选,默认为配置中的设置)
  272. Returns:
  273. 作业是否成功完成
  274. Raises:
  275. RuntimeError: 当找不到 Streaming jar 时
  276. ValueError: 当输入路径无效时
  277. """
  278. # 使用配置中的默认值
  279. use_combiner = default_value(combiner, self.use_combiner)
  280. reducers = default_value(num_reducers, self.num_reducers)
  281. job_timeout = default_value(timeout, self.job_timeout)
  282. # 确定脚本路径
  283. if mapper_script is None:
  284. mapper_script = __file__
  285. if reducer_script is None:
  286. reducer_script = __file__
  287. # 构建 Hadoop Streaming 命令
  288. streaming_jar = self._find_streaming_jar()
  289. if not streaming_jar:
  290. self.logger.error("Could not find Hadoop Streaming jar")
  291. raise RuntimeError("Hadoop Streaming jar not found")
  292. cmd_parts = [
  293. self.hadoop_cmd,
  294. 'jar', streaming_jar,
  295. '-files', f"{mapper_script},{reducer_script}",
  296. '-mapper', f"python3 {os.path.basename(mapper_script)} mapper",
  297. '-reducer', f"python3 {os.path.basename(reducer_script)} reducer",
  298. '-input', input_path,
  299. '-output', output_path,
  300. '-D', f"mapreduce.job.reduces={reducers}"
  301. ]
  302. if use_combiner:
  303. cmd_parts.extend(['-combiner', f"python3 {os.path.basename(mapper_script)} mapper | sort | python3 {os.path.basename(reducer_script)} reducer"])
  304. cmd = ' '.join(cmd_parts)
  305. self.logger.info(f"Running Hadoop Streaming job: {cmd}")
  306. returncode, stdout, stderr = run_command(cmd, timeout=job_timeout)
  307. if returncode == 0:
  308. self.logger.info("WordCount job completed successfully")
  309. self.logger.info(f"Output: {stdout}")
  310. return True
  311. else:
  312. self.logger.error(f"WordCount job failed with return code {returncode}")
  313. self.logger.error(f"Stderr: {stderr}")
  314. return False
  315. def _find_streaming_jar(self) -> Optional[str]:
  316. """
  317. 查找 Hadoop Streaming jar 文件
  318. Returns:
  319. Streaming jar 文件路径,如果未找到返回 None
  320. """
  321. import glob
  322. # 尝试从常见位置查找
  323. search_paths = [
  324. os.path.join(self.hadoop_home, 'share', 'hadoop', 'tools', 'lib'),
  325. os.path.join(self.hadoop_home, 'contrib', 'streaming'),
  326. '/usr/lib/hadoop-mapreduce',
  327. '/usr/hdp/current/hadoop-mapreduce-client'
  328. ]
  329. for path in search_paths:
  330. if os.path.exists(path):
  331. jars = glob.glob(os.path.join(path, 'hadoop-streaming-*.jar'))
  332. if jars:
  333. return jars[0]
  334. # 尝试使用 hadoop classpath 查找
  335. returncode, stdout, stderr = run_command(f"{self.hadoop_cmd} classpath --glob")
  336. if returncode == 0:
  337. # 解析 classpath,查找 streaming jar
  338. classpath = stdout.strip()
  339. for part in classpath.split(os.pathsep):
  340. if 'streaming' in part.lower() and part.endswith('.jar'):
  341. return part
  342. return None
  343. def count_words_locally(self, text: str,
  344. case_sensitive: bool = False,
  345. min_word_length: int = 1,
  346. stop_words: Optional[List[str]] = None,
  347. top_n: int = 10) -> WordCountResult:
  348. """
  349. 本地统计单词(不使用 Hadoop)
  350. 用于测试和小规模数据处理。
  351. Args:
  352. text: 输入文本
  353. case_sensitive: 是否区分大小写
  354. min_word_length: 最小单词长度
  355. stop_words: 停用词列表
  356. top_n: 返回前 N 个最常见的单词
  357. Returns:
  358. WordCountResult 结果对象
  359. Example:
  360. >>> wc = WordCountStreaming()
  361. >>> result = wc.count_words_locally("hello world hello")
  362. >>> result.total_words
  363. 3
  364. >>> result.unique_words
  365. 2
  366. """
  367. import time
  368. start_time = time.time()
  369. # 模拟完整的 MapReduce 流程
  370. all_pairs = []
  371. for line in text.split('\n'):
  372. pairs = self.mapper(line, case_sensitive, min_word_length, stop_words)
  373. all_pairs.extend(pairs)
  374. # 按单词分组
  375. word_groups = defaultdict(list)
  376. for word, count in all_pairs:
  377. word_groups[word].append(count)
  378. # 执行 Reduce
  379. word_counts = {}
  380. for word, counts in word_groups.items():
  381. _, total = self.reducer(word, counts)
  382. word_counts[word] = total
  383. # 计算统计信息
  384. total_words = sum(word_counts.values())
  385. unique_words = len(word_counts)
  386. top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
  387. # 计算执行时间
  388. execution_time_ms = (time.time() - start_time) * 1000
  389. return WordCountResult(
  390. total_words=total_words,
  391. unique_words=unique_words,
  392. top_words=top_words,
  393. word_counts=word_counts,
  394. execution_time_ms=execution_time_ms
  395. )
  396. def analyze_text(self, text: str) -> Dict[str, Any]:
  397. """
  398. 分析文本的详细统计信息
  399. Args:
  400. text: 输入文本
  401. Returns:
  402. 包含详细统计信息的字典
  403. """
  404. result = self.count_words_locally(text)
  405. # 计算额外的统计信息
  406. word_lengths = [len(word) for word in result.word_counts.keys()]
  407. analysis = {
  408. 'total_words': result.total_words,
  409. 'unique_words': result.unique_words,
  410. 'most_frequent_word': result.most_frequent_word,
  411. 'avg_word_frequency': result.avg_word_frequency,
  412. 'min_word_length': min(word_lengths) if word_lengths else 0,
  413. 'max_word_length': max(word_lengths) if word_lengths else 0,
  414. 'avg_word_length': sum(word_lengths) / len(word_lengths) if word_lengths else 0,
  415. 'word_frequency_distribution': self._get_frequency_distribution(result.word_counts),
  416. 'top_10_words': result.top_words[:10]
  417. }
  418. return analysis
  419. def _get_frequency_distribution(self, word_counts: Dict[str, int]) -> Dict[str, int]:
  420. """
  421. 获取词频分布
  422. Args:
  423. word_counts: 单词计数字典
  424. Returns:
  425. 词频分布(出现1次的单词数、出现2次的单词数等)
  426. """
  427. distribution = defaultdict(int)
  428. for count in word_counts.values():
  429. if count == 1:
  430. distribution['once'] += 1
  431. elif count <= 5:
  432. distribution['2-5'] += 1
  433. elif count <= 10:
  434. distribution['6-10'] += 1
  435. else:
  436. distribution['10+'] += 1
  437. return dict(distribution)
  438. def get_stop_words(self, language: str = 'english') -> List[str]:
  439. """
  440. 获取常见停用词列表
  441. Args:
  442. language: 语言(默认 'english')
  443. Returns:
  444. 停用词列表
  445. """
  446. # 常见英语停用词
  447. english_stop_words = [
  448. 'the', 'be', 'to', 'of', 'and', 'a', 'in', 'that', 'have', 'i',
  449. 'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you', 'do',
  450. 'at', 'this', 'but', 'his', 'by', 'from', 'they', 'we', 'say',
  451. 'her', 'she', 'or', 'an', 'will', 'my', 'one', 'all', 'would',
  452. 'there', 'their', 'what', 'so', 'up', 'out', 'if', 'about',
  453. 'who', 'get', 'which', 'go', 'me', 'when', 'make', 'can',
  454. 'like', 'time', 'no', 'just', 'him', 'know', 'take', 'people',
  455. 'into', 'year', 'your', 'good', 'some', 'could', 'them', 'see',
  456. 'other', 'than', 'then', 'now', 'look', 'only', 'come', 'its',
  457. 'over', 'think', 'also', 'back', 'after', 'use', 'two', 'how',
  458. 'our', 'work', 'first', 'well', 'way', 'even', 'new', 'want',
  459. 'because', 'any', 'these', 'give', 'day', 'most', 'us'
  460. ]
  461. if language.lower() == 'english':
  462. return english_stop_words
  463. else:
  464. # 默认返回英语停用词
  465. return english_stop_words
  466. def main():
  467. """
  468. 主函数:作为独立脚本运行
  469. 支持的命令:
  470. - mapper: 运行 Mapper
  471. - reducer: 运行 Reducer
  472. - local: 本地测试
  473. - analyze: 分析文本
  474. """
  475. import argparse
  476. parser = argparse.ArgumentParser(
  477. description='Hadoop Streaming WordCount (Modern Version)',
  478. formatter_class=argparse.RawDescriptionHelpFormatter,
  479. epilog='''
  480. Examples:
  481. # 从标准输入运行 Mapper
  482. python wordcount_streaming.py mapper < input.txt
  483. # 从标准输入运行 Reducer
  484. python wordcount_streaming.py reducer < mapper_output.txt
  485. # 本地测试
  486. python wordcount_streaming.py local
  487. # 分析文本文件
  488. python wordcount_streaming.py analyze --file input.txt --stop-words
  489. # 使用增强的 Mapper 参数
  490. python wordcount_streaming.py mapper --case-sensitive --min-length 3 < input.txt
  491. '''
  492. )
  493. # 子命令
  494. subparsers = parser.add_subparsers(dest='command', help='Available commands')
  495. # mapper 子命令
  496. mapper_parser = subparsers.add_parser('mapper', help='Run Mapper from stdin')
  497. mapper_parser.add_argument('--case-sensitive', '-c', action='store_true',
  498. help='Case-sensitive word matching')
  499. mapper_parser.add_argument('--min-length', '-m', type=int, default=1,
  500. help='Minimum word length (default: 1)')
  501. mapper_parser.add_argument('--stop-words', '-s', action='store_true',
  502. help='Use common English stop words')
  503. mapper_parser.add_argument('--stop-words-file', type=str, default=None,
  504. help='Custom stop words file path')
  505. # reducer 子命令
  506. reducer_parser = subparsers.add_parser('reducer', help='Run Reducer from stdin')
  507. # local 子命令
  508. local_parser = subparsers.add_parser('local', help='Run local test')
  509. local_parser.add_argument('--file', '-f', type=str, default=None,
  510. help='Input file path (default: use sample text)')
  511. local_parser.add_argument('--case-sensitive', '-c', action='store_true',
  512. help='Case-sensitive word matching')
  513. local_parser.add_argument('--min-length', '-m', type=int, default=1,
  514. help='Minimum word length (default: 1)')
  515. local_parser.add_argument('--stop-words', '-s', action='store_true',
  516. help='Use common English stop words')
  517. local_parser.add_argument('--top-n', '-n', type=int, default=10,
  518. help='Number of top words to show (default: 10)')
  519. local_parser.add_argument('--format', '-fmt', type=str, default='text',
  520. choices=['text', 'json', 'csv'],
  521. help='Output format (default: text)')
  522. local_parser.add_argument('--output', '-o', type=str, default=None,
  523. help='Output file path (optional)')
  524. # analyze 子命令
  525. analyze_parser = subparsers.add_parser('analyze', help='Analyze text statistics')
  526. analyze_parser.add_argument('--file', '-f', type=str, required=True,
  527. help='Input file path')
  528. analyze_parser.add_argument('--format', '-fmt', type=str, default='json',
  529. choices=['text', 'json'],
  530. help='Output format (default: json)')
  531. args = parser.parse_args()
  532. if not args.command:
  533. parser.print_help()
  534. sys.exit(1)
  535. wc = WordCountStreaming()
  536. # 处理停用词
  537. stop_words = None
  538. if hasattr(args, 'stop_words') and args.stop_words:
  539. stop_words = wc.get_stop_words()
  540. if hasattr(args, 'stop_words_file') and args.stop_words_file:
  541. if os.path.exists(args.stop_words_file):
  542. with open(args.stop_words_file, 'r', encoding='utf-8') as f:
  543. custom_stop_words = [line.strip().lower() for line in f if line.strip()]
  544. if stop_words is None:
  545. stop_words = custom_stop_words
  546. else:
  547. stop_words.extend(custom_stop_words)
  548. if args.command == 'mapper':
  549. wc.run_mapper_from_stdin(
  550. case_sensitive=getattr(args, 'case_sensitive', False),
  551. min_word_length=getattr(args, 'min_length', 1),
  552. stop_words=stop_words
  553. )
  554. elif args.command == 'reducer':
  555. wc.run_reducer_from_stdin()
  556. elif args.command == 'local':
  557. # 本地测试
  558. if args.file and os.path.exists(args.file):
  559. with open(args.file, 'r', encoding='utf-8') as f:
  560. test_text = f.read()
  561. else:
  562. # 使用示例文本
  563. test_text = """
  564. Hello world, hello Hadoop!
  565. Hadoop is great for big data.
  566. Big data processing with Hadoop.
  567. Spark is fast and general engine for large-scale data processing.
  568. Hadoop provides massive storage for any kind of data.
  569. """
  570. # 执行统计
  571. result = wc.count_words_locally(
  572. test_text,
  573. case_sensitive=args.case_sensitive,
  574. min_word_length=args.min_length,
  575. stop_words=stop_words,
  576. top_n=args.top_n
  577. )
  578. # 确定输出格式
  579. output_format = OutputFormat(args.format)
  580. # 输出结果
  581. if args.output:
  582. result.save_to_file(args.output, output_format)
  583. print(f"Result saved to: {args.output}")
  584. else:
  585. if output_format == OutputFormat.JSON:
  586. print(result.to_json(indent=2))
  587. elif output_format == OutputFormat.CSV:
  588. print('word,count')
  589. for word, count in sorted(result.word_counts.items(), key=lambda x: x[1], reverse=True):
  590. print(f'"{word}",{count}')
  591. else:
  592. print(f"\n{'='*60}")
  593. print(f"Word Count Results (Local Mode)")
  594. print(f"{'='*60}")
  595. print(f"\nTotal words: {result.total_words:,}")
  596. print(f"Unique words: {result.unique_words:,}")
  597. print(f"Execution time: {result.execution_time_ms:.2f} ms")
  598. if result.most_frequent_word:
  599. print(f"\nMost frequent word: {result.most_frequent_word[0]} ({result.most_frequent_word[1]} times)")
  600. print(f"\nTop {args.top_n} words:")
  601. for word, count in result.top_words:
  602. print(f" {word}: {count}")
  603. elif args.command == 'analyze':
  604. # 分析文本
  605. with open(args.file, 'r', encoding='utf-8') as f:
  606. text = f.read()
  607. analysis = wc.analyze_text(text)
  608. if args.format == 'json':
  609. print(json.dumps(analysis, ensure_ascii=False, indent=2))
  610. else:
  611. print(f"\n{'='*60}")
  612. print(f"Text Analysis Report")
  613. print(f"{'='*60}")
  614. print(f"\nFile: {args.file}")
  615. print(f"\nBasic Statistics:")
  616. print(f" Total words: {analysis['total_words']:,}")
  617. print(f" Unique words: {analysis['unique_words']:,}")
  618. print(f" Most frequent: {analysis['most_frequent_word']}")
  619. print(f" Average frequency: {analysis['avg_word_frequency']:.2f}")
  620. print(f"\nWord Length Statistics:")
  621. print(f" Min word length: {analysis['min_word_length']}")
  622. print(f" Max word length: {analysis['max_word_length']}")
  623. print(f" Average word length: {analysis['avg_word_length']:.2f}")
  624. print(f"\nWord Frequency Distribution:")
  625. for key, value in analysis['word_frequency_distribution'].items():
  626. print(f" {key}: {value}")
  627. print(f"\nTop 10 Words:")
  628. for word, count in analysis['top_10_words']:
  629. print(f" {word}: {count}")
  630. if __name__ == '__main__':
  631. main()