| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767 |
- """
- Hadoop Streaming 方式的词频统计模块 - 现代化版本
- 对应 Java 版本的 WordCount 类,使用 Hadoop Streaming 方式实现:
- - Mapper: 从标准输入读取数据,分割为单词,输出 <单词, 1>
- - Reducer: 从标准输入读取 Mapper 输出,统计每个单词的总次数
- - Combiner: 可选的本地合并,减少数据传输
- 现代化特性:
- - 配置管理集成
- - 改进的错误处理
- - 类型安全的数据类
- - 增强的统计功能
- - 灵活的命令行参数
- """
- import sys
- import os
- import json
- from collections import defaultdict
- from dataclasses import dataclass, field, asdict
- from typing import Dict, List, Optional, Tuple, Any, Iterator
- from enum import Enum
- from ..utils.helpers import run_command, setup_logger, format_file_size, default_value
- from ..config import ConfigurationManager, MapReduceConfig, get_config
- class OutputFormat(Enum):
- """
- 输出格式枚举
- """
- TEXT = 'text'
- JSON = 'json'
- CSV = 'csv'
- @dataclass
- class WordCountResult:
- """
- 词频统计结果数据类
- """
- total_words: int = 0
- unique_words: int = 0
- top_words: List[Tuple[str, int]] = field(default_factory=list)
- word_counts: Dict[str, int] = field(default_factory=dict)
- execution_time_ms: float = 0.0
-
- @property
- def most_frequent_word(self) -> Optional[Tuple[str, int]]:
- """
- 获取出现频率最高的单词
-
- Returns:
- (单词, 次数) 元组,如果没有单词返回 None
- """
- return self.top_words[0] if self.top_words else None
-
- @property
- def avg_word_frequency(self) -> float:
- """
- 计算平均词频
-
- Returns:
- 平均每个单词出现的次数
- """
- return self.total_words / self.unique_words if self.unique_words > 0 else 0.0
-
- def to_dict(self) -> Dict[str, Any]:
- """
- 转换为字典
-
- Returns:
- 字典表示
- """
- data = asdict(self)
- data['most_frequent_word'] = self.most_frequent_word
- data['avg_word_frequency'] = self.avg_word_frequency
- return data
-
- def to_json(self, indent: Optional[int] = None) -> str:
- """
- 转换为 JSON 字符串
-
- Args:
- indent: 缩进空格数
-
- Returns:
- JSON 字符串
- """
- return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent)
-
- def save_to_file(self, file_path: str, format: OutputFormat = OutputFormat.JSON):
- """
- 保存结果到文件
-
- Args:
- file_path: 文件路径
- format: 输出格式
- """
- os.makedirs(os.path.dirname(os.path.abspath(file_path)), exist_ok=True)
-
- if format == OutputFormat.JSON:
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(self.to_dict(), f, ensure_ascii=False, indent=2)
- elif format == OutputFormat.CSV:
- with open(file_path, 'w', encoding='utf-8') as f:
- f.write('word,count\n')
- for word, count in sorted(self.word_counts.items(), key=lambda x: x[1], reverse=True):
- f.write(f'"{word}",{count}\n')
- else:
- with open(file_path, 'w', encoding='utf-8') as f:
- f.write(f"Total words: {self.total_words}\n")
- f.write(f"Unique words: {self.unique_words}\n")
- f.write(f"Most frequent: {self.most_frequent_word}\n")
- f.write(f"\nTop 20 words:\n")
- for word, count in self.top_words[:20]:
- f.write(f"{word}: {count}\n")
- class WordCountStreaming:
- """
- Hadoop Streaming 方式的词频统计类 - 现代化版本
-
- 封装了 Hadoop Streaming 作业的执行,提供与 Java 版本 WordCount 类类似的功能。
-
- 现代化特性:
- - 配置管理集成
- - 改进的错误处理
- - 类型安全的数据类
- - 增强的统计功能
- - 灵活的命令行参数
- """
-
- def __init__(self,
- hadoop_home: Optional[str] = None,
- logger_name: str = 'wordcount_streaming',
- config: Optional[MapReduceConfig] = None):
- """
- 初始化 WordCountStreaming 实例
-
- Args:
- hadoop_home: Hadoop 安装目录(可选,默认从环境变量获取)
- logger_name: 日志器名称
- config: MapReduce 配置对象(可选)
- """
- self.logger = setup_logger(logger_name)
-
- # 获取配置
- if config is None:
- config = get_config().mapreduce
-
- self.config = config
-
- # Hadoop 配置
- self.hadoop_home = hadoop_home or os.environ.get('HADOOP_HOME', '')
- self.hadoop_cmd = 'hadoop'
-
- # 从配置中获取设置
- self.use_combiner = config.use_combiner
- self.num_reducers = config.num_reducers
- self.job_timeout = config.job_timeout
-
- def mapper(self, line: str,
- case_sensitive: bool = False,
- min_word_length: int = 1,
- stop_words: Optional[List[str]] = None) -> List[Tuple[str, int]]:
- """
- Mapper 函数:将一行文本分割为单词,输出 <单词, 1>
-
- 对应 Java 版本的 TokenizerMapper.map 方法。
-
- Args:
- line: 输入的一行文本
- case_sensitive: 是否区分大小写(默认 False)
- min_word_length: 最小单词长度(默认 1)
- stop_words: 停用词列表(可选)
-
- Returns:
- 单词和计数的元组列表
-
- Example:
- >>> wc = WordCountStreaming()
- >>> wc.mapper("hello world hello")
- [('hello', 1), ('world', 1), ('hello', 1)]
- """
- results = []
-
- # 分割文本为单词(使用空格、制表符等分隔符)
- words = line.strip().split()
-
- for word in words:
- # 清理单词(移除标点符号)
- word = word.strip('.,!?;:()[]{}"\'')
-
- # 处理大小写
- if not case_sensitive:
- word = word.lower()
-
- # 检查单词长度
- if len(word) < min_word_length:
- continue
-
- # 检查停用词
- if stop_words and word in stop_words:
- continue
-
- if word:
- results.append((word, 1))
-
- return results
-
- def combiner(self, pairs: List[Tuple[str, int]]) -> List[Tuple[str, int]]:
- """
- Combiner 函数:在 Mapper 端进行本地合并
-
- 对应 Java 版本的 Combiner 功能,减少数据传输量。
-
- Args:
- pairs: Mapper 输出的 <单词, 1> 列表
-
- Returns:
- 合并后的 <单词, 本地计数> 列表
-
- Example:
- >>> wc = WordCountStreaming()
- >>> wc.combiner([('hello', 1), ('world', 1), ('hello', 1)])
- [('hello', 2), ('world', 1)]
- """
- word_counts = defaultdict(int)
- for word, count in pairs:
- word_counts[word] += count
- return [(word, count) for word, count in word_counts.items()]
-
- def reducer(self, word: str, counts: List[int]) -> Tuple[str, int]:
- """
- Reducer 函数:统计每个单词的总次数
-
- 对应 Java 版本的 IntSumReducer.reduce 方法。
-
- Args:
- word: 单词
- counts: 该单词的所有计数列表
-
- Returns:
- <单词, 总次数> 元组
-
- Example:
- >>> wc = WordCountStreaming()
- >>> wc.reducer('hello', [1, 1, 1])
- ('hello', 3)
- """
- total = sum(counts)
- return (word, total)
-
- def run_mapper_from_stdin(self,
- case_sensitive: bool = False,
- min_word_length: int = 1,
- stop_words: Optional[List[str]] = None):
- """
- 从标准输入运行 Mapper(用于 Hadoop Streaming)
-
- 从 stdin 读取每行数据,执行 Mapper 逻辑,输出到 stdout。
-
- Args:
- case_sensitive: 是否区分大小写
- min_word_length: 最小单词长度
- stop_words: 停用词列表
- """
- for line in sys.stdin:
- pairs = self.mapper(line, case_sensitive, min_word_length, stop_words)
- for word, count in pairs:
- print(f"{word}\t{count}")
-
- def run_reducer_from_stdin(self):
- """
- 从标准输入运行 Reducer(用于 Hadoop Streaming)
-
- 从 stdin 读取 Mapper 输出,执行 Reducer 逻辑,输出到 stdout。
- 假设输入已经按键排序(Hadoop Streaming 会自动排序)。
- """
- current_word = None
- current_counts = []
-
- for line in sys.stdin:
- line = line.strip()
- if not line:
- continue
-
- # 解析输入:单词\t计数
- parts = line.split('\t', 1)
- if len(parts) != 2:
- continue
-
- word, count_str = parts
- try:
- count = int(count_str)
- except ValueError:
- continue
-
- # 处理相同单词的计数
- if current_word == word:
- current_counts.append(count)
- else:
- # 输出前一个单词的结果
- if current_word is not None:
- result_word, result_count = self.reducer(current_word, current_counts)
- print(f"{result_word}\t{result_count}")
-
- # 开始处理新单词
- current_word = word
- current_counts = [count]
-
- # 输出最后一个单词的结果
- if current_word is not None:
- result_word, result_count = self.reducer(current_word, current_counts)
- print(f"{result_word}\t{result_count}")
-
- def run(self, input_path: str, output_path: str,
- mapper_script: Optional[str] = None,
- reducer_script: Optional[str] = None,
- combiner: Optional[bool] = None,
- num_reducers: Optional[int] = None,
- timeout: Optional[int] = None) -> bool:
- """
- 运行完整的 WordCount 作业
-
- 使用 Hadoop Streaming 提交作业到 Hadoop 集群。
-
- Args:
- input_path: HDFS 输入路径
- output_path: HDFS 输出路径(不能已存在)
- mapper_script: Mapper 脚本路径(可选,默认使用当前脚本)
- reducer_script: Reducer 脚本路径(可选,默认使用当前脚本)
- combiner: 是否使用 Combiner(可选,默认为配置中的设置)
- num_reducers: Reducer 任务数量(可选,默认为配置中的设置)
- timeout: 超时时间(秒,可选,默认为配置中的设置)
-
- Returns:
- 作业是否成功完成
-
- Raises:
- RuntimeError: 当找不到 Streaming jar 时
- ValueError: 当输入路径无效时
- """
- # 使用配置中的默认值
- use_combiner = default_value(combiner, self.use_combiner)
- reducers = default_value(num_reducers, self.num_reducers)
- job_timeout = default_value(timeout, self.job_timeout)
-
- # 确定脚本路径
- if mapper_script is None:
- mapper_script = __file__
- if reducer_script is None:
- reducer_script = __file__
-
- # 构建 Hadoop Streaming 命令
- streaming_jar = self._find_streaming_jar()
- if not streaming_jar:
- self.logger.error("Could not find Hadoop Streaming jar")
- raise RuntimeError("Hadoop Streaming jar not found")
-
- cmd_parts = [
- self.hadoop_cmd,
- 'jar', streaming_jar,
- '-files', f"{mapper_script},{reducer_script}",
- '-mapper', f"python3 {os.path.basename(mapper_script)} mapper",
- '-reducer', f"python3 {os.path.basename(reducer_script)} reducer",
- '-input', input_path,
- '-output', output_path,
- '-D', f"mapreduce.job.reduces={reducers}"
- ]
-
- if use_combiner:
- cmd_parts.extend(['-combiner', f"python3 {os.path.basename(mapper_script)} mapper | sort | python3 {os.path.basename(reducer_script)} reducer"])
-
- cmd = ' '.join(cmd_parts)
- self.logger.info(f"Running Hadoop Streaming job: {cmd}")
-
- returncode, stdout, stderr = run_command(cmd, timeout=job_timeout)
-
- if returncode == 0:
- self.logger.info("WordCount job completed successfully")
- self.logger.info(f"Output: {stdout}")
- return True
- else:
- self.logger.error(f"WordCount job failed with return code {returncode}")
- self.logger.error(f"Stderr: {stderr}")
- return False
-
- def _find_streaming_jar(self) -> Optional[str]:
- """
- 查找 Hadoop Streaming jar 文件
-
- Returns:
- Streaming jar 文件路径,如果未找到返回 None
- """
- import glob
-
- # 尝试从常见位置查找
- search_paths = [
- os.path.join(self.hadoop_home, 'share', 'hadoop', 'tools', 'lib'),
- os.path.join(self.hadoop_home, 'contrib', 'streaming'),
- '/usr/lib/hadoop-mapreduce',
- '/usr/hdp/current/hadoop-mapreduce-client'
- ]
-
- for path in search_paths:
- if os.path.exists(path):
- jars = glob.glob(os.path.join(path, 'hadoop-streaming-*.jar'))
- if jars:
- return jars[0]
-
- # 尝试使用 hadoop classpath 查找
- returncode, stdout, stderr = run_command(f"{self.hadoop_cmd} classpath --glob")
- if returncode == 0:
- # 解析 classpath,查找 streaming jar
- classpath = stdout.strip()
- for part in classpath.split(os.pathsep):
- if 'streaming' in part.lower() and part.endswith('.jar'):
- return part
-
- return None
-
- def count_words_locally(self, text: str,
- case_sensitive: bool = False,
- min_word_length: int = 1,
- stop_words: Optional[List[str]] = None,
- top_n: int = 10) -> WordCountResult:
- """
- 本地统计单词(不使用 Hadoop)
-
- 用于测试和小规模数据处理。
-
- Args:
- text: 输入文本
- case_sensitive: 是否区分大小写
- min_word_length: 最小单词长度
- stop_words: 停用词列表
- top_n: 返回前 N 个最常见的单词
-
- Returns:
- WordCountResult 结果对象
-
- Example:
- >>> wc = WordCountStreaming()
- >>> result = wc.count_words_locally("hello world hello")
- >>> result.total_words
- 3
- >>> result.unique_words
- 2
- """
- import time
- start_time = time.time()
-
- # 模拟完整的 MapReduce 流程
- all_pairs = []
- for line in text.split('\n'):
- pairs = self.mapper(line, case_sensitive, min_word_length, stop_words)
- all_pairs.extend(pairs)
-
- # 按单词分组
- word_groups = defaultdict(list)
- for word, count in all_pairs:
- word_groups[word].append(count)
-
- # 执行 Reduce
- word_counts = {}
- for word, counts in word_groups.items():
- _, total = self.reducer(word, counts)
- word_counts[word] = total
-
- # 计算统计信息
- total_words = sum(word_counts.values())
- unique_words = len(word_counts)
- top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
-
- # 计算执行时间
- execution_time_ms = (time.time() - start_time) * 1000
-
- return WordCountResult(
- total_words=total_words,
- unique_words=unique_words,
- top_words=top_words,
- word_counts=word_counts,
- execution_time_ms=execution_time_ms
- )
-
- def analyze_text(self, text: str) -> Dict[str, Any]:
- """
- 分析文本的详细统计信息
-
- Args:
- text: 输入文本
-
- Returns:
- 包含详细统计信息的字典
- """
- result = self.count_words_locally(text)
-
- # 计算额外的统计信息
- word_lengths = [len(word) for word in result.word_counts.keys()]
-
- analysis = {
- 'total_words': result.total_words,
- 'unique_words': result.unique_words,
- 'most_frequent_word': result.most_frequent_word,
- 'avg_word_frequency': result.avg_word_frequency,
- 'min_word_length': min(word_lengths) if word_lengths else 0,
- 'max_word_length': max(word_lengths) if word_lengths else 0,
- 'avg_word_length': sum(word_lengths) / len(word_lengths) if word_lengths else 0,
- 'word_frequency_distribution': self._get_frequency_distribution(result.word_counts),
- 'top_10_words': result.top_words[:10]
- }
-
- return analysis
-
- def _get_frequency_distribution(self, word_counts: Dict[str, int]) -> Dict[str, int]:
- """
- 获取词频分布
-
- Args:
- word_counts: 单词计数字典
-
- Returns:
- 词频分布(出现1次的单词数、出现2次的单词数等)
- """
- distribution = defaultdict(int)
- for count in word_counts.values():
- if count == 1:
- distribution['once'] += 1
- elif count <= 5:
- distribution['2-5'] += 1
- elif count <= 10:
- distribution['6-10'] += 1
- else:
- distribution['10+'] += 1
- return dict(distribution)
-
- def get_stop_words(self, language: str = 'english') -> List[str]:
- """
- 获取常见停用词列表
-
- Args:
- language: 语言(默认 'english')
-
- Returns:
- 停用词列表
- """
- # 常见英语停用词
- english_stop_words = [
- 'the', 'be', 'to', 'of', 'and', 'a', 'in', 'that', 'have', 'i',
- 'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you', 'do',
- 'at', 'this', 'but', 'his', 'by', 'from', 'they', 'we', 'say',
- 'her', 'she', 'or', 'an', 'will', 'my', 'one', 'all', 'would',
- 'there', 'their', 'what', 'so', 'up', 'out', 'if', 'about',
- 'who', 'get', 'which', 'go', 'me', 'when', 'make', 'can',
- 'like', 'time', 'no', 'just', 'him', 'know', 'take', 'people',
- 'into', 'year', 'your', 'good', 'some', 'could', 'them', 'see',
- 'other', 'than', 'then', 'now', 'look', 'only', 'come', 'its',
- 'over', 'think', 'also', 'back', 'after', 'use', 'two', 'how',
- 'our', 'work', 'first', 'well', 'way', 'even', 'new', 'want',
- 'because', 'any', 'these', 'give', 'day', 'most', 'us'
- ]
-
- if language.lower() == 'english':
- return english_stop_words
- else:
- # 默认返回英语停用词
- return english_stop_words
- def main():
- """
- 主函数:作为独立脚本运行
-
- 支持的命令:
- - mapper: 运行 Mapper
- - reducer: 运行 Reducer
- - local: 本地测试
- - analyze: 分析文本
- """
- import argparse
-
- parser = argparse.ArgumentParser(
- description='Hadoop Streaming WordCount (Modern Version)',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog='''
- Examples:
- # 从标准输入运行 Mapper
- python wordcount_streaming.py mapper < input.txt
-
- # 从标准输入运行 Reducer
- python wordcount_streaming.py reducer < mapper_output.txt
-
- # 本地测试
- python wordcount_streaming.py local
-
- # 分析文本文件
- python wordcount_streaming.py analyze --file input.txt --stop-words
-
- # 使用增强的 Mapper 参数
- python wordcount_streaming.py mapper --case-sensitive --min-length 3 < input.txt
- '''
- )
-
- # 子命令
- subparsers = parser.add_subparsers(dest='command', help='Available commands')
-
- # mapper 子命令
- mapper_parser = subparsers.add_parser('mapper', help='Run Mapper from stdin')
- mapper_parser.add_argument('--case-sensitive', '-c', action='store_true',
- help='Case-sensitive word matching')
- mapper_parser.add_argument('--min-length', '-m', type=int, default=1,
- help='Minimum word length (default: 1)')
- mapper_parser.add_argument('--stop-words', '-s', action='store_true',
- help='Use common English stop words')
- mapper_parser.add_argument('--stop-words-file', type=str, default=None,
- help='Custom stop words file path')
-
- # reducer 子命令
- reducer_parser = subparsers.add_parser('reducer', help='Run Reducer from stdin')
-
- # local 子命令
- local_parser = subparsers.add_parser('local', help='Run local test')
- local_parser.add_argument('--file', '-f', type=str, default=None,
- help='Input file path (default: use sample text)')
- local_parser.add_argument('--case-sensitive', '-c', action='store_true',
- help='Case-sensitive word matching')
- local_parser.add_argument('--min-length', '-m', type=int, default=1,
- help='Minimum word length (default: 1)')
- local_parser.add_argument('--stop-words', '-s', action='store_true',
- help='Use common English stop words')
- local_parser.add_argument('--top-n', '-n', type=int, default=10,
- help='Number of top words to show (default: 10)')
- local_parser.add_argument('--format', '-fmt', type=str, default='text',
- choices=['text', 'json', 'csv'],
- help='Output format (default: text)')
- local_parser.add_argument('--output', '-o', type=str, default=None,
- help='Output file path (optional)')
-
- # analyze 子命令
- analyze_parser = subparsers.add_parser('analyze', help='Analyze text statistics')
- analyze_parser.add_argument('--file', '-f', type=str, required=True,
- help='Input file path')
- analyze_parser.add_argument('--format', '-fmt', type=str, default='json',
- choices=['text', 'json'],
- help='Output format (default: json)')
-
- args = parser.parse_args()
-
- if not args.command:
- parser.print_help()
- sys.exit(1)
-
- wc = WordCountStreaming()
-
- # 处理停用词
- stop_words = None
- if hasattr(args, 'stop_words') and args.stop_words:
- stop_words = wc.get_stop_words()
-
- if hasattr(args, 'stop_words_file') and args.stop_words_file:
- if os.path.exists(args.stop_words_file):
- with open(args.stop_words_file, 'r', encoding='utf-8') as f:
- custom_stop_words = [line.strip().lower() for line in f if line.strip()]
- if stop_words is None:
- stop_words = custom_stop_words
- else:
- stop_words.extend(custom_stop_words)
-
- if args.command == 'mapper':
- wc.run_mapper_from_stdin(
- case_sensitive=getattr(args, 'case_sensitive', False),
- min_word_length=getattr(args, 'min_length', 1),
- stop_words=stop_words
- )
-
- elif args.command == 'reducer':
- wc.run_reducer_from_stdin()
-
- elif args.command == 'local':
- # 本地测试
- if args.file and os.path.exists(args.file):
- with open(args.file, 'r', encoding='utf-8') as f:
- test_text = f.read()
- else:
- # 使用示例文本
- test_text = """
- Hello world, hello Hadoop!
- Hadoop is great for big data.
- Big data processing with Hadoop.
- Spark is fast and general engine for large-scale data processing.
- Hadoop provides massive storage for any kind of data.
- """
-
- # 执行统计
- result = wc.count_words_locally(
- test_text,
- case_sensitive=args.case_sensitive,
- min_word_length=args.min_length,
- stop_words=stop_words,
- top_n=args.top_n
- )
-
- # 确定输出格式
- output_format = OutputFormat(args.format)
-
- # 输出结果
- if args.output:
- result.save_to_file(args.output, output_format)
- print(f"Result saved to: {args.output}")
- else:
- if output_format == OutputFormat.JSON:
- print(result.to_json(indent=2))
- elif output_format == OutputFormat.CSV:
- print('word,count')
- for word, count in sorted(result.word_counts.items(), key=lambda x: x[1], reverse=True):
- print(f'"{word}",{count}')
- else:
- print(f"\n{'='*60}")
- print(f"Word Count Results (Local Mode)")
- print(f"{'='*60}")
- print(f"\nTotal words: {result.total_words:,}")
- print(f"Unique words: {result.unique_words:,}")
- print(f"Execution time: {result.execution_time_ms:.2f} ms")
-
- if result.most_frequent_word:
- print(f"\nMost frequent word: {result.most_frequent_word[0]} ({result.most_frequent_word[1]} times)")
-
- print(f"\nTop {args.top_n} words:")
- for word, count in result.top_words:
- print(f" {word}: {count}")
-
- elif args.command == 'analyze':
- # 分析文本
- with open(args.file, 'r', encoding='utf-8') as f:
- text = f.read()
-
- analysis = wc.analyze_text(text)
-
- if args.format == 'json':
- print(json.dumps(analysis, ensure_ascii=False, indent=2))
- else:
- print(f"\n{'='*60}")
- print(f"Text Analysis Report")
- print(f"{'='*60}")
- print(f"\nFile: {args.file}")
- print(f"\nBasic Statistics:")
- print(f" Total words: {analysis['total_words']:,}")
- print(f" Unique words: {analysis['unique_words']:,}")
- print(f" Most frequent: {analysis['most_frequent_word']}")
- print(f" Average frequency: {analysis['avg_word_frequency']:.2f}")
- print(f"\nWord Length Statistics:")
- print(f" Min word length: {analysis['min_word_length']}")
- print(f" Max word length: {analysis['max_word_length']}")
- print(f" Average word length: {analysis['avg_word_length']:.2f}")
- print(f"\nWord Frequency Distribution:")
- for key, value in analysis['word_frequency_distribution'].items():
- print(f" {key}: {value}")
- print(f"\nTop 10 Words:")
- for word, count in analysis['top_10_words']:
- print(f" {word}: {count}")
- if __name__ == '__main__':
- main()
|