Browse Source

feat: implement hadoop tools in python with hdfs operations and mapreduce

This commit introduces a comprehensive Python implementation of Hadoop tools, including:
- HDFS operations (create/delete directories, upload/download files, read/write files)
- MapReduce implementations (Hadoop Streaming and PySpark wordcount)
- Utility functions for command execution, path validation, and logging
- Example scripts and documentation in README.md
liuyuqi-cnb 1 month ago
parent
commit
e211c241cc

+ 205 - 1
README.md

@@ -1,3 +1,207 @@
 # hadoop-tools
 
-hadoop相关操作类
+Hadoop 相关操作类,提供 Hadoop 数据分析能力。
+
+## 项目概述
+
+本项目提供两种语言的实现:
+- **Java 版本**:原始实现,位于 `src/` 目录
+- **Python 版本**:新实现,位于 `python/` 目录(推荐使用)
+
+## Python 版本功能
+
+### 1. HDFS 文件系统操作 (`hdfs_operations.py`)
+
+提供与 HDFS 交互的各种方法:
+
+| 方法 | 功能描述 |
+|------|----------|
+| `make_dir(path)` | 创建目录 |
+| `delete(path, recursive)` | 删除文件/目录 |
+| `copy_from_local(src, dst)` | 从本地上传文件到 HDFS |
+| `copy_to_local(src, dst)` | 从 HDFS 下载文件到本地 |
+| `read_file(path)` | 读取 HDFS 文件内容 |
+| `write_file(path, content)` | 写入内容到 HDFS 文件 |
+| `exists(path)` | 检查路径是否存在 |
+| `list_dir(path)` | 列出目录内容 |
+| `get_file_size(path)` | 获取文件大小 |
+
+**使用示例:**
+```python
+from python.hdfs_operations import HDFSOperations
+
+hdfs = HDFSOperations()
+
+# 创建目录
+hdfs.make_dir('/user/hadoop/data')
+
+# 上传文件
+hdfs.copy_from_local('/local/path/file.txt', '/user/hadoop/data/')
+
+# 读取文件内容
+content = hdfs.read_file('/user/hadoop/data/file.txt')
+
+# 检查文件是否存在
+if hdfs.exists('/user/hadoop/data/file.txt'):
+    print("文件存在")
+```
+
+### 2. Hadoop Streaming 词频统计 (`mapreduce/wordcount_streaming.py`)
+
+使用 Hadoop Streaming 方式实现经典的 WordCount 算法:
+
+**核心组件:**
+- **Mapper**: 将文本分割为单词,输出 `<单词, 1>`
+- **Combiner**: 在 Mapper 端进行本地合并,减少数据传输
+- **Reducer**: 统计每个单词的总次数
+
+**使用方式:**
+
+1. **作为模块导入:**
+```python
+from python.mapreduce.wordcount_streaming import WordCountStreaming
+
+wc = WordCountStreaming()
+
+# 本地统计(用于测试)
+result = wc.count_words_locally("Hello world, hello Hadoop!")
+print(result)  # {'hello': 2, 'world': 1, 'hadoop': 1}
+
+# 提交 Hadoop Streaming 作业
+wc.run('/user/hadoop/input', '/user/hadoop/output')
+```
+
+2. **作为独立脚本运行:**
+```bash
+# 运行 Mapper
+echo "Hello world hello" | python wordcount_streaming.py mapper
+
+# 运行 Reducer
+echo "hello\t1\nworld\t1\nhello\t1" | sort | python wordcount_streaming.py reducer
+
+# 本地测试
+python wordcount_streaming.py local
+```
+
+### 3. PySpark 词频统计 (`mapreduce/wordcount_spark.py`)
+
+使用 PySpark 实现词频统计,这是现代大数据处理的推荐方式:
+
+**特性:**
+- 更简洁的 API
+- 更好的性能
+- 支持 RDD 和 DataFrame 两种 API
+- 可以与 Spark SQL、MLlib 等集成
+
+**使用方式:**
+
+1. **作为模块导入:**
+```python
+from python.mapreduce.wordcount_spark import WordCountSpark
+
+wc = WordCountSpark()
+
+# 本地统计(不使用 Spark 集群)
+result = wc.count_words_locally("Spark is fast and general engine")
+
+# 使用 Spark 运行(支持本地文件和 HDFS)
+result = wc.run('/user/hadoop/data/input.txt', '/user/hadoop/output')
+
+# 停止 Spark 会话
+wc.stop()
+```
+
+2. **作为独立脚本运行:**
+```bash
+# 本地模式(不使用 Spark 集群)
+python wordcount_spark.py --local input.txt output.txt
+
+# Spark 模式
+python wordcount_spark.py hdfs:///user/hadoop/data output
+```
+
+## 工具函数 (`utils/helpers.py`)
+
+提供常用的辅助功能:
+
+| 函数 | 功能描述 |
+|------|----------|
+| `run_command(cmd)` | 执行命令行命令 |
+| `validate_hdfs_path(path)` | 验证 HDFS 路径格式 |
+| `format_file_size(size_bytes)` | 格式化文件大小(B/KB/MB/GB/TB) |
+| `setup_logger(name)` | 设置日志器 |
+
+## 安装依赖
+
+```bash
+# 安装基础依赖
+pip install -r requirements.txt
+
+# 安装 PySpark(如果需要使用 Spark 功能)
+pip install pyspark
+```
+
+## 运行示例
+
+```bash
+# 运行完整示例
+python examples/run_examples.py
+
+# 测试 Hadoop Streaming 词频统计
+python python/mapreduce/wordcount_streaming.py local
+
+# 测试 PySpark 词频统计(本地模式)
+python python/mapreduce/wordcount_spark.py --local examples/sample_data.txt
+```
+
+## 项目结构
+
+```
+hadoop-tools/
+├── python/                    # Python 版本实现
+│   ├── __init__.py           # 模块入口
+│   ├── hdfs_operations.py    # HDFS 操作功能
+│   ├── mapreduce/
+│   │   ├── __init__.py
+│   │   ├── wordcount_streaming.py  # Hadoop Streaming 方式
+│   │   └── wordcount_spark.py      # PySpark 方式
+│   └── utils/
+│       ├── __init__.py
+│       └── helpers.py         # 工具函数
+├── examples/                  # 示例文件
+│   ├── sample_data.txt       # 示例数据
+│   └── run_examples.py       # 示例运行脚本
+├── src/                       # Java 版本实现
+│   └── me/yoqi/hadoop/test/
+│       ├── CommonOperation.java  # HDFS 操作
+│       └── WordCount.java        # 词频统计
+├── R/                         # R 语言相关(待实现)
+├── pom.xml                   # Maven 配置(Java 项目)
+├── requirements.txt          # Python 依赖
+└── README.md
+```
+
+## 与 Java 版本的对应关系
+
+| Java 类/方法 | Python 对应 |
+|--------------|-------------|
+| `CommonOperation.makeDir()` | `HDFSOperations.make_dir()` |
+| `CommonOperation.delDir()` | `HDFSOperations.delete(recursive=True)` |
+| `CommonOperation.delFile()` | `HDFSOperations.delete(recursive=False)` |
+| `CommonOperation.putFile()` | `HDFSOperations.copy_from_local()` |
+| `CommonOperation.readFile()` | `HDFSOperations.read_file()` |
+| `CommonOperation.writeFile()` | `HDFSOperations.write_file()` |
+| `WordCount.TokenizerMapper` | `WordCountStreaming.mapper()` |
+| `WordCount.IntSumReducer` | `WordCountStreaming.reducer()` |
+| `WordCount.main()` | `WordCountStreaming.run()` / `WordCountSpark.run()` |
+
+## 注意事项
+
+1. **HDFS 操作**:需要配置 Hadoop 环境变量,确保 `hdfs` 或 `hadoop` 命令可用
+2. **Hadoop Streaming**:需要找到 Hadoop Streaming jar 文件,脚本会自动查找常见位置
+3. **PySpark**:需要安装 PySpark 并配置 Spark 环境
+4. **路径格式**:HDFS 路径必须以 `/` 开头,不能包含连续斜杠或非法字符
+
+## 许可证
+
+详见 LICENSE 文件。

