backend_and_cms/backend/app/utils.py

208 lines
6.5 KiB
Python
Raw Normal View History

2024-09-17 12:11:39 +08:00
import os
import logging
2024-10-07 22:50:33 +08:00
import boto3
from botocore.exceptions import ClientError
2024-09-17 12:11:39 +08:00
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from uuid import uuid4
from PIL import Image, ImageTk
import emails # type: ignore
import jwt
from jinja2 import Template
from jwt.exceptions import InvalidTokenError
import filetype
from app.core.config import settings
static = 'static'
@dataclass
class EmailData:
html_content: str
subject: str
def render_email_template(*, template_name: str, context: dict[str, Any]) -> str:
template_str = (
Path(__file__).parent / "email-templates" / "build" / template_name
).read_text()
html_content = Template(template_str).render(context)
return html_content
def send_email(
*,
email_to: str,
subject: str = "",
html_content: str = "",
) -> None:
assert settings.emails_enabled, "no provided configuration for email variables"
message = emails.Message(
subject=subject,
html=html_content,
mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
)
smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
if settings.SMTP_TLS:
smtp_options["tls"] = True
elif settings.SMTP_SSL:
smtp_options["ssl"] = True
if settings.SMTP_USER:
smtp_options["user"] = settings.SMTP_USER
if settings.SMTP_PASSWORD:
smtp_options["password"] = settings.SMTP_PASSWORD
response = message.send(to=email_to, smtp=smtp_options)
logging.info(f"send email result: {response}")
def generate_test_email(email_to: str) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Test email"
html_content = render_email_template(
template_name="test_email.html",
context={"project_name": settings.PROJECT_NAME, "email": email_to},
)
return EmailData(html_content=html_content, subject=subject)
def generate_reset_password_email(email_to: str, email: str, token: str) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Password recovery for user {email}"
link = f"{settings.server_host}/reset-password?token={token}"
html_content = render_email_template(
template_name="reset_password.html",
context={
"project_name": settings.PROJECT_NAME,
"username": email,
"email": email_to,
"valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
"link": link,
},
)
return EmailData(html_content=html_content, subject=subject)
def generate_new_account_email(
email_to: str, username: str, password: str
) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - New account for user {username}"
html_content = render_email_template(
template_name="new_account.html",
context={
"project_name": settings.PROJECT_NAME,
"username": username,
"password": password,
"email": email_to,
"link": settings.server_host,
},
)
return EmailData(html_content=html_content, subject=subject)
def generate_password_reset_token(email: str) -> str:
delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
now = datetime.now(timezone.utc)
expires = now + delta
exp = expires.timestamp()
encoded_jwt = jwt.encode(
{"exp": exp, "nbf": now, "sub": email},
settings.SECRET_KEY,
algorithm="HS256",
)
return encoded_jwt
def verify_password_reset_token(token: str) -> str | None:
try:
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
return str(decoded_token["sub"])
except InvalidTokenError:
return None
from fastapi import HTTPException, status
from typing import IO
import filetype
def validate_file_size_type(file: IO):
FILE_SIZE = 5097152 # 2MB
accepted_file_types = ["image/png", "image/jpeg", "image/jpg", "image/heic", "image/heif", "image/heics", "png",
"jpeg", "jpg", "heic", "heif", "heics"
]
file_info = filetype.guess(file.file)
if file_info is None:
raise HTTPException(
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail="Unable to determine file type",
)
detected_content_type = file_info.extension.lower()
if (
file.content_type not in accepted_file_types
or detected_content_type not in accepted_file_types
):
raise HTTPException(
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail="Unsupported file type",
)
real_file_size = 0
for chunk in file.file:
real_file_size += len(chunk)
if real_file_size > FILE_SIZE:
raise HTTPException(status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail="Too large")
2024-10-07 22:50:33 +08:00
# async def save_picture(file, folderName: str = '', fileName: str = None):
# randon_uid = str(uuid4())
# _, f_ext = os.path.splitext(file.filename)
2024-09-17 12:11:39 +08:00
2024-10-07 22:50:33 +08:00
# picture_name = (randon_uid if fileName==None else fileName.lower().replace(' ', '')) + f_ext
2024-09-17 12:11:39 +08:00
2024-10-07 22:50:33 +08:00
# path = os.path.join(static,folderName)
# if not os.path.exists(path):
# os.makedirs(path)
2024-09-17 12:11:39 +08:00
2024-10-07 22:50:33 +08:00
# picture_path = os.path.join(path,picture_name)
2024-09-17 12:11:39 +08:00
2024-10-07 22:50:33 +08:00
# #output_size = (125,125)
# img = Image.open(file.file)
2024-09-17 12:11:39 +08:00
2024-10-07 22:50:33 +08:00
# #img.thumbnail(output_size)
# img.save(picture_path)
2024-09-17 12:11:39 +08:00
2024-10-07 22:50:33 +08:00
# return f'{static}/{folderName}/{picture_name}'
async def save_picture(file, folderName: str = "", fileName: str = None):
random_uid = str(uuid4())
_, f_ext = os.path.splitext(file.filename)
picture_name = (fileName.lower().replace(" ", "") if fileName else random_uid) + f_ext
r2 = boto3.client(
"s3",
endpoint_url=f"https://{settings.AccountID}.r2.cloudflarestorage.com",
aws_access_key_id=settings.access_key_id,
aws_secret_access_key=settings.secret_access_key,
)
img = Image.open(file.file)
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='PNG')
img_byte_arr = img_byte_arr.getvalue()
r2.put_object(Bucket=settings.bucket_name, Key=f"{folderName}/{picture_name}", Body=img_byte_arr)
r2_url = f"https://{settings.AccountID}.r2.cloudflarestorage.com/{settings.bucket_name}/{folderName}/{picture_name}"
return r2_url
2024-09-17 12:11:39 +08:00
async def del_picture(picture_path):
try:
os.remove(picture_path)
except Exception as e:
print('Error: ', e)
return False
return True