from typing import Any from apps.api.deps import CurrentUser, SessionDep, get_current_active_superuser from apps.common.security import get_password_hash, verify_password from apps.config import settings from apps.models import crud from apps.models.item import Item from apps.models.models import Message from apps.models.user import (UpdatePassword, User, UserCreate, UserCreateOpen, UserOut, UsersOut, UserUpdate, UserUpdateMe) from apps.utils.mail_util import generate_new_account_email, send_email from sqlmodel import col, delete, func, select from fastapi import APIRouter, Depends, HTTPException router = APIRouter() @router.get( "/", dependencies=[Depends(get_current_active_superuser)], response_model=UsersOut ) def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any: """ Retrieve users. """ count_statement = select(func.count()).select_from(User) count = session.exec(count_statement).one() statement = select(User).offset(skip).limit(limit) users = session.exec(statement).all() return UsersOut(data=users, count=count) @router.post( "/", dependencies=[Depends(get_current_active_superuser)], response_model=UserOut ) def create_user(*, session: SessionDep, user_in: UserCreate) -> Any: """ Create new user. """ user = crud.get_user_by_email(session=session, email=user_in.email) if user: raise HTTPException( status_code=400, detail="The user with this email already exists in the system.", ) user = crud.create_user(session=session, user_create=user_in) if settings.emails_enabled and user_in.email: email_data = generate_new_account_email( email_to=user_in.email, username=user_in.email, password=user_in.password ) send_email( email_to=user_in.email, subject=email_data.subject, html_content=email_data.html_content, ) return user @router.patch("/me", response_model=UserOut) def update_user_me( *, session: SessionDep, user_in: UserUpdateMe, current_user: CurrentUser ) -> Any: """ Update own user. """ if user_in.email: existing_user = crud.get_user_by_email(session=session, email=user_in.email) if existing_user and existing_user.id != current_user.id: raise HTTPException( status_code=409, detail="User with this email already exists" ) user_data = user_in.model_dump(exclude_unset=True) current_user.sqlmodel_update(user_data) session.add(current_user) session.commit() session.refresh(current_user) return current_user @router.patch("/me/password", response_model=Message) def update_password_me( *, session: SessionDep, body: UpdatePassword, current_user: CurrentUser ) -> Any: """ Update own password. """ if not verify_password(body.current_password, current_user.hashed_password): raise HTTPException(status_code=400, detail="Incorrect password") if body.current_password == body.new_password: raise HTTPException( status_code=400, detail="New password cannot be the same as the current one" ) hashed_password = get_password_hash(body.new_password) current_user.hashed_password = hashed_password session.add(current_user) session.commit() return Message(message="Password updated successfully") @router.get("/me", response_model=UserOut) def read_user_me(session: SessionDep, current_user: CurrentUser) -> Any: """ Get current user. """ return current_user @router.post("/open", response_model=UserOut) def create_user_open(session: SessionDep, user_in: UserCreateOpen) -> Any: """ Create new user without the need to be logged in. """ if not settings.USERS_OPEN_REGISTRATION: raise HTTPException( status_code=403, detail="Open user registration is forbidden on this server", ) user = crud.get_user_by_email(session=session, email=user_in.email) if user: raise HTTPException( status_code=400, detail="The user with this email already exists in the system", ) user_create = UserCreate.from_orm(user_in) user = crud.create_user(session=session, user_create=user_create) return user @router.get("/{user_id}", response_model=UserOut) def read_user_by_id( user_id: int, session: SessionDep, current_user: CurrentUser ) -> Any: """ Get a specific user by id. """ user = session.get(User, user_id) if user == current_user: return user if not current_user.is_superuser: raise HTTPException( status_code=403, detail="The user doesn't have enough privileges", ) return user @router.patch( "/{user_id}", dependencies=[Depends(get_current_active_superuser)], response_model=UserOut, ) def update_user( *, session: SessionDep, user_id: int, user_in: UserUpdate, ) -> Any: """ Update a user. """ db_user = session.get(User, user_id) if not db_user: raise HTTPException( status_code=404, detail="The user with this id does not exist in the system", ) if user_in.email: existing_user = crud.get_user_by_email(session=session, email=user_in.email) if existing_user and existing_user.id != user_id: raise HTTPException( status_code=409, detail="User with this email already exists" ) db_user = crud.update_user(session=session, db_user=db_user, user_in=user_in) return db_user @router.delete("/{user_id}") def delete_user( session: SessionDep, current_user: CurrentUser, user_id: int ) -> Message: """ Delete a user. """ user = session.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") elif user != current_user and not current_user.is_superuser: raise HTTPException( status_code=403, detail="The user doesn't have enough privileges" ) elif user == current_user and current_user.is_superuser: raise HTTPException( status_code=403, detail="Super users are not allowed to delete themselves" ) statement = delete(Item).where(col(Item.owner_id) == user_id) session.exec(statement) # type: ignore session.delete(user) session.commit() return Message(message="User deleted successfully")