from apps.api.deps import get_current_active_superuser from apps.models.models import Message from apps.utils.mail_util import generate_test_email, send_email from pydantic.networks import EmailStr from fastapi import APIRouter, BackgroundTasks, Depends from fastapi.templating import Jinja2Templates router = APIRouter() templates = Jinja2Templates(directory="./coronavirus/templates") @router.post( "/test-email/", dependencies=[Depends(get_current_active_superuser)], status_code=201, ) def test_email(email_to: EmailStr) -> Message: """ Test emails. """ email_data = generate_test_email(email_to=email_to) send_email( email_to=email_to, subject=email_data.subject, html_content=email_data.html_content, ) return Message(message="Test email sent") @router.get("/test2") async def test(): return { "code": 200, "message": "hello world", "data": [{"name": "张三", "sex": "男", "age": 19}], } @router.get("/city/{city}") async def get_city(city: str, query_string: str | None = None): return {"city": city, "query_string": query_string} @router.post("/background_tasks") async def run_bg_task(framework: str, background_tasks: BackgroundTasks): """ :param framework: 被调用的后台任务函数的参数 :param background_tasks: FastAPI.BackgroundTasks :return: """ background_tasks.add_task(bg_task, framework) return {"message": "任务已在后台运行"} def bg_task(framework: str): with open("README.md", mode="a") as f: f.write(f"## {framework} 框架精讲")