""" Hadoop Streaming 方式的词频统计模块 - 现代化版本 对应 Java 版本的 WordCount 类,使用 Hadoop Streaming 方式实现: - Mapper: 从标准输入读取数据,分割为单词,输出 <单词, 1> - Reducer: 从标准输入读取 Mapper 输出,统计每个单词的总次数 - Combiner: 可选的本地合并,减少数据传输 现代化特性: - 配置管理集成 - 改进的错误处理 - 类型安全的数据类 - 增强的统计功能 - 灵活的命令行参数 """ import sys import os import json from collections import defaultdict from dataclasses import dataclass, field, asdict from typing import Dict, List, Optional, Tuple, Any, Iterator from enum import Enum from ..utils.helpers import run_command, setup_logger, format_file_size, default_value from ..config import ConfigurationManager, MapReduceConfig, get_config class OutputFormat(Enum): """ 输出格式枚举 """ TEXT = 'text' JSON = 'json' CSV = 'csv' @dataclass class WordCountResult: """ 词频统计结果数据类 """ total_words: int = 0 unique_words: int = 0 top_words: List[Tuple[str, int]] = field(default_factory=list) word_counts: Dict[str, int] = field(default_factory=dict) execution_time_ms: float = 0.0 @property def most_frequent_word(self) -> Optional[Tuple[str, int]]: """ 获取出现频率最高的单词 Returns: (单词, 次数) 元组,如果没有单词返回 None """ return self.top_words[0] if self.top_words else None @property def avg_word_frequency(self) -> float: """ 计算平均词频 Returns: 平均每个单词出现的次数 """ return self.total_words / self.unique_words if self.unique_words > 0 else 0.0 def to_dict(self) -> Dict[str, Any]: """ 转换为字典 Returns: 字典表示 """ data = asdict(self) data['most_frequent_word'] = self.most_frequent_word data['avg_word_frequency'] = self.avg_word_frequency return data def to_json(self, indent: Optional[int] = None) -> str: """ 转换为 JSON 字符串 Args: indent: 缩进空格数 Returns: JSON 字符串 """ return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent) def save_to_file(self, file_path: str, format: OutputFormat = OutputFormat.JSON): """ 保存结果到文件 Args: file_path: 文件路径 format: 输出格式 """ os.makedirs(os.path.dirname(os.path.abspath(file_path)), exist_ok=True) if format == OutputFormat.JSON: with open(file_path, 'w', encoding='utf-8') as f: json.dump(self.to_dict(), f, ensure_ascii=False, indent=2) elif format == OutputFormat.CSV: with open(file_path, 'w', encoding='utf-8') as f: f.write('word,count\n') for word, count in sorted(self.word_counts.items(), key=lambda x: x[1], reverse=True): f.write(f'"{word}",{count}\n') else: with open(file_path, 'w', encoding='utf-8') as f: f.write(f"Total words: {self.total_words}\n") f.write(f"Unique words: {self.unique_words}\n") f.write(f"Most frequent: {self.most_frequent_word}\n") f.write(f"\nTop 20 words:\n") for word, count in self.top_words[:20]: f.write(f"{word}: {count}\n") class WordCountStreaming: """ Hadoop Streaming 方式的词频统计类 - 现代化版本 封装了 Hadoop Streaming 作业的执行,提供与 Java 版本 WordCount 类类似的功能。 现代化特性: - 配置管理集成 - 改进的错误处理 - 类型安全的数据类 - 增强的统计功能 - 灵活的命令行参数 """ def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'wordcount_streaming', config: Optional[MapReduceConfig] = None): """ 初始化 WordCountStreaming 实例 Args: hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取) logger_name: 日志器名称 config: MapReduce 配置对象(可选) """ self.logger = setup_logger(logger_name) # 获取配置 if config is None: config = get_config().mapreduce self.config = config # Hadoop 配置 self.hadoop_home = hadoop_home or os.environ.get('HADOOP_HOME', '') self.hadoop_cmd = 'hadoop' # 从配置中获取设置 self.use_combiner = config.use_combiner self.num_reducers = config.num_reducers self.job_timeout = config.job_timeout def mapper(self, line: str, case_sensitive: bool = False, min_word_length: int = 1, stop_words: Optional[List[str]] = None) -> List[Tuple[str, int]]: """ Mapper 函数:将一行文本分割为单词,输出 <单词, 1> 对应 Java 版本的 TokenizerMapper.map 方法。 Args: line: 输入的一行文本 case_sensitive: 是否区分大小写(默认 False) min_word_length: 最小单词长度(默认 1) stop_words: 停用词列表(可选) Returns: 单词和计数的元组列表 Example: >>> wc = WordCountStreaming() >>> wc.mapper("hello world hello") [('hello', 1), ('world', 1), ('hello', 1)] """ results = [] # 分割文本为单词(使用空格、制表符等分隔符) words = line.strip().split() for word in words: # 清理单词(移除标点符号) word = word.strip('.,!?;:()[]{}"\'') # 处理大小写 if not case_sensitive: word = word.lower() # 检查单词长度 if len(word) < min_word_length: continue # 检查停用词 if stop_words and word in stop_words: continue if word: results.append((word, 1)) return results def combiner(self, pairs: List[Tuple[str, int]]) -> List[Tuple[str, int]]: """ Combiner 函数:在 Mapper 端进行本地合并 对应 Java 版本的 Combiner 功能,减少数据传输量。 Args: pairs: Mapper 输出的 <单词, 1> 列表 Returns: 合并后的 <单词, 本地计数> 列表 Example: >>> wc = WordCountStreaming() >>> wc.combiner([('hello', 1), ('world', 1), ('hello', 1)]) [('hello', 2), ('world', 1)] """ word_counts = defaultdict(int) for word, count in pairs: word_counts[word] += count return [(word, count) for word, count in word_counts.items()] def reducer(self, word: str, counts: List[int]) -> Tuple[str, int]: """ Reducer 函数:统计每个单词的总次数 对应 Java 版本的 IntSumReducer.reduce 方法。 Args: word: 单词 counts: 该单词的所有计数列表 Returns: <单词, 总次数> 元组 Example: >>> wc = WordCountStreaming() >>> wc.reducer('hello', [1, 1, 1]) ('hello', 3) """ total = sum(counts) return (word, total) def run_mapper_from_stdin(self, case_sensitive: bool = False, min_word_length: int = 1, stop_words: Optional[List[str]] = None): """ 从标准输入运行 Mapper(用于 Hadoop Streaming) 从 stdin 读取每行数据,执行 Mapper 逻辑,输出到 stdout。 Args: case_sensitive: 是否区分大小写 min_word_length: 最小单词长度 stop_words: 停用词列表 """ for line in sys.stdin: pairs = self.mapper(line, case_sensitive, min_word_length, stop_words) for word, count in pairs: print(f"{word}\t{count}") def run_reducer_from_stdin(self): """ 从标准输入运行 Reducer(用于 Hadoop Streaming) 从 stdin 读取 Mapper 输出,执行 Reducer 逻辑,输出到 stdout。 假设输入已经按键排序(Hadoop Streaming 会自动排序)。 """ current_word = None current_counts = [] for line in sys.stdin: line = line.strip() if not line: continue # 解析输入:单词\t计数 parts = line.split('\t', 1) if len(parts) != 2: continue word, count_str = parts try: count = int(count_str) except ValueError: continue # 处理相同单词的计数 if current_word == word: current_counts.append(count) else: # 输出前一个单词的结果 if current_word is not None: result_word, result_count = self.reducer(current_word, current_counts) print(f"{result_word}\t{result_count}") # 开始处理新单词 current_word = word current_counts = [count] # 输出最后一个单词的结果 if current_word is not None: result_word, result_count = self.reducer(current_word, current_counts) print(f"{result_word}\t{result_count}") def run(self, input_path: str, output_path: str, mapper_script: Optional[str] = None, reducer_script: Optional[str] = None, combiner: Optional[bool] = None, num_reducers: Optional[int] = None, timeout: Optional[int] = None) -> bool: """ 运行完整的 WordCount 作业 使用 Hadoop Streaming 提交作业到 Hadoop 集群。 Args: input_path: HDFS 输入路径 output_path: HDFS 输出路径(不能已存在) mapper_script: Mapper 脚本路径(可选,默认使用当前脚本) reducer_script: Reducer 脚本路径(可选,默认使用当前脚本) combiner: 是否使用 Combiner(可选,默认为配置中的设置) num_reducers: Reducer 任务数量(可选,默认为配置中的设置) timeout: 超时时间(秒,可选,默认为配置中的设置) Returns: 作业是否成功完成 Raises: RuntimeError: 当找不到 Streaming jar 时 ValueError: 当输入路径无效时 """ # 使用配置中的默认值 use_combiner = default_value(combiner, self.use_combiner) reducers = default_value(num_reducers, self.num_reducers) job_timeout = default_value(timeout, self.job_timeout) # 确定脚本路径 if mapper_script is None: mapper_script = __file__ if reducer_script is None: reducer_script = __file__ # 构建 Hadoop Streaming 命令 streaming_jar = self._find_streaming_jar() if not streaming_jar: self.logger.error("Could not find Hadoop Streaming jar") raise RuntimeError("Hadoop Streaming jar not found") cmd_parts = [ self.hadoop_cmd, 'jar', streaming_jar, '-files', f"{mapper_script},{reducer_script}", '-mapper', f"python3 {os.path.basename(mapper_script)} mapper", '-reducer', f"python3 {os.path.basename(reducer_script)} reducer", '-input', input_path, '-output', output_path, '-D', f"mapreduce.job.reduces={reducers}" ] if use_combiner: cmd_parts.extend(['-combiner', f"python3 {os.path.basename(mapper_script)} mapper | sort | python3 {os.path.basename(reducer_script)} reducer"]) cmd = ' '.join(cmd_parts) self.logger.info(f"Running Hadoop Streaming job: {cmd}") returncode, stdout, stderr = run_command(cmd, timeout=job_timeout) if returncode == 0: self.logger.info("WordCount job completed successfully") self.logger.info(f"Output: {stdout}") return True else: self.logger.error(f"WordCount job failed with return code {returncode}") self.logger.error(f"Stderr: {stderr}") return False def _find_streaming_jar(self) -> Optional[str]: """ 查找 Hadoop Streaming jar 文件 Returns: Streaming jar 文件路径,如果未找到返回 None """ import glob # 尝试从常见位置查找 search_paths = [ os.path.join(self.hadoop_home, 'share', 'hadoop', 'tools', 'lib'), os.path.join(self.hadoop_home, 'contrib', 'streaming'), '/usr/lib/hadoop-mapreduce', '/usr/hdp/current/hadoop-mapreduce-client' ] for path in search_paths: if os.path.exists(path): jars = glob.glob(os.path.join(path, 'hadoop-streaming-*.jar')) if jars: return jars[0] # 尝试使用 hadoop classpath 查找 returncode, stdout, stderr = run_command(f"{self.hadoop_cmd} classpath --glob") if returncode == 0: # 解析 classpath,查找 streaming jar classpath = stdout.strip() for part in classpath.split(os.pathsep): if 'streaming' in part.lower() and part.endswith('.jar'): return part return None def count_words_locally(self, text: str, case_sensitive: bool = False, min_word_length: int = 1, stop_words: Optional[List[str]] = None, top_n: int = 10) -> WordCountResult: """ 本地统计单词(不使用 Hadoop) 用于测试和小规模数据处理。 Args: text: 输入文本 case_sensitive: 是否区分大小写 min_word_length: 最小单词长度 stop_words: 停用词列表 top_n: 返回前 N 个最常见的单词 Returns: WordCountResult 结果对象 Example: >>> wc = WordCountStreaming() >>> result = wc.count_words_locally("hello world hello") >>> result.total_words 3 >>> result.unique_words 2 """ import time start_time = time.time() # 模拟完整的 MapReduce 流程 all_pairs = [] for line in text.split('\n'): pairs = self.mapper(line, case_sensitive, min_word_length, stop_words) all_pairs.extend(pairs) # 按单词分组 word_groups = defaultdict(list) for word, count in all_pairs: word_groups[word].append(count) # 执行 Reduce word_counts = {} for word, counts in word_groups.items(): _, total = self.reducer(word, counts) word_counts[word] = total # 计算统计信息 total_words = sum(word_counts.values()) unique_words = len(word_counts) top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:top_n] # 计算执行时间 execution_time_ms = (time.time() - start_time) * 1000 return WordCountResult( total_words=total_words, unique_words=unique_words, top_words=top_words, word_counts=word_counts, execution_time_ms=execution_time_ms ) def analyze_text(self, text: str) -> Dict[str, Any]: """ 分析文本的详细统计信息 Args: text: 输入文本 Returns: 包含详细统计信息的字典 """ result = self.count_words_locally(text) # 计算额外的统计信息 word_lengths = [len(word) for word in result.word_counts.keys()] analysis = { 'total_words': result.total_words, 'unique_words': result.unique_words, 'most_frequent_word': result.most_frequent_word, 'avg_word_frequency': result.avg_word_frequency, 'min_word_length': min(word_lengths) if word_lengths else 0, 'max_word_length': max(word_lengths) if word_lengths else 0, 'avg_word_length': sum(word_lengths) / len(word_lengths) if word_lengths else 0, 'word_frequency_distribution': self._get_frequency_distribution(result.word_counts), 'top_10_words': result.top_words[:10] } return analysis def _get_frequency_distribution(self, word_counts: Dict[str, int]) -> Dict[str, int]: """ 获取词频分布 Args: word_counts: 单词计数字典 Returns: 词频分布(出现1次的单词数、出现2次的单词数等) """ distribution = defaultdict(int) for count in word_counts.values(): if count == 1: distribution['once'] += 1 elif count <= 5: distribution['2-5'] += 1 elif count <= 10: distribution['6-10'] += 1 else: distribution['10+'] += 1 return dict(distribution) def get_stop_words(self, language: str = 'english') -> List[str]: """ 获取常见停用词列表 Args: language: 语言(默认 'english') Returns: 停用词列表 """ # 常见英语停用词 english_stop_words = [ 'the', 'be', 'to', 'of', 'and', 'a', 'in', 'that', 'have', 'i', 'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you', 'do', 'at', 'this', 'but', 'his', 'by', 'from', 'they', 'we', 'say', 'her', 'she', 'or', 'an', 'will', 'my', 'one', 'all', 'would', 'there', 'their', 'what', 'so', 'up', 'out', 'if', 'about', 'who', 'get', 'which', 'go', 'me', 'when', 'make', 'can', 'like', 'time', 'no', 'just', 'him', 'know', 'take', 'people', 'into', 'year', 'your', 'good', 'some', 'could', 'them', 'see', 'other', 'than', 'then', 'now', 'look', 'only', 'come', 'its', 'over', 'think', 'also', 'back', 'after', 'use', 'two', 'how', 'our', 'work', 'first', 'well', 'way', 'even', 'new', 'want', 'because', 'any', 'these', 'give', 'day', 'most', 'us' ] if language.lower() == 'english': return english_stop_words else: # 默认返回英语停用词 return english_stop_words def main(): """ 主函数:作为独立脚本运行 支持的命令: - mapper: 运行 Mapper - reducer: 运行 Reducer - local: 本地测试 - analyze: 分析文本 """ import argparse parser = argparse.ArgumentParser( description='Hadoop Streaming WordCount (Modern Version)', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' Examples: # 从标准输入运行 Mapper python wordcount_streaming.py mapper < input.txt # 从标准输入运行 Reducer python wordcount_streaming.py reducer < mapper_output.txt # 本地测试 python wordcount_streaming.py local # 分析文本文件 python wordcount_streaming.py analyze --file input.txt --stop-words # 使用增强的 Mapper 参数 python wordcount_streaming.py mapper --case-sensitive --min-length 3 < input.txt ''' ) # 子命令 subparsers = parser.add_subparsers(dest='command', help='Available commands') # mapper 子命令 mapper_parser = subparsers.add_parser('mapper', help='Run Mapper from stdin') mapper_parser.add_argument('--case-sensitive', '-c', action='store_true', help='Case-sensitive word matching') mapper_parser.add_argument('--min-length', '-m', type=int, default=1, help='Minimum word length (default: 1)') mapper_parser.add_argument('--stop-words', '-s', action='store_true', help='Use common English stop words') mapper_parser.add_argument('--stop-words-file', type=str, default=None, help='Custom stop words file path') # reducer 子命令 reducer_parser = subparsers.add_parser('reducer', help='Run Reducer from stdin') # local 子命令 local_parser = subparsers.add_parser('local', help='Run local test') local_parser.add_argument('--file', '-f', type=str, default=None, help='Input file path (default: use sample text)') local_parser.add_argument('--case-sensitive', '-c', action='store_true', help='Case-sensitive word matching') local_parser.add_argument('--min-length', '-m', type=int, default=1, help='Minimum word length (default: 1)') local_parser.add_argument('--stop-words', '-s', action='store_true', help='Use common English stop words') local_parser.add_argument('--top-n', '-n', type=int, default=10, help='Number of top words to show (default: 10)') local_parser.add_argument('--format', '-fmt', type=str, default='text', choices=['text', 'json', 'csv'], help='Output format (default: text)') local_parser.add_argument('--output', '-o', type=str, default=None, help='Output file path (optional)') # analyze 子命令 analyze_parser = subparsers.add_parser('analyze', help='Analyze text statistics') analyze_parser.add_argument('--file', '-f', type=str, required=True, help='Input file path') analyze_parser.add_argument('--format', '-fmt', type=str, default='json', choices=['text', 'json'], help='Output format (default: json)') args = parser.parse_args() if not args.command: parser.print_help() sys.exit(1) wc = WordCountStreaming() # 处理停用词 stop_words = None if hasattr(args, 'stop_words') and args.stop_words: stop_words = wc.get_stop_words() if hasattr(args, 'stop_words_file') and args.stop_words_file: if os.path.exists(args.stop_words_file): with open(args.stop_words_file, 'r', encoding='utf-8') as f: custom_stop_words = [line.strip().lower() for line in f if line.strip()] if stop_words is None: stop_words = custom_stop_words else: stop_words.extend(custom_stop_words) if args.command == 'mapper': wc.run_mapper_from_stdin( case_sensitive=getattr(args, 'case_sensitive', False), min_word_length=getattr(args, 'min_length', 1), stop_words=stop_words ) elif args.command == 'reducer': wc.run_reducer_from_stdin() elif args.command == 'local': # 本地测试 if args.file and os.path.exists(args.file): with open(args.file, 'r', encoding='utf-8') as f: test_text = f.read() else: # 使用示例文本 test_text = """ Hello world, hello Hadoop! Hadoop is great for big data. Big data processing with Hadoop. Spark is fast and general engine for large-scale data processing. Hadoop provides massive storage for any kind of data. """ # 执行统计 result = wc.count_words_locally( test_text, case_sensitive=args.case_sensitive, min_word_length=args.min_length, stop_words=stop_words, top_n=args.top_n ) # 确定输出格式 output_format = OutputFormat(args.format) # 输出结果 if args.output: result.save_to_file(args.output, output_format) print(f"Result saved to: {args.output}") else: if output_format == OutputFormat.JSON: print(result.to_json(indent=2)) elif output_format == OutputFormat.CSV: print('word,count') for word, count in sorted(result.word_counts.items(), key=lambda x: x[1], reverse=True): print(f'"{word}",{count}') else: print(f"\n{'='*60}") print(f"Word Count Results (Local Mode)") print(f"{'='*60}") print(f"\nTotal words: {result.total_words:,}") print(f"Unique words: {result.unique_words:,}") print(f"Execution time: {result.execution_time_ms:.2f} ms") if result.most_frequent_word: print(f"\nMost frequent word: {result.most_frequent_word[0]} ({result.most_frequent_word[1]} times)") print(f"\nTop {args.top_n} words:") for word, count in result.top_words: print(f" {word}: {count}") elif args.command == 'analyze': # 分析文本 with open(args.file, 'r', encoding='utf-8') as f: text = f.read() analysis = wc.analyze_text(text) if args.format == 'json': print(json.dumps(analysis, ensure_ascii=False, indent=2)) else: print(f"\n{'='*60}") print(f"Text Analysis Report") print(f"{'='*60}") print(f"\nFile: {args.file}") print(f"\nBasic Statistics:") print(f" Total words: {analysis['total_words']:,}") print(f" Unique words: {analysis['unique_words']:,}") print(f" Most frequent: {analysis['most_frequent_word']}") print(f" Average frequency: {analysis['avg_word_frequency']:.2f}") print(f"\nWord Length Statistics:") print(f" Min word length: {analysis['min_word_length']}") print(f" Max word length: {analysis['max_word_length']}") print(f" Average word length: {analysis['avg_word_length']:.2f}") print(f"\nWord Frequency Distribution:") for key, value in analysis['word_frequency_distribution'].items(): print(f" {key}: {value}") print(f"\nTop 10 Words:") for word, count in analysis['top_10_words']: print(f" {word}: {count}") if __name__ == '__main__': main()