# hadoop-tools Hadoop 相关操作类,提供 Hadoop 数据分析能力。 ## 项目概述 本项目提供两种语言的实现: - **Java 版本**:原始实现,位于 `src/` 目录 - **Python 版本**:新实现,位于 `python/` 目录(推荐使用) ## Python 版本功能 ### 1. HDFS 文件系统操作 (`hdfs_operations.py`) 提供与 HDFS 交互的各种方法: | 方法 | 功能描述 | |------|----------| | `make_dir(path)` | 创建目录 | | `delete(path, recursive)` | 删除文件/目录 | | `copy_from_local(src, dst)` | 从本地上传文件到 HDFS | | `copy_to_local(src, dst)` | 从 HDFS 下载文件到本地 | | `read_file(path)` | 读取 HDFS 文件内容 | | `write_file(path, content)` | 写入内容到 HDFS 文件 | | `exists(path)` | 检查路径是否存在 | | `list_dir(path)` | 列出目录内容 | | `get_file_size(path)` | 获取文件大小 | **使用示例:** ```python from python.hdfs_operations import HDFSOperations hdfs = HDFSOperations() # 创建目录 hdfs.make_dir('/user/hadoop/data') # 上传文件 hdfs.copy_from_local('/local/path/file.txt', '/user/hadoop/data/') # 读取文件内容 content = hdfs.read_file('/user/hadoop/data/file.txt') # 检查文件是否存在 if hdfs.exists('/user/hadoop/data/file.txt'): print("文件存在") ``` ### 2. Hadoop Streaming 词频统计 (`mapreduce/wordcount_streaming.py`) 使用 Hadoop Streaming 方式实现经典的 WordCount 算法: **核心组件:** - **Mapper**: 将文本分割为单词,输出 `<单词, 1>` - **Combiner**: 在 Mapper 端进行本地合并,减少数据传输 - **Reducer**: 统计每个单词的总次数 **使用方式:** 1. **作为模块导入:** ```python from python.mapreduce.wordcount_streaming import WordCountStreaming wc = WordCountStreaming() # 本地统计(用于测试) result = wc.count_words_locally("Hello world, hello Hadoop!") print(result) # {'hello': 2, 'world': 1, 'hadoop': 1} # 提交 Hadoop Streaming 作业 wc.run('/user/hadoop/input', '/user/hadoop/output') ``` 2. **作为独立脚本运行:** ```bash # 运行 Mapper echo "Hello world hello" | python wordcount_streaming.py mapper # 运行 Reducer echo "hello\t1\nworld\t1\nhello\t1" | sort | python wordcount_streaming.py reducer # 本地测试 python wordcount_streaming.py local ``` ### 3. PySpark 词频统计 (`mapreduce/wordcount_spark.py`) 使用 PySpark 实现词频统计,这是现代大数据处理的推荐方式: **特性:** - 更简洁的 API - 更好的性能 - 支持 RDD 和 DataFrame 两种 API - 可以与 Spark SQL、MLlib 等集成 **使用方式:** 1. **作为模块导入:** ```python from python.mapreduce.wordcount_spark import WordCountSpark wc = WordCountSpark() # 本地统计(不使用 Spark 集群) result = wc.count_words_locally("Spark is fast and general engine") # 使用 Spark 运行(支持本地文件和 HDFS) result = wc.run('/user/hadoop/data/input.txt', '/user/hadoop/output') # 停止 Spark 会话 wc.stop() ``` 2. **作为独立脚本运行:** ```bash # 本地模式(不使用 Spark 集群) python wordcount_spark.py --local input.txt output.txt # Spark 模式 python wordcount_spark.py hdfs:///user/hadoop/data output ``` ## 工具函数 (`utils/helpers.py`) 提供常用的辅助功能: | 函数 | 功能描述 | |------|----------| | `run_command(cmd)` | 执行命令行命令 | | `validate_hdfs_path(path)` | 验证 HDFS 路径格式 | | `format_file_size(size_bytes)` | 格式化文件大小(B/KB/MB/GB/TB) | | `setup_logger(name)` | 设置日志器 | ## 安装依赖 ```bash # 安装基础依赖 pip install -r requirements.txt # 安装 PySpark(如果需要使用 Spark 功能) pip install pyspark ``` ## 运行示例 ```bash # 运行完整示例 python examples/run_examples.py # 测试 Hadoop Streaming 词频统计 python python/mapreduce/wordcount_streaming.py local # 测试 PySpark 词频统计(本地模式) python python/mapreduce/wordcount_spark.py --local examples/sample_data.txt ``` ## 项目结构 ``` hadoop-tools/ ├── python/ # Python 版本实现 │ ├── __init__.py # 模块入口 │ ├── hdfs_operations.py # HDFS 操作功能 │ ├── mapreduce/ │ │ ├── __init__.py │ │ ├── wordcount_streaming.py # Hadoop Streaming 方式 │ │ └── wordcount_spark.py # PySpark 方式 │ └── utils/ │ ├── __init__.py │ └── helpers.py # 工具函数 ├── examples/ # 示例文件 │ ├── sample_data.txt # 示例数据 │ └── run_examples.py # 示例运行脚本 ├── src/ # Java 版本实现 │ └── me/yoqi/hadoop/test/ │ ├── CommonOperation.java # HDFS 操作 │ └── WordCount.java # 词频统计 ├── R/ # R 语言相关(待实现) ├── pom.xml # Maven 配置(Java 项目) ├── requirements.txt # Python 依赖 └── README.md ``` ## 与 Java 版本的对应关系 | Java 类/方法 | Python 对应 | |--------------|-------------| | `CommonOperation.makeDir()` | `HDFSOperations.make_dir()` | | `CommonOperation.delDir()` | `HDFSOperations.delete(recursive=True)` | | `CommonOperation.delFile()` | `HDFSOperations.delete(recursive=False)` | | `CommonOperation.putFile()` | `HDFSOperations.copy_from_local()` | | `CommonOperation.readFile()` | `HDFSOperations.read_file()` | | `CommonOperation.writeFile()` | `HDFSOperations.write_file()` | | `WordCount.TokenizerMapper` | `WordCountStreaming.mapper()` | | `WordCount.IntSumReducer` | `WordCountStreaming.reducer()` | | `WordCount.main()` | `WordCountStreaming.run()` / `WordCountSpark.run()` | ## 注意事项 1. **HDFS 操作**:需要配置 Hadoop 环境变量,确保 `hdfs` 或 `hadoop` 命令可用 2. **Hadoop Streaming**:需要找到 Hadoop Streaming jar 文件,脚本会自动查找常见位置 3. **PySpark**:需要安装 PySpark 并配置 Spark 环境 4. **路径格式**:HDFS 路径必须以 `/` 开头,不能包含连续斜杠或非法字符 ## 许可证 详见 LICENSE 文件。