+ 216 - 0
examples/run_examples.py

@@ -0,0 +1,216 @@
+"""
+Hadoop Tools 示例脚本
+
+演示如何使用 Python 版本的 Hadoop 工具包。
+"""
+
+import sys
+import os
+
+# 添加项目路径
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from python.hdfs_operations import HDFSOperations
+from python.mapreduce.wordcount_streaming import WordCountStreaming
+from python.mapreduce.wordcount_spark import WordCountSpark
+from python.utils.helpers import format_file_size
+
+
+def example_hdfs_operations():
+    """
+    示例:HDFS 操作
+    """
+    print("\n" + "=" * 60)
+    print("示例 1: HDFS 操作 (HDFSOperations)")
+    print("=" * 60)
+    
+    hdfs = HDFSOperations()
+    
+    # 由于我们可能没有实际的 Hadoop 环境,这里只演示 API 的使用
+    # 实际使用时需要配置 Hadoop 环境
+    
+    print("\n1. 验证 HDFS 路径格式:")
+    test_paths = [
+        '/user/hadoop/data',
+        'invalid/path',
+        '/user//hadoop',
+        '/user/hadoop/data/file.txt'
+    ]
+    
+    for path in test_paths:
+        is_valid = hdfs._validate_hdfs_path if hasattr(hdfs, '_validate_hdfs_path') else hdfs.exists  # 实际使用 exists 方法
+        # 这里使用本地验证方法
+        from python.utils.helpers import validate_hdfs_path
+        valid = validate_hdfs_path(path)
+        print(f"   路径 '{path}': {'有效' if valid else '无效'}")
+    
+    print("\n2. 格式化文件大小:")
+    sizes = [1024, 1024*1024, 1024*1024*1024, 1024*1024*1024*1024]
+    for size in sizes:
+        print(f"   {size} 字节 = {format_file_size(size)}")
+    
+    print("\n3. HDFS 操作方法列表:")
+    methods = [
+        ('make_dir', '创建目录'),
+        ('delete', '删除文件/目录'),
+        ('copy_from_local', '从本地上传文件到 HDFS'),
+        ('copy_to_local', '从 HDFS 下载文件到本地'),
+        ('read_file', '读取 HDFS 文件内容'),
+        ('write_file', '写入内容到 HDFS 文件'),
+        ('exists', '检查路径是否存在'),
+        ('list_dir', '列出目录内容'),
+        ('get_file_size', '获取文件大小')
+    ]
+    
+    for method, desc in methods:
+        print(f"   - {method}(): {desc}")
+    
+    print("\n注意:实际运行 HDFS 操作需要配置 Hadoop 环境变量。")
+
+
+def example_wordcount_streaming():
+    """
+    示例:Hadoop Streaming 词频统计
+    """
+    print("\n" + "=" * 60)
+    print("示例 2: Hadoop Streaming 词频统计 (WordCountStreaming)")
+    print("=" * 60)
+    
+    wc = WordCountStreaming()
+    
+    print("\n1. Mapper 功能演示:")
+    test_line = "Hello world, hello Hadoop! Hadoop is great."
+    print(f"   输入行: '{test_line}'")
+    mapper_output = wc.mapper(test_line)
+    print(f"   Mapper 输出: {mapper_output}")
+    
+    print("\n2. Combiner 功能演示:")
+    print(f"   输入: {mapper_output}")
+    combiner_output = wc.combiner(mapper_output)
+    print(f"   Combiner 输出: {combiner_output}")
+    
+    print("\n3. Reducer 功能演示:")
+    test_word = 'hadoop'
+    test_counts = [1, 1, 1, 1]
+    print(f"   单词: '{test_word}', 计数列表: {test_counts}")
+    reducer_output = wc.reducer(test_word, test_counts)
+    print(f"   Reducer 输出: {reducer_output}")
+    
+    print("\n4. 本地词频统计演示:")
+    sample_text = """
+    Hadoop is a framework for distributed storage and processing of big data.
+    Big data is data that contains greater variety.
+    Hadoop provides massive storage for any kind of data.
+    """
+    print(f"   输入文本:")
+    for line in sample_text.strip().split('\n'):
+        print(f"      {line.strip()}")
+    
+    result = wc.count_words_locally(sample_text)
+    
+    print("\n   统计结果 (按词频排序):")
+    sorted_result = sorted(result.items(), key=lambda x: x[1], reverse=True)
+    for word, count in sorted_result[:10]:
+        print(f"      {word}: {count}")
+
+
+def example_wordcount_spark():
+    """
+    示例:PySpark 词频统计
+    """
+    print("\n" + "=" * 60)
+    print("示例 3: PySpark 词频统计 (WordCountSpark)")
+    print("=" * 60)
+    
+    wc = WordCountSpark()
+    
+    print("\n1. 本地词频统计演示 (不使用 Spark 集群):")
+    
+    # 使用示例数据文件
+    sample_file = os.path.join(os.path.dirname(__file__), 'sample_data.txt')
+    
+    if os.path.exists(sample_file):
+        print(f"   输入文件: {sample_file}")
+        
+        # 读取文件内容
+        with open(sample_file, 'r', encoding='utf-8') as f:
+            content = f.read()
+        
+        # 本地统计
+        result = wc.count_words_locally(content)
+        
+        print("\n   统计结果 (Top 15):")
+        sorted_result = sorted(result.items(), key=lambda x: x[1], reverse=True)
+        
+        total_words = sum(result.values())
+        print(f"   总词数: {total_words}")
+        print(f"   不同词数: {len(result)}")
+        print("   -" * 30)
+        
+        for i, (word, count) in enumerate(sorted_result[:15], 1):
+            percentage = (count / total_words) * 100
+            print(f"   {i:2d}. {word:12s} {count:3d} ({percentage:4.1f}%)")
+    else:
+        print(f"   示例文件不存在: {sample_file}")
+        # 使用内置文本
+        sample_text = """
+        Spark is a fast and general engine for large-scale data processing.
+        Spark provides high-level APIs in Java, Scala, Python and R.
+        Spark also supports a rich set of higher-level tools including Spark SQL.
+        """
+        result = wc.count_words_locally(sample_text)
+        
+        print("\n   统计结果:")
+        for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True):
+            print(f"   {word}: {count}")
+    
+    print("\n2. WordCountSpark 方法列表:")
+    methods = [
+        ('run', '运行完整的词频统计作业(支持本地文件和 HDFS)'),
+        ('count_words_from_rdd', '使用 RDD API 统计单词'),
+        ('count_words_from_dataframe', '使用 DataFrame API 统计单词'),
+        ('count_words_locally', '本地统计单词(不使用 Spark 集群)'),
+        ('run_with_files', '对多个文件运行词频统计'),
+        ('stop', '停止 Spark 会话')
+    ]
+    
+    for method, desc in methods:
+        print(f"   - {method}(): {desc}")
+    
+    print("\n注意:使用 Spark 功能需要安装 PySpark: pip install pyspark")
+
+
+def main():
+    """
+    主函数:运行所有示例
+    """
+    print("\n" + "#" * 60)
+    print("# Hadoop Tools - Python 版本示例")
+    print("# 提供 Hadoop 数据分析能力")
+    print("#" * 60)
+    
+    print("\n项目结构:")
+    print("  python/")
+    print("    ├── __init__.py          # 模块入口")
+    print("    ├── hdfs_operations.py   # HDFS 操作功能")
+    print("    ├── mapreduce/")
+    print("    │   ├── __init__.py")
+    print("    │   ├── wordcount_streaming.py  # Hadoop Streaming 方式")
+    print("    │   └── wordcount_spark.py      # PySpark 方式")
+    print("    └── utils/")
+    print("        ├── __init__.py")
+    print("        └── helpers.py      # 工具函数")
+    
+    # 运行示例
+    example_hdfs_operations()
+    example_wordcount_streaming()
+    example_wordcount_spark()
+    
+    print("\n" + "#" * 60)
+    print("# 示例运行完成!")
+    print("# 请查看上方输出了解各模块的使用方法。")
+    print("#" * 60)
+
+
+if __name__ == '__main__':
+    main()

