wordcount_spark.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. """
  2. PySpark 方式的词频统计模块
  3. 使用 PySpark 实现词频统计,这是现代大数据处理的推荐方式:
  4. - 更简洁的 API
  5. - 更好的性能
  6. - 支持更多的数据处理操作
  7. - 可以与 Spark SQL、MLlib 等集成
  8. 对应 Java 版本的 WordCount 类,但使用更现代的 Spark 框架。
  9. 使用方式:
  10. 1. 作为模块导入使用:
  11. from wordcount_spark import WordCountSpark
  12. wc = WordCountSpark()
  13. result = wc.run(input_path, output_path)
  14. 2. 作为独立脚本运行:
  15. $ python wordcount_spark.py <input_path> <output_path>
  16. """
  17. import sys
  18. from typing import Dict, List, Optional, Tuple
  19. from collections import defaultdict
  20. from ..utils.helpers import setup_logger, format_file_size
  21. class WordCountSpark:
  22. """
  23. PySpark 方式的词频统计类
  24. 封装了 PySpark 作业的执行,提供高效的词频统计功能。
  25. """
  26. def __init__(self, app_name: str = 'WordCount',
  27. master: Optional[str] = None,
  28. logger_name: str = 'wordcount_spark'):
  29. """
  30. 初始化 WordCountSpark 实例
  31. Args:
  32. app_name: Spark 应用名称
  33. master: Spark 主节点 URL(可选,如 'local[*]', 'spark://master:7077')
  34. 如果为 None,Spark 会从配置中自动获取
  35. logger_name: 日志器名称
  36. """
  37. self.logger = setup_logger(logger_name)
  38. self.app_name = app_name
  39. self.master = master
  40. self.spark = None
  41. self.sc = None
  42. def _init_spark(self):
  43. """
  44. 初始化 Spark 会话和上下文
  45. 延迟初始化,只有在需要时才创建 Spark 实例。
  46. """
  47. if self.spark is not None:
  48. return
  49. try:
  50. from pyspark.sql import SparkSession
  51. builder = SparkSession.builder.appName(self.app_name)
  52. if self.master:
  53. builder = builder.master(self.master)
  54. # 配置一些常用参数
  55. builder = builder.config("spark.sql.shuffle.partitions", "2")
  56. builder = builder.config("spark.driver.memory", "1g")
  57. builder = builder.config("spark.executor.memory", "1g")
  58. self.spark = builder.getOrCreate()
  59. self.sc = self.spark.sparkContext
  60. self.logger.info(f"Spark session initialized: {self.app_name}")
  61. self.logger.info(f"Spark master: {self.sc.master}")
  62. self.logger.info(f"Spark version: {self.sc.version}")
  63. except ImportError as e:
  64. self.logger.error(f"PySpark is not installed: {e}")
  65. raise
  66. except Exception as e:
  67. self.logger.error(f"Failed to initialize Spark: {e}")
  68. raise
  69. def stop(self):
  70. """
  71. 停止 Spark 会话
  72. """
  73. if self.spark:
  74. self.spark.stop()
  75. self.spark = None
  76. self.sc = None
  77. self.logger.info("Spark session stopped")
  78. def count_words_from_rdd(self, text_rdd) -> Dict[str, int]:
  79. """
  80. 从 RDD 统计单词
  81. 对应 Java 版本的 WordCount 逻辑,但使用 Spark 的算子。
  82. Args:
  83. text_rdd: 包含文本的 RDD
  84. Returns:
  85. 单词计数字典
  86. """
  87. # 1. 分割每行文本为单词
  88. # 对应 Java 的 TokenizerMapper.map 方法
  89. words_rdd = text_rdd.flatMap(self._split_line)
  90. # 2. 映射为 (单词, 1)
  91. pairs_rdd = words_rdd.map(lambda word: (word, 1))
  92. # 3. 按单词聚合计数
  93. # 对应 Java 的 IntSumReducer.reduce 方法
  94. word_counts_rdd = pairs_rdd.reduceByKey(lambda x, y: x + y)
  95. # 4. 收集结果到本地
  96. result = word_counts_rdd.collectAsMap()
  97. return dict(result)
  98. def _split_line(self, line: str) -> List[str]:
  99. """
  100. 分割一行文本为单词列表
  101. Args:
  102. line: 输入文本行
  103. Returns:
  104. 单词列表
  105. """
  106. words = []
  107. # 分割文本为单词(使用空格、制表符等分隔符)
  108. raw_words = line.strip().split()
  109. for word in raw_words:
  110. # 清理单词(移除标点符号,转为小写)
  111. word = word.strip('.,!?;:()[]{}"\'').lower()
  112. if word: # 确保单词非空
  113. words.append(word)
  114. return words
  115. def count_words_from_dataframe(self, df, text_column: str = 'value') -> Dict[str, int]:
  116. """
  117. 从 DataFrame 统计单词(使用 Spark SQL 风格)
  118. 更高级的 API,适合复杂的数据处理。
  119. Args:
  120. df: 包含文本的 DataFrame
  121. text_column: 包含文本的列名
  122. Returns:
  123. 单词计数字典
  124. """
  125. from pyspark.sql.functions import explode, split, lower, trim, regexp_replace, col, count
  126. # 1. 清理文本(移除标点符号,转为小写)
  127. df_clean = df.withColumn(
  128. 'clean_text',
  129. lower(trim(regexp_replace(col(text_column), '[^a-zA-Z0-9\\s]', ' ')))
  130. )
  131. # 2. 分割为单词
  132. df_words = df_clean.withColumn(
  133. 'word',
  134. explode(split(col('clean_text'), '\\s+'))
  135. )
  136. # 3. 过滤空单词
  137. df_filtered = df_words.filter(col('word') != '')
  138. # 4. 按单词分组计数
  139. df_counts = df_filtered.groupBy('word').agg(count('*').alias('count'))
  140. # 5. 收集结果
  141. result = {row['word']: row['count'] for row in df_counts.collect()}
  142. return result
  143. def run(self, input_path: str, output_path: Optional[str] = None,
  144. use_dataframe: bool = True) -> Dict[str, int]:
  145. """
  146. 运行完整的 WordCount 作业
  147. Args:
  148. input_path: 输入路径(可以是本地文件路径或 HDFS 路径)
  149. output_path: 输出路径(可选,如果指定则保存结果)
  150. use_dataframe: 是否使用 DataFrame API(否则使用 RDD API)
  151. Returns:
  152. 单词计数字典
  153. """
  154. self._init_spark()
  155. self.logger.info(f"Running WordCount job on: {input_path}")
  156. if use_dataframe:
  157. # 使用 DataFrame API
  158. df = self.spark.read.text(input_path)
  159. result = self.count_words_from_dataframe(df)
  160. else:
  161. # 使用 RDD API
  162. text_rdd = self.sc.textFile(input_path)
  163. result = self.count_words_from_rdd(text_rdd)
  164. # 保存结果(如果指定了输出路径)
  165. if output_path:
  166. self._save_result(result, output_path)
  167. # 打印统计信息
  168. self._print_statistics(result)
  169. return result
  170. def _save_result(self, result: Dict[str, int], output_path: str):
  171. """
  172. 保存结果到文件
  173. Args:
  174. result: 单词计数字典
  175. output_path: 输出路径
  176. """
  177. self.logger.info(f"Saving results to: {output_path}")
  178. # 转换为 RDD 并保存
  179. result_rdd = self.sc.parallelize([
  180. f"{word}\t{count}"
  181. for word, count in sorted(result.items())
  182. ])
  183. result_rdd.saveAsTextFile(output_path)
  184. self.logger.info(f"Results saved to: {output_path}")
  185. def _print_statistics(self, result: Dict[str, int]):
  186. """
  187. 打印统计信息
  188. Args:
  189. result: 单词计数字典
  190. """
  191. if not result:
  192. self.logger.info("No words found")
  193. return
  194. total_words = sum(result.values())
  195. unique_words = len(result)
  196. sorted_words = sorted(result.items(), key=lambda x: x[1], reverse=True)
  197. self.logger.info("=" * 50)
  198. self.logger.info("WordCount Statistics")
  199. self.logger.info("=" * 50)
  200. self.logger.info(f"Total words: {total_words}")
  201. self.logger.info(f"Unique words: {unique_words}")
  202. self.logger.info("-" * 50)
  203. self.logger.info("Top 10 words:")
  204. for i, (word, count) in enumerate(sorted_words[:10], 1):
  205. percentage = (count / total_words) * 100
  206. self.logger.info(f" {i:2d}. {word:15s} {count:5d} ({percentage:5.1f}%)")
  207. self.logger.info("=" * 50)
  208. def count_words_locally(self, text: str) -> Dict[str, int]:
  209. """
  210. 本地统计单词(不使用 Spark 集群)
  211. 用于测试和小规模数据处理。
  212. Args:
  213. text: 输入文本
  214. Returns:
  215. 单词计数字典
  216. Example:
  217. >>> wc = WordCountSpark()
  218. >>> wc.count_words_locally("hello world hello")
  219. {'hello': 2, 'world': 1}
  220. """
  221. word_counts = defaultdict(int)
  222. for line in text.split('\n'):
  223. words = self._split_line(line)
  224. for word in words:
  225. word_counts[word] += 1
  226. return dict(word_counts)
  227. def run_with_files(self, files: List[str], output_path: Optional[str] = None) -> Dict[str, int]:
  228. """
  229. 对多个文件运行词频统计
  230. Args:
  231. files: 文件路径列表
  232. output_path: 输出路径(可选)
  233. Returns:
  234. 单词计数字典
  235. """
  236. # 合并所有文件的内容
  237. all_text = ""
  238. for file_path in files:
  239. try:
  240. with open(file_path, 'r', encoding='utf-8') as f:
  241. all_text += f.read() + "\n"
  242. except Exception as e:
  243. self.logger.warning(f"Failed to read file {file_path}: {e}")
  244. # 本地统计
  245. result = self.count_words_locally(all_text)
  246. # 保存结果
  247. if output_path:
  248. with open(output_path, 'w', encoding='utf-8') as f:
  249. for word, count in sorted(result.items()):
  250. f.write(f"{word}\t{count}\n")
  251. # 打印统计信息
  252. self._print_statistics(result)
  253. return result
  254. def main():
  255. """
  256. 主函数:作为独立脚本运行
  257. 使用方式:
  258. python wordcount_spark.py <input_path> [output_path]
  259. """
  260. if len(sys.argv) < 2:
  261. print("Usage: python wordcount_spark.py <input_path> [output_path]")
  262. print("Examples:")
  263. print(" python wordcount_spark.py input.txt")
  264. print(" python wordcount_spark.py hdfs:///user/hadoop/data output")
  265. print(" python wordcount_spark.py --local input.txt output.txt")
  266. sys.exit(1)
  267. # 解析参数
  268. use_local = False
  269. input_path = None
  270. output_path = None
  271. i = 1
  272. while i < len(sys.argv):
  273. arg = sys.argv[i]
  274. if arg == '--local':
  275. use_local = True
  276. elif input_path is None:
  277. input_path = arg
  278. else:
  279. output_path = arg
  280. i += 1
  281. if input_path is None:
  282. print("Error: Input path is required")
  283. sys.exit(1)
  284. wc = WordCountSpark()
  285. try:
  286. if use_local:
  287. # 本地模式(不使用 Spark)
  288. result = wc.run_with_files([input_path], output_path)
  289. else:
  290. # Spark 模式
  291. result = wc.run(input_path, output_path)
  292. # 打印结果
  293. print("\nFinal results:")
  294. for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True)[:20]:
  295. print(f"{word}: {count}")
  296. finally:
  297. wc.stop()
  298. if __name__ == '__main__':
  299. main()