from collections.abc import Generator from typing import Annotated from apps.common import security from apps.config import settings from apps.extensions.db import engine from apps.models.models import TokenPayload from apps.models.user import User from jose import JWTError, jwt from pydantic import ValidationError, EmailStr, Field, field_validator from sqlmodel import Session from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer reusable_oauth2 = OAuth2PasswordBearer( tokenUrl=f"{settings.API_V1_STR}/login/access-token" ) def get_db() -> Generator[Session, None, None]: """ 返回数据库对象 """ with Session(engine) as session: yield session SessionDep = Annotated[Session, Depends(get_db)] TokenDep = Annotated[str, Depends(reusable_oauth2)] def get_current_user(session: SessionDep, token: TokenDep) -> User: try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[security.ALGORITHM] ) token_data = TokenPayload(**payload) except (JWTError, ValidationError): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Could not validate credentials", ) user = session.get(User, token_data.sub) if not user: raise HTTPException(status_code=404, detail="User not found") if not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return user # 在使用 CurrentUser 依赖时,会调用 get_current_user 函数来获取当前用户对象,并将其作为 User 类型的参数传递给依赖项 CurrentUser = Annotated[User, Depends(get_current_user)] def get_current_active_superuser(current_user: CurrentUser) -> User: if not current_user.is_superuser: raise HTTPException( status_code=400, detail="The user doesn't have enough privileges" ) return current_user