Browse Source

feat(mapreduce): enhance wordcount streaming with modern features

- Add configuration management integration
- Improve error handling and type safety with dataclasses
- Add enhanced statistics and analysis capabilities
- Implement flexible command line arguments
- Support multiple output formats (text, json, csv)
- Add stop words filtering and case sensitivity options
- Include word length filtering and frequency analysis
liuyuqi-cnb 1 month ago
parent
commit
bd8c6c6740
2 changed files with 484 additions and 62 deletions
  1. 1 0
      .gitignore
  2. 483 62
      python/mapreduce/wordcount_streaming.py

+ 1 - 0
.gitignore

@@ -15,3 +15,4 @@ hs_err_pid*
 /build/
 /build/
 /target/
 /target/
 /bin/
 /bin/
+*.pyc

+ 483 - 62
python/mapreduce/wordcount_streaming.py

@@ -1,48 +1,170 @@
 """
 """
-Hadoop Streaming 方式的词频统计模块
+Hadoop Streaming 方式的词频统计模块 - 现代化版本
 
 
 对应 Java 版本的 WordCount 类,使用 Hadoop Streaming 方式实现:
 对应 Java 版本的 WordCount 类,使用 Hadoop Streaming 方式实现:
 - Mapper: 从标准输入读取数据,分割为单词,输出 <单词, 1>
 - Mapper: 从标准输入读取数据,分割为单词,输出 <单词, 1>
 - Reducer: 从标准输入读取 Mapper 输出,统计每个单词的总次数
 - Reducer: 从标准输入读取 Mapper 输出,统计每个单词的总次数
 - Combiner: 可选的本地合并,减少数据传输
 - Combiner: 可选的本地合并,减少数据传输
 
 
-使用方式:
-1. 作为独立脚本运行(用于 Hadoop Streaming):
-   $ python wordcount_streaming.py mapper < input.txt
-   $ python wordcount_streaming.py reducer < mapper_output.txt
-
-2. 作为模块导入使用:
-   from wordcount_streaming import WordCountStreaming
-   wc = WordCountStreaming()
-   wc.run(input_path, output_path)
+现代化特性:
+- 配置管理集成
+- 改进的错误处理
+- 类型安全的数据类
+- 增强的统计功能
+- 灵活的命令行参数
 """
 """
 
 
 import sys
 import sys
+import os
+import json
 from collections import defaultdict
 from collections import defaultdict
-from typing import Dict, List, Optional, Tuple
-from ..utils.helpers import run_command, setup_logger
+from dataclasses import dataclass, field, asdict
+from typing import Dict, List, Optional, Tuple, Any, Iterator
+from enum import Enum
+
+from ..utils.helpers import run_command, setup_logger, format_file_size, default_value
+from ..config import ConfigurationManager, MapReduceConfig, get_config
+
+
+class OutputFormat(Enum):
+    """
+    输出格式枚举
+    """
+    TEXT = 'text'
+    JSON = 'json'
+    CSV = 'csv'
+
+
+@dataclass
+class WordCountResult:
+    """
+    词频统计结果数据类
+    """
+    total_words: int = 0
+    unique_words: int = 0
+    top_words: List[Tuple[str, int]] = field(default_factory=list)
+    word_counts: Dict[str, int] = field(default_factory=dict)
+    execution_time_ms: float = 0.0
+    
+    @property
+    def most_frequent_word(self) -> Optional[Tuple[str, int]]:
+        """
+        获取出现频率最高的单词
+        
+        Returns:
+            (单词, 次数) 元组,如果没有单词返回 None
+        """
+        return self.top_words[0] if self.top_words else None
+    
+    @property
+    def avg_word_frequency(self) -> float:
+        """
+        计算平均词频
+        
+        Returns:
+            平均每个单词出现的次数
+        """
+        return self.total_words / self.unique_words if self.unique_words > 0 else 0.0
+    
+    def to_dict(self) -> Dict[str, Any]:
+        """
+        转换为字典
+        
+        Returns:
+            字典表示
+        """
+        data = asdict(self)
+        data['most_frequent_word'] = self.most_frequent_word
+        data['avg_word_frequency'] = self.avg_word_frequency
+        return data
+    
+    def to_json(self, indent: Optional[int] = None) -> str:
+        """
+        转换为 JSON 字符串
+        
+        Args:
+            indent: 缩进空格数
+            
+        Returns:
+            JSON 字符串
+        """
+        return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent)
+    
+    def save_to_file(self, file_path: str, format: OutputFormat = OutputFormat.JSON):
+        """
+        保存结果到文件
+        
+        Args:
+            file_path: 文件路径
+            format: 输出格式
+        """
+        os.makedirs(os.path.dirname(os.path.abspath(file_path)), exist_ok=True)
+        
+        if format == OutputFormat.JSON:
+            with open(file_path, 'w', encoding='utf-8') as f:
+                json.dump(self.to_dict(), f, ensure_ascii=False, indent=2)
+        elif format == OutputFormat.CSV:
+            with open(file_path, 'w', encoding='utf-8') as f:
+                f.write('word,count\n')
+                for word, count in sorted(self.word_counts.items(), key=lambda x: x[1], reverse=True):
+                    f.write(f'"{word}",{count}\n')
+        else:
+            with open(file_path, 'w', encoding='utf-8') as f:
+                f.write(f"Total words: {self.total_words}\n")
+                f.write(f"Unique words: {self.unique_words}\n")
+                f.write(f"Most frequent: {self.most_frequent_word}\n")
+                f.write(f"\nTop 20 words:\n")
+                for word, count in self.top_words[:20]:
+                    f.write(f"{word}: {count}\n")
 
 
 
 
 class WordCountStreaming:
 class WordCountStreaming:
     """
     """
