run_examples.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. """
  2. Hadoop Tools 示例脚本
  3. 演示如何使用 Python 版本的 Hadoop 工具包。
  4. """
  5. import sys
  6. import os
  7. # 添加项目路径
  8. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  9. from python.hdfs_operations import HDFSOperations
  10. from python.mapreduce.wordcount_streaming import WordCountStreaming
  11. from python.mapreduce.wordcount_spark import WordCountSpark
  12. from python.utils.helpers import format_file_size
  13. def example_hdfs_operations():
  14. """
  15. 示例:HDFS 操作
  16. """
  17. print("\n" + "=" * 60)
  18. print("示例 1: HDFS 操作 (HDFSOperations)")
  19. print("=" * 60)
  20. hdfs = HDFSOperations()
  21. # 由于我们可能没有实际的 Hadoop 环境,这里只演示 API 的使用
  22. # 实际使用时需要配置 Hadoop 环境
  23. print("\n1. 验证 HDFS 路径格式:")
  24. test_paths = [
  25. '/user/hadoop/data',
  26. 'invalid/path',
  27. '/user//hadoop',
  28. '/user/hadoop/data/file.txt'
  29. ]
  30. for path in test_paths:
  31. is_valid = hdfs._validate_hdfs_path if hasattr(hdfs, '_validate_hdfs_path') else hdfs.exists # 实际使用 exists 方法
  32. # 这里使用本地验证方法
  33. from python.utils.helpers import validate_hdfs_path
  34. valid = validate_hdfs_path(path)
  35. print(f" 路径 '{path}': {'有效' if valid else '无效'}")
  36. print("\n2. 格式化文件大小:")
  37. sizes = [1024, 1024*1024, 1024*1024*1024, 1024*1024*1024*1024]
  38. for size in sizes:
  39. print(f" {size} 字节 = {format_file_size(size)}")
  40. print("\n3. HDFS 操作方法列表:")
  41. methods = [
  42. ('make_dir', '创建目录'),
  43. ('delete', '删除文件/目录'),
  44. ('copy_from_local', '从本地上传文件到 HDFS'),
  45. ('copy_to_local', '从 HDFS 下载文件到本地'),
  46. ('read_file', '读取 HDFS 文件内容'),
  47. ('write_file', '写入内容到 HDFS 文件'),
  48. ('exists', '检查路径是否存在'),
  49. ('list_dir', '列出目录内容'),
  50. ('get_file_size', '获取文件大小')
  51. ]
  52. for method, desc in methods:
  53. print(f" - {method}(): {desc}")
  54. print("\n注意:实际运行 HDFS 操作需要配置 Hadoop 环境变量。")
  55. def example_wordcount_streaming():
  56. """
  57. 示例:Hadoop Streaming 词频统计
  58. """
  59. print("\n" + "=" * 60)
  60. print("示例 2: Hadoop Streaming 词频统计 (WordCountStreaming)")
  61. print("=" * 60)
  62. wc = WordCountStreaming()
  63. print("\n1. Mapper 功能演示:")
  64. test_line = "Hello world, hello Hadoop! Hadoop is great."
  65. print(f" 输入行: '{test_line}'")
  66. mapper_output = wc.mapper(test_line)
  67. print(f" Mapper 输出: {mapper_output}")
  68. print("\n2. Combiner 功能演示:")
  69. print(f" 输入: {mapper_output}")
  70. combiner_output = wc.combiner(mapper_output)
  71. print(f" Combiner 输出: {combiner_output}")
  72. print("\n3. Reducer 功能演示:")
  73. test_word = 'hadoop'
  74. test_counts = [1, 1, 1, 1]
  75. print(f" 单词: '{test_word}', 计数列表: {test_counts}")
  76. reducer_output = wc.reducer(test_word, test_counts)
  77. print(f" Reducer 输出: {reducer_output}")
  78. print("\n4. 本地词频统计演示:")
  79. sample_text = """
  80. Hadoop is a framework for distributed storage and processing of big data.
  81. Big data is data that contains greater variety.
  82. Hadoop provides massive storage for any kind of data.
  83. """
  84. print(f" 输入文本:")
  85. for line in sample_text.strip().split('\n'):
  86. print(f" {line.strip()}")
  87. result = wc.count_words_locally(sample_text)
  88. print("\n 统计结果 (按词频排序):")
  89. sorted_result = sorted(result.items(), key=lambda x: x[1], reverse=True)
  90. for word, count in sorted_result[:10]:
  91. print(f" {word}: {count}")
  92. def example_wordcount_spark():
  93. """
  94. 示例:PySpark 词频统计
  95. """
  96. print("\n" + "=" * 60)
  97. print("示例 3: PySpark 词频统计 (WordCountSpark)")
  98. print("=" * 60)
  99. wc = WordCountSpark()
  100. print("\n1. 本地词频统计演示 (不使用 Spark 集群):")
  101. # 使用示例数据文件
  102. sample_file = os.path.join(os.path.dirname(__file__), 'sample_data.txt')
  103. if os.path.exists(sample_file):
  104. print(f" 输入文件: {sample_file}")
  105. # 读取文件内容
  106. with open(sample_file, 'r', encoding='utf-8') as f:
  107. content = f.read()
  108. # 本地统计
  109. result = wc.count_words_locally(content)
  110. print("\n 统计结果 (Top 15):")
  111. sorted_result = sorted(result.items(), key=lambda x: x[1], reverse=True)
  112. total_words = sum(result.values())
  113. print(f" 总词数: {total_words}")
  114. print(f" 不同词数: {len(result)}")
  115. print(" -" * 30)
  116. for i, (word, count) in enumerate(sorted_result[:15], 1):
  117. percentage = (count / total_words) * 100
  118. print(f" {i:2d}. {word:12s} {count:3d} ({percentage:4.1f}%)")
  119. else:
  120. print(f" 示例文件不存在: {sample_file}")
  121. # 使用内置文本
  122. sample_text = """
  123. Spark is a fast and general engine for large-scale data processing.
  124. Spark provides high-level APIs in Java, Scala, Python and R.
  125. Spark also supports a rich set of higher-level tools including Spark SQL.
  126. """
  127. result = wc.count_words_locally(sample_text)
  128. print("\n 统计结果:")
  129. for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True):
  130. print(f" {word}: {count}")
  131. print("\n2. WordCountSpark 方法列表:")
  132. methods = [
  133. ('run', '运行完整的词频统计作业(支持本地文件和 HDFS)'),
  134. ('count_words_from_rdd', '使用 RDD API 统计单词'),
  135. ('count_words_from_dataframe', '使用 DataFrame API 统计单词'),
  136. ('count_words_locally', '本地统计单词(不使用 Spark 集群)'),
  137. ('run_with_files', '对多个文件运行词频统计'),
  138. ('stop', '停止 Spark 会话')
  139. ]
  140. for method, desc in methods:
  141. print(f" - {method}(): {desc}")
  142. print("\n注意:使用 Spark 功能需要安装 PySpark: pip install pyspark")
  143. def main():
  144. """
  145. 主函数:运行所有示例
  146. """
  147. print("\n" + "#" * 60)
  148. print("# Hadoop Tools - Python 版本示例")
  149. print("# 提供 Hadoop 数据分析能力")
  150. print("#" * 60)
  151. print("\n项目结构:")
  152. print(" python/")
  153. print(" ├── __init__.py # 模块入口")
  154. print(" ├── hdfs_operations.py # HDFS 操作功能")
  155. print(" ├── mapreduce/")
  156. print(" │ ├── __init__.py")
  157. print(" │ ├── wordcount_streaming.py # Hadoop Streaming 方式")
  158. print(" │ └── wordcount_spark.py # PySpark 方式")
  159. print(" └── utils/")
  160. print(" ├── __init__.py")
  161. print(" └── helpers.py # 工具函数")
  162. # 运行示例
  163. example_hdfs_operations()
  164. example_wordcount_streaming()
  165. example_wordcount_spark()
  166. print("\n" + "#" * 60)
  167. print("# 示例运行完成!")
  168. print("# 请查看上方输出了解各模块的使用方法。")
  169. print("#" * 60)
  170. if __name__ == '__main__':
  171. main()