""" PySpark 方式的词频统计模块 使用 PySpark 实现词频统计,这是现代大数据处理的推荐方式: - 更简洁的 API - 更好的性能 - 支持更多的数据处理操作 - 可以与 Spark SQL、MLlib 等集成 对应 Java 版本的 WordCount 类,但使用更现代的 Spark 框架。 使用方式: 1. 作为模块导入使用: from wordcount_spark import WordCountSpark wc = WordCountSpark() result = wc.run(input_path, output_path) 2. 作为独立脚本运行: $ python wordcount_spark.py """ import sys from typing import Dict, List, Optional, Tuple from collections import defaultdict from ..utils.helpers import setup_logger, format_file_size class WordCountSpark: """ PySpark 方式的词频统计类 封装了 PySpark 作业的执行,提供高效的词频统计功能。 """ def __init__(self, app_name: str = 'WordCount', master: Optional[str] = None, logger_name: str = 'wordcount_spark'): """ 初始化 WordCountSpark 实例 Args: app_name: Spark 应用名称 master: Spark 主节点 URL(可选,如 'local[*]', 'spark://master:7077') 如果为 None,Spark 会从配置中自动获取 logger_name: 日志器名称 """ self.logger = setup_logger(logger_name) self.app_name = app_name self.master = master self.spark = None self.sc = None def _init_spark(self): """ 初始化 Spark 会话和上下文 延迟初始化,只有在需要时才创建 Spark 实例。 """ if self.spark is not None: return try: from pyspark.sql import SparkSession builder = SparkSession.builder.appName(self.app_name) if self.master: builder = builder.master(self.master) # 配置一些常用参数 builder = builder.config("spark.sql.shuffle.partitions", "2") builder = builder.config("spark.driver.memory", "1g") builder = builder.config("spark.executor.memory", "1g") self.spark = builder.getOrCreate() self.sc = self.spark.sparkContext self.logger.info(f"Spark session initialized: {self.app_name}") self.logger.info(f"Spark master: {self.sc.master}") self.logger.info(f"Spark version: {self.sc.version}") except ImportError as e: self.logger.error(f"PySpark is not installed: {e}") raise except Exception as e: self.logger.error(f"Failed to initialize Spark: {e}") raise def stop(self): """ 停止 Spark 会话 """ if self.spark: self.spark.stop() self.spark = None self.sc = None self.logger.info("Spark session stopped") def count_words_from_rdd(self, text_rdd) -> Dict[str, int]: """ 从 RDD 统计单词 对应 Java 版本的 WordCount 逻辑,但使用 Spark 的算子。 Args: text_rdd: 包含文本的 RDD Returns: 单词计数字典 """ # 1. 分割每行文本为单词 # 对应 Java 的 TokenizerMapper.map 方法 words_rdd = text_rdd.flatMap(self._split_line) # 2. 映射为 (单词, 1) pairs_rdd = words_rdd.map(lambda word: (word, 1)) # 3. 按单词聚合计数 # 对应 Java 的 IntSumReducer.reduce 方法 word_counts_rdd = pairs_rdd.reduceByKey(lambda x, y: x + y) # 4. 收集结果到本地 result = word_counts_rdd.collectAsMap() return dict(result) def _split_line(self, line: str) -> List[str]: """ 分割一行文本为单词列表 Args: line: 输入文本行 Returns: 单词列表 """ words = [] # 分割文本为单词(使用空格、制表符等分隔符) raw_words = line.strip().split() for word in raw_words: # 清理单词(移除标点符号,转为小写) word = word.strip('.,!?;:()[]{}"\'').lower() if word: # 确保单词非空 words.append(word) return words def count_words_from_dataframe(self, df, text_column: str = 'value') -> Dict[str, int]: """ 从 DataFrame 统计单词(使用 Spark SQL 风格) 更高级的 API,适合复杂的数据处理。 Args: df: 包含文本的 DataFrame text_column: 包含文本的列名 Returns: 单词计数字典 """ from pyspark.sql.functions import explode, split, lower, trim, regexp_replace, col, count # 1. 清理文本(移除标点符号,转为小写) df_clean = df.withColumn( 'clean_text', lower(trim(regexp_replace(col(text_column), '[^a-zA-Z0-9\\s]', ' '))) ) # 2. 分割为单词 df_words = df_clean.withColumn( 'word', explode(split(col('clean_text'), '\\s+')) ) # 3. 过滤空单词 df_filtered = df_words.filter(col('word') != '') # 4. 按单词分组计数 df_counts = df_filtered.groupBy('word').agg(count('*').alias('count')) # 5. 收集结果 result = {row['word']: row['count'] for row in df_counts.collect()} return result def run(self, input_path: str, output_path: Optional[str] = None, use_dataframe: bool = True) -> Dict[str, int]: """ 运行完整的 WordCount 作业 Args: input_path: 输入路径(可以是本地文件路径或 HDFS 路径) output_path: 输出路径(可选,如果指定则保存结果) use_dataframe: 是否使用 DataFrame API(否则使用 RDD API) Returns: 单词计数字典 """ self._init_spark() self.logger.info(f"Running WordCount job on: {input_path}") if use_dataframe: # 使用 DataFrame API df = self.spark.read.text(input_path) result = self.count_words_from_dataframe(df) else: # 使用 RDD API text_rdd = self.sc.textFile(input_path) result = self.count_words_from_rdd(text_rdd) # 保存结果(如果指定了输出路径) if output_path: self._save_result(result, output_path) # 打印统计信息 self._print_statistics(result) return result def _save_result(self, result: Dict[str, int], output_path: str): """ 保存结果到文件 Args: result: 单词计数字典 output_path: 输出路径 """ self.logger.info(f"Saving results to: {output_path}") # 转换为 RDD 并保存 result_rdd = self.sc.parallelize([ f"{word}\t{count}" for word, count in sorted(result.items()) ]) result_rdd.saveAsTextFile(output_path) self.logger.info(f"Results saved to: {output_path}") def _print_statistics(self, result: Dict[str, int]): """ 打印统计信息 Args: result: 单词计数字典 """ if not result: self.logger.info("No words found") return total_words = sum(result.values()) unique_words = len(result) sorted_words = sorted(result.items(), key=lambda x: x[1], reverse=True) self.logger.info("=" * 50) self.logger.info("WordCount Statistics") self.logger.info("=" * 50) self.logger.info(f"Total words: {total_words}") self.logger.info(f"Unique words: {unique_words}") self.logger.info("-" * 50) self.logger.info("Top 10 words:") for i, (word, count) in enumerate(sorted_words[:10], 1): percentage = (count / total_words) * 100 self.logger.info(f" {i:2d}. {word:15s} {count:5d} ({percentage:5.1f}%)") self.logger.info("=" * 50) def count_words_locally(self, text: str) -> Dict[str, int]: """ 本地统计单词(不使用 Spark 集群) 用于测试和小规模数据处理。 Args: text: 输入文本 Returns: 单词计数字典 Example: >>> wc = WordCountSpark() >>> wc.count_words_locally("hello world hello") {'hello': 2, 'world': 1} """ word_counts = defaultdict(int) for line in text.split('\n'): words = self._split_line(line) for word in words: word_counts[word] += 1 return dict(word_counts) def run_with_files(self, files: List[str], output_path: Optional[str] = None) -> Dict[str, int]: """ 对多个文件运行词频统计 Args: files: 文件路径列表 output_path: 输出路径(可选) Returns: 单词计数字典 """ # 合并所有文件的内容 all_text = "" for file_path in files: try: with open(file_path, 'r', encoding='utf-8') as f: all_text += f.read() + "\n" except Exception as e: self.logger.warning(f"Failed to read file {file_path}: {e}") # 本地统计 result = self.count_words_locally(all_text) # 保存结果 if output_path: with open(output_path, 'w', encoding='utf-8') as f: for word, count in sorted(result.items()): f.write(f"{word}\t{count}\n") # 打印统计信息 self._print_statistics(result) return result def main(): """ 主函数:作为独立脚本运行 使用方式: python wordcount_spark.py [output_path] """ if len(sys.argv) < 2: print("Usage: python wordcount_spark.py [output_path]") print("Examples:") print(" python wordcount_spark.py input.txt") print(" python wordcount_spark.py hdfs:///user/hadoop/data output") print(" python wordcount_spark.py --local input.txt output.txt") sys.exit(1) # 解析参数 use_local = False input_path = None output_path = None i = 1 while i < len(sys.argv): arg = sys.argv[i] if arg == '--local': use_local = True elif input_path is None: input_path = arg else: output_path = arg i += 1 if input_path is None: print("Error: Input path is required") sys.exit(1) wc = WordCountSpark() try: if use_local: # 本地模式(不使用 Spark) result = wc.run_with_files([input_path], output_path) else: # Spark 模式 result = wc.run(input_path, output_path) # 打印结果 print("\nFinal results:") for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True)[:20]: print(f"{word}: {count}") finally: wc.stop() if __name__ == '__main__': main()