-    Hadoop Streaming 方式的词频统计类
+    Hadoop Streaming 方式的词频统计类 - 现代化版本
     
     
     封装了 Hadoop Streaming 作业的执行,提供与 Java 版本 WordCount 类类似的功能。
     封装了 Hadoop Streaming 作业的执行,提供与 Java 版本 WordCount 类类似的功能。
+    
+    现代化特性:
+    - 配置管理集成
+    - 改进的错误处理
+    - 类型安全的数据类
+    - 增强的统计功能
+    - 灵活的命令行参数
     """
     """
     
     
-    def __init__(self, hadoop_home: Optional[str] = None, logger_name: str = 'wordcount_streaming'):
+    def __init__(self, 
+                 hadoop_home: Optional[str] = None, 
+                 logger_name: str = 'wordcount_streaming',
+                 config: Optional[MapReduceConfig] = None):
         """
         """
         初始化 WordCountStreaming 实例
         初始化 WordCountStreaming 实例
         
         
         Args:
         Args:
             hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
             hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
             logger_name: 日志器名称
             logger_name: 日志器名称
+            config: MapReduce 配置对象(可选)
         """
         """
         self.logger = setup_logger(logger_name)
         self.logger = setup_logger(logger_name)
-        self.hadoop_home = hadoop_home or __import__('os').environ.get('HADOOP_HOME', '')
+        
+        # 获取配置
+        if config is None:
+            config = get_config().mapreduce
+        
+        self.config = config
+        
+        # Hadoop 配置
+        self.hadoop_home = hadoop_home or os.environ.get('HADOOP_HOME', '')
         self.hadoop_cmd = 'hadoop'
         self.hadoop_cmd = 'hadoop'
         
         
-    def mapper(self, line: str) -> List[Tuple[str, int]]:
+        # 从配置中获取设置
+        self.use_combiner = config.use_combiner
+        self.num_reducers = config.num_reducers
+        self.job_timeout = config.job_timeout
+    
+    def mapper(self, line: str, 
+               case_sensitive: bool = False,
+               min_word_length: int = 1,
+               stop_words: Optional[List[str]] = None) -> List[Tuple[str, int]]:
         """
         """
         Mapper 函数:将一行文本分割为单词,输出 <单词, 1>
         Mapper 函数:将一行文本分割为单词,输出 <单词, 1>
         
         
@@ -50,6 +172,9 @@ class WordCountStreaming:
         
         
         Args:
         Args:
             line: 输入的一行文本
             line: 输入的一行文本
+            case_sensitive: 是否区分大小写(默认 False)
+            min_word_length: 最小单词长度(默认 1)
+            stop_words: 停用词列表(可选)
             
             
         Returns:
         Returns:
             单词和计数的元组列表
             单词和计数的元组列表
