123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- from typing import Any
- from apps.api.deps import CurrentUser, SessionDep, get_current_active_superuser
- from apps.common.security import get_password_hash, verify_password
- from apps.config import settings
- from apps.models import crud
- from apps.models.item import Item
- from apps.models.models import Message
- from apps.models.user import (UpdatePassword, User, UserCreate, UserCreateOpen,
- UserOut, UsersOut, UserUpdate, UserUpdateMe)
- from apps.utils.mail_util import generate_new_account_email, send_email
- from sqlmodel import col, delete, func, select
- from fastapi import APIRouter, Depends, HTTPException
- router = APIRouter()
- @router.get(
- "/", dependencies=[Depends(get_current_active_superuser)], response_model=UsersOut
- )
- def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any:
- """
- Retrieve users.
- """
- count_statement = select(func.count()).select_from(User)
- count = session.exec(count_statement).one()
- statement = select(User).offset(skip).limit(limit)
- users = session.exec(statement).all()
- return UsersOut(data=users, count=count)
- @router.post(
- "/", dependencies=[Depends(get_current_active_superuser)], response_model=UserOut
- )
- def create_user(*, session: SessionDep, user_in: UserCreate) -> Any:
- """
- Create new user.
- """
- user = crud.get_user_by_email(session=session, email=user_in.email)
- if user:
- raise HTTPException(
- status_code=400,
- detail="The user with this email already exists in the system.",
- )
- user = crud.create_user(session=session, user_create=user_in)
- if settings.emails_enabled and user_in.email:
- email_data = generate_new_account_email(
- email_to=user_in.email, username=user_in.email, password=user_in.password
- )
- send_email(
- email_to=user_in.email,
- subject=email_data.subject,
- html_content=email_data.html_content,
- )
- return user
- @router.patch("/me", response_model=UserOut)
- def update_user_me(
- *, session: SessionDep, user_in: UserUpdateMe, current_user: CurrentUser
- ) -> Any:
- """
- Update own user.
- """
- if user_in.email:
- existing_user = crud.get_user_by_email(session=session, email=user_in.email)
- if existing_user and existing_user.id != current_user.id:
- raise HTTPException(
- status_code=409, detail="User with this email already exists"
- )
- user_data = user_in.model_dump(exclude_unset=True)
- current_user.sqlmodel_update(user_data)
- session.add(current_user)
- session.commit()
- session.refresh(current_user)
- return current_user
- @router.patch("/me/password", response_model=Message)
- def update_password_me(
- *, session: SessionDep, body: UpdatePassword, current_user: CurrentUser
- ) -> Any:
- """
- Update own password.
- """
- if not verify_password(body.current_password, current_user.hashed_password):
- raise HTTPException(status_code=400, detail="Incorrect password")
- if body.current_password == body.new_password:
- raise HTTPException(
- status_code=400, detail="New password cannot be the same as the current one"
- )
- hashed_password = get_password_hash(body.new_password)
- current_user.hashed_password = hashed_password
- session.add(current_user)
- session.commit()
- return Message(message="Password updated successfully")
- @router.get("/me", response_model=UserOut)
- def read_user_me(session: SessionDep, current_user: CurrentUser) -> Any:
- """
- Get current user.
- """
- return current_user
- @router.post("/open", response_model=UserOut)
- def create_user_open(session: SessionDep, user_in: UserCreateOpen) -> Any:
- """
- Create new user without the need to be logged in.
- """
- if not settings.USERS_OPEN_REGISTRATION:
- raise HTTPException(
- status_code=403,
- detail="Open user registration is forbidden on this server",
- )
- user = crud.get_user_by_email(session=session, email=user_in.email)
- if user:
- raise HTTPException(
- status_code=400,
- detail="The user with this email already exists in the system",
- )
- user_create = UserCreate.from_orm(user_in)
- user = crud.create_user(session=session, user_create=user_create)
- return user
- @router.get("/{user_id}", response_model=UserOut)
- def read_user_by_id(
- user_id: int, session: SessionDep, current_user: CurrentUser
- ) -> Any:
- """
- Get a specific user by id.
- """
- user = session.get(User, user_id)
- if user == current_user:
- return user
- if not current_user.is_superuser:
- raise HTTPException(
- status_code=403,
- detail="The user doesn't have enough privileges",
- )
- return user
- @router.patch(
- "/{user_id}",
- dependencies=[Depends(get_current_active_superuser)],
- response_model=UserOut,
- )
- def update_user(
- *,
- session: SessionDep,
- user_id: int,
- user_in: UserUpdate,
- ) -> Any:
- """
- Update a user.
- """
- db_user = session.get(User, user_id)
- if not db_user:
- raise HTTPException(
- status_code=404,
- detail="The user with this id does not exist in the system",
- )
- if user_in.email:
- existing_user = crud.get_user_by_email(session=session, email=user_in.email)
- if existing_user and existing_user.id != user_id:
- raise HTTPException(
- status_code=409, detail="User with this email already exists"
- )
- db_user = crud.update_user(session=session, db_user=db_user, user_in=user_in)
- return db_user
- @router.delete("/{user_id}")
- def delete_user(
- session: SessionDep, current_user: CurrentUser, user_id: int
- ) -> Message:
- """
- Delete a user.
- """
- user = session.get(User, user_id)
- if not user:
- raise HTTPException(status_code=404, detail="User not found")
- elif user != current_user and not current_user.is_superuser:
- raise HTTPException(
- status_code=403, detail="The user doesn't have enough privileges"
- )
- elif user == current_user and current_user.is_superuser:
- raise HTTPException(
- status_code=403, detail="Super users are not allowed to delete themselves"
- )
- statement = delete(Item).where(col(Item.owner_id) == user_id)
- session.exec(statement) # type: ignore
- session.delete(user)
- session.commit()
- return Message(message="User deleted successfully")
|