""" Hadoop Streaming 方式的词频统计模块 对应 Java 版本的 WordCount 类,使用 Hadoop Streaming 方式实现: - Mapper: 从标准输入读取数据,分割为单词,输出 <单词, 1> - Reducer: 从标准输入读取 Mapper 输出,统计每个单词的总次数 - Combiner: 可选的本地合并,减少数据传输 使用方式: 1. 作为独立脚本运行(用于 Hadoop Streaming): $ python wordcount_streaming.py mapper < input.txt $ python wordcount_streaming.py reducer < mapper_output.txt 2. 作为模块导入使用: from wordcount_streaming import WordCountStreaming wc = WordCountStreaming() wc.run(input_path, output_path) """ import sys from collections import defaultdict from typing import Dict, List, Optional, Tuple from ..utils.helpers import run_command, setup_logger class WordCountStreaming: """ Hadoop Streaming 方式的词频统计类 封装了 Hadoop Streaming 作业的执行,提供与 Java 版本 WordCount 类类似的功能。 """ def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'wordcount_streaming'): """ 初始化 WordCountStreaming 实例 Args: hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取) logger_name: 日志器名称 """ self.logger = setup_logger(logger_name) self.hadoop_home = hadoop_home or __import__('os').environ.get('HADOOP_HOME', '') self.hadoop_cmd = 'hadoop' def mapper(self, line: str) -> List[Tuple[str, int]]: """ Mapper 函数:将一行文本分割为单词,输出 <单词, 1> 对应 Java 版本的 TokenizerMapper.map 方法。 Args: line: 输入的一行文本 Returns: 单词和计数的元组列表 Example: >>> wc = WordCountStreaming() >>> wc.mapper("hello world hello") [('hello', 1), ('world', 1), ('hello', 1)] """ results = [] # 分割文本为单词(使用空格、制表符等分隔符) words = line.strip().split() for word in words: # 清理单词(移除标点符号,转为小写) word = word.strip('.,!?;:()[]{}"\'').lower() if word: # 确保单词非空 results.append((word, 1)) return results def combiner(self, pairs: List[Tuple[str, int]]) -> List[Tuple[str, int]]: """ Combiner 函数:在 Mapper 端进行本地合并 对应 Java 版本的 Combiner 功能,减少数据传输量。 Args: pairs: Mapper 输出的 <单词, 1> 列表 Returns: 合并后的 <单词, 本地计数> 列表 Example: >>> wc = WordCountStreaming() >>> wc.combiner([('hello', 1), ('world', 1), ('hello', 1)]) [('hello', 2), ('world', 1)] """ word_counts = defaultdict(int) for word, count in pairs: word_counts[word] += count return [(word, count) for word, count in word_counts.items()] def reducer(self, word: str, counts: List[int]) -> Tuple[str, int]: """ Reducer 函数:统计每个单词的总次数 对应 Java 版本的 IntSumReducer.reduce 方法。 Args: word: 单词 counts: 该单词的所有计数列表 Returns: <单词, 总次数> 元组 Example: >>> wc = WordCountStreaming() >>> wc.reducer('hello', [1, 1, 1]) ('hello', 3) """ total = sum(counts) return (word, total) def run_mapper_from_stdin(self): """ 从标准输入运行 Mapper(用于 Hadoop Streaming) 从 stdin 读取每行数据,执行 Mapper 逻辑,输出到 stdout。 """ for line in sys.stdin: pairs = self.mapper(line) for word, count in pairs: print(f"{word}\t{count}") def run_reducer_from_stdin(self): """ 从标准输入运行 Reducer(用于 Hadoop Streaming) 从 stdin 读取 Mapper 输出,执行 Reducer 逻辑,输出到 stdout。 假设输入已经按键排序(Hadoop Streaming 会自动排序)。 """ current_word = None current_counts = [] for line in sys.stdin: line = line.strip() if not line: continue # 解析输入:单词\t计数 parts = line.split('\t', 1) if len(parts) != 2: continue word, count_str = parts try: count = int(count_str) except ValueError: continue # 处理相同单词的计数 if current_word == word: current_counts.append(count) else: # 输出前一个单词的结果 if current_word is not None: result_word, result_count = self.reducer(current_word, current_counts) print(f"{result_word}\t{result_count}") # 开始处理新单词 current_word = word current_counts = [count] # 输出最后一个单词的结果 if current_word is not None: result_word, result_count = self.reducer(current_word, current_counts) print(f"{result_word}\t{result_count}") def run(self, input_path: str, output_path: str, mapper_script: Optional[str] = None, reducer_script: Optional[str] = None, combiner: bool = True, num_reducers: int = 1) -> bool: """ 运行完整的 WordCount 作业 使用 Hadoop Streaming 提交作业到 Hadoop 集群。 Args: input_path: HDFS 输入路径 output_path: HDFS 输出路径(不能已存在) mapper_script: Mapper 脚本路径(可选,默认使用当前脚本) reducer_script: Reducer 脚本路径(可选,默认使用当前脚本) combiner: 是否使用 Combiner num_reducers: Reducer 任务数量 Returns: 作业是否成功完成 """ import os # 确定脚本路径 if mapper_script is None: mapper_script = __file__ if reducer_script is None: reducer_script = __file__ # 构建 Hadoop Streaming 命令 streaming_jar = self._find_streaming_jar() if not streaming_jar: self.logger.error("Could not find Hadoop Streaming jar") return False cmd_parts = [ self.hadoop_cmd, 'jar', streaming_jar, '-files', f"{mapper_script},{reducer_script}", '-mapper', f"python3 {os.path.basename(mapper_script)} mapper", '-reducer', f"python3 {os.path.basename(reducer_script)} reducer", '-input', input_path, '-output', output_path, '-D', f"mapreduce.job.reduces={num_reducers}" ] if combiner: cmd_parts.extend(['-combiner', f"python3 {os.path.basename(mapper_script)} mapper | sort | python3 {os.path.basename(reducer_script)} reducer"]) cmd = ' '.join(cmd_parts) self.logger.info(f"Running Hadoop Streaming job: {cmd}") returncode, stdout, stderr = run_command(cmd, timeout=3600) # 1小时超时 if returncode == 0: self.logger.info("WordCount job completed successfully") self.logger.info(f"Output: {stdout}") return True else: self.logger.error(f"WordCount job failed with return code {returncode}") self.logger.error(f"Stderr: {stderr}") return False def _find_streaming_jar(self) -> Optional[str]: """ 查找 Hadoop Streaming jar 文件 Returns: Streaming jar 文件路径,如果未找到返回 None """ import os import glob # 尝试从常见位置查找 search_paths = [ os.path.join(self.hadoop_home, 'share', 'hadoop', 'tools', 'lib'), os.path.join(self.hadoop_home, 'contrib', 'streaming'), '/usr/lib/hadoop-mapreduce', '/usr/hdp/current/hadoop-mapreduce-client' ] for path in search_paths: if os.path.exists(path): jars = glob.glob(os.path.join(path, 'hadoop-streaming-*.jar')) if jars: return jars[0] # 尝试使用 hadoop classpath 查找 returncode, stdout, stderr = run_command(f"{self.hadoop_cmd} classpath --glob") if returncode == 0: # 解析 classpath,查找 streaming jar classpath = stdout.strip() for part in classpath.split(os.pathsep): if 'streaming' in part.lower() and part.endswith('.jar'): return part return None def count_words_locally(self, text: str) -> Dict[str, int]: """ 本地统计单词(不使用 Hadoop) 用于测试和小规模数据处理。 Args: text: 输入文本 Returns: 单词计数字典 Example: >>> wc = WordCountStreaming() >>> wc.count_words_locally("hello world hello") {'hello': 2, 'world': 1} """ # 模拟完整的 MapReduce 流程 all_pairs = [] for line in text.split('\n'): pairs = self.mapper(line) all_pairs.extend(pairs) # 按单词分组 word_groups = defaultdict(list) for word, count in all_pairs: word_groups[word].append(count) # 执行 Reduce results = {} for word, counts in word_groups.items(): _, total = self.reducer(word, counts) results[word] = total return results def main(): """ 主函数:作为独立脚本运行 支持的命令: - mapper: 运行 Mapper - reducer: 运行 Reducer - local: 本地测试 """ if len(sys.argv) < 2: print("Usage: python wordcount_streaming.py ") print("Commands:") print(" mapper - Run Mapper from stdin") print(" reducer - Run Reducer from stdin") print(" local - Run local test") sys.exit(1) command = sys.argv[1] wc = WordCountStreaming() if command == 'mapper': wc.run_mapper_from_stdin() elif command == 'reducer': wc.run_reducer_from_stdin() elif command == 'local': # 本地测试 test_text = """ Hello world, hello Hadoop! Hadoop is great for big data. Big data processing with Hadoop. """ result = wc.count_words_locally(test_text) print("Word count results:") for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True): print(f"{word}: {count}") else: print(f"Unknown command: {command}") sys.exit(1) if __name__ == '__main__': main()