@@ -60,13 +185,29 @@ class WordCountStreaming:
             [('hello', 1), ('world', 1), ('hello', 1)]
             [('hello', 1), ('world', 1), ('hello', 1)]
         """
         """
         results = []
         results = []
+        
         # 分割文本为单词(使用空格、制表符等分隔符)
         # 分割文本为单词(使用空格、制表符等分隔符)
         words = line.strip().split()
         words = line.strip().split()
+        
         for word in words:
         for word in words:
-            # 清理单词(移除标点符号,转为小写)
-            word = word.strip('.,!?;:()[]{}"\'').lower()
-            if word:  # 确保单词非空
+            # 清理单词(移除标点符号)
+            word = word.strip('.,!?;:()[]{}"\'')
+            
+            # 处理大小写
+            if not case_sensitive:
+                word = word.lower()
+            
+            # 检查单词长度
+            if len(word) < min_word_length:
+                continue
+            
+            # 检查停用词
+            if stop_words and word in stop_words:
+                continue
+            
+            if word:
                 results.append((word, 1))
                 results.append((word, 1))
+        
         return results
         return results
     
     
     def combiner(self, pairs: List[Tuple[str, int]]) -> List[Tuple[str, int]]:
     def combiner(self, pairs: List[Tuple[str, int]]) -> List[Tuple[str, int]]:
@@ -112,14 +253,22 @@ class WordCountStreaming:
         total = sum(counts)
         total = sum(counts)
         return (word, total)
         return (word, total)
     
     
-    def run_mapper_from_stdin(self):
+    def run_mapper_from_stdin(self, 
+                               case_sensitive: bool = False,
+                               min_word_length: int = 1,
+                               stop_words: Optional[List[str]] = None):
         """
         """
         从标准输入运行 Mapper(用于 Hadoop Streaming)
         从标准输入运行 Mapper(用于 Hadoop Streaming)
         
         
         从 stdin 读取每行数据,执行 Mapper 逻辑,输出到 stdout。
         从 stdin 读取每行数据,执行 Mapper 逻辑,输出到 stdout。
+        
+        Args:
+            case_sensitive: 是否区分大小写
+            min_word_length: 最小单词长度
+            stop_words: 停用词列表
         """
         """
         for line in sys.stdin:
         for line in sys.stdin:
-            pairs = self.mapper(line)
+            pairs = self.mapper(line, case_sensitive, min_word_length, stop_words)
             for word, count in pairs:
             for word, count in pairs:
                 print(f"{word}\t{count}")
                 print(f"{word}\t{count}")
     
     
@@ -170,8 +319,9 @@ class WordCountStreaming:
     def run(self, input_path: str, output_path: str, 
     def run(self, input_path: str, output_path: str, 
             mapper_script: Optional[str] = None,
             mapper_script: Optional[str] = None,
             reducer_script: Optional[str] = None,
             reducer_script: Optional[str] = None,
-            combiner: bool = True,
-            num_reducers: int = 1) -> bool:
+            combiner: Optional[bool] = None,
+            num_reducers: Optional[int] = None,
+            timeout: Optional[int] = None) -> bool:
         """
         """
         运行完整的 WordCount 作业
         运行完整的 WordCount 作业
         
         
@@ -182,13 +332,21 @@ class WordCountStreaming:
             output_path: HDFS 输出路径(不能已存在)
             output_path: HDFS 输出路径(不能已存在)
             mapper_script: Mapper 脚本路径(可选,默认使用当前脚本)
             mapper_script: Mapper 脚本路径(可选,默认使用当前脚本)
             reducer_script: Reducer 脚本路径(可选,默认使用当前脚本)
             reducer_script: Reducer 脚本路径(可选,默认使用当前脚本)
-            combiner: 是否使用 Combiner
-            num_reducers: Reducer 任务数量
+            combiner: 是否使用 Combiner(可选,默认为配置中的设置)
+            num_reducers: Reducer 任务数量(可选,默认为配置中的设置)
+            timeout: 超时时间(秒,可选,默认为配置中的设置)
             
             
         Returns:
         Returns:
             作业是否成功完成
             作业是否成功完成
+            
+        Raises:
+            RuntimeError: 当找不到 Streaming jar 时
+            ValueError: 当输入路径无效时
         """
         """