+ 10 - 0
examples/sample_data.txt

@@ -0,0 +1,10 @@
+Hello world, hello Hadoop!
+Hadoop is a framework for distributed storage and processing of big data.
+Big data is data that contains greater variety, arriving in increasing volumes and with more velocity.
+Hadoop provides massive storage for any kind of data.
+Hadoop also provides enormous processing power.
+With Hadoop, you can store and process billions of records.
+Hadoop ecosystem includes many tools like HDFS, MapReduce, YARN, Spark, etc.
+Spark is a fast and general engine for large-scale data processing.
+Spark can run on Hadoop, Mesos, standalone, or in the cloud.
+Spark provides high-level APIs in Java, Scala, Python, R, and SQL.

+ 27 - 0
python/__init__.py

@@ -0,0 +1,27 @@
+"""
+Hadoop Tools - Python 版本
+
+提供 Hadoop 数据分析能力,包括:
+- HDFS 文件系统操作
+- MapReduce 作业执行
+- 大数据处理工具
+
+模块结构:
+- hdfs_operations: HDFS 文件系统操作
+- mapreduce: MapReduce 作业实现
+  - wordcount_streaming: Hadoop Streaming 方式的词频统计
+  - wordcount_spark: PySpark 方式的词频统计
+- utils: 工具函数
+"""
+
+from .hdfs_operations import HDFSOperations
+from .mapreduce.wordcount_streaming import WordCountStreaming
+from .mapreduce.wordcount_spark import WordCountSpark
+
+__all__ = [
+    'HDFSOperations',
+    'WordCountStreaming',
+    'WordCountSpark'
+]
+
+__version__ = '0.1.0'

BIN
python/__pycache__/__init__.cpython-312.pyc


BIN
python/__pycache__/hdfs_operations.cpython-312.pyc


+ 365 - 0
python/hdfs_operations.py

