| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- """
- Hadoop Tools 示例脚本
- 演示如何使用 Python 版本的 Hadoop 工具包。
- """
- import sys
- import os
- # 添加项目路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from python.hdfs_operations import HDFSOperations
- from python.mapreduce.wordcount_streaming import WordCountStreaming
- from python.mapreduce.wordcount_spark import WordCountSpark
- from python.utils.helpers import format_file_size
- def example_hdfs_operations():
- """
- 示例:HDFS 操作
- """
- print("\n" + "=" * 60)
- print("示例 1: HDFS 操作 (HDFSOperations)")
- print("=" * 60)
-
- hdfs = HDFSOperations()
-
- # 由于我们可能没有实际的 Hadoop 环境,这里只演示 API 的使用
- # 实际使用时需要配置 Hadoop 环境
-
- print("\n1. 验证 HDFS 路径格式:")
- test_paths = [
- '/user/hadoop/data',
- 'invalid/path',
- '/user//hadoop',
- '/user/hadoop/data/file.txt'
- ]
-
- for path in test_paths:
- is_valid = hdfs._validate_hdfs_path if hasattr(hdfs, '_validate_hdfs_path') else hdfs.exists # 实际使用 exists 方法
- # 这里使用本地验证方法
- from python.utils.helpers import validate_hdfs_path
- valid = validate_hdfs_path(path)
- print(f" 路径 '{path}': {'有效' if valid else '无效'}")
-
- print("\n2. 格式化文件大小:")
- sizes = [1024, 1024*1024, 1024*1024*1024, 1024*1024*1024*1024]
- for size in sizes:
- print(f" {size} 字节 = {format_file_size(size)}")
-
- print("\n3. HDFS 操作方法列表:")
- methods = [
- ('make_dir', '创建目录'),
- ('delete', '删除文件/目录'),
- ('copy_from_local', '从本地上传文件到 HDFS'),
- ('copy_to_local', '从 HDFS 下载文件到本地'),
- ('read_file', '读取 HDFS 文件内容'),
- ('write_file', '写入内容到 HDFS 文件'),
- ('exists', '检查路径是否存在'),
- ('list_dir', '列出目录内容'),
- ('get_file_size', '获取文件大小')
- ]
-
- for method, desc in methods:
- print(f" - {method}(): {desc}")
-
- print("\n注意:实际运行 HDFS 操作需要配置 Hadoop 环境变量。")
- def example_wordcount_streaming():
- """
- 示例:Hadoop Streaming 词频统计
- """
- print("\n" + "=" * 60)
- print("示例 2: Hadoop Streaming 词频统计 (WordCountStreaming)")
- print("=" * 60)
-
- wc = WordCountStreaming()
-
- print("\n1. Mapper 功能演示:")
- test_line = "Hello world, hello Hadoop! Hadoop is great."
- print(f" 输入行: '{test_line}'")
- mapper_output = wc.mapper(test_line)
- print(f" Mapper 输出: {mapper_output}")
-
- print("\n2. Combiner 功能演示:")
- print(f" 输入: {mapper_output}")
- combiner_output = wc.combiner(mapper_output)
- print(f" Combiner 输出: {combiner_output}")
-
- print("\n3. Reducer 功能演示:")
- test_word = 'hadoop'
- test_counts = [1, 1, 1, 1]
- print(f" 单词: '{test_word}', 计数列表: {test_counts}")
- reducer_output = wc.reducer(test_word, test_counts)
- print(f" Reducer 输出: {reducer_output}")
-
- print("\n4. 本地词频统计演示:")
- sample_text = """
- Hadoop is a framework for distributed storage and processing of big data.
- Big data is data that contains greater variety.
- Hadoop provides massive storage for any kind of data.
- """
- print(f" 输入文本:")
- for line in sample_text.strip().split('\n'):
- print(f" {line.strip()}")
-
- result = wc.count_words_locally(sample_text)
-
- print("\n 统计结果 (按词频排序):")
- sorted_result = sorted(result.items(), key=lambda x: x[1], reverse=True)
- for word, count in sorted_result[:10]:
- print(f" {word}: {count}")
- def example_wordcount_spark():
- """
- 示例:PySpark 词频统计
- """
- print("\n" + "=" * 60)
- print("示例 3: PySpark 词频统计 (WordCountSpark)")
- print("=" * 60)
-
- wc = WordCountSpark()
-
- print("\n1. 本地词频统计演示 (不使用 Spark 集群):")
-
- # 使用示例数据文件
- sample_file = os.path.join(os.path.dirname(__file__), 'sample_data.txt')
-
- if os.path.exists(sample_file):
- print(f" 输入文件: {sample_file}")
-
- # 读取文件内容
- with open(sample_file, 'r', encoding='utf-8') as f:
- content = f.read()
-
- # 本地统计
- result = wc.count_words_locally(content)
-
- print("\n 统计结果 (Top 15):")
- sorted_result = sorted(result.items(), key=lambda x: x[1], reverse=True)
-
- total_words = sum(result.values())
- print(f" 总词数: {total_words}")
- print(f" 不同词数: {len(result)}")
- print(" -" * 30)
-
- for i, (word, count) in enumerate(sorted_result[:15], 1):
- percentage = (count / total_words) * 100
- print(f" {i:2d}. {word:12s} {count:3d} ({percentage:4.1f}%)")
- else:
- print(f" 示例文件不存在: {sample_file}")
- # 使用内置文本
- sample_text = """
- Spark is a fast and general engine for large-scale data processing.
- Spark provides high-level APIs in Java, Scala, Python and R.
- Spark also supports a rich set of higher-level tools including Spark SQL.
- """
- result = wc.count_words_locally(sample_text)
-
- print("\n 统计结果:")
- for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True):
- print(f" {word}: {count}")
-
- print("\n2. WordCountSpark 方法列表:")
- methods = [
- ('run', '运行完整的词频统计作业(支持本地文件和 HDFS)'),
- ('count_words_from_rdd', '使用 RDD API 统计单词'),
- ('count_words_from_dataframe', '使用 DataFrame API 统计单词'),
- ('count_words_locally', '本地统计单词(不使用 Spark 集群)'),
- ('run_with_files', '对多个文件运行词频统计'),
- ('stop', '停止 Spark 会话')
- ]
-
- for method, desc in methods:
- print(f" - {method}(): {desc}")
-
- print("\n注意:使用 Spark 功能需要安装 PySpark: pip install pyspark")
- def main():
- """
- 主函数:运行所有示例
- """
- print("\n" + "#" * 60)
- print("# Hadoop Tools - Python 版本示例")
- print("# 提供 Hadoop 数据分析能力")
- print("#" * 60)
-
- print("\n项目结构:")
- print(" python/")
- print(" ├── __init__.py # 模块入口")
- print(" ├── hdfs_operations.py # HDFS 操作功能")
- print(" ├── mapreduce/")
- print(" │ ├── __init__.py")
- print(" │ ├── wordcount_streaming.py # Hadoop Streaming 方式")
- print(" │ └── wordcount_spark.py # PySpark 方式")
- print(" └── utils/")
- print(" ├── __init__.py")
- print(" └── helpers.py # 工具函数")
-
- # 运行示例
- example_hdfs_operations()
- example_wordcount_streaming()
- example_wordcount_spark()
-
- print("\n" + "#" * 60)
- print("# 示例运行完成!")
- print("# 请查看上方输出了解各模块的使用方法。")
- print("#" * 60)
- if __name__ == '__main__':
- main()
|