Hadoop 相关操作类,提供 Hadoop 数据分析能力。
本项目提供两种语言的实现:
src/ 目录python/ 目录(推荐使用)hdfs_operations.py)提供与 HDFS 交互的各种方法:
| 方法 | 功能描述 |
|---|---|
make_dir(path) |
创建目录 |
delete(path, recursive) |
删除文件/目录 |
copy_from_local(src, dst) |
从本地上传文件到 HDFS |
copy_to_local(src, dst) |
从 HDFS 下载文件到本地 |
read_file(path) |
读取 HDFS 文件内容 |
write_file(path, content) |
写入内容到 HDFS 文件 |
exists(path) |
检查路径是否存在 |
list_dir(path) |
列出目录内容 |
get_file_size(path) |
获取文件大小 |
使用示例:
from python.hdfs_operations import HDFSOperations
hdfs = HDFSOperations()
# 创建目录
hdfs.make_dir('/user/hadoop/data')
# 上传文件
hdfs.copy_from_local('/local/path/file.txt', '/user/hadoop/data/')
# 读取文件内容
content = hdfs.read_file('/user/hadoop/data/file.txt')
# 检查文件是否存在
if hdfs.exists('/user/hadoop/data/file.txt'):
print("文件存在")
mapreduce/wordcount_streaming.py)使用 Hadoop Streaming 方式实现经典的 WordCount 算法:
核心组件:
<单词, 1>使用方式:
作为模块导入:
from python.mapreduce.wordcount_streaming import WordCountStreaming
wc = WordCountStreaming()
# 本地统计(用于测试)
result = wc.count_words_locally("Hello world, hello Hadoop!")
print(result) # {'hello': 2, 'world': 1, 'hadoop': 1}
# 提交 Hadoop Streaming 作业
wc.run('/user/hadoop/input', '/user/hadoop/output')
作为独立脚本运行:
# 运行 Mapper
echo "Hello world hello" | python wordcount_streaming.py mapper
# 运行 Reducer
echo "hello\t1\nworld\t1\nhello\t1" | sort | python wordcount_streaming.py reducer
# 本地测试
python wordcount_streaming.py local
mapreduce/wordcount_spark.py)使用 PySpark 实现词频统计,这是现代大数据处理的推荐方式:
特性:
使用方式:
作为模块导入:
from python.mapreduce.wordcount_spark import WordCountSpark
wc = WordCountSpark()
# 本地统计(不使用 Spark 集群)
result = wc.count_words_locally("Spark is fast and general engine")
# 使用 Spark 运行(支持本地文件和 HDFS)
result = wc.run('/user/hadoop/data/input.txt', '/user/hadoop/output')
# 停止 Spark 会话
wc.stop()
作为独立脚本运行:
# 本地模式(不使用 Spark 集群)
python wordcount_spark.py --local input.txt output.txt
# Spark 模式
python wordcount_spark.py hdfs:///user/hadoop/data output
utils/helpers.py)提供常用的辅助功能:
| 函数 | 功能描述 |
|---|---|
run_command(cmd) |
执行命令行命令 |
validate_hdfs_path(path) |
验证 HDFS 路径格式 |
format_file_size(size_bytes) |
格式化文件大小(B/KB/MB/GB/TB) |
setup_logger(name) |
设置日志器 |
# 安装基础依赖
pip install -r requirements.txt
# 安装 PySpark(如果需要使用 Spark 功能)
pip install pyspark
# 运行完整示例
python examples/run_examples.py
# 测试 Hadoop Streaming 词频统计
python python/mapreduce/wordcount_streaming.py local
# 测试 PySpark 词频统计(本地模式)
python python/mapreduce/wordcount_spark.py --local examples/sample_data.txt
hadoop-tools/
├── python/ # Python 版本实现
│ ├── __init__.py # 模块入口
│ ├── hdfs_operations.py # HDFS 操作功能
│ ├── mapreduce/
│ │ ├── __init__.py
│ │ ├── wordcount_streaming.py # Hadoop Streaming 方式
│ │ └── wordcount_spark.py # PySpark 方式
│ └── utils/
│ ├── __init__.py
│ └── helpers.py # 工具函数
├── examples/ # 示例文件
│ ├── sample_data.txt # 示例数据
│ └── run_examples.py # 示例运行脚本
├── src/ # Java 版本实现
│ └── me/yoqi/hadoop/test/
│ ├── CommonOperation.java # HDFS 操作
│ └── WordCount.java # 词频统计
├── R/ # R 语言相关(待实现)
├── pom.xml # Maven 配置(Java 项目)
├── requirements.txt # Python 依赖
└── README.md
| Java 类/方法 | Python 对应 |
|---|---|
CommonOperation.makeDir() |
HDFSOperations.make_dir() |
CommonOperation.delDir() |
HDFSOperations.delete(recursive=True) |
CommonOperation.delFile() |
HDFSOperations.delete(recursive=False) |
CommonOperation.putFile() |
HDFSOperations.copy_from_local() |
CommonOperation.readFile() |
HDFSOperations.read_file() |
CommonOperation.writeFile() |
HDFSOperations.write_file() |
WordCount.TokenizerMapper |
WordCountStreaming.mapper() |
WordCount.IntSumReducer |
WordCountStreaming.reducer() |
WordCount.main() |
WordCountStreaming.run() / WordCountSpark.run() |
hdfs 或 hadoop 命令可用/ 开头,不能包含连续斜杠或非法字符详见 LICENSE 文件。