@@ -0,0 +1,365 @@
+"""
+HDFS 文件系统操作模块
+
+提供与 Java 版本 CommonOperation 类相同的功能:
+- 创建目录
+- 删除目录/文件
+- 上传文件
+- 读写文件
+- 检查文件是否存在
+- 列出目录内容
+"""
+
+import os
+from typing import List, Optional, Tuple
+from .utils.helpers import run_command, validate_hdfs_path, setup_logger
+
+
+class HDFSOperations:
+    """
+    HDFS 文件系统操作类
+    
+    封装了 Hadoop 命令行工具,提供与 HDFS 交互的各种方法。
+    功能与 Java 版本的 CommonOperation 类相对应。
+    """
+    
+    def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'hdfs_operations'):
+        """
+        初始化 HDFSOperations 实例
+        
+        Args:
+            hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
+            logger_name: 日志器名称
+        """
+        self.logger = setup_logger(logger_name)
+        self.hadoop_home = hadoop_home or os.environ.get('HADOOP_HOME', '')
+        self.hadoop_cmd = 'hdfs' if self._check_command_exists('hdfs') else 'hadoop'
+        
+    def _check_command_exists(self, cmd: str) -> bool:
+        """
+        检查命令是否存在
+        
+        Args:
+            cmd: 命令名称
+            
+        Returns:
+            命令是否存在
+        """
+        return os.system(f'which {cmd} > /dev/null 2>&1') == 0
+    
+    def _execute_hdfs_command(self, subcommand: str, args: List[str] = None) -> Tuple[int, str, str]:
+        """
+        执行 HDFS 命令
+        
+        Args:
+            subcommand: HDFS 子命令(如 dfs, fs 等)
+            args: 命令参数列表
+            
+        Returns:
+            (return_code, stdout, stderr)
+        """
+        args = args or []
+        cmd = f"{self.hadoop_cmd} {subcommand} {' '.join(args)}"
+        self.logger.debug(f"Executing command: {cmd}")
+        return run_command(cmd)
+    
+    def make_dir(self, path: str) -> bool:
+        """
+        创建目录
+        
+        对应 Java 版本的 makeDir 方法。
+        
+        Args:
+            path: 要创建的目录路径
+            
+        Returns:
+            是否创建成功
+            
+        Example:
+            >>> hdfs = HDFSOperations()
+            >>> hdfs.make_dir('/user/root/test1')
+            True
+        """
+        if not validate_hdfs_path(path):
+            self.logger.error(f"Invalid HDFS path: {path}")
+            return False
+        
+        self.logger.info(f"Creating directory: {path}")
+        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-mkdir', '-p', path])
+        
+        if returncode == 0:
+            self.logger.info(f"Successfully created directory: {path}")
+            return True
+        else:
+            self.logger.error(f"Failed to create directory: {path}, Error: {stderr}")
+            return False
+    
+    def delete(self, path: str, recursive: bool = True) -> bool:
+        """
+        删除文件或目录
+        
+        对应 Java 版本的 delDir 和 delFile 方法。
+        
+        Args:
+            path: 要删除的路径
+            recursive: 是否递归删除(用于目录)
+            
+        Returns:
+            是否删除成功
+            
+        Example:
+            >>> hdfs = HDFSOperations()
+            >>> hdfs.delete('/user/hadoop/data/word.txt')
+            True
+        """
+        if not validate_hdfs_path(path):
+            self.logger.error(f"Invalid HDFS path: {path}")
+            return False
+        
+        self.logger.info(f"Deleting: {path}")
+        args = ['-rm', '-r'] if recursive else ['-rm']
+        args.append(path)
+        
+        returncode, stdout, stderr = self._execute_hdfs_command('dfs', args)
+        
+        if returncode == 0:
+            self.logger.info(f"Successfully deleted: {path}")
+            return True
+        else:
+            self.logger.error(f"Failed to delete: {path}, Error: {stderr}")
+            return False
+    
+    def copy_from_local(self, src: str, dst: str) -> bool:
+        """
+        从本地文件系统上传文件到 HDFS
+        
+        对应 Java 版本的 putFile 方法。
+        
+        Args:
+            src: 本地文件路径
+            dst: HDFS 目标路径
+            
+        Returns:
+            是否上传成功
+            
+        Example:
+            >>> hdfs = HDFSOperations()
+            >>> hdfs.copy_from_local('/home/hadoop/word.txt', '/user/hadoop/data/')
+            True
+        """
+        if not os.path.exists(src):
+            self.logger.error(f"Local file not found: {src}")
+            return False
+        
+        if not validate_hdfs_path(dst):
+            self.logger.error(f"Invalid HDFS path: {dst}")
+            return False
+        
+        self.logger.info(f"Copying from local {src} to HDFS {dst}")
+        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-copyFromLocal', src, dst])
+        
+        if returncode == 0:
+            self.logger.info(f"Successfully copied {src} to {dst}")
+            return True
+        else:
+            self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
+            return False
+    
+    def copy_to_local(self, src: str, dst: str) -> bool:
+        """
+        从 HDFS 下载文件到本地文件系统
+        
+        Args:
+            src: HDFS 源路径
+            dst: 本地目标路径
+            
+        Returns:
+            是否下载成功
+        """
+        if not validate_hdfs_path(src):
+            self.logger.error(f"Invalid HDFS path: {src}")
+            return False
+        
+        self.logger.info(f"Copying from HDFS {src} to local {dst}")
+        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-copyToLocal', src, dst])
+        
+        if returncode == 0:
+            self.logger.info(f"Successfully copied {src} to {dst}")
+            return True
+        else:
+            self.logger.error(f"Failed to copy {src} to {dst}, Error: {stderr}")
+            return False
+    
+    def read_file(self, path: str) -> Optional[str]:
+        """
+        读取 HDFS 文件内容
+        
+        对应 Java 版本的 readFile 方法。
+        
+        Args:
+            path: HDFS 文件路径
+            
+        Returns:
+            文件内容(字符串),如果失败返回 None
+            
+        Example:
+            >>> hdfs = HDFSOperations()
+            >>> content = hdfs.read_file('/user/hadoop/data/write.txt')
+            >>> print(content)
+            da jia hao,cai shi zhen de hao!
+        """
+        if not validate_hdfs_path(path):
+            self.logger.error(f"Invalid HDFS path: {path}")
+            return None
+        
+        if not self.exists(path):
+            self.logger.error(f"File does not exist: {path}")
+            return None
+        
+        self.logger.info(f"Reading file: {path}")
+        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-cat', path])
+        
+        if returncode == 0:
+            self.logger.info(f"Successfully read file: {path}")
+            return stdout
+        else:
+            self.logger.error(f"Failed to read file: {path}, Error: {stderr}")
+            return None
+    
+    def write_file(self, path: str, content: str, overwrite: bool = True) -> bool:
+        """
+        写入内容到 HDFS 文件
+        
+        对应 Java 版本的 writeFile 方法。
+        
+        Args:
+            path: HDFS 文件路径
+            content: 要写入的内容
+            overwrite: 是否覆盖已存在的文件
+            
+        Returns:
+            是否写入成功
+            
+        Example:
+            >>> hdfs = HDFSOperations()
+            >>> hdfs.write_file('/user/hadoop/data/write.txt', 'da jia hao,cai shi zhen de hao!')
+            True
+        """
+        if not validate_hdfs_path(path):
+            self.logger.error(f"Invalid HDFS path: {path}")
+            return False
+        
+        self.logger.info(f"Writing to file: {path}")
+        
+        # 创建临时文件
+        import tempfile
+        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
+            temp_file.write(content)
+            temp_path = temp_file.name
+        
+        try:
+            # 使用 put 命令上传临时文件
+            args = ['-put']
+            if overwrite:
+                args.append('-f')
+            args.extend([temp_path, path])
+            
+            returncode, stdout, stderr = self._execute_hdfs_command('dfs', args)
+            
+            if returncode == 0:
+                self.logger.info(f"Successfully wrote to file: {path}")
+                return True
+            else:
+                self.logger.error(f"Failed to write to file: {path}, Error: {stderr}")
+                return False
+        finally:
+            # 清理临时文件
+            if os.path.exists(temp_path):
+                os.unlink(temp_path)
+    
+    def exists(self, path: str) -> bool:
+        """
+        检查 HDFS 路径是否存在
+        
+        Args:
+            path: HDFS 路径
+            
+        Returns:
+            路径是否存在
+        """
+        if not validate_hdfs_path(path):
+            return False
+        
+        returncode, _, _ = self._execute_hdfs_command('dfs', ['-test', '-e', path])
+        return returncode == 0
+    
+    def list_dir(self, path: str) -> List[str]:
+        """
+        列出 HDFS 目录内容
+        
+        Args:
+            path: HDFS 目录路径
+            
+        Returns:
+            目录内容列表
+        """
+        if not validate_hdfs_path(path):
+            self.logger.error(f"Invalid HDFS path: {path}")
+            return []
+        
+        if not self.exists(path):
+            self.logger.error(f"Directory does not exist: {path}")
+            return []
+        
+        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-ls', path])
+        
+        if returncode == 0:
+            # 解析输出,提取文件名
+            lines = stdout.strip().split('\n')
+            # 跳过第一行(如果是目录列表的标题)
+            if len(lines) > 0 and lines[0].startswith('Found'):
+                lines = lines[1:]
+            
+            # 提取文件名(每一行的最后一个字段)
+            files = []
+            for line in lines:
+                parts = line.split()
+                if len(parts) >= 8:
+                    files.append(parts[-1])
+            return files
+        else:
+            self.logger.error(f"Failed to list directory: {path}, Error: {stderr}")
+            return []
+    
+    def get_file_size(self, path: str) -> Optional[int]:
+        """
+        获取 HDFS 文件大小
+        
+        Args:
+            path: HDFS 文件路径
+            
+        Returns:
+            文件大小(字节),如果失败返回 None
+        """
+        if not validate_hdfs_path(path):
+            self.logger.error(f"Invalid HDFS path: {path}")
+            return None
+        
+        if not self.exists(path):
+            self.logger.error(f"File does not exist: {path}")
+            return None
+        
+        returncode, stdout, stderr = self._execute_hdfs_command('dfs', ['-du', '-s', path])
+        
+        if returncode == 0:
+            # 解析输出,提取文件大小
+            parts = stdout.strip().split()
+            if len(parts) >= 1:
+                try:
+                    return int(parts[0])
+                except ValueError:
+                    self.logger.error(f"Failed to parse file size: {stdout}")
+                    return None
+        else:
+            self.logger.error(f"Failed to get file size: {path}, Error: {stderr}")
+            return None

