helpers.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import re
  2. import time
  3. import random
  4. import logging
  5. from functools import wraps
  6. from typing import Any, Callable, TypeVar, Tuple
  7. T = TypeVar('T')
  8. def retry(times: int = 3, delay: float = 2.0, backoff: float = 2.0, exceptions: Tuple = (Exception,)) -> Callable:
  9. """
  10. 重试装饰器
  11. :param times: 重试次数
  12. :param delay: 初始延迟时间(秒)
  13. :param backoff: 延迟倍数
  14. :param exceptions: 需要捕获的异常类型
  15. """
  16. def decorator(func: Callable[..., T]) -> Callable[..., T]:
  17. @wraps(func)
  18. def wrapper(*args, **kwargs) -> T:
  19. logger = logging.getLogger(__name__)
  20. _delay = delay
  21. for attempt in range(times):
  22. try:
  23. return func(*args, **kwargs)
  24. except exceptions as e:
  25. logger.warning(f"函数 {func.__name__} 执行失败 (尝试 {attempt + 1}/{times}): {e}")
  26. if attempt < times - 1:
  27. logger.info(f"等待 {_delay} 秒后重试...")
  28. time.sleep(_delay)
  29. _delay *= backoff
  30. else:
  31. logger.error(f"函数 {func.__name__} 已达到最大重试次数 {times}")
  32. raise
  33. return wrapper
  34. return decorator
  35. def random_delay(min_seconds: float = 1.0, max_seconds: float = 3.0) -> float:
  36. """
  37. 随机延迟
  38. :param min_seconds: 最小延迟时间
  39. :param max_seconds: 最大延迟时间
  40. :return: 实际延迟时间
  41. """
  42. delay = random.uniform(min_seconds, max_seconds)
  43. time.sleep(delay)
  44. return delay
  45. def clean_price(price_str: str) -> float:
  46. """
  47. 清洗价格字符串,转换为浮点数
  48. :param price_str: 价格字符串,如 "¥123.45", "123.45元", "1,234.56"
  49. :return: 价格浮点数,解析失败返回 0.0
  50. """
  51. if not price_str:
  52. return 0.0
  53. price_str = str(price_str).strip()
  54. price_str = price_str.replace('¥', '').replace('¥', '').replace('元', '')
  55. price_str = price_str.replace(',', '').replace(',', '')
  56. match = re.search(r'(\d+\.?\d*)', price_str)
  57. if match:
  58. try:
  59. return float(match.group(1))
  60. except (ValueError, TypeError):
  61. return 0.0
  62. return 0.0
  63. def extract_numbers(text: str) -> list:
  64. """
  65. 从文本中提取所有数字
  66. :param text: 输入文本
  67. :return: 数字列表
  68. """
  69. if not text:
  70. return []
  71. return [float(num) if '.' in num else int(num) for num in re.findall(r'\d+\.?\d*', str(text))]
  72. def parse_sales_volume(sales_str: str) -> int:
  73. """
  74. 解析销量字符串
  75. :param sales_str: 销量字符串,如 "1.5万+", "1000+", "500"
  76. :return: 销量整数
  77. """
  78. if not sales_str:
  79. return 0
  80. sales_str = str(sales_str).strip().lower()
  81. multiplier = 1
  82. if '万' in sales_str or 'w' in sales_str:
  83. multiplier = 10000
  84. sales_str = sales_str.replace('万', '').replace('w', '')
  85. sales_str = sales_str.replace('+', '').replace('人', '').replace('付款', '')
  86. numbers = extract_numbers(sales_str)
  87. if numbers:
  88. return int(numbers[0] * multiplier)
  89. return 0
  90. def format_price(price: float, currency: str = 'CNY') -> str:
  91. """
  92. 格式化价格显示
  93. :param price: 价格数值
  94. :param currency: 货币单位
  95. :return: 格式化后的价格字符串
  96. """
  97. if currency == 'CNY':
  98. return f"¥{price:.2f}"
  99. elif currency == 'USD':
  100. return f"${price:.2f}"
  101. else:
  102. return f"{price:.2f} {currency}"
  103. def is_valid_url(url: str) -> bool:
  104. """
  105. 检查URL是否有效
  106. :param url: URL字符串
  107. :return: 是否有效
  108. """
  109. if not url:
  110. return False
  111. pattern = re.compile(
  112. r'^https?://'
  113. r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
  114. r'localhost|'
  115. r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
  116. r'(?::\d+)?'
  117. r'(?:/?|[/?]\S+)$', re.IGNORECASE)
  118. return bool(pattern.match(str(url)))
  119. def truncate_text(text: str, max_length: int, suffix: str = '...') -> str:
  120. """
  121. 截断文本
  122. :param text: 原始文本
  123. :param max_length: 最大长度
  124. :param suffix: 截断后缀
  125. :return: 截断后的文本
  126. """
  127. if not text:
  128. return ''
  129. text = str(text)
  130. if len(text) <= max_length:
  131. return text
  132. return text[:max_length - len(suffix)] + suffix
  133. def remove_html_tags(text: str) -> str:
  134. """
  135. 移除HTML标签
  136. :param text: 包含HTML标签的文本
  137. :return: 纯文本
  138. """
  139. if not text:
  140. return ''
  141. clean = re.compile('<.*?>')
  142. return re.sub(clean, '', str(text))
  143. def normalize_whitespace(text: str) -> str:
  144. """
  145. 规范化空白字符
  146. :param text: 原始文本
  147. :return: 规范化后的文本
  148. """
  149. if not text:
  150. return ''
  151. return ' '.join(str(text).split())