""" Hadoop Tools 示例脚本 演示如何使用 Python 版本的 Hadoop 工具包。 """ import sys import os # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from python.hdfs_operations import HDFSOperations from python.mapreduce.wordcount_streaming import WordCountStreaming from python.mapreduce.wordcount_spark import WordCountSpark from python.utils.helpers import format_file_size def example_hdfs_operations(): """ 示例:HDFS 操作 """ print("\n" + "=" * 60) print("示例 1: HDFS 操作 (HDFSOperations)") print("=" * 60) hdfs = HDFSOperations() # 由于我们可能没有实际的 Hadoop 环境,这里只演示 API 的使用 # 实际使用时需要配置 Hadoop 环境 print("\n1. 验证 HDFS 路径格式:") test_paths = [ '/user/hadoop/data', 'invalid/path', '/user//hadoop', '/user/hadoop/data/file.txt' ] for path in test_paths: is_valid = hdfs._validate_hdfs_path if hasattr(hdfs, '_validate_hdfs_path') else hdfs.exists # 实际使用 exists 方法 # 这里使用本地验证方法 from python.utils.helpers import validate_hdfs_path valid = validate_hdfs_path(path) print(f" 路径 '{path}': {'有效' if valid else '无效'}") print("\n2. 格式化文件大小:") sizes = [1024, 1024*1024, 1024*1024*1024, 1024*1024*1024*1024] for size in sizes: print(f" {size} 字节 = {format_file_size(size)}") print("\n3. HDFS 操作方法列表:") methods = [ ('make_dir', '创建目录'), ('delete', '删除文件/目录'), ('copy_from_local', '从本地上传文件到 HDFS'), ('copy_to_local', '从 HDFS 下载文件到本地'), ('read_file', '读取 HDFS 文件内容'), ('write_file', '写入内容到 HDFS 文件'), ('exists', '检查路径是否存在'), ('list_dir', '列出目录内容'), ('get_file_size', '获取文件大小') ] for method, desc in methods: print(f" - {method}(): {desc}") print("\n注意:实际运行 HDFS 操作需要配置 Hadoop 环境变量。") def example_wordcount_streaming(): """ 示例:Hadoop Streaming 词频统计 """ print("\n" + "=" * 60) print("示例 2: Hadoop Streaming 词频统计 (WordCountStreaming)") print("=" * 60) wc = WordCountStreaming() print("\n1. Mapper 功能演示:") test_line = "Hello world, hello Hadoop! Hadoop is great." print(f" 输入行: '{test_line}'") mapper_output = wc.mapper(test_line) print(f" Mapper 输出: {mapper_output}") print("\n2. Combiner 功能演示:") print(f" 输入: {mapper_output}") combiner_output = wc.combiner(mapper_output) print(f" Combiner 输出: {combiner_output}") print("\n3. Reducer 功能演示:") test_word = 'hadoop' test_counts = [1, 1, 1, 1] print(f" 单词: '{test_word}', 计数列表: {test_counts}") reducer_output = wc.reducer(test_word, test_counts) print(f" Reducer 输出: {reducer_output}") print("\n4. 本地词频统计演示:") sample_text = """ Hadoop is a framework for distributed storage and processing of big data. Big data is data that contains greater variety. Hadoop provides massive storage for any kind of data. """ print(f" 输入文本:") for line in sample_text.strip().split('\n'): print(f" {line.strip()}") result = wc.count_words_locally(sample_text) print("\n 统计结果 (按词频排序):") sorted_result = sorted(result.items(), key=lambda x: x[1], reverse=True) for word, count in sorted_result[:10]: print(f" {word}: {count}") def example_wordcount_spark(): """ 示例:PySpark 词频统计 """ print("\n" + "=" * 60) print("示例 3: PySpark 词频统计 (WordCountSpark)") print("=" * 60) wc = WordCountSpark() print("\n1. 本地词频统计演示 (不使用 Spark 集群):") # 使用示例数据文件 sample_file = os.path.join(os.path.dirname(__file__), 'sample_data.txt') if os.path.exists(sample_file): print(f" 输入文件: {sample_file}") # 读取文件内容 with open(sample_file, 'r', encoding='utf-8') as f: content = f.read() # 本地统计 result = wc.count_words_locally(content) print("\n 统计结果 (Top 15):") sorted_result = sorted(result.items(), key=lambda x: x[1], reverse=True) total_words = sum(result.values()) print(f" 总词数: {total_words}") print(f" 不同词数: {len(result)}") print(" -" * 30) for i, (word, count) in enumerate(sorted_result[:15], 1): percentage = (count / total_words) * 100 print(f" {i:2d}. {word:12s} {count:3d} ({percentage:4.1f}%)") else: print(f" 示例文件不存在: {sample_file}") # 使用内置文本 sample_text = """ Spark is a fast and general engine for large-scale data processing. Spark provides high-level APIs in Java, Scala, Python and R. Spark also supports a rich set of higher-level tools including Spark SQL. """ result = wc.count_words_locally(sample_text) print("\n 统计结果:") for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True): print(f" {word}: {count}") print("\n2. WordCountSpark 方法列表:") methods = [ ('run', '运行完整的词频统计作业(支持本地文件和 HDFS)'), ('count_words_from_rdd', '使用 RDD API 统计单词'), ('count_words_from_dataframe', '使用 DataFrame API 统计单词'), ('count_words_locally', '本地统计单词(不使用 Spark 集群)'), ('run_with_files', '对多个文件运行词频统计'), ('stop', '停止 Spark 会话') ] for method, desc in methods: print(f" - {method}(): {desc}") print("\n注意:使用 Spark 功能需要安装 PySpark: pip install pyspark") def main(): """ 主函数:运行所有示例 """ print("\n" + "#" * 60) print("# Hadoop Tools - Python 版本示例") print("# 提供 Hadoop 数据分析能力") print("#" * 60) print("\n项目结构:") print(" python/") print(" ├── __init__.py # 模块入口") print(" ├── hdfs_operations.py # HDFS 操作功能") print(" ├── mapreduce/") print(" │ ├── __init__.py") print(" │ ├── wordcount_streaming.py # Hadoop Streaming 方式") print(" │ └── wordcount_spark.py # PySpark 方式") print(" └── utils/") print(" ├── __init__.py") print(" └── helpers.py # 工具函数") # 运行示例 example_hdfs_operations() example_wordcount_streaming() example_wordcount_spark() print("\n" + "#" * 60) print("# 示例运行完成!") print("# 请查看上方输出了解各模块的使用方法。") print("#" * 60) if __name__ == '__main__': main()