-        import os
+        # 使用配置中的默认值
+        use_combiner = default_value(combiner, self.use_combiner)
+        reducers = default_value(num_reducers, self.num_reducers)
+        job_timeout = default_value(timeout, self.job_timeout)
         
         
         # 确定脚本路径
         # 确定脚本路径
         if mapper_script is None:
         if mapper_script is None:
@@ -200,7 +358,7 @@ class WordCountStreaming:
         streaming_jar = self._find_streaming_jar()
         streaming_jar = self._find_streaming_jar()
         if not streaming_jar:
         if not streaming_jar:
             self.logger.error("Could not find Hadoop Streaming jar")
             self.logger.error("Could not find Hadoop Streaming jar")
-            return False
+            raise RuntimeError("Hadoop Streaming jar not found")
         
         
         cmd_parts = [
         cmd_parts = [
             self.hadoop_cmd,
             self.hadoop_cmd,
@@ -210,16 +368,16 @@ class WordCountStreaming:
             '-reducer', f"python3 {os.path.basename(reducer_script)} reducer",
             '-reducer', f"python3 {os.path.basename(reducer_script)} reducer",
             '-input', input_path,
             '-input', input_path,
             '-output', output_path,
             '-output', output_path,
-            '-D', f"mapreduce.job.reduces={num_reducers}"
+            '-D', f"mapreduce.job.reduces={reducers}"
         ]
         ]
         
         
-        if combiner:
+        if use_combiner:
             cmd_parts.extend(['-combiner', f"python3 {os.path.basename(mapper_script)} mapper | sort | python3 {os.path.basename(reducer_script)} reducer"])
             cmd_parts.extend(['-combiner', f"python3 {os.path.basename(mapper_script)} mapper | sort | python3 {os.path.basename(reducer_script)} reducer"])
         
         
         cmd = ' '.join(cmd_parts)
         cmd = ' '.join(cmd_parts)
         self.logger.info(f"Running Hadoop Streaming job: {cmd}")
         self.logger.info(f"Running Hadoop Streaming job: {cmd}")
         
         
-        returncode, stdout, stderr = run_command(cmd, timeout=3600)  # 1小时超时
+        returncode, stdout, stderr = run_command(cmd, timeout=job_timeout)
         
         
         if returncode == 0:
         if returncode == 0:
             self.logger.info("WordCount job completed successfully")
             self.logger.info("WordCount job completed successfully")
@@ -237,7 +395,6 @@ class WordCountStreaming:
         Returns:
         Returns:
             Streaming jar 文件路径,如果未找到返回 None
             Streaming jar 文件路径,如果未找到返回 None
         """
         """
-        import os
         import glob
         import glob
         
         
         # 尝试从常见位置查找
         # 尝试从常见位置查找
@@ -265,7 +422,11 @@ class WordCountStreaming:
         
         
         return None
         return None
     
     
-    def count_words_locally(self, text: str) -> Dict[str, int]:
+    def count_words_locally(self, text: str,
+                             case_sensitive: bool = False,
+                             min_word_length: int = 1,
+                             stop_words: Optional[List[str]] = None,
+                             top_n: int = 10) -> WordCountResult:
         """
         """
         本地统计单词(不使用 Hadoop)
         本地统计单词(不使用 Hadoop)
         
         
@@ -273,19 +434,29 @@ class WordCountStreaming:
         
         
         Args:
         Args:
             text: 输入文本
             text: 输入文本
+            case_sensitive: 是否区分大小写
+            min_word_length: 最小单词长度
+            stop_words: 停用词列表
+            top_n: 返回前 N 个最常见的单词
             
             
         Returns:
         Returns:
-            单词计数字典
+            WordCountResult 结果对象
             
             
         Example:
         Example:
             >>> wc = WordCountStreaming()
             >>> wc = WordCountStreaming()
-            >>> wc.count_words_locally("hello world hello")
-            {'hello': 2, 'world': 1}
+            >>> result = wc.count_words_locally("hello world hello")
+            >>> result.total_words
+            3
+            >>> result.unique_words
+            2
         """
         """
+        import time
+        start_time = time.time()
+        
         # 模拟完整的 MapReduce 流程
         # 模拟完整的 MapReduce 流程
         all_pairs = []
         all_pairs = []
         for line in text.split('\n'):
         for line in text.split('\n'):
