| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- """
- Hadoop Streaming 方式的词频统计模块
- 对应 Java 版本的 WordCount 类,使用 Hadoop Streaming 方式实现:
- - Mapper: 从标准输入读取数据,分割为单词,输出 <单词, 1>
- - Reducer: 从标准输入读取 Mapper 输出,统计每个单词的总次数
- - Combiner: 可选的本地合并,减少数据传输
- 使用方式:
- 1. 作为独立脚本运行(用于 Hadoop Streaming):
- $ python wordcount_streaming.py mapper < input.txt
- $ python wordcount_streaming.py reducer < mapper_output.txt
- 2. 作为模块导入使用:
- from wordcount_streaming import WordCountStreaming
- wc = WordCountStreaming()
- wc.run(input_path, output_path)
- """
- import sys
- from collections import defaultdict
- from typing import Dict, List, Optional, Tuple
- from ..utils.helpers import run_command, setup_logger
- class WordCountStreaming:
- """
- Hadoop Streaming 方式的词频统计类
-
- 封装了 Hadoop Streaming 作业的执行,提供与 Java 版本 WordCount 类类似的功能。
- """
-
- def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'wordcount_streaming'):
- """
- 初始化 WordCountStreaming 实例
-
- Args:
- hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
- logger_name: 日志器名称
- """
- self.logger = setup_logger(logger_name)
- self.hadoop_home = hadoop_home or __import__('os').environ.get('HADOOP_HOME', '')
- self.hadoop_cmd = 'hadoop'
-
- def mapper(self, line: str) -> List[Tuple[str, int]]:
- """
- Mapper 函数:将一行文本分割为单词,输出 <单词, 1>
-
- 对应 Java 版本的 TokenizerMapper.map 方法。
-
- Args:
- line: 输入的一行文本
-
- Returns:
- 单词和计数的元组列表
-
- Example:
- >>> wc = WordCountStreaming()
- >>> wc.mapper("hello world hello")
- [('hello', 1), ('world', 1), ('hello', 1)]
- """
- results = []
- # 分割文本为单词(使用空格、制表符等分隔符)
- words = line.strip().split()
- for word in words:
- # 清理单词(移除标点符号,转为小写)
- word = word.strip('.,!?;:()[]{}"\'').lower()
- if word: # 确保单词非空
- results.append((word, 1))
- return results
-
- def combiner(self, pairs: List[Tuple[str, int]]) -> List[Tuple[str, int]]:
- """
- Combiner 函数:在 Mapper 端进行本地合并
-
- 对应 Java 版本的 Combiner 功能,减少数据传输量。
-
- Args:
- pairs: Mapper 输出的 <单词, 1> 列表
-
- Returns:
- 合并后的 <单词, 本地计数> 列表
-
- Example:
- >>> wc = WordCountStreaming()
- >>> wc.combiner([('hello', 1), ('world', 1), ('hello', 1)])
- [('hello', 2), ('world', 1)]
- """
- word_counts = defaultdict(int)
- for word, count in pairs:
- word_counts[word] += count
- return [(word, count) for word, count in word_counts.items()]
-
- def reducer(self, word: str, counts: List[int]) -> Tuple[str, int]:
- """
- Reducer 函数:统计每个单词的总次数
-
- 对应 Java 版本的 IntSumReducer.reduce 方法。
-
- Args:
- word: 单词
- counts: 该单词的所有计数列表
-
- Returns:
- <单词, 总次数> 元组
-
- Example:
- >>> wc = WordCountStreaming()
- >>> wc.reducer('hello', [1, 1, 1])
- ('hello', 3)
- """
- total = sum(counts)
- return (word, total)
-
- def run_mapper_from_stdin(self):
- """
- 从标准输入运行 Mapper(用于 Hadoop Streaming)
-
- 从 stdin 读取每行数据,执行 Mapper 逻辑,输出到 stdout。
- """
- for line in sys.stdin:
- pairs = self.mapper(line)
- for word, count in pairs:
- print(f"{word}\t{count}")
-
- def run_reducer_from_stdin(self):
- """
- 从标准输入运行 Reducer(用于 Hadoop Streaming)
-
- 从 stdin 读取 Mapper 输出,执行 Reducer 逻辑,输出到 stdout。
- 假设输入已经按键排序(Hadoop Streaming 会自动排序)。
- """
- current_word = None
- current_counts = []
-
- for line in sys.stdin:
- line = line.strip()
- if not line:
- continue
-
- # 解析输入:单词\t计数
- parts = line.split('\t', 1)
- if len(parts) != 2:
- continue
-
- word, count_str = parts
- try:
- count = int(count_str)
- except ValueError:
- continue
-
- # 处理相同单词的计数
- if current_word == word:
- current_counts.append(count)
- else:
- # 输出前一个单词的结果
- if current_word is not None:
- result_word, result_count = self.reducer(current_word, current_counts)
- print(f"{result_word}\t{result_count}")
-
- # 开始处理新单词
- current_word = word
- current_counts = [count]
-
- # 输出最后一个单词的结果
- if current_word is not None:
- result_word, result_count = self.reducer(current_word, current_counts)
- print(f"{result_word}\t{result_count}")
-
- def run(self, input_path: str, output_path: str,
- mapper_script: Optional[str] = None,
- reducer_script: Optional[str] = None,
- combiner: bool = True,
- num_reducers: int = 1) -> bool:
- """
- 运行完整的 WordCount 作业
-
- 使用 Hadoop Streaming 提交作业到 Hadoop 集群。
-
- Args:
- input_path: HDFS 输入路径
- output_path: HDFS 输出路径(不能已存在)
- mapper_script: Mapper 脚本路径(可选,默认使用当前脚本)
- reducer_script: Reducer 脚本路径(可选,默认使用当前脚本)
- combiner: 是否使用 Combiner
- num_reducers: Reducer 任务数量
-
- Returns:
- 作业是否成功完成
- """
- import os
-
- # 确定脚本路径
- if mapper_script is None:
- mapper_script = __file__
- if reducer_script is None:
- reducer_script = __file__
-
- # 构建 Hadoop Streaming 命令
- streaming_jar = self._find_streaming_jar()
- if not streaming_jar:
- self.logger.error("Could not find Hadoop Streaming jar")
- return False
-
- cmd_parts = [
- self.hadoop_cmd,
- 'jar', streaming_jar,
- '-files', f"{mapper_script},{reducer_script}",
- '-mapper', f"python3 {os.path.basename(mapper_script)} mapper",
- '-reducer', f"python3 {os.path.basename(reducer_script)} reducer",
- '-input', input_path,
- '-output', output_path,
- '-D', f"mapreduce.job.reduces={num_reducers}"
- ]
-
- if combiner:
- cmd_parts.extend(['-combiner', f"python3 {os.path.basename(mapper_script)} mapper | sort | python3 {os.path.basename(reducer_script)} reducer"])
-
- cmd = ' '.join(cmd_parts)
- self.logger.info(f"Running Hadoop Streaming job: {cmd}")
-
- returncode, stdout, stderr = run_command(cmd, timeout=3600) # 1小时超时
-
- if returncode == 0:
- self.logger.info("WordCount job completed successfully")
- self.logger.info(f"Output: {stdout}")
- return True
- else:
- self.logger.error(f"WordCount job failed with return code {returncode}")
- self.logger.error(f"Stderr: {stderr}")
- return False
-
- def _find_streaming_jar(self) -> Optional[str]:
- """
- 查找 Hadoop Streaming jar 文件
-
- Returns:
- Streaming jar 文件路径,如果未找到返回 None
- """
- import os
- import glob
-
- # 尝试从常见位置查找
- search_paths = [
- os.path.join(self.hadoop_home, 'share', 'hadoop', 'tools', 'lib'),
- os.path.join(self.hadoop_home, 'contrib', 'streaming'),
- '/usr/lib/hadoop-mapreduce',
- '/usr/hdp/current/hadoop-mapreduce-client'
- ]
-
- for path in search_paths:
- if os.path.exists(path):
- jars = glob.glob(os.path.join(path, 'hadoop-streaming-*.jar'))
- if jars:
- return jars[0]
-
- # 尝试使用 hadoop classpath 查找
- returncode, stdout, stderr = run_command(f"{self.hadoop_cmd} classpath --glob")
- if returncode == 0:
- # 解析 classpath,查找 streaming jar
- classpath = stdout.strip()
- for part in classpath.split(os.pathsep):
- if 'streaming' in part.lower() and part.endswith('.jar'):
- return part
-
- return None
-
- def count_words_locally(self, text: str) -> Dict[str, int]:
- """
- 本地统计单词(不使用 Hadoop)
-
- 用于测试和小规模数据处理。
-
- Args:
- text: 输入文本
-
- Returns:
- 单词计数字典
-
- Example:
- >>> wc = WordCountStreaming()
- >>> wc.count_words_locally("hello world hello")
- {'hello': 2, 'world': 1}
- """
- # 模拟完整的 MapReduce 流程
- all_pairs = []
- for line in text.split('\n'):
- pairs = self.mapper(line)
- all_pairs.extend(pairs)
-
- # 按单词分组
- word_groups = defaultdict(list)
- for word, count in all_pairs:
- word_groups[word].append(count)
-
- # 执行 Reduce
- results = {}
- for word, counts in word_groups.items():
- _, total = self.reducer(word, counts)
- results[word] = total
-
- return results
- def main():
- """
- 主函数:作为独立脚本运行
-
- 支持的命令:
- - mapper: 运行 Mapper
- - reducer: 运行 Reducer
- - local: 本地测试
- """
- if len(sys.argv) < 2:
- print("Usage: python wordcount_streaming.py <command>")
- print("Commands:")
- print(" mapper - Run Mapper from stdin")
- print(" reducer - Run Reducer from stdin")
- print(" local - Run local test")
- sys.exit(1)
-
- command = sys.argv[1]
- wc = WordCountStreaming()
-
- if command == 'mapper':
- wc.run_mapper_from_stdin()
- elif command == 'reducer':
- wc.run_reducer_from_stdin()
- elif command == 'local':
- # 本地测试
- test_text = """
- Hello world, hello Hadoop!
- Hadoop is great for big data.
- Big data processing with Hadoop.
- """
- result = wc.count_words_locally(test_text)
- print("Word count results:")
- for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True):
- print(f"{word}: {count}")
- else:
- print(f"Unknown command: {command}")
- sys.exit(1)
- if __name__ == '__main__':
- main()
|