+ 15 - 0
python/mapreduce/__init__.py

@@ -0,0 +1,15 @@
+"""
+MapReduce 相关模块
+
+提供多种 MapReduce 作业实现方式:
+- Hadoop Streaming: 使用标准输入输出与 Hadoop 交互
+- PySpark: 现代、高效的大数据处理框架
+"""
+
+from .wordcount_streaming import WordCountStreaming
+from .wordcount_spark import WordCountSpark
+
+__all__ = [
+    'WordCountStreaming',
+    'WordCountSpark'
+]

BIN
python/mapreduce/__pycache__/__init__.cpython-312.pyc


BIN
python/mapreduce/__pycache__/wordcount_spark.cpython-312.pyc


BIN
python/mapreduce/__pycache__/wordcount_streaming.cpython-312.pyc


+ 383 - 0
python/mapreduce/wordcount_spark.py

@@ -0,0 +1,383 @@
+"""
+PySpark 方式的词频统计模块
+
+使用 PySpark 实现词频统计,这是现代大数据处理的推荐方式:
+- 更简洁的 API
+- 更好的性能
+- 支持更多的数据处理操作
+- 可以与 Spark SQL、MLlib 等集成
+
+对应 Java 版本的 WordCount 类,但使用更现代的 Spark 框架。
+
+使用方式:
+1. 作为模块导入使用:
+   from wordcount_spark import WordCountSpark
+   wc = WordCountSpark()
+   result = wc.run(input_path, output_path)
+
+2. 作为独立脚本运行:
+   $ python wordcount_spark.py <input_path> <output_path>
+"""
+
+import sys
+from typing import Dict, List, Optional, Tuple
+from collections import defaultdict
+from ..utils.helpers import setup_logger, format_file_size
+
+
+class WordCountSpark:
+    """
+    PySpark 方式的词频统计类
+    
+    封装了 PySpark 作业的执行,提供高效的词频统计功能。
+    """
+    
+    def __init__(self, app_name: str = 'WordCount', 
+                 master: Optional[str] = None,
+                 logger_name: str = 'wordcount_spark'):
+        """
+        初始化 WordCountSpark 实例
+        
+        Args:
+            app_name: Spark 应用名称
+            master: Spark 主节点 URL(可选,如 'local[*]', 'spark://master:7077')
+                    如果为 None,Spark 会从配置中自动获取
+            logger_name: 日志器名称
+        """
+        self.logger = setup_logger(logger_name)
+        self.app_name = app_name
+        self.master = master
+        self.spark = None
+        self.sc = None
+        
+    def _init_spark(self):
+        """
+        初始化 Spark 会话和上下文
+        
+        延迟初始化,只有在需要时才创建 Spark 实例。
+        """
+        if self.spark is not None:
+            return
+        
+        try:
+            from pyspark.sql import SparkSession
+            
+            builder = SparkSession.builder.appName(self.app_name)
+            if self.master:
+                builder = builder.master(self.master)
+            
+            # 配置一些常用参数
+            builder = builder.config("spark.sql.shuffle.partitions", "2")
+            builder = builder.config("spark.driver.memory", "1g")
+            builder = builder.config("spark.executor.memory", "1g")
+            
+            self.spark = builder.getOrCreate()
+            self.sc = self.spark.sparkContext
+            
+            self.logger.info(f"Spark session initialized: {self.app_name}")
+            self.logger.info(f"Spark master: {self.sc.master}")
+            self.logger.info(f"Spark version: {self.sc.version}")
+            
+        except ImportError as e:
+            self.logger.error(f"PySpark is not installed: {e}")
+            raise
+        except Exception as e:
+            self.logger.error(f"Failed to initialize Spark: {e}")
+            raise
+    
+    def stop(self):
+        """
+        停止 Spark 会话
+        """
+        if self.spark:
+            self.spark.stop()
+            self.spark = None
+            self.sc = None
+            self.logger.info("Spark session stopped")
+    
+    def count_words_from_rdd(self, text_rdd) -> Dict[str, int]:
+        """
+        从 RDD 统计单词
+        
+        对应 Java 版本的 WordCount 逻辑,但使用 Spark 的算子。
+        
+        Args:
+            text_rdd: 包含文本的 RDD
+            
+        Returns:
+            单词计数字典
+        """
+        # 1. 分割每行文本为单词
+        # 对应 Java 的 TokenizerMapper.map 方法
+        words_rdd = text_rdd.flatMap(self._split_line)
+        
+        # 2. 映射为 (单词, 1)
+        pairs_rdd = words_rdd.map(lambda word: (word, 1))
+        
+        # 3. 按单词聚合计数
+        # 对应 Java 的 IntSumReducer.reduce 方法
+        word_counts_rdd = pairs_rdd.reduceByKey(lambda x, y: x + y)
+        
+        # 4. 收集结果到本地
+        result = word_counts_rdd.collectAsMap()
+        
+        return dict(result)
+    
+    def _split_line(self, line: str) -> List[str]:
+        """
+        分割一行文本为单词列表
+        
+        Args:
+            line: 输入文本行
+            
+        Returns:
+            单词列表
+        """
+        words = []
+        # 分割文本为单词(使用空格、制表符等分隔符)
+        raw_words = line.strip().split()
+        for word in raw_words:
+            # 清理单词(移除标点符号,转为小写)
+            word = word.strip('.,!?;:()[]{}"\'').lower()
+            if word:  # 确保单词非空
+                words.append(word)
+        return words
+    
+    def count_words_from_dataframe(self, df, text_column: str = 'value') -> Dict[str, int]:
+        """
+        从 DataFrame 统计单词(使用 Spark SQL 风格)
+        
+        更高级的 API,适合复杂的数据处理。
+        
+        Args:
+            df: 包含文本的 DataFrame
+            text_column: 包含文本的列名
+            
+        Returns:
+            单词计数字典
+        """
+        from pyspark.sql.functions import explode, split, lower, trim, regexp_replace, col, count
+        
+        # 1. 清理文本(移除标点符号,转为小写)
+        df_clean = df.withColumn(
+            'clean_text',
+            lower(trim(regexp_replace(col(text_column), '[^a-zA-Z0-9\\s]', ' ')))
+        )
+        
+        # 2. 分割为单词
+        df_words = df_clean.withColumn(
+            'word',
+            explode(split(col('clean_text'), '\\s+'))
+        )
+        
+        # 3. 过滤空单词
+        df_filtered = df_words.filter(col('word') != '')
+        
+        # 4. 按单词分组计数
+        df_counts = df_filtered.groupBy('word').agg(count('*').alias('count'))
+        
+        # 5. 收集结果
+        result = {row['word']: row['count'] for row in df_counts.collect()}
+        
+        return result
+    
+    def run(self, input_path: str, output_path: Optional[str] = None,
+            use_dataframe: bool = True) -> Dict[str, int]:
+        """
+        运行完整的 WordCount 作业
+        
+        Args:
+            input_path: 输入路径(可以是本地文件路径或 HDFS 路径)
+            output_path: 输出路径(可选,如果指定则保存结果)
+            use_dataframe: 是否使用 DataFrame API(否则使用 RDD API)
+            
+        Returns:
+            单词计数字典
+        """
+        self._init_spark()
+        
+        self.logger.info(f"Running WordCount job on: {input_path}")
+        
+        if use_dataframe:
+            # 使用 DataFrame API
+            df = self.spark.read.text(input_path)
+            result = self.count_words_from_dataframe(df)
+        else:
+            # 使用 RDD API
+            text_rdd = self.sc.textFile(input_path)
+            result = self.count_words_from_rdd(text_rdd)
+        
+        # 保存结果(如果指定了输出路径)
+        if output_path:
+            self._save_result(result, output_path)
+        
+        # 打印统计信息
+        self._print_statistics(result)
+        
+        return result
+    
+    def _save_result(self, result: Dict[str, int], output_path: str):
+        """
+        保存结果到文件
+        
+        Args:
+            result: 单词计数字典
+            output_path: 输出路径
+        """
+        self.logger.info(f"Saving results to: {output_path}")
+        
+        # 转换为 RDD 并保存
+        result_rdd = self.sc.parallelize([
+            f"{word}\t{count}" 
+            for word, count in sorted(result.items())
+        ])
+        result_rdd.saveAsTextFile(output_path)
+        
+        self.logger.info(f"Results saved to: {output_path}")
+    
+    def _print_statistics(self, result: Dict[str, int]):
+        """
+        打印统计信息
+        
+        Args:
+            result: 单词计数字典
+        """
+        if not result:
+            self.logger.info("No words found")
+            return
+        
+        total_words = sum(result.values())
+        unique_words = len(result)
+        sorted_words = sorted(result.items(), key=lambda x: x[1], reverse=True)
+        
+        self.logger.info("=" * 50)
+        self.logger.info("WordCount Statistics")
+        self.logger.info("=" * 50)
+        self.logger.info(f"Total words: {total_words}")
+        self.logger.info(f"Unique words: {unique_words}")
+        self.logger.info("-" * 50)
+        self.logger.info("Top 10 words:")
+        
+        for i, (word, count) in enumerate(sorted_words[:10], 1):
+            percentage = (count / total_words) * 100
+            self.logger.info(f"  {i:2d}. {word:15s} {count:5d} ({percentage:5.1f}%)")
+        
+        self.logger.info("=" * 50)
+    
+    def count_words_locally(self, text: str) -> Dict[str, int]:
+        """
+        本地统计单词(不使用 Spark 集群)
+        
+        用于测试和小规模数据处理。
+        
+        Args:
+            text: 输入文本
+            
+        Returns:
+            单词计数字典
+            
+        Example:
+            >>> wc = WordCountSpark()
+            >>> wc.count_words_locally("hello world hello")
+            {'hello': 2, 'world': 1}
+        """
+        word_counts = defaultdict(int)
+        
+        for line in text.split('\n'):
+            words = self._split_line(line)
+            for word in words:
+                word_counts[word] += 1
+        
+        return dict(word_counts)
+    
+    def run_with_files(self, files: List[str], output_path: Optional[str] = None) -> Dict[str, int]:
+        """
+        对多个文件运行词频统计
+        
+        Args:
+            files: 文件路径列表
+            output_path: 输出路径(可选)
+            
+        Returns:
+            单词计数字典
+        """
+        # 合并所有文件的内容
+        all_text = ""
+        for file_path in files:
+            try:
+                with open(file_path, 'r', encoding='utf-8') as f:
+                    all_text += f.read() + "\n"
+            except Exception as e:
+                self.logger.warning(f"Failed to read file {file_path}: {e}")
+        
+        # 本地统计
+        result = self.count_words_locally(all_text)
+        
+        # 保存结果
+        if output_path:
+            with open(output_path, 'w', encoding='utf-8') as f:
+                for word, count in sorted(result.items()):
+                    f.write(f"{word}\t{count}\n")
+        
+        # 打印统计信息
+        self._print_statistics(result)
+        
+        return result
+
+
+def main():
+    """
+    主函数:作为独立脚本运行
+    
+    使用方式:
+    python wordcount_spark.py <input_path> [output_path]
+    """
+    if len(sys.argv) < 2:
+        print("Usage: python wordcount_spark.py <input_path> [output_path]")
+        print("Examples:")
+        print("  python wordcount_spark.py input.txt")
+        print("  python wordcount_spark.py hdfs:///user/hadoop/data output")
+        print("  python wordcount_spark.py --local input.txt output.txt")
+        sys.exit(1)
+    
+    # 解析参数
+    use_local = False
+    input_path = None
+    output_path = None
+    
+    i = 1
+    while i < len(sys.argv):
+        arg = sys.argv[i]
+        if arg == '--local':
+            use_local = True
+        elif input_path is None:
+            input_path = arg
+        else:
+            output_path = arg
+        i += 1
+    
+    if input_path is None:
+        print("Error: Input path is required")
+        sys.exit(1)
+    
+    wc = WordCountSpark()
+    
+    try:
+        if use_local:
+            # 本地模式(不使用 Spark)
+            result = wc.run_with_files([input_path], output_path)
+        else:
+            # Spark 模式
+            result = wc.run(input_path, output_path)
+        
+        # 打印结果
+        print("\nFinal results:")
+        for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True)[:20]:
+            print(f"{word}: {count}")
+    
+    finally:
+        wc.stop()
+
+
+if __name__ == '__main__':
+    main()

