12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- from collections.abc import Generator
- from typing import Annotated
- from apps.common import security
- from apps.config import settings
- from apps.extensions.db import engine
- from apps.models.models import TokenPayload
- from apps.models.user import User
- from jose import JWTError, jwt
- from pydantic import ValidationError, EmailStr, Field, field_validator
- from sqlmodel import Session
- from fastapi import Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer
- reusable_oauth2 = OAuth2PasswordBearer(
- tokenUrl=f"{settings.API_V1_STR}/login/access-token"
- )
- def get_db() -> Generator[Session, None, None]:
- """
- 返回数据库对象
- """
- with Session(engine) as session:
- yield session
- SessionDep = Annotated[Session, Depends(get_db)]
- TokenDep = Annotated[str, Depends(reusable_oauth2)]
- def get_current_user(session: SessionDep, token: TokenDep) -> User:
- try:
- payload = jwt.decode(
- token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
- )
- token_data = TokenPayload(**payload)
- except (JWTError, ValidationError):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Could not validate credentials",
- )
- user = session.get(User, token_data.sub)
- if not user:
- raise HTTPException(status_code=404, detail="User not found")
- if not user.is_active:
- raise HTTPException(status_code=400, detail="Inactive user")
- return user
- # 在使用 CurrentUser 依赖时,会调用 get_current_user 函数来获取当前用户对象,并将其作为 User 类型的参数传递给依赖项
- CurrentUser = Annotated[User, Depends(get_current_user)]
- def get_current_active_superuser(current_user: CurrentUser) -> User:
- if not current_user.is_superuser:
- raise HTTPException(
- status_code=400, detail="The user doesn't have enough privileges"
- )
- return current_user
|