| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- """
- PySpark 方式的词频统计模块
- 使用 PySpark 实现词频统计,这是现代大数据处理的推荐方式:
- - 更简洁的 API
- - 更好的性能
- - 支持更多的数据处理操作
- - 可以与 Spark SQL、MLlib 等集成
- 对应 Java 版本的 WordCount 类,但使用更现代的 Spark 框架。
- 使用方式:
- 1. 作为模块导入使用:
- from wordcount_spark import WordCountSpark
- wc = WordCountSpark()
- result = wc.run(input_path, output_path)
- 2. 作为独立脚本运行:
- $ python wordcount_spark.py <input_path> <output_path>
- """
- import sys
- from typing import Dict, List, Optional, Tuple
- from collections import defaultdict
- from ..utils.helpers import setup_logger, format_file_size
- class WordCountSpark:
- """
- PySpark 方式的词频统计类
-
- 封装了 PySpark 作业的执行,提供高效的词频统计功能。
- """
-
- def __init__(self, app_name: str = 'WordCount',
- master: Optional[str] = None,
- logger_name: str = 'wordcount_spark'):
- """
- 初始化 WordCountSpark 实例
-
- Args:
- app_name: Spark 应用名称
- master: Spark 主节点 URL(可选,如 'local[*]', 'spark://master:7077')
- 如果为 None,Spark 会从配置中自动获取
- logger_name: 日志器名称
- """
- self.logger = setup_logger(logger_name)
- self.app_name = app_name
- self.master = master
- self.spark = None
- self.sc = None
-
- def _init_spark(self):
- """
- 初始化 Spark 会话和上下文
-
- 延迟初始化,只有在需要时才创建 Spark 实例。
- """
- if self.spark is not None:
- return
-
- try:
- from pyspark.sql import SparkSession
-
- builder = SparkSession.builder.appName(self.app_name)
- if self.master:
- builder = builder.master(self.master)
-
- # 配置一些常用参数
- builder = builder.config("spark.sql.shuffle.partitions", "2")
- builder = builder.config("spark.driver.memory", "1g")
- builder = builder.config("spark.executor.memory", "1g")
-
- self.spark = builder.getOrCreate()
- self.sc = self.spark.sparkContext
-
- self.logger.info(f"Spark session initialized: {self.app_name}")
- self.logger.info(f"Spark master: {self.sc.master}")
- self.logger.info(f"Spark version: {self.sc.version}")
-
- except ImportError as e:
- self.logger.error(f"PySpark is not installed: {e}")
- raise
- except Exception as e:
- self.logger.error(f"Failed to initialize Spark: {e}")
- raise
-
- def stop(self):
- """
- 停止 Spark 会话
- """
- if self.spark:
- self.spark.stop()
- self.spark = None
- self.sc = None
- self.logger.info("Spark session stopped")
-
- def count_words_from_rdd(self, text_rdd) -> Dict[str, int]:
- """
- 从 RDD 统计单词
-
- 对应 Java 版本的 WordCount 逻辑,但使用 Spark 的算子。
-
- Args:
- text_rdd: 包含文本的 RDD
-
- Returns:
- 单词计数字典
- """
- # 1. 分割每行文本为单词
- # 对应 Java 的 TokenizerMapper.map 方法
- words_rdd = text_rdd.flatMap(self._split_line)
-
- # 2. 映射为 (单词, 1)
- pairs_rdd = words_rdd.map(lambda word: (word, 1))
-
- # 3. 按单词聚合计数
- # 对应 Java 的 IntSumReducer.reduce 方法
- word_counts_rdd = pairs_rdd.reduceByKey(lambda x, y: x + y)
-
- # 4. 收集结果到本地
- result = word_counts_rdd.collectAsMap()
-
- return dict(result)
-
- def _split_line(self, line: str) -> List[str]:
- """
- 分割一行文本为单词列表
-
- Args:
- line: 输入文本行
-
- Returns:
- 单词列表
- """
- words = []
- # 分割文本为单词(使用空格、制表符等分隔符)
- raw_words = line.strip().split()
- for word in raw_words:
- # 清理单词(移除标点符号,转为小写)
- word = word.strip('.,!?;:()[]{}"\'').lower()
- if word: # 确保单词非空
- words.append(word)
- return words
-
- def count_words_from_dataframe(self, df, text_column: str = 'value') -> Dict[str, int]:
- """
- 从 DataFrame 统计单词(使用 Spark SQL 风格)
-
- 更高级的 API,适合复杂的数据处理。
-
- Args:
- df: 包含文本的 DataFrame
- text_column: 包含文本的列名
-
- Returns:
- 单词计数字典
- """
- from pyspark.sql.functions import explode, split, lower, trim, regexp_replace, col, count
-
- # 1. 清理文本(移除标点符号,转为小写)
- df_clean = df.withColumn(
- 'clean_text',
- lower(trim(regexp_replace(col(text_column), '[^a-zA-Z0-9\\s]', ' ')))
- )
-
- # 2. 分割为单词
- df_words = df_clean.withColumn(
- 'word',
- explode(split(col('clean_text'), '\\s+'))
- )
-
- # 3. 过滤空单词
- df_filtered = df_words.filter(col('word') != '')
-
- # 4. 按单词分组计数
- df_counts = df_filtered.groupBy('word').agg(count('*').alias('count'))
-
- # 5. 收集结果
- result = {row['word']: row['count'] for row in df_counts.collect()}
-
- return result
-
- def run(self, input_path: str, output_path: Optional[str] = None,
- use_dataframe: bool = True) -> Dict[str, int]:
- """
- 运行完整的 WordCount 作业
-
- Args:
- input_path: 输入路径(可以是本地文件路径或 HDFS 路径)
- output_path: 输出路径(可选,如果指定则保存结果)
- use_dataframe: 是否使用 DataFrame API(否则使用 RDD API)
-
- Returns:
- 单词计数字典
- """
- self._init_spark()
-
- self.logger.info(f"Running WordCount job on: {input_path}")
-
- if use_dataframe:
- # 使用 DataFrame API
- df = self.spark.read.text(input_path)
- result = self.count_words_from_dataframe(df)
- else:
- # 使用 RDD API
- text_rdd = self.sc.textFile(input_path)
- result = self.count_words_from_rdd(text_rdd)
-
- # 保存结果(如果指定了输出路径)
- if output_path:
- self._save_result(result, output_path)
-
- # 打印统计信息
- self._print_statistics(result)
-
- return result
-
- def _save_result(self, result: Dict[str, int], output_path: str):
- """
- 保存结果到文件
-
- Args:
- result: 单词计数字典
- output_path: 输出路径
- """
- self.logger.info(f"Saving results to: {output_path}")
-
- # 转换为 RDD 并保存
- result_rdd = self.sc.parallelize([
- f"{word}\t{count}"
- for word, count in sorted(result.items())
- ])
- result_rdd.saveAsTextFile(output_path)
-
- self.logger.info(f"Results saved to: {output_path}")
-
- def _print_statistics(self, result: Dict[str, int]):
- """
- 打印统计信息
-
- Args:
- result: 单词计数字典
- """
- if not result:
- self.logger.info("No words found")
- return
-
- total_words = sum(result.values())
- unique_words = len(result)
- sorted_words = sorted(result.items(), key=lambda x: x[1], reverse=True)
-
- self.logger.info("=" * 50)
- self.logger.info("WordCount Statistics")
- self.logger.info("=" * 50)
- self.logger.info(f"Total words: {total_words}")
- self.logger.info(f"Unique words: {unique_words}")
- self.logger.info("-" * 50)
- self.logger.info("Top 10 words:")
-
- for i, (word, count) in enumerate(sorted_words[:10], 1):
- percentage = (count / total_words) * 100
- self.logger.info(f" {i:2d}. {word:15s} {count:5d} ({percentage:5.1f}%)")
-
- self.logger.info("=" * 50)
-
- def count_words_locally(self, text: str) -> Dict[str, int]:
- """
- 本地统计单词(不使用 Spark 集群)
-
- 用于测试和小规模数据处理。
-
- Args:
- text: 输入文本
-
- Returns:
- 单词计数字典
-
- Example:
- >>> wc = WordCountSpark()
- >>> wc.count_words_locally("hello world hello")
- {'hello': 2, 'world': 1}
- """
- word_counts = defaultdict(int)
-
- for line in text.split('\n'):
- words = self._split_line(line)
- for word in words:
- word_counts[word] += 1
-
- return dict(word_counts)
-
- def run_with_files(self, files: List[str], output_path: Optional[str] = None) -> Dict[str, int]:
- """
- 对多个文件运行词频统计
-
- Args:
- files: 文件路径列表
- output_path: 输出路径(可选)
-
- Returns:
- 单词计数字典
- """
- # 合并所有文件的内容
- all_text = ""
- for file_path in files:
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- all_text += f.read() + "\n"
- except Exception as e:
- self.logger.warning(f"Failed to read file {file_path}: {e}")
-
- # 本地统计
- result = self.count_words_locally(all_text)
-
- # 保存结果
- if output_path:
- with open(output_path, 'w', encoding='utf-8') as f:
- for word, count in sorted(result.items()):
- f.write(f"{word}\t{count}\n")
-
- # 打印统计信息
- self._print_statistics(result)
-
- return result
- def main():
- """
- 主函数:作为独立脚本运行
-
- 使用方式:
- python wordcount_spark.py <input_path> [output_path]
- """
- if len(sys.argv) < 2:
- print("Usage: python wordcount_spark.py <input_path> [output_path]")
- print("Examples:")
- print(" python wordcount_spark.py input.txt")
- print(" python wordcount_spark.py hdfs:///user/hadoop/data output")
- print(" python wordcount_spark.py --local input.txt output.txt")
- sys.exit(1)
-
- # 解析参数
- use_local = False
- input_path = None
- output_path = None
-
- i = 1
- while i < len(sys.argv):
- arg = sys.argv[i]
- if arg == '--local':
- use_local = True
- elif input_path is None:
- input_path = arg
- else:
- output_path = arg
- i += 1
-
- if input_path is None:
- print("Error: Input path is required")
- sys.exit(1)
-
- wc = WordCountSpark()
-
- try:
- if use_local:
- # 本地模式(不使用 Spark)
- result = wc.run_with_files([input_path], output_path)
- else:
- # Spark 模式
- result = wc.run(input_path, output_path)
-
- # 打印结果
- print("\nFinal results:")
- for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True)[:20]:
- print(f"{word}: {count}")
-
- finally:
- wc.stop()
- if __name__ == '__main__':
- main()
|