+ 346 - 0
python/mapreduce/wordcount_streaming.py

@@ -0,0 +1,346 @@
+"""
+Hadoop Streaming 方式的词频统计模块
+
+对应 Java 版本的 WordCount 类,使用 Hadoop Streaming 方式实现:
+- Mapper: 从标准输入读取数据,分割为单词,输出 <单词, 1>
+- Reducer: 从标准输入读取 Mapper 输出,统计每个单词的总次数
+- Combiner: 可选的本地合并,减少数据传输
+
+使用方式:
+1. 作为独立脚本运行(用于 Hadoop Streaming):
+   $ python wordcount_streaming.py mapper < input.txt
+   $ python wordcount_streaming.py reducer < mapper_output.txt
+
+2. 作为模块导入使用:
+   from wordcount_streaming import WordCountStreaming
+   wc = WordCountStreaming()
+   wc.run(input_path, output_path)
+"""
+
+import sys
+from collections import defaultdict
+from typing import Dict, List, Optional, Tuple
+from ..utils.helpers import run_command, setup_logger
+
+
+class WordCountStreaming:
+    """
+    Hadoop Streaming 方式的词频统计类
+    
+    封装了 Hadoop Streaming 作业的执行,提供与 Java 版本 WordCount 类类似的功能。
+    """
+    
+    def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'wordcount_streaming'):
+        """
+        初始化 WordCountStreaming 实例
+        
+        Args:
+            hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
+            logger_name: 日志器名称
+        """
+        self.logger = setup_logger(logger_name)
+        self.hadoop_home = hadoop_home or __import__('os').environ.get('HADOOP_HOME', '')
+        self.hadoop_cmd = 'hadoop'
+        
+    def mapper(self, line: str) -> List[Tuple[str, int]]:
+        """
+        Mapper 函数:将一行文本分割为单词,输出 <单词, 1>
+        
+        对应 Java 版本的 TokenizerMapper.map 方法。
+        
+        Args:
+            line: 输入的一行文本
+            
+        Returns:
+            单词和计数的元组列表
+            
+        Example:
+            >>> wc = WordCountStreaming()
+            >>> wc.mapper("hello world hello")
+            [('hello', 1), ('world', 1), ('hello', 1)]
+        """
+        results = []
+        # 分割文本为单词(使用空格、制表符等分隔符)
+        words = line.strip().split()
+        for word in words:
+            # 清理单词(移除标点符号,转为小写)
+            word = word.strip('.,!?;:()[]{}"\'').lower()
+            if word:  # 确保单词非空
+                results.append((word, 1))
+        return results
+    
+    def combiner(self, pairs: List[Tuple[str, int]]) -> List[Tuple[str, int]]:
+        """
+        Combiner 函数:在 Mapper 端进行本地合并
+        
+        对应 Java 版本的 Combiner 功能,减少数据传输量。
+        
+        Args:
+            pairs: Mapper 输出的 <单词, 1> 列表
+            
+        Returns:
+            合并后的 <单词, 本地计数> 列表
+            
+        Example:
+            >>> wc = WordCountStreaming()
+            >>> wc.combiner([('hello', 1), ('world', 1), ('hello', 1)])
+            [('hello', 2), ('world', 1)]
+        """
+        word_counts = defaultdict(int)
+        for word, count in pairs:
+            word_counts[word] += count
+        return [(word, count) for word, count in word_counts.items()]
+    
+    def reducer(self, word: str, counts: List[int]) -> Tuple[str, int]:
+        """
+        Reducer 函数:统计每个单词的总次数
+        
+        对应 Java 版本的 IntSumReducer.reduce 方法。
+        
+        Args:
+            word: 单词
+            counts: 该单词的所有计数列表
+            
+        Returns:
+            <单词, 总次数> 元组
+            
+        Example:
+            >>> wc = WordCountStreaming()
+            >>> wc.reducer('hello', [1, 1, 1])
+            ('hello', 3)
+        """
+        total = sum(counts)
+        return (word, total)
+    
+    def run_mapper_from_stdin(self):
+        """
+        从标准输入运行 Mapper(用于 Hadoop Streaming)
+        
+        从 stdin 读取每行数据,执行 Mapper 逻辑,输出到 stdout。
+        """
+        for line in sys.stdin:
+            pairs = self.mapper(line)
+            for word, count in pairs:
+                print(f"{word}\t{count}")
+    
+    def run_reducer_from_stdin(self):
+        """
+        从标准输入运行 Reducer(用于 Hadoop Streaming)
+        
+        从 stdin 读取 Mapper 输出,执行 Reducer 逻辑,输出到 stdout。
+        假设输入已经按键排序(Hadoop Streaming 会自动排序)。
+        """
+        current_word = None
+        current_counts = []
+        
+        for line in sys.stdin:
+            line = line.strip()
+            if not line:
+                continue
+            
+            # 解析输入:单词\t计数
+            parts = line.split('\t', 1)
+            if len(parts) != 2:
+                continue
+            
+            word, count_str = parts
+            try:
+                count = int(count_str)
+            except ValueError:
+                continue
+            
+            # 处理相同单词的计数
+            if current_word == word:
+                current_counts.append(count)
+            else:
+                # 输出前一个单词的结果
+                if current_word is not None:
+                    result_word, result_count = self.reducer(current_word, current_counts)
+                    print(f"{result_word}\t{result_count}")
+                
+                # 开始处理新单词
+                current_word = word
+                current_counts = [count]
+        
+        # 输出最后一个单词的结果
+        if current_word is not None:
+            result_word, result_count = self.reducer(current_word, current_counts)
+            print(f"{result_word}\t{result_count}")
+    
+    def run(self, input_path: str, output_path: str, 
+            mapper_script: Optional[str] = None,
+            reducer_script: Optional[str] = None,
+            combiner: bool = True,
+            num_reducers: int = 1) -> bool:
+        """
+        运行完整的 WordCount 作业
+        
+        使用 Hadoop Streaming 提交作业到 Hadoop 集群。
+        
+        Args:
+            input_path: HDFS 输入路径
+            output_path: HDFS 输出路径(不能已存在)
+            mapper_script: Mapper 脚本路径(可选,默认使用当前脚本)
+            reducer_script: Reducer 脚本路径(可选,默认使用当前脚本)
+            combiner: 是否使用 Combiner
+            num_reducers: Reducer 任务数量
+            
+        Returns:
+            作业是否成功完成
+        """
+        import os
+        
+        # 确定脚本路径
+        if mapper_script is None:
+            mapper_script = __file__
+        if reducer_script is None:
+            reducer_script = __file__
+        
+        # 构建 Hadoop Streaming 命令
+        streaming_jar = self._find_streaming_jar()
+        if not streaming_jar:
+            self.logger.error("Could not find Hadoop Streaming jar")
+            return False
+        
+        cmd_parts = [
+            self.hadoop_cmd,
+            'jar', streaming_jar,
+            '-files', f"{mapper_script},{reducer_script}",
+            '-mapper', f"python3 {os.path.basename(mapper_script)} mapper",
+            '-reducer', f"python3 {os.path.basename(reducer_script)} reducer",
+            '-input', input_path,
+            '-output', output_path,
+            '-D', f"mapreduce.job.reduces={num_reducers}"
+        ]
+        
+        if combiner:
+            cmd_parts.extend(['-combiner', f"python3 {os.path.basename(mapper_script)} mapper | sort | python3 {os.path.basename(reducer_script)} reducer"])
+        
+        cmd = ' '.join(cmd_parts)
+        self.logger.info(f"Running Hadoop Streaming job: {cmd}")
+        
+        returncode, stdout, stderr = run_command(cmd, timeout=3600)  # 1小时超时
+        
+        if returncode == 0:
+            self.logger.info("WordCount job completed successfully")
+            self.logger.info(f"Output: {stdout}")
+            return True
+        else:
+            self.logger.error(f"WordCount job failed with return code {returncode}")
+            self.logger.error(f"Stderr: {stderr}")
+            return False
+    
+    def _find_streaming_jar(self) -> Optional[str]:
+        """
+        查找 Hadoop Streaming jar 文件
+        
+        Returns:
+            Streaming jar 文件路径,如果未找到返回 None
+        """
+        import os
+        import glob
+        
+        # 尝试从常见位置查找
+        search_paths = [
+            os.path.join(self.hadoop_home, 'share', 'hadoop', 'tools', 'lib'),
+            os.path.join(self.hadoop_home, 'contrib', 'streaming'),
+            '/usr/lib/hadoop-mapreduce',
+            '/usr/hdp/current/hadoop-mapreduce-client'
+        ]
+        
+        for path in search_paths:
+            if os.path.exists(path):
+                jars = glob.glob(os.path.join(path, 'hadoop-streaming-*.jar'))
+                if jars:
+                    return jars[0]
+        
+        # 尝试使用 hadoop classpath 查找
+        returncode, stdout, stderr = run_command(f"{self.hadoop_cmd} classpath --glob")
+        if returncode == 0:
+            # 解析 classpath,查找 streaming jar
+            classpath = stdout.strip()
+            for part in classpath.split(os.pathsep):
+                if 'streaming' in part.lower() and part.endswith('.jar'):
+                    return part
+        
+        return None
+    
+    def count_words_locally(self, text: str) -> Dict[str, int]:
+        """
+        本地统计单词(不使用 Hadoop)
+        
+        用于测试和小规模数据处理。
+        
+        Args:
+            text: 输入文本
+            
+        Returns:
+            单词计数字典
+            
+        Example:
+            >>> wc = WordCountStreaming()
+            >>> wc.count_words_locally("hello world hello")
+            {'hello': 2, 'world': 1}
+        """
+        # 模拟完整的 MapReduce 流程
+        all_pairs = []
+        for line in text.split('\n'):
+            pairs = self.mapper(line)
+            all_pairs.extend(pairs)
+        
+        # 按单词分组
+        word_groups = defaultdict(list)
+        for word, count in all_pairs:
+            word_groups[word].append(count)
+        
+        # 执行 Reduce
+        results = {}
+        for word, counts in word_groups.items():
+            _, total = self.reducer(word, counts)
+            results[word] = total
+        
+        return results
+
+
+def main():
+    """
+    主函数:作为独立脚本运行
+    
+    支持的命令:
+    - mapper: 运行 Mapper
+    - reducer: 运行 Reducer
+    - local: 本地测试
+    """
+    if len(sys.argv) < 2:
+        print("Usage: python wordcount_streaming.py <command>")
+        print("Commands:")
+        print("  mapper   - Run Mapper from stdin")
+        print("  reducer  - Run Reducer from stdin")
+        print("  local    - Run local test")
+        sys.exit(1)
+    
+    command = sys.argv[1]
+    wc = WordCountStreaming()
+    
+    if command == 'mapper':
+        wc.run_mapper_from_stdin()
+    elif command == 'reducer':
+        wc.run_reducer_from_stdin()
+    elif command == 'local':
+        # 本地测试
+        test_text = """
+        Hello world, hello Hadoop!
+        Hadoop is great for big data.
+        Big data processing with Hadoop.
+        """
+        result = wc.count_words_locally(test_text)
+        print("Word count results:")
+        for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True):
+            print(f"{word}: {count}")
+    else:
+        print(f"Unknown command: {command}")
+        sys.exit(1)
+
+
+if __name__ == '__main__':
+    main()

