264 lines
7.4 KiB
Python
264 lines
7.4 KiB
Python
import os
|
|
import logging
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from uuid import uuid4
|
|
from botocore.exceptions import ClientError
|
|
from PIL import Image, ImageTk
|
|
import emails # type: ignore
|
|
import jwt
|
|
from jinja2 import Template
|
|
from jwt.exceptions import InvalidTokenError
|
|
import filetype
|
|
from app.core.config import settings
|
|
|
|
static = "static"
|
|
|
|
ConnectionUrl = f"https://{settings.AccountID}.r2.cloudflarestorage.com"
|
|
|
|
r2 = boto3.client(
|
|
"s3",
|
|
endpoint_url=ConnectionUrl,
|
|
aws_access_key_id=settings.access_key_id,
|
|
aws_secret_access_key=settings.secret_access_key,
|
|
)
|
|
|
|
@dataclass
|
|
class EmailData:
|
|
html_content: str
|
|
subject: str
|
|
|
|
|
|
def render_email_template(*, template_name: str, context: dict[str, Any]) -> str:
|
|
template_str = (
|
|
Path(__file__).parent / "email-templates" / "build" / template_name
|
|
).read_text()
|
|
html_content = Template(template_str).render(context)
|
|
return html_content
|
|
|
|
|
|
def send_email(
|
|
*,
|
|
email_to: str,
|
|
subject: str = "",
|
|
html_content: str = "",
|
|
) -> None:
|
|
assert settings.emails_enabled, "no provided configuration for email variables"
|
|
message = emails.Message(
|
|
subject=subject,
|
|
html=html_content,
|
|
mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
|
|
)
|
|
smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
|
|
if settings.SMTP_TLS:
|
|
smtp_options["tls"] = True
|
|
elif settings.SMTP_SSL:
|
|
smtp_options["ssl"] = True
|
|
if settings.SMTP_USER:
|
|
smtp_options["user"] = settings.SMTP_USER
|
|
if settings.SMTP_PASSWORD:
|
|
smtp_options["password"] = settings.SMTP_PASSWORD
|
|
response = message.send(to=email_to, smtp=smtp_options)
|
|
logging.info(f"send email result: {response}")
|
|
|
|
|
|
def generate_test_email(email_to: str) -> EmailData:
|
|
project_name = settings.PROJECT_NAME
|
|
subject = f"{project_name} - Test email"
|
|
html_content = render_email_template(
|
|
template_name="test_email.html",
|
|
context={"project_name": settings.PROJECT_NAME, "email": email_to},
|
|
)
|
|
return EmailData(html_content=html_content, subject=subject)
|
|
|
|
|
|
def generate_reset_password_email(email_to: str, email: str, token: str) -> EmailData:
|
|
project_name = settings.PROJECT_NAME
|
|
subject = f"{project_name} - Password recovery for user {email}"
|
|
link = f"{settings.server_host}/reset-password?token={token}"
|
|
html_content = render_email_template(
|
|
template_name="reset_password.html",
|
|
context={
|
|
"project_name": settings.PROJECT_NAME,
|
|
"username": email,
|
|
"email": email_to,
|
|
"valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
|
|
"link": link,
|
|
},
|
|
)
|
|
return EmailData(html_content=html_content, subject=subject)
|
|
|
|
|
|
def generate_new_account_email(
|
|
email_to: str, username: str, password: str
|
|
) -> EmailData:
|
|
project_name = settings.PROJECT_NAME
|
|
subject = f"{project_name} - New account for user {username}"
|
|
html_content = render_email_template(
|
|
template_name="new_account.html",
|
|
context={
|
|
"project_name": settings.PROJECT_NAME,
|
|
"username": username,
|
|
"password": password,
|
|
"email": email_to,
|
|
"link": settings.server_host,
|
|
},
|
|
)
|
|
return EmailData(html_content=html_content, subject=subject)
|
|
|
|
|
|
def generate_password_reset_token(email: str) -> str:
|
|
delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
|
|
now = datetime.now(timezone.utc)
|
|
expires = now + delta
|
|
exp = expires.timestamp()
|
|
encoded_jwt = jwt.encode(
|
|
{"exp": exp, "nbf": now, "sub": email},
|
|
settings.SECRET_KEY,
|
|
algorithm="HS256",
|
|
)
|
|
return encoded_jwt
|
|
|
|
|
|
def verify_password_reset_token(token: str) -> str | None:
|
|
try:
|
|
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
|
|
return str(decoded_token["sub"])
|
|
except InvalidTokenError:
|
|
return None
|
|
|
|
|
|
from fastapi import HTTPException, status
|
|
from typing import IO
|
|
import filetype
|
|
|
|
|
|
def validate_file_size_type(file: IO):
|
|
FILE_SIZE = 5097152 # 2MB
|
|
|
|
accepted_file_types = [
|
|
"image/png",
|
|
"image/jpeg",
|
|
"image/jpg",
|
|
"image/heic",
|
|
"image/heif",
|
|
"image/heics",
|
|
"png",
|
|
"jpeg",
|
|
"jpg",
|
|
"heic",
|
|
"heif",
|
|
"heics",
|
|
]
|
|
file_info = filetype.guess(file.file)
|
|
if file_info is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
|
|
detail="Unable to determine file type",
|
|
)
|
|
|
|
detected_content_type = file_info.extension.lower()
|
|
|
|
if (
|
|
file.content_type not in accepted_file_types
|
|
or detected_content_type not in accepted_file_types
|
|
):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
|
|
detail="Unsupported file type",
|
|
)
|
|
|
|
real_file_size = 0
|
|
for chunk in file.file:
|
|
real_file_size += len(chunk)
|
|
if real_file_size > FILE_SIZE:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail="Too large"
|
|
)
|
|
|
|
|
|
# async def save_picture(file, folderName: str = '', fileName: str = None):
|
|
# randon_uid = str(uuid4())
|
|
# _, f_ext = os.path.splitext(file.filename)
|
|
|
|
# picture_name = (randon_uid if fileName==None else fileName.lower().replace(' ', '')) + f_ext
|
|
|
|
# path = os.path.join(static,folderName)
|
|
# if not os.path.exists(path):
|
|
# os.makedirs(path)
|
|
|
|
# picture_path = os.path.join(path,picture_name)
|
|
|
|
# #output_size = (125,125)
|
|
# img = Image.open(file.file)
|
|
|
|
# #img.thumbnail(output_size)
|
|
# img.save(picture_path)
|
|
|
|
# return f'{static}/{folderName}/{picture_name}'
|
|
|
|
|
|
def upload_file(file_name, bucket_name, object_name=None):
|
|
|
|
|
|
if object_name is None:
|
|
object_name = file_name
|
|
|
|
try:
|
|
r2.upload_file(file_name, bucket_name, object_name)
|
|
except ClientError as e:
|
|
print(f"An error occurred: {e}")
|
|
return False
|
|
return True
|
|
|
|
def delete_file(file_name, bucket_name, object_name=None):
|
|
if object_name is None:
|
|
object_name = file_name
|
|
try:
|
|
r2.delete_object(Bucket=bucket_name, Key=file_name)
|
|
|
|
except ClientError as e:
|
|
print(f"An error occurred: {e}")
|
|
return False
|
|
return True
|
|
async def save_picture(file, folder_name: str = "", file_name: str = None):
|
|
|
|
randon_uid = str(uuid4())
|
|
_, f_ext = os.path.splitext(file.filename)
|
|
|
|
picture_name = (
|
|
randon_uid if file_name is None else file_name.lower().replace(" ", "")
|
|
) + f_ext
|
|
|
|
path = os.path.join(static, folder_name)
|
|
if not os.path.exists(path):
|
|
os.makedirs(path)
|
|
|
|
picture_path = os.path.join(path, picture_name)
|
|
|
|
# output_size = (125,125)
|
|
img = Image.open(file.file)
|
|
|
|
# img.thumbnail(output_size)
|
|
img.save(picture_path)
|
|
|
|
if upload_file(picture_path, "images"):
|
|
print(f"File {picture_path} uploaded successfully to images")
|
|
else:
|
|
print(f"File upload failed")
|
|
|
|
return f"{static}/{folder_name}/{picture_name}"
|
|
|
|
async def del_picture(picture_path):
|
|
try:
|
|
os.remove(picture_path)
|
|
delete_file(picture_path, "images")
|
|
except Exception as e:
|
|
print("Error: ", e)
|
|
return False
|
|
return True
|