| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- import re
- import time
- import random
- import logging
- from functools import wraps
- from typing import Any, Callable, TypeVar, Tuple
- T = TypeVar('T')
- def retry(times: int = 3, delay: float = 2.0, backoff: float = 2.0, exceptions: Tuple = (Exception,)) -> Callable:
- """
- 重试装饰器
- :param times: 重试次数
- :param delay: 初始延迟时间(秒)
- :param backoff: 延迟倍数
- :param exceptions: 需要捕获的异常类型
- """
- def decorator(func: Callable[..., T]) -> Callable[..., T]:
- @wraps(func)
- def wrapper(*args, **kwargs) -> T:
- logger = logging.getLogger(__name__)
- _delay = delay
- for attempt in range(times):
- try:
- return func(*args, **kwargs)
- except exceptions as e:
- logger.warning(f"函数 {func.__name__} 执行失败 (尝试 {attempt + 1}/{times}): {e}")
- if attempt < times - 1:
- logger.info(f"等待 {_delay} 秒后重试...")
- time.sleep(_delay)
- _delay *= backoff
- else:
- logger.error(f"函数 {func.__name__} 已达到最大重试次数 {times}")
- raise
- return wrapper
- return decorator
- def random_delay(min_seconds: float = 1.0, max_seconds: float = 3.0) -> float:
- """
- 随机延迟
- :param min_seconds: 最小延迟时间
- :param max_seconds: 最大延迟时间
- :return: 实际延迟时间
- """
- delay = random.uniform(min_seconds, max_seconds)
- time.sleep(delay)
- return delay
- def clean_price(price_str: str) -> float:
- """
- 清洗价格字符串,转换为浮点数
- :param price_str: 价格字符串,如 "¥123.45", "123.45元", "1,234.56"
- :return: 价格浮点数,解析失败返回 0.0
- """
- if not price_str:
- return 0.0
-
- price_str = str(price_str).strip()
-
- price_str = price_str.replace('¥', '').replace('¥', '').replace('元', '')
- price_str = price_str.replace(',', '').replace(',', '')
-
- match = re.search(r'(\d+\.?\d*)', price_str)
- if match:
- try:
- return float(match.group(1))
- except (ValueError, TypeError):
- return 0.0
- return 0.0
- def extract_numbers(text: str) -> list:
- """
- 从文本中提取所有数字
- :param text: 输入文本
- :return: 数字列表
- """
- if not text:
- return []
- return [float(num) if '.' in num else int(num) for num in re.findall(r'\d+\.?\d*', str(text))]
- def parse_sales_volume(sales_str: str) -> int:
- """
- 解析销量字符串
- :param sales_str: 销量字符串,如 "1.5万+", "1000+", "500"
- :return: 销量整数
- """
- if not sales_str:
- return 0
-
- sales_str = str(sales_str).strip().lower()
-
- multiplier = 1
- if '万' in sales_str or 'w' in sales_str:
- multiplier = 10000
- sales_str = sales_str.replace('万', '').replace('w', '')
-
- sales_str = sales_str.replace('+', '').replace('人', '').replace('付款', '')
-
- numbers = extract_numbers(sales_str)
- if numbers:
- return int(numbers[0] * multiplier)
- return 0
- def format_price(price: float, currency: str = 'CNY') -> str:
- """
- 格式化价格显示
- :param price: 价格数值
- :param currency: 货币单位
- :return: 格式化后的价格字符串
- """
- if currency == 'CNY':
- return f"¥{price:.2f}"
- elif currency == 'USD':
- return f"${price:.2f}"
- else:
- return f"{price:.2f} {currency}"
- def is_valid_url(url: str) -> bool:
- """
- 检查URL是否有效
- :param url: URL字符串
- :return: 是否有效
- """
- if not url:
- return False
- pattern = re.compile(
- r'^https?://'
- r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
- r'localhost|'
- r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
- r'(?::\d+)?'
- r'(?:/?|[/?]\S+)$', re.IGNORECASE)
- return bool(pattern.match(str(url)))
- def truncate_text(text: str, max_length: int, suffix: str = '...') -> str:
- """
- 截断文本
- :param text: 原始文本
- :param max_length: 最大长度
- :param suffix: 截断后缀
- :return: 截断后的文本
- """
- if not text:
- return ''
- text = str(text)
- if len(text) <= max_length:
- return text
- return text[:max_length - len(suffix)] + suffix
- def remove_html_tags(text: str) -> str:
- """
- 移除HTML标签
- :param text: 包含HTML标签的文本
- :return: 纯文本
- """
- if not text:
- return ''
- clean = re.compile('<.*?>')
- return re.sub(clean, '', str(text))
- def normalize_whitespace(text: str) -> str:
- """
- 规范化空白字符
- :param text: 原始文本
- :return: 规范化后的文本
- """
- if not text:
- return ''
- return ' '.join(str(text).split())
|