+ 22 - 0
python/utils/__init__.py

@@ -0,0 +1,22 @@
+"""
+工具函数模块
+
+提供常用的辅助功能:
+- 命令行执行工具
+- 文件处理工具
+- 日志工具
+"""
+
+from .helpers import (
+    run_command,
+    validate_hdfs_path,
+    format_file_size,
+    setup_logger
+)
+
+__all__ = [
+    'run_command',
+    'validate_hdfs_path',
+    'format_file_size',
+    'setup_logger'
+]

BIN
python/utils/__pycache__/__init__.cpython-312.pyc


BIN
python/utils/__pycache__/helpers.cpython-312.pyc


+ 127 - 0
python/utils/helpers.py

@@ -0,0 +1,127 @@
+"""
+工具函数模块
+
+提供常用的辅助功能:
+- 命令行执行工具
+- 文件处理工具
+- 日志工具
+"""
+
+import subprocess
+import logging
+import re
+from typing import Tuple, Optional
+
+
+def run_command(cmd: str, shell: bool = True, timeout: int = 300) -> Tuple[int, str, str]:
+    """
+    执行命令行命令
+    
+    Args:
+        cmd: 要执行的命令
+        shell: 是否使用 shell 执行
+        timeout: 超时时间(秒)
+        
+    Returns:
+        (return_code, stdout, stderr)
+    """
+    try:
+        result = subprocess.run(
+            cmd,
+            shell=shell,
+            capture_output=True,
+            text=True,
+            timeout=timeout
+        )
+        return result.returncode, result.stdout, result.stderr
+    except subprocess.TimeoutExpired:
+        return -1, "", f"Command timed out after {timeout} seconds"
+    except Exception as e:
+        return -1, "", str(e)
+
+
+def validate_hdfs_path(path: str) -> bool:
+    """
+    验证 HDFS 路径格式是否有效
+    
+    Args:
+        path: 要验证的路径
+        
+    Returns:
+        路径是否有效
+    """
+    if not path:
+        return False
+    
+    # HDFS 路径必须以 / 开头
+    if not path.startswith('/'):
+        return False
+    
+    # 检查是否包含非法字符
+    invalid_chars = re.compile(r'[<>:"|?*]')
+    if invalid_chars.search(path):
+        return False
+    
+    # 检查是否包含连续的斜杠
+    if '//' in path:
+        return False
+    
+    return True
+
+
+def format_file_size(size_bytes: int) -> str:
+    """
+    格式化文件大小,将字节转换为人类可读的格式
+    
+    Args:
+        size_bytes: 文件大小(字节)
+        
+    Returns:
+        格式化后的文件大小字符串
+    """
+    for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
+        if size_bytes < 1024.0:
+            return f"{size_bytes:.2f} {unit}"
+        size_bytes /= 1024.0
+    return f"{size_bytes:.2f} PB"
+
+
+def setup_logger(name: str, level: int = logging.INFO, log_file: Optional[str] = None) -> logging.Logger:
+    """
+    设置日志器
+    
+    Args:
+        name: 日志器名称
+        level: 日志级别
+        log_file: 日志文件路径(可选)
+        
+    Returns:
+        配置好的日志器
+    """
+    logger = logging.getLogger(name)
+    logger.setLevel(level)
+    
+    # 避免重复添加处理器
+    if logger.handlers:
+        return logger
+    
+    # 创建格式器
+    formatter = logging.Formatter(
+        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+        datefmt='%Y-%m-%d %H:%M:%S'
+    )
+    
+    # 添加控制台处理器
+    console_handler = logging.StreamHandler()
+    console_handler.setLevel(level)
+    console_handler.setFormatter(formatter)
+    logger.addHandler(console_handler)
+    
+    # 如果指定了日志文件,添加文件处理器
+    if log_file:
+        file_handler = logging.FileHandler(log_file)
+        file_handler.setLevel(level)
+        file_handler.setFormatter(formatter)
+        logger.addHandler(file_handler)
+    
+    return logger

+ 11 - 0
requirements.txt

@@ -0,0 +1,11 @@
+# Hadoop 相关依赖
+# PySpark - 用于现代大数据处理
+pyspark>=3.0.0
+
+# 可选:HDFS 客户端库(如果不想使用命令行工具)
+# hdfs>=2.7.0
+# pyhdfs>=0.3.0
+
+# 工具库
+click>=7.0  # 用于创建命令行工具
+rich>=10.0.0  # 用于美化输出