-            pairs = self.mapper(line)
+            pairs = self.mapper(line, case_sensitive, min_word_length, stop_words)
             all_pairs.extend(pairs)
             all_pairs.extend(pairs)
         
         
         # 按单词分组
         # 按单词分组
@@ -294,12 +465,109 @@ class WordCountStreaming:
             word_groups[word].append(count)
             word_groups[word].append(count)
         
         
         # 执行 Reduce
         # 执行 Reduce
-        results = {}
+        word_counts = {}
         for word, counts in word_groups.items():
         for word, counts in word_groups.items():
             _, total = self.reducer(word, counts)
             _, total = self.reducer(word, counts)
-            results[word] = total
+            word_counts[word] = total
         
         
-        return results
+        # 计算统计信息
+        total_words = sum(word_counts.values())
+        unique_words = len(word_counts)
+        top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
+        
+        # 计算执行时间
+        execution_time_ms = (time.time() - start_time) * 1000
+        
+        return WordCountResult(
+            total_words=total_words,
+            unique_words=unique_words,
+            top_words=top_words,
+            word_counts=word_counts,
+            execution_time_ms=execution_time_ms
+        )
+    
+    def analyze_text(self, text: str) -> Dict[str, Any]:
+        """
+        分析文本的详细统计信息
+        
+        Args:
+            text: 输入文本
+            
+        Returns:
+            包含详细统计信息的字典
+        """
+        result = self.count_words_locally(text)
+        
+        # 计算额外的统计信息
+        word_lengths = [len(word) for word in result.word_counts.keys()]
+        
+        analysis = {
+            'total_words': result.total_words,
+            'unique_words': result.unique_words,
+            'most_frequent_word': result.most_frequent_word,
+            'avg_word_frequency': result.avg_word_frequency,
+            'min_word_length': min(word_lengths) if word_lengths else 0,
+            'max_word_length': max(word_lengths) if word_lengths else 0,
+            'avg_word_length': sum(word_lengths) / len(word_lengths) if word_lengths else 0,
+            'word_frequency_distribution': self._get_frequency_distribution(result.word_counts),
+            'top_10_words': result.top_words[:10]
+        }
+        
+        return analysis
+    
+    def _get_frequency_distribution(self, word_counts: Dict[str, int]) -> Dict[str, int]:
+        """
+        获取词频分布
+        
+        Args:
+            word_counts: 单词计数字典
+            
+        Returns:
+            词频分布(出现1次的单词数、出现2次的单词数等)
+        """
+        distribution = defaultdict(int)
+        for count in word_counts.values():
+            if count == 1:
+                distribution['once'] += 1
+            elif count <= 5:
+                distribution['2-5'] += 1
+            elif count <= 10:
+                distribution['6-10'] += 1
+            else:
+                distribution['10+'] += 1
+        return dict(distribution)
+    
+    def get_stop_words(self, language: str = 'english') -> List[str]:
+        """
+        获取常见停用词列表
+        
+        Args:
+            language: 语言(默认 'english')
+            
+        Returns:
+            停用词列表
+        """
+        # 常见英语停用词
+        english_stop_words = [
+            'the', 'be', 'to', 'of', 'and', 'a', 'in', 'that', 'have', 'i',
+            'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you', 'do',
+            'at', 'this', 'but', 'his', 'by', 'from', 'they', 'we', 'say',
+            'her', 'she', 'or', 'an', 'will', 'my', 'one', 'all', 'would',
+            'there', 'their', 'what', 'so', 'up', 'out', 'if', 'about',
+            'who', 'get', 'which', 'go', 'me', 'when', 'make', 'can',
+            'like', 'time', 'no', 'just', 'him', 'know', 'take', 'people',
+            'into', 'year', 'your', 'good', 'some', 'could', 'them', 'see',
+            'other', 'than', 'then', 'now', 'look', 'only', 'come', 'its',
+            'over', 'think', 'also', 'back', 'after', 'use', 'two', 'how',
+            'our', 'work', 'first', 'well', 'way', 'even', 'new', 'want',
+            'because', 'any', 'these', 'give', 'day', 'most', 'us'
+        ]
+        
+        if language.lower() == 'english':
+            return english_stop_words
+        else:
+            # 默认返回英语停用词
+            return english_stop_words
 
 
 
 
 def main():
 def main():
