hadoop相关操作类

liuyuqi-cnb bd8c6c6740 feat(mapreduce): enhance wordcount streaming with modern features 1 month ago
R ba13c6f073 添加 R/HadoopStreaming 9 years ago
auto_install 1164bd2fb2 添加hadoop基础操作: 9 years ago
examples e211c241cc feat: implement hadoop tools in python with hdfs operations and mapreduce 1 month ago
python bd8c6c6740 feat(mapreduce): enhance wordcount streaming with modern features 1 month ago
src 1164bd2fb2 添加hadoop基础操作: 9 years ago
.gitignore bd8c6c6740 feat(mapreduce): enhance wordcount streaming with modern features 1 month ago
LICENSE 60c7a2afda Initial commit 9 years ago
README.md e211c241cc feat: implement hadoop tools in python with hdfs operations and mapreduce 1 month ago
pom.xml 1164bd2fb2 添加hadoop基础操作: 9 years ago
requirements.txt b4beccafda feat(hdfs_operations): implement comprehensive HDFS operations class 1 month ago

README.md

hadoop-tools

Hadoop 相关操作类,提供 Hadoop 数据分析能力。

项目概述

本项目提供两种语言的实现:

  • Java 版本:原始实现,位于 src/ 目录
  • Python 版本:新实现,位于 python/ 目录(推荐使用)

Python 版本功能

1. HDFS 文件系统操作 (hdfs_operations.py)

提供与 HDFS 交互的各种方法:

方法 功能描述
make_dir(path) 创建目录
delete(path, recursive) 删除文件/目录
copy_from_local(src, dst) 从本地上传文件到 HDFS
copy_to_local(src, dst) 从 HDFS 下载文件到本地
read_file(path) 读取 HDFS 文件内容
write_file(path, content) 写入内容到 HDFS 文件
exists(path) 检查路径是否存在
list_dir(path) 列出目录内容
get_file_size(path) 获取文件大小

使用示例:

from python.hdfs_operations import HDFSOperations

hdfs = HDFSOperations()

# 创建目录
hdfs.make_dir('/user/hadoop/data')

# 上传文件
hdfs.copy_from_local('/local/path/file.txt', '/user/hadoop/data/')

# 读取文件内容
content = hdfs.read_file('/user/hadoop/data/file.txt')

# 检查文件是否存在
if hdfs.exists('/user/hadoop/data/file.txt'):
    print("文件存在")

2. Hadoop Streaming 词频统计 (mapreduce/wordcount_streaming.py)

使用 Hadoop Streaming 方式实现经典的 WordCount 算法:

核心组件:

  • Mapper: 将文本分割为单词,输出 <单词, 1>
  • Combiner: 在 Mapper 端进行本地合并,减少数据传输
  • Reducer: 统计每个单词的总次数

使用方式:

  1. 作为模块导入:

    from python.mapreduce.wordcount_streaming import WordCountStreaming
    
    wc = WordCountStreaming()
    
    # 本地统计(用于测试)
    result = wc.count_words_locally("Hello world, hello Hadoop!")
    print(result)  # {'hello': 2, 'world': 1, 'hadoop': 1}
    
    # 提交 Hadoop Streaming 作业
    wc.run('/user/hadoop/input', '/user/hadoop/output')
    
  2. 作为独立脚本运行:

    # 运行 Mapper
    echo "Hello world hello" | python wordcount_streaming.py mapper
    
    # 运行 Reducer
    echo "hello\t1\nworld\t1\nhello\t1" | sort | python wordcount_streaming.py reducer
    
    # 本地测试
    python wordcount_streaming.py local
    

3. PySpark 词频统计 (mapreduce/wordcount_spark.py)

使用 PySpark 实现词频统计,这是现代大数据处理的推荐方式:

特性:

  • 更简洁的 API
  • 更好的性能
  • 支持 RDD 和 DataFrame 两种 API
  • 可以与 Spark SQL、MLlib 等集成

使用方式:

  1. 作为模块导入:

    from python.mapreduce.wordcount_spark import WordCountSpark
    
    wc = WordCountSpark()
    
    # 本地统计(不使用 Spark 集群)
    result = wc.count_words_locally("Spark is fast and general engine")
    
    # 使用 Spark 运行(支持本地文件和 HDFS)
    result = wc.run('/user/hadoop/data/input.txt', '/user/hadoop/output')
    
    # 停止 Spark 会话
    wc.stop()
    
  2. 作为独立脚本运行:

    # 本地模式(不使用 Spark 集群)
    python wordcount_spark.py --local input.txt output.txt
    
    # Spark 模式
    python wordcount_spark.py hdfs:///user/hadoop/data output
    

工具函数 (utils/helpers.py)

提供常用的辅助功能:

函数 功能描述
run_command(cmd) 执行命令行命令
validate_hdfs_path(path) 验证 HDFS 路径格式
format_file_size(size_bytes) 格式化文件大小(B/KB/MB/GB/TB)
setup_logger(name) 设置日志器

安装依赖

# 安装基础依赖
pip install -r requirements.txt

# 安装 PySpark(如果需要使用 Spark 功能)
pip install pyspark

运行示例

# 运行完整示例
python examples/run_examples.py

# 测试 Hadoop Streaming 词频统计
python python/mapreduce/wordcount_streaming.py local

# 测试 PySpark 词频统计(本地模式)
python python/mapreduce/wordcount_spark.py --local examples/sample_data.txt

项目结构

hadoop-tools/
├── python/                    # Python 版本实现
│   ├── __init__.py           # 模块入口
│   ├── hdfs_operations.py    # HDFS 操作功能
│   ├── mapreduce/
│   │   ├── __init__.py
│   │   ├── wordcount_streaming.py  # Hadoop Streaming 方式
│   │   └── wordcount_spark.py      # PySpark 方式
│   └── utils/
│       ├── __init__.py
│       └── helpers.py         # 工具函数
├── examples/                  # 示例文件
│   ├── sample_data.txt       # 示例数据
│   └── run_examples.py       # 示例运行脚本
├── src/                       # Java 版本实现
│   └── me/yoqi/hadoop/test/
│       ├── CommonOperation.java  # HDFS 操作
│       └── WordCount.java        # 词频统计
├── R/                         # R 语言相关(待实现)
├── pom.xml                   # Maven 配置(Java 项目)
├── requirements.txt          # Python 依赖
└── README.md

与 Java 版本的对应关系

Java 类/方法 Python 对应
CommonOperation.makeDir() HDFSOperations.make_dir()
CommonOperation.delDir() HDFSOperations.delete(recursive=True)
CommonOperation.delFile() HDFSOperations.delete(recursive=False)
CommonOperation.putFile() HDFSOperations.copy_from_local()
CommonOperation.readFile() HDFSOperations.read_file()
CommonOperation.writeFile() HDFSOperations.write_file()
WordCount.TokenizerMapper WordCountStreaming.mapper()
WordCount.IntSumReducer WordCountStreaming.reducer()
WordCount.main() WordCountStreaming.run() / WordCountSpark.run()

注意事项

  1. HDFS 操作:需要配置 Hadoop 环境变量,确保 hdfshadoop 命令可用
  2. Hadoop Streaming:需要找到 Hadoop Streaming jar 文件,脚本会自动查找常见位置
  3. PySpark:需要安装 PySpark 并配置 Spark 环境
  4. 路径格式:HDFS 路径必须以 / 开头,不能包含连续斜杠或非法字符

许可证

详见 LICENSE 文件。