backend_and_cms/backend/app/utils.py

264 lines
7.4 KiB
Python

import os
import logging
import boto3
from botocore.exceptions import ClientError
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from uuid import uuid4
from botocore.exceptions import ClientError
from PIL import Image, ImageTk
import emails # type: ignore
import jwt
from jinja2 import Template
from jwt.exceptions import InvalidTokenError
import filetype
from app.core.config import settings
static = "static"
ConnectionUrl = f"https://{settings.AccountID}.r2.cloudflarestorage.com"
r2 = boto3.client(
"s3",
endpoint_url=ConnectionUrl,
aws_access_key_id=settings.access_key_id,
aws_secret_access_key=settings.secret_access_key,
)
@dataclass
class EmailData:
html_content: str
subject: str
def render_email_template(*, template_name: str, context: dict[str, Any]) -> str:
template_str = (
Path(__file__).parent / "email-templates" / "build" / template_name
).read_text()
html_content = Template(template_str).render(context)
return html_content
def send_email(
*,
email_to: str,
subject: str = "",
html_content: str = "",
) -> None:
assert settings.emails_enabled, "no provided configuration for email variables"
message = emails.Message(
subject=subject,
html=html_content,
mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
)
smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
if settings.SMTP_TLS:
smtp_options["tls"] = True
elif settings.SMTP_SSL:
smtp_options["ssl"] = True
if settings.SMTP_USER:
smtp_options["user"] = settings.SMTP_USER
if settings.SMTP_PASSWORD:
smtp_options["password"] = settings.SMTP_PASSWORD
response = message.send(to=email_to, smtp=smtp_options)
logging.info(f"send email result: {response}")
def generate_test_email(email_to: str) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Test email"
html_content = render_email_template(
template_name="test_email.html",
context={"project_name": settings.PROJECT_NAME, "email": email_to},
)
return EmailData(html_content=html_content, subject=subject)
def generate_reset_password_email(email_to: str, email: str, token: str) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Password recovery for user {email}"
link = f"{settings.server_host}/reset-password?token={token}"
html_content = render_email_template(
template_name="reset_password.html",
context={
"project_name": settings.PROJECT_NAME,
"username": email,
"email": email_to,
"valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
"link": link,
},
)
return EmailData(html_content=html_content, subject=subject)
def generate_new_account_email(
email_to: str, username: str, password: str
) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - New account for user {username}"
html_content = render_email_template(
template_name="new_account.html",
context={
"project_name": settings.PROJECT_NAME,
"username": username,
"password": password,
"email": email_to,
"link": settings.server_host,
},
)
return EmailData(html_content=html_content, subject=subject)
def generate_password_reset_token(email: str) -> str:
delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
now = datetime.now(timezone.utc)
expires = now + delta
exp = expires.timestamp()
encoded_jwt = jwt.encode(
{"exp": exp, "nbf": now, "sub": email},
settings.SECRET_KEY,
algorithm="HS256",
)
return encoded_jwt
def verify_password_reset_token(token: str) -> str | None:
try:
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
return str(decoded_token["sub"])
except InvalidTokenError:
return None
from fastapi import HTTPException, status
from typing import IO
import filetype
def validate_file_size_type(file: IO):
FILE_SIZE = 5097152 # 2MB
accepted_file_types = [
"image/png",
"image/jpeg",
"image/jpg",
"image/heic",
"image/heif",
"image/heics",
"png",
"jpeg",
"jpg",
"heic",
"heif",
"heics",
]
file_info = filetype.guess(file.file)
if file_info is None:
raise HTTPException(
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail="Unable to determine file type",
)
detected_content_type = file_info.extension.lower()
if (
file.content_type not in accepted_file_types
or detected_content_type not in accepted_file_types
):
raise HTTPException(
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail="Unsupported file type",
)
real_file_size = 0
for chunk in file.file:
real_file_size += len(chunk)
if real_file_size > FILE_SIZE:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail="Too large"
)
# async def save_picture(file, folderName: str = '', fileName: str = None):
# randon_uid = str(uuid4())
# _, f_ext = os.path.splitext(file.filename)
# picture_name = (randon_uid if fileName==None else fileName.lower().replace(' ', '')) + f_ext
# path = os.path.join(static,folderName)
# if not os.path.exists(path):
# os.makedirs(path)
# picture_path = os.path.join(path,picture_name)
# #output_size = (125,125)
# img = Image.open(file.file)
# #img.thumbnail(output_size)
# img.save(picture_path)
# return f'{static}/{folderName}/{picture_name}'
def upload_file(file_name, bucket_name, object_name=None):
if object_name is None:
object_name = file_name
try:
r2.upload_file(file_name, bucket_name, object_name)
except ClientError as e:
print(f"An error occurred: {e}")
return False
return True
def delete_file(file_name, bucket_name, object_name=None):
if object_name is None:
object_name = file_name
try:
r2.delete_object(Bucket=bucket_name, Key=file_name)
except ClientError as e:
print(f"An error occurred: {e}")
return False
return True
async def save_picture(file, folder_name: str = "", file_name: str = None):
randon_uid = str(uuid4())
_, f_ext = os.path.splitext(file.filename)
picture_name = (
randon_uid if file_name is None else file_name.lower().replace(" ", "")
) + f_ext
path = os.path.join(static, folder_name)
if not os.path.exists(path):
os.makedirs(path)
picture_path = os.path.join(path, picture_name)
# output_size = (125,125)
img = Image.open(file.file)
# img.thumbnail(output_size)
img.save(picture_path)
if upload_file(picture_path, "images"):
print(f"File {picture_path} uploaded successfully to images")
else:
print(f"File upload failed")
return f"{static}/{folder_name}/{picture_name}"
async def del_picture(picture_path):
try:
os.remove(picture_path)
delete_file(picture_path, "images")
except Exception as e:
print("Error: ", e)
return False
return True