@@ -310,36 +578,189 @@ def main():
     - mapper: 运行 Mapper
     - mapper: 运行 Mapper
     - reducer: 运行 Reducer
     - reducer: 运行 Reducer
     - local: 本地测试
     - local: 本地测试
+    - analyze: 分析文本
     """
     """
-    if len(sys.argv) < 2:
-        print("Usage: python wordcount_streaming.py <command>")
-        print("Commands:")
-        print("  mapper   - Run Mapper from stdin")
-        print("  reducer  - Run Reducer from stdin")
-        print("  local    - Run local test")
+    import argparse
+    
+    parser = argparse.ArgumentParser(
+        description='Hadoop Streaming WordCount (Modern Version)',
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog='''
+Examples:
+  # 从标准输入运行 Mapper
+  python wordcount_streaming.py mapper < input.txt
+  
+  # 从标准输入运行 Reducer
+  python wordcount_streaming.py reducer < mapper_output.txt
+  
+  # 本地测试
+  python wordcount_streaming.py local
+  
+  # 分析文本文件
+  python wordcount_streaming.py analyze --file input.txt --stop-words
+  
+  # 使用增强的 Mapper 参数
+  python wordcount_streaming.py mapper --case-sensitive --min-length 3 < input.txt
+        '''
+    )
+    
+    # 子命令
+    subparsers = parser.add_subparsers(dest='command', help='Available commands')
+    
+    # mapper 子命令
+    mapper_parser = subparsers.add_parser('mapper', help='Run Mapper from stdin')
+    mapper_parser.add_argument('--case-sensitive', '-c', action='store_true',
+                               help='Case-sensitive word matching')
+    mapper_parser.add_argument('--min-length', '-m', type=int, default=1,
+                               help='Minimum word length (default: 1)')
+    mapper_parser.add_argument('--stop-words', '-s', action='store_true',
+                               help='Use common English stop words')
+    mapper_parser.add_argument('--stop-words-file', type=str, default=None,
+                               help='Custom stop words file path')
+    
+    # reducer 子命令
+    reducer_parser = subparsers.add_parser('reducer', help='Run Reducer from stdin')
+    
+    # local 子命令
+    local_parser = subparsers.add_parser('local', help='Run local test')
+    local_parser.add_argument('--file', '-f', type=str, default=None,
+                              help='Input file path (default: use sample text)')
+    local_parser.add_argument('--case-sensitive', '-c', action='store_true',
+                               help='Case-sensitive word matching')
+    local_parser.add_argument('--min-length', '-m', type=int, default=1,
+                               help='Minimum word length (default: 1)')
+    local_parser.add_argument('--stop-words', '-s', action='store_true',
+                               help='Use common English stop words')
+    local_parser.add_argument('--top-n', '-n', type=int, default=10,
+                               help='Number of top words to show (default: 10)')
+    local_parser.add_argument('--format', '-fmt', type=str, default='text',
+                               choices=['text', 'json', 'csv'],
+                               help='Output format (default: text)')
+    local_parser.add_argument('--output', '-o', type=str, default=None,
+                               help='Output file path (optional)')
+    
+    # analyze 子命令
+    analyze_parser = subparsers.add_parser('analyze', help='Analyze text statistics')
+    analyze_parser.add_argument('--file', '-f', type=str, required=True,
+                                help='Input file path')
+    analyze_parser.add_argument('--format', '-fmt', type=str, default='json',
+                               choices=['text', 'json'],
+                               help='Output format (default: json)')
+    
+    args = parser.parse_args()
+    
+    if not args.command:
+        parser.print_help()
         sys.exit(1)
         sys.exit(1)
     
     
-    command = sys.argv[1]
     wc = WordCountStreaming()
     wc = WordCountStreaming()
     
     
-    if command == 'mapper':
-        wc.run_mapper_from_stdin()
-    elif command == 'reducer':
+    # 处理停用词
+    stop_words = None
+    if hasattr(args, 'stop_words') and args.stop_words:
+        stop_words = wc.get_stop_words()
+    
+    if hasattr(args, 'stop_words_file') and args.stop_words_file:
+        if os.path.exists(args.stop_words_file):
+            with open(args.stop_words_file, 'r', encoding='utf-8') as f:
+                custom_stop_words = [line.strip().lower() for line in f if line.strip()]
+            if stop_words is None:
+                stop_words = custom_stop_words
+            else:
+                stop_words.extend(custom_stop_words)
+    
+    if args.command == 'mapper':
+        wc.run_mapper_from_stdin(
+            case_sensitive=getattr(args, 'case_sensitive', False),
+            min_word_length=getattr(args, 'min_length', 1),
+            stop_words=stop_words
+        )
+    
+    elif args.command == 'reducer':
         wc.run_reducer_from_stdin()
         wc.run_reducer_from_stdin()
