import re import time import random import logging from functools import wraps from typing import Any, Callable, TypeVar, Tuple T = TypeVar('T') def retry(times: int = 3, delay: float = 2.0, backoff: float = 2.0, exceptions: Tuple = (Exception,)) -> Callable: """ 重试装饰器 :param times: 重试次数 :param delay: 初始延迟时间(秒) :param backoff: 延迟倍数 :param exceptions: 需要捕获的异常类型 """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def wrapper(*args, **kwargs) -> T: logger = logging.getLogger(__name__) _delay = delay for attempt in range(times): try: return func(*args, **kwargs) except exceptions as e: logger.warning(f"函数 {func.__name__} 执行失败 (尝试 {attempt + 1}/{times}): {e}") if attempt < times - 1: logger.info(f"等待 {_delay} 秒后重试...") time.sleep(_delay) _delay *= backoff else: logger.error(f"函数 {func.__name__} 已达到最大重试次数 {times}") raise return wrapper return decorator def random_delay(min_seconds: float = 1.0, max_seconds: float = 3.0) -> float: """ 随机延迟 :param min_seconds: 最小延迟时间 :param max_seconds: 最大延迟时间 :return: 实际延迟时间 """ delay = random.uniform(min_seconds, max_seconds) time.sleep(delay) return delay def clean_price(price_str: str) -> float: """ 清洗价格字符串,转换为浮点数 :param price_str: 价格字符串,如 "¥123.45", "123.45元", "1,234.56" :return: 价格浮点数,解析失败返回 0.0 """ if not price_str: return 0.0 price_str = str(price_str).strip() price_str = price_str.replace('¥', '').replace('¥', '').replace('元', '') price_str = price_str.replace(',', '').replace(',', '') match = re.search(r'(\d+\.?\d*)', price_str) if match: try: return float(match.group(1)) except (ValueError, TypeError): return 0.0 return 0.0 def extract_numbers(text: str) -> list: """ 从文本中提取所有数字 :param text: 输入文本 :return: 数字列表 """ if not text: return [] return [float(num) if '.' in num else int(num) for num in re.findall(r'\d+\.?\d*', str(text))] def parse_sales_volume(sales_str: str) -> int: """ 解析销量字符串 :param sales_str: 销量字符串,如 "1.5万+", "1000+", "500" :return: 销量整数 """ if not sales_str: return 0 sales_str = str(sales_str).strip().lower() multiplier = 1 if '万' in sales_str or 'w' in sales_str: multiplier = 10000 sales_str = sales_str.replace('万', '').replace('w', '') sales_str = sales_str.replace('+', '').replace('人', '').replace('付款', '') numbers = extract_numbers(sales_str) if numbers: return int(numbers[0] * multiplier) return 0 def format_price(price: float, currency: str = 'CNY') -> str: """ 格式化价格显示 :param price: 价格数值 :param currency: 货币单位 :return: 格式化后的价格字符串 """ if currency == 'CNY': return f"¥{price:.2f}" elif currency == 'USD': return f"${price:.2f}" else: return f"{price:.2f} {currency}" def is_valid_url(url: str) -> bool: """ 检查URL是否有效 :param url: URL字符串 :return: 是否有效 """ if not url: return False pattern = re.compile( r'^https?://' r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' r'localhost|' r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' r'(?::\d+)?' r'(?:/?|[/?]\S+)$', re.IGNORECASE) return bool(pattern.match(str(url))) def truncate_text(text: str, max_length: int, suffix: str = '...') -> str: """ 截断文本 :param text: 原始文本 :param max_length: 最大长度 :param suffix: 截断后缀 :return: 截断后的文本 """ if not text: return '' text = str(text) if len(text) <= max_length: return text return text[:max_length - len(suffix)] + suffix def remove_html_tags(text: str) -> str: """ 移除HTML标签 :param text: 包含HTML标签的文本 :return: 纯文本 """ if not text: return '' clean = re.compile('<.*?>') return re.sub(clean, '', str(text)) def normalize_whitespace(text: str) -> str: """ 规范化空白字符 :param text: 原始文本 :return: 规范化后的文本 """ if not text: return '' return ' '.join(str(text).split())