backend_and_cms/backend/app/utils.py

264 lines
7.4 KiB
Python
Raw Permalink Normal View History

2024-09-17 12:11:39 +08:00
import os
import logging
2024-10-07 22:50:33 +08:00
import boto3
from botocore.exceptions import ClientError
2024-09-17 12:11:39 +08:00
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from uuid import uuid4
2024-10-08 00:23:12 +08:00
from botocore.exceptions import ClientError
2024-09-17 12:11:39 +08:00
from PIL import Image, ImageTk
import emails # type: ignore
import jwt
from jinja2 import Template
from jwt.exceptions import InvalidTokenError
import filetype
from app.core.config import settings
2024-10-08 00:23:12 +08:00
static = "static"
2024-10-08 10:22:05 +08:00
ConnectionUrl = f"https://{settings.AccountID}.r2.cloudflarestorage.com"
r2 = boto3.client(
"s3",
endpoint_url=ConnectionUrl,
aws_access_key_id=settings.access_key_id,
aws_secret_access_key=settings.secret_access_key,
)
2024-10-08 00:23:12 +08:00
2024-09-17 12:11:39 +08:00
@dataclass
class EmailData:
html_content: str
subject: str
def render_email_template(*, template_name: str, context: dict[str, Any]) -> str:
template_str = (
Path(__file__).parent / "email-templates" / "build" / template_name
).read_text()
html_content = Template(template_str).render(context)
return html_content
def send_email(
*,
email_to: str,
subject: str = "",
html_content: str = "",
) -> None:
assert settings.emails_enabled, "no provided configuration for email variables"
message = emails.Message(
subject=subject,
html=html_content,
mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
)
smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
if settings.SMTP_TLS:
smtp_options["tls"] = True
elif settings.SMTP_SSL:
smtp_options["ssl"] = True
if settings.SMTP_USER:
smtp_options["user"] = settings.SMTP_USER
if settings.SMTP_PASSWORD:
smtp_options["password"] = settings.SMTP_PASSWORD
response = message.send(to=email_to, smtp=smtp_options)
logging.info(f"send email result: {response}")
def generate_test_email(email_to: str) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Test email"
html_content = render_email_template(
template_name="test_email.html",
context={"project_name": settings.PROJECT_NAME, "email": email_to},
)
return EmailData(html_content=html_content, subject=subject)
def generate_reset_password_email(email_to: str, email: str, token: str) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Password recovery for user {email}"
link = f"{settings.server_host}/reset-password?token={token}"
html_content = render_email_template(
template_name="reset_password.html",
context={
"project_name": settings.PROJECT_NAME,
"username": email,
"email": email_to,
"valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
"link": link,
},
)
return EmailData(html_content=html_content, subject=subject)
def generate_new_account_email(
email_to: str, username: str, password: str
) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - New account for user {username}"
html_content = render_email_template(
template_name="new_account.html",
context={
"project_name": settings.PROJECT_NAME,
"username": username,
"password": password,
"email": email_to,
"link": settings.server_host,
},
)
return EmailData(html_content=html_content, subject=subject)
def generate_password_reset_token(email: str) -> str:
delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
now = datetime.now(timezone.utc)
expires = now + delta
exp = expires.timestamp()
encoded_jwt = jwt.encode(
{"exp": exp, "nbf": now, "sub": email},
settings.SECRET_KEY,
algorithm="HS256",
)
return encoded_jwt
def verify_password_reset_token(token: str) -> str | None:
try:
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
return str(decoded_token["sub"])
except InvalidTokenError:
return None
2024-10-08 00:23:12 +08:00
2024-09-17 12:11:39 +08:00
from fastapi import HTTPException, status
from typing import IO
import filetype
def validate_file_size_type(file: IO):
2024-10-08 00:23:12 +08:00
FILE_SIZE = 5097152 # 2MB
2024-09-17 12:11:39 +08:00
2024-10-08 00:23:12 +08:00
accepted_file_types = [
"image/png",
"image/jpeg",
"image/jpg",
"image/heic",
"image/heif",
"image/heics",
"png",
"jpeg",
"jpg",
"heic",
"heif",
"heics",
]
2024-09-17 12:11:39 +08:00
file_info = filetype.guess(file.file)
if file_info is None:
raise HTTPException(
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail="Unable to determine file type",
)
detected_content_type = file_info.extension.lower()
if (
file.content_type not in accepted_file_types
or detected_content_type not in accepted_file_types
):
raise HTTPException(
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail="Unsupported file type",
)
real_file_size = 0
for chunk in file.file:
real_file_size += len(chunk)
if real_file_size > FILE_SIZE:
2024-10-08 00:23:12 +08:00
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail="Too large"
)
2024-10-07 22:50:33 +08:00
# async def save_picture(file, folderName: str = '', fileName: str = None):
# randon_uid = str(uuid4())
# _, f_ext = os.path.splitext(file.filename)
2024-10-08 00:23:12 +08:00
# picture_name = (randon_uid if fileName==None else fileName.lower().replace(' ', '')) + f_ext
2024-09-17 12:11:39 +08:00
2024-10-07 22:50:33 +08:00
# path = os.path.join(static,folderName)
# if not os.path.exists(path):
# os.makedirs(path)
2024-10-08 00:23:12 +08:00
2024-10-07 22:50:33 +08:00
# picture_path = os.path.join(path,picture_name)
2024-09-17 12:11:39 +08:00
2024-10-07 22:50:33 +08:00
# #output_size = (125,125)
# img = Image.open(file.file)
2024-10-08 00:23:12 +08:00
2024-10-07 22:50:33 +08:00
# #img.thumbnail(output_size)
# img.save(picture_path)
2024-10-08 00:23:12 +08:00
2024-10-07 22:50:33 +08:00
# return f'{static}/{folderName}/{picture_name}'
2024-10-08 01:20:44 +08:00
def upload_file(file_name, bucket_name, object_name=None):
2024-10-08 10:22:05 +08:00
2024-10-08 00:23:12 +08:00
if object_name is None:
object_name = file_name
try:
r2.upload_file(file_name, bucket_name, object_name)
except ClientError as e:
print(f"An error occurred: {e}")
return False
return True
2024-10-08 10:22:05 +08:00
def delete_file(file_name, bucket_name, object_name=None):
if object_name is None:
object_name = file_name
try:
2024-10-08 10:45:22 +08:00
r2.delete_object(Bucket=bucket_name, Key=file_name)
2024-10-08 10:22:05 +08:00
except ClientError as e:
print(f"An error occurred: {e}")
return False
2024-10-08 10:58:19 +08:00
return True
2024-10-08 00:23:12 +08:00
async def save_picture(file, folder_name: str = "", file_name: str = None):
2024-10-07 22:50:33 +08:00
2024-10-08 00:23:12 +08:00
randon_uid = str(uuid4())
_, f_ext = os.path.splitext(file.filename)
picture_name = (
randon_uid if file_name is None else file_name.lower().replace(" ", "")
) + f_ext
2024-10-07 22:50:33 +08:00
2024-10-08 00:23:12 +08:00
path = os.path.join(static, folder_name)
if not os.path.exists(path):
os.makedirs(path)
2024-10-07 22:50:33 +08:00
2024-10-08 00:23:12 +08:00
picture_path = os.path.join(path, picture_name)
2024-10-07 22:50:33 +08:00
2024-10-08 00:23:12 +08:00
# output_size = (125,125)
img = Image.open(file.file)
# img.thumbnail(output_size)
img.save(picture_path)
2024-10-08 01:03:08 +08:00
2024-10-08 00:23:12 +08:00
if upload_file(picture_path, "images"):
print(f"File {picture_path} uploaded successfully to images")
else:
print(f"File upload failed")
return f"{static}/{folder_name}/{picture_name}"
2024-09-17 12:11:39 +08:00
async def del_picture(picture_path):
try:
os.remove(picture_path)
2024-10-08 10:22:05 +08:00
delete_file(picture_path, "images")
2024-09-17 12:11:39 +08:00
except Exception as e:
2024-10-08 00:23:12 +08:00
print("Error: ", e)
2024-09-17 12:11:39 +08:00
return False
2024-10-08 00:23:12 +08:00
return True