-    elif command == 'local':
+    
+    elif args.command == 'local':
         # 本地测试
         # 本地测试
-        test_text = """
-        Hello world, hello Hadoop!
-        Hadoop is great for big data.
-        Big data processing with Hadoop.
-        """
-        result = wc.count_words_locally(test_text)
-        print("Word count results:")
-        for word, count in sorted(result.items(), key=lambda x: x[1], reverse=True):
-            print(f"{word}: {count}")
-    else:
-        print(f"Unknown command: {command}")
-        sys.exit(1)
+        if args.file and os.path.exists(args.file):
+            with open(args.file, 'r', encoding='utf-8') as f:
+                test_text = f.read()
+        else:
+            # 使用示例文本
+            test_text = """
+            Hello world, hello Hadoop!
+            Hadoop is great for big data.
+            Big data processing with Hadoop.
+            Spark is fast and general engine for large-scale data processing.
+            Hadoop provides massive storage for any kind of data.
+            """
+        
+        # 执行统计
+        result = wc.count_words_locally(
+            test_text,
+            case_sensitive=args.case_sensitive,
+            min_word_length=args.min_length,
+            stop_words=stop_words,
+            top_n=args.top_n
+        )
+        
+        # 确定输出格式
+        output_format = OutputFormat(args.format)
+        
+        # 输出结果
+        if args.output:
+            result.save_to_file(args.output, output_format)
+            print(f"Result saved to: {args.output}")
+        else:
+            if output_format == OutputFormat.JSON:
+                print(result.to_json(indent=2))
+            elif output_format == OutputFormat.CSV:
+                print('word,count')
+                for word, count in sorted(result.word_counts.items(), key=lambda x: x[1], reverse=True):
+                    print(f'"{word}",{count}')
+            else:
+                print(f"\n{'='*60}")
+                print(f"Word Count Results (Local Mode)")
+                print(f"{'='*60}")
+                print(f"\nTotal words: {result.total_words:,}")
+                print(f"Unique words: {result.unique_words:,}")
+                print(f"Execution time: {result.execution_time_ms:.2f} ms")
+                
+                if result.most_frequent_word:
+                    print(f"\nMost frequent word: {result.most_frequent_word[0]} ({result.most_frequent_word[1]} times)")
+                
+                print(f"\nTop {args.top_n} words:")
+                for word, count in result.top_words:
+                    print(f"  {word}: {count}")
+    
+    elif args.command == 'analyze':
+        # 分析文本
+        with open(args.file, 'r', encoding='utf-8') as f:
+            text = f.read()
+        
+        analysis = wc.analyze_text(text)
+        
+        if args.format == 'json':
+            print(json.dumps(analysis, ensure_ascii=False, indent=2))
+        else:
+            print(f"\n{'='*60}")
+            print(f"Text Analysis Report")
+            print(f"{'='*60}")
+            print(f"\nFile: {args.file}")
+            print(f"\nBasic Statistics:")
+            print(f"  Total words: {analysis['total_words']:,}")
+            print(f"  Unique words: {analysis['unique_words']:,}")
+            print(f"  Most frequent: {analysis['most_frequent_word']}")
+            print(f"  Average frequency: {analysis['avg_word_frequency']:.2f}")
+            print(f"\nWord Length Statistics:")
+            print(f"  Min word length: {analysis['min_word_length']}")
+            print(f"  Max word length: {analysis['max_word_length']}")
+            print(f"  Average word length: {analysis['avg_word_length']:.2f}")
+            print(f"\nWord Frequency Distribution:")
+            for key, value in analysis['word_frequency_distribution'].items():
+                print(f"  {key}: {value}")
+            print(f"\nTop 10 Words:")
+            for word, count in analysis['top_10_words']:
+                print(f"  {word}: {count}")
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':