wordcount_streaming.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. """
  2. Hadoop Streaming 方式的词频统计模块
  3. 对应 Java 版本的 WordCount 类,使用 Hadoop Streaming 方式实现:
  4. - Mapper: 从标准输入读取数据,分割为单词,输出 <单词, 1>
  5. - Reducer: 从标准输入读取 Mapper 输出,统计每个单词的总次数
  6. - Combiner: 可选的本地合并,减少数据传输
  7. 使用方式:
  8. 1. 作为独立脚本运行(用于 Hadoop Streaming):
  9. $ python wordcount_streaming.py mapper < input.txt
  10. $ python wordcount_streaming.py reducer < mapper_output.txt
  11. 2. 作为模块导入使用:
  12. from wordcount_streaming import WordCountStreaming
  13. wc = WordCountStreaming()
  14. wc.run(input_path, output_path)
  15. """
  16. import sys
  17. from collections import defaultdict
  18. from typing import Dict, List, Optional, Tuple
  19. from ..utils.helpers import run_command, setup_logger
  20. class WordCountStreaming:
  21. """
  22. Hadoop Streaming 方式的词频统计类
  23. 封装了 Hadoop Streaming 作业的执行,提供与 Java 版本 WordCount 类类似的功能。
  24. """
  25. def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'wordcount_streaming'):
  26. """
  27. 初始化 WordCountStreaming 实例
  28. Args:
  29. hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
  30. logger_name: 日志器名称
  31. """
  32. self.logger = setup_logger(logger_name)
  33. self.hadoop_home = hadoop_home or __import__('os').environ.get('HADOOP_HOME', '')
  34. self.hadoop_cmd = 'hadoop'
  35. def mapper(self, line: str) -> List[Tuple[str, int]]:
  36. """
  37. Mapper 函数:将一行文本分割为单词,输出 <单词, 1>
  38. 对应 Java 版本的 TokenizerMapper.map 方法。
  39. Args:
  40. line: 输入的一行文本
  41. Returns:
  42. 单词和计数的元组列表
  43. Example:
  44. >>> wc = WordCountStreaming()
  45. >>> wc.mapper("hello world hello")
  46. [('hello', 1), ('world', 1), ('hello', 1)]
  47. """
  48. results = []
  49. # 分割文本为单词(使用空格、制表符等分隔符)
  50. words = line.strip().split()
  51. for word in words:
  52. # 清理单词(移除标点符号,转为小写)
  53. word = word.strip('.,!?;:()[]{}"\'').lower()
  54. if word: # 确保单词非空
  55. results.append((word, 1))
  56. return results
  57. def combiner(self, pairs: List[Tuple[str, int]]) -> List[Tuple[str, int]]:
  58. """
  59. Combiner 函数:在 Mapper 端进行本地合并
  60. 对应 Java 版本的 Combiner 功能,减少数据传输量。
  61. Args:
  62. pairs: Mapper 输出的 <单词, 1> 列表
  63. Returns:
  64. 合并后的 <单词, 本地计数> 列表
  65. Example:
  66. >>> wc = WordCountStreaming()
  67. >>> wc.combiner([('hello', 1), ('world', 1), ('hello', 1)])
  68. [('hello', 2), ('world', 1)]
  69. """
  70. word_counts = defaultdict(int)
  71. for word, count in pairs:
  72. word_counts[word] += count
  73. return [(word, count) for word, count in word_counts.items()]
  74. def reducer(self, word: str, counts: List[int]) -> Tuple[str, int]:
  75. """
  76. Reducer 函数:统计每个单词的总次数
  77. 对应 Java 版本的 IntSumReducer.reduce 方法。
  78. Args:
  79. word: 单词
  80. counts: 该单词的所有计数列表
  81. Returns:
  82. <单词, 总次数> 元组
  83. Example:
  84. >>> wc = WordCountStreaming()
  85. >>> wc.reducer('hello', [1, 1, 1])
  86. ('hello', 3)
  87. """
  88. total = sum(counts)
  89. return (word, total)
  90. def run_mapper_from_stdin(self):
  91. """
  92. 从标准输入运行 Mapper(用于 Hadoop Streaming)
  93. 从 stdin 读取每行数据,执行 Mapper 逻辑,输出到 stdout。
  94. """
  95. for line in sys.stdin:
  96. pairs = self.mapper(line)
  97. for word, count in pairs:
  98. print(f"{word}\t{count}")
  99. def run_reducer_from_stdin(self):
  100. """
  101. 从标准输入运行 Reducer(用于 Hadoop Streaming)
  102. 从 stdin 读取 Mapper 输出,执行 Reducer 逻辑,输出到 stdout。
  103. 假设输入已经按键排序(Hadoop Streaming 会自动排序)。
  104. """
  105. current_word = None
  106. current_counts = []
  107. for line in sys.stdin:
  108. line = line.strip()
  109. if not line:
  110. continue
  111. # 解析输入:单词\t计数
  112. parts = line.split('\t', 1)
  113. if len(parts) != 2:
  114. continue
  115. word, count_str = parts
  116. try:
  117. count = int(count_str)
  118. except ValueError:
  119. continue
  120. # 处理相同单词的计数
  121. if current_word == word:
  122. current_counts.append(count)
  123. else:
  124. # 输出前一个单词的结果
  125. if current_word is not None:
  126. result_word, result_count = self.reducer(current_word, current_counts)
  127. print(f"{result_word}\t{result_count}")
  128. # 开始处理新单词
  129. current_word = word
  130. current_counts = [count]
  131. # 输出最后一个单词的结果
  132. if current_word is not None:
  133. result_word, result_count = self.reducer(current_word, current_counts)
  134. print(f"{result_word}\t{result_count}")
  135. def run(self, input_path: str, output_path: str,
  136. mapper_script: Optional[str] = None,
  137. reducer_script: Optional[str] = None,
  138. combiner: bool = True,
  139. num_reducers: int = 1) -> bool:
  140. """
  141. 运行完整的 WordCount 作业
  142. 使用 Hadoop Streaming 提交作业到 Hadoop 集群。
  143. Args:
  144. input_path: HDFS 输入路径
  145. output_path: HDFS 输出路径(不能已存在)
  146. mapper_script: Mapper 脚本路径(可选,默认使用当前脚本)
  147. reducer_script: Reducer 脚本路径(可选,默认使用当前脚本)
  148. combiner: 是否使用 Combiner
  149. num_reducers: Reducer 任务数量
  150. Returns:
  151. 作业是否成功完成
  152. """
  153. import os
  154. # 确定脚本路径
  155. if mapper_script is None:
  156. mapper_script = __file__
  157. if reducer_script is None:
  158. reducer_script = __file__
  159. # 构建 Hadoop Streaming 命令
  160. streaming_jar = self._find_streaming_jar()
  161. if not streaming_jar:
  162. self.logger.error("Could not find Hadoop Streaming jar")
  163. return False
  164. cmd_parts = [
  165. self.hadoop_cmd,
  166. 'jar', streaming_jar,
  167. '-files', f"{mapper_script},{reducer_script}",
  168. '-mapper', f"python3 {os.path.basename(mapper_script)} mapper",
  169. '-reducer', f"python3 {os.path.basename(reducer_script)} reducer",
  170. '-input', input_path,
  171. '-output', output_path,
  172. '-D', f"mapreduce.job.reduces={num_reducers}"
  173. ]
  174. if combiner:
  175. cmd_parts.extend(['-combiner', f"python3 {os.path.basename(mapper_script)} mapper | sort | python3 {os.path.basename(reducer_script)} reducer"])
  176. cmd = ' '.join(cmd_parts)
  177. self.logger.info(f"Running Hadoop Streaming job: {cmd}")
  178. returncode, stdout, stderr = run_command(cmd, timeout=3600) # 1小时超时
  179. if returncode == 0:
  180. self.logger.info("WordCount job completed successfully")
  181. self.logger.info(f"Output: {stdout}")
  182. return True
  183. else:
  184. self.logger.error(f"WordCount job failed with return code {returncode}")
  185. self.logger.error(f"Stderr: {stderr}")
  186. return False
  187. def _find_streaming_jar(self) -> Optional[str]:
  188. """
  189. 查找 Hadoop Streaming jar 文件
  190. Returns:
  191. Streaming jar 文件路径,如果未找到返回 None
  192. """
  193. import os
  194. import glob
  195. # 尝试从常见位置查找
  196. search_paths = [
  197. os.path.join(self.hadoop_home, 'share', 'hadoop', 'tools', 'lib'),
  198. os.path.join(self.hadoop_home, 'contrib', 'streaming'),
  199. '/usr/lib/hadoop-mapreduce',
  200. '/usr/hdp/current/hadoop-mapreduce-client'
  201. ]
  202. for path in search_paths:
  203. if os.path.exists(path):
  204. jars = glob.glob(os.path.join(path, 'hadoop-streaming-*.jar'))
  205. if jars:
  206. return jars[0]
  207. # 尝试使用 hadoop classpath 查找
  208. returncode, stdout, stderr = run_command(f"{self.hadoop_cmd} classpath --glob")
  209. if returncode == 0:
  210. # 解析 classpath,查找 streaming jar
  211. classpath = stdout.strip()
  212. for part in classpath.split(os.pathsep):
  213. if 'streaming' in part.lower() and part.endswith('.jar'):
  214. return part
  215. return None
  216. def count_words_locally(self, text: str) -> Dict[str, int]:
  217. """
  218. 本地统计单词(不使用 Hadoop)
  219. 用于测试和小规模数据处理。
  220. Args:
  221. text: 输入文本
  222. Returns:
  223. 单词计数字典
  224. Example:
  225. >>> wc = WordCountStreaming()
  226. >>> wc.count_words_locally("hello world hello")
  227. {'hello': 2, 'world': 1}
  228. """
  229. # 模拟完整的 MapReduce 流程
  230. all_pairs = []
  231. for line in text.split('\n'):
  232. pairs = self.mapper(line)
  233. all_pairs.extend(pairs)
  234. # 按单词分组
  235. word_groups = defaultdict(list)
  236. for word, count in all_pairs:
  237. word_groups[word].append(count)
  238. # 执行 Reduce
  239. results = {}
  240. for word, counts in word_groups.items():
  241. _, total = self.reducer(word, counts)
  242. results[word] = total
  243. return results
  244. def main():
  245. """
  246. 主函数:作为独立脚本运行
  247. 支持的命令:
  248. - mapper: 运行 Mapper
  249. - reducer: 运行 Reducer
  250. - local: 本地测试
  251. """
  252. if len(sys.argv) < 2:
  253. print("Usage: python wordcount_streaming.py <command>")
  254. print("Commands:")
  255. print(" mapper - Run Mapper from stdin")
  256. print(" reducer - Run Reducer from stdin")
  257. print(" local - Run local test")
  258. sys.exit(1)
  259. command = sys.argv[1]
  260. wc = WordCountStreaming()
  261. if command == 'mapper':
  262. wc.run_mapper_from_stdin()
  263. elif command == 'reducer':
  264. wc.run_reducer_from_stdin()
  265. elif command == 'local':
  266. # 本地测试
  267. test_text = """
  268. Hello world, hello Hadoop!
  269. Hadoop is great for big data.
  270. Big data processing with Hadoop.
  271. """
  272. result = wc.count_words_locally(test_text)
  273. print("Word count results:")
  274. for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True):
  275. print(f"{word}: {count}")
  276. else:
  277. print(f"Unknown command: {command}")
  278. sys.exit(1)
  279. if __name__ == '__main__':
  280. main()