import logging from typing import Dict, List, Optional from datetime import datetime from sqlalchemy.exc import IntegrityError from models.product import Product, PriceHistory, get_session class DBUtils: def __init__(self): self.logger = logging.getLogger(__name__) self.session = get_session() def __del__(self): if hasattr(self, 'session'): self.session.close() def add_product(self, product_data: Dict) -> Optional[Product]: """ 添加或更新商品信息 :param product_data: 商品数据字典 :return: Product对象或None """ try: existing = self.session.query(Product).filter( Product.product_id == product_data.get('product_id'), Product.platform == product_data.get('platform') ).first() if existing: for key, value in product_data.items(): if hasattr(existing, key) and value is not None: setattr(existing, key, value) existing.update_time = datetime.now() self.session.commit() self.logger.info(f"更新商品信息: {product_data.get('product_id')}") return existing else: product = Product(**product_data) self.session.add(product) self.session.commit() self.logger.info(f"添加新商品: {product_data.get('product_id')}") return product except IntegrityError as e: self.session.rollback() self.logger.warning(f"商品已存在,跳过: {product_data.get('product_id')} - {e}") return None except Exception as e: self.session.rollback() self.logger.error(f"添加商品失败: {e}") return None def add_products_batch(self, products_data: List[Dict]) -> int: """ 批量添加商品 :param products_data: 商品数据列表 :return: 成功添加的数量 """ success_count = 0 for product_data in products_data: if self.add_product(product_data): success_count += 1 return success_count def add_price_history(self, price_data: Dict) -> Optional[PriceHistory]: """ 添加价格历史记录 :param price_data: 价格数据字典 :return: PriceHistory对象或None """ try: price_history = PriceHistory(**price_data) self.session.add(price_history) self.session.commit() self.logger.debug(f"添加价格历史: product_id={price_data.get('product_id')}, price={price_data.get('price')}") return price_history except Exception as e: self.session.rollback() self.logger.error(f"添加价格历史失败: {e}") return None def add_price_history_batch(self, prices_data: List[Dict]) -> int: """ 批量添加价格历史 :param prices_data: 价格数据列表 :return: 成功添加的数量 """ success_count = 0 for price_data in prices_data: if self.add_price_history(price_data): success_count += 1 return success_count def get_product_by_id(self, product_id: str, platform: str) -> Optional[Product]: """ 根据商品ID和平台获取商品 :param product_id: 商品ID :param platform: 平台 :return: Product对象或None """ try: return self.session.query(Product).filter( Product.product_id == product_id, Product.platform == platform ).first() except Exception as e: self.logger.error(f"获取商品失败: {e}") return None def get_products_by_platform(self, platform: str, limit: int = 100) -> List[Dict]: """ 获取指定平台的商品 :param platform: 平台 :param limit: 返回数量限制 :return: 商品列表 """ try: products = self.session.query(Product).filter( Product.platform == platform ).order_by(Product.update_time.desc()).limit(limit).all() return [p.to_dict() for p in products] except Exception as e: self.logger.error(f"获取商品列表失败: {e}") return [] def get_product_count(self, platform: str = None) -> int: """ 获取商品总数 :param platform: 平台,可选 :return: 商品数量 """ try: query = self.session.query(Product) if platform: query = query.filter(Product.platform == platform) return query.count() except Exception as e: self.logger.error(f"获取商品数量失败: {e}") return 0 def get_price_history(self, product_id: str, platform: str, limit: int = 30) -> List[Dict]: """ 获取商品的价格历史 :param product_id: 商品ID :param platform: 平台 :param limit: 返回数量限制 :return: 价格历史列表 """ try: history = self.session.query(PriceHistory).filter( PriceHistory.product_id == product_id, PriceHistory.platform == platform ).order_by(PriceHistory.crawl_time.desc()).limit(limit).all() return [h.to_dict() for h in history] except Exception as e: self.logger.error(f"获取价格历史失败: {e}") return [] def get_all_products(self, limit: int = 100, offset: int = 0) -> List[Dict]: """ 获取所有商品 :param limit: 返回数量限制 :param offset: 偏移量 :return: 商品列表 """ try: products = self.session.query(Product).order_by( Product.update_time.desc() ).offset(offset).limit(limit).all() return [p.to_dict() for p in products] except Exception as e: self.logger.error(f"获取所有商品失败: {e}") return [] def search_products(self, keyword: str, platform: str = None, limit: int = 50) -> List[Dict]: """ 搜索商品 :param keyword: 搜索关键词 :param platform: 平台,可选 :param limit: 返回数量限制 :return: 商品列表 """ try: query = self.session.query(Product).filter( Product.name.like(f'%{keyword}%') ) if platform: query = query.filter(Product.platform == platform) products = query.order_by(Product.update_time.desc()).limit(limit).all() return [p.to_dict() for p in products] except Exception as e: self.logger.error(f"搜索商品失败: {e}") return []