| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- import logging
- from typing import Dict, List, Optional
- from datetime import datetime
- from sqlalchemy.exc import IntegrityError
- from models.product import Product, PriceHistory, get_session
- class DBUtils:
- def __init__(self):
- self.logger = logging.getLogger(__name__)
- self.session = get_session()
- def __del__(self):
- if hasattr(self, 'session'):
- self.session.close()
- def add_product(self, product_data: Dict) -> Optional[Product]:
- """
- 添加或更新商品信息
- :param product_data: 商品数据字典
- :return: Product对象或None
- """
- try:
- existing = self.session.query(Product).filter(
- Product.product_id == product_data.get('product_id'),
- Product.platform == product_data.get('platform')
- ).first()
- if existing:
- for key, value in product_data.items():
- if hasattr(existing, key) and value is not None:
- setattr(existing, key, value)
- existing.update_time = datetime.now()
- self.session.commit()
- self.logger.info(f"更新商品信息: {product_data.get('product_id')}")
- return existing
- else:
- product = Product(**product_data)
- self.session.add(product)
- self.session.commit()
- self.logger.info(f"添加新商品: {product_data.get('product_id')}")
- return product
- except IntegrityError as e:
- self.session.rollback()
- self.logger.warning(f"商品已存在,跳过: {product_data.get('product_id')} - {e}")
- return None
- except Exception as e:
- self.session.rollback()
- self.logger.error(f"添加商品失败: {e}")
- return None
- def add_products_batch(self, products_data: List[Dict]) -> int:
- """
- 批量添加商品
- :param products_data: 商品数据列表
- :return: 成功添加的数量
- """
- success_count = 0
- for product_data in products_data:
- if self.add_product(product_data):
- success_count += 1
- return success_count
- def add_price_history(self, price_data: Dict) -> Optional[PriceHistory]:
- """
- 添加价格历史记录
- :param price_data: 价格数据字典
- :return: PriceHistory对象或None
- """
- try:
- price_history = PriceHistory(**price_data)
- self.session.add(price_history)
- self.session.commit()
- self.logger.debug(f"添加价格历史: product_id={price_data.get('product_id')}, price={price_data.get('price')}")
- return price_history
- except Exception as e:
- self.session.rollback()
- self.logger.error(f"添加价格历史失败: {e}")
- return None
- def add_price_history_batch(self, prices_data: List[Dict]) -> int:
- """
- 批量添加价格历史
- :param prices_data: 价格数据列表
- :return: 成功添加的数量
- """
- success_count = 0
- for price_data in prices_data:
- if self.add_price_history(price_data):
- success_count += 1
- return success_count
- def get_product_by_id(self, product_id: str, platform: str) -> Optional[Product]:
- """
- 根据商品ID和平台获取商品
- :param product_id: 商品ID
- :param platform: 平台
- :return: Product对象或None
- """
- try:
- return self.session.query(Product).filter(
- Product.product_id == product_id,
- Product.platform == platform
- ).first()
- except Exception as e:
- self.logger.error(f"获取商品失败: {e}")
- return None
- def get_products_by_platform(self, platform: str, limit: int = 100) -> List[Dict]:
- """
- 获取指定平台的商品
- :param platform: 平台
- :param limit: 返回数量限制
- :return: 商品列表
- """
- try:
- products = self.session.query(Product).filter(
- Product.platform == platform
- ).order_by(Product.update_time.desc()).limit(limit).all()
- return [p.to_dict() for p in products]
- except Exception as e:
- self.logger.error(f"获取商品列表失败: {e}")
- return []
- def get_product_count(self, platform: str = None) -> int:
- """
- 获取商品总数
- :param platform: 平台,可选
- :return: 商品数量
- """
- try:
- query = self.session.query(Product)
- if platform:
- query = query.filter(Product.platform == platform)
- return query.count()
- except Exception as e:
- self.logger.error(f"获取商品数量失败: {e}")
- return 0
- def get_price_history(self, product_id: str, platform: str, limit: int = 30) -> List[Dict]:
- """
- 获取商品的价格历史
- :param product_id: 商品ID
- :param platform: 平台
- :param limit: 返回数量限制
- :return: 价格历史列表
- """
- try:
- history = self.session.query(PriceHistory).filter(
- PriceHistory.product_id == product_id,
- PriceHistory.platform == platform
- ).order_by(PriceHistory.crawl_time.desc()).limit(limit).all()
- return [h.to_dict() for h in history]
- except Exception as e:
- self.logger.error(f"获取价格历史失败: {e}")
- return []
- def get_all_products(self, limit: int = 100, offset: int = 0) -> List[Dict]:
- """
- 获取所有商品
- :param limit: 返回数量限制
- :param offset: 偏移量
- :return: 商品列表
- """
- try:
- products = self.session.query(Product).order_by(
- Product.update_time.desc()
- ).offset(offset).limit(limit).all()
- return [p.to_dict() for p in products]
- except Exception as e:
- self.logger.error(f"获取所有商品失败: {e}")
- return []
- def search_products(self, keyword: str, platform: str = None, limit: int = 50) -> List[Dict]:
- """
- 搜索商品
- :param keyword: 搜索关键词
- :param platform: 平台,可选
- :param limit: 返回数量限制
- :return: 商品列表
- """
- try:
- query = self.session.query(Product).filter(
- Product.name.like(f'%{keyword}%')
- )
- if platform:
- query = query.filter(Product.platform == platform)
- products = query.order_by(Product.update_time.desc()).limit(limit).all()
- return [p.to_dict() for p in products]
- except Exception as e:
- self.logger.error(f"搜索商品失败: {e}")
- return []
|