import os import logging import boto3 from botocore.exceptions import ClientError from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any from uuid import uuid4 from botocore.exceptions import ClientError from PIL import Image, ImageTk import emails # type: ignore import jwt from jinja2 import Template from jwt.exceptions import InvalidTokenError import filetype from app.core.config import settings static = "static" @dataclass class EmailData: html_content: str subject: str def render_email_template(*, template_name: str, context: dict[str, Any]) -> str: template_str = ( Path(__file__).parent / "email-templates" / "build" / template_name ).read_text() html_content = Template(template_str).render(context) return html_content def send_email( *, email_to: str, subject: str = "", html_content: str = "", ) -> None: assert settings.emails_enabled, "no provided configuration for email variables" message = emails.Message( subject=subject, html=html_content, mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL), ) smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT} if settings.SMTP_TLS: smtp_options["tls"] = True elif settings.SMTP_SSL: smtp_options["ssl"] = True if settings.SMTP_USER: smtp_options["user"] = settings.SMTP_USER if settings.SMTP_PASSWORD: smtp_options["password"] = settings.SMTP_PASSWORD response = message.send(to=email_to, smtp=smtp_options) logging.info(f"send email result: {response}") def generate_test_email(email_to: str) -> EmailData: project_name = settings.PROJECT_NAME subject = f"{project_name} - Test email" html_content = render_email_template( template_name="test_email.html", context={"project_name": settings.PROJECT_NAME, "email": email_to}, ) return EmailData(html_content=html_content, subject=subject) def generate_reset_password_email(email_to: str, email: str, token: str) -> EmailData: project_name = settings.PROJECT_NAME subject = f"{project_name} - Password recovery for user {email}" link = f"{settings.server_host}/reset-password?token={token}" html_content = render_email_template( template_name="reset_password.html", context={ "project_name": settings.PROJECT_NAME, "username": email, "email": email_to, "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS, "link": link, }, ) return EmailData(html_content=html_content, subject=subject) def generate_new_account_email( email_to: str, username: str, password: str ) -> EmailData: project_name = settings.PROJECT_NAME subject = f"{project_name} - New account for user {username}" html_content = render_email_template( template_name="new_account.html", context={ "project_name": settings.PROJECT_NAME, "username": username, "password": password, "email": email_to, "link": settings.server_host, }, ) return EmailData(html_content=html_content, subject=subject) def generate_password_reset_token(email: str) -> str: delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS) now = datetime.now(timezone.utc) expires = now + delta exp = expires.timestamp() encoded_jwt = jwt.encode( {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256", ) return encoded_jwt def verify_password_reset_token(token: str) -> str | None: try: decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) return str(decoded_token["sub"]) except InvalidTokenError: return None from fastapi import HTTPException, status from typing import IO import filetype def validate_file_size_type(file: IO): FILE_SIZE = 5097152 # 2MB accepted_file_types = [ "image/png", "image/jpeg", "image/jpg", "image/heic", "image/heif", "image/heics", "png", "jpeg", "jpg", "heic", "heif", "heics", ] file_info = filetype.guess(file.file) if file_info is None: raise HTTPException( status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, detail="Unable to determine file type", ) detected_content_type = file_info.extension.lower() if ( file.content_type not in accepted_file_types or detected_content_type not in accepted_file_types ): raise HTTPException( status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, detail="Unsupported file type", ) real_file_size = 0 for chunk in file.file: real_file_size += len(chunk) if real_file_size > FILE_SIZE: raise HTTPException( status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail="Too large" ) # async def save_picture(file, folderName: str = '', fileName: str = None): # randon_uid = str(uuid4()) # _, f_ext = os.path.splitext(file.filename) # picture_name = (randon_uid if fileName==None else fileName.lower().replace(' ', '')) + f_ext # path = os.path.join(static,folderName) # if not os.path.exists(path): # os.makedirs(path) # picture_path = os.path.join(path,picture_name) # #output_size = (125,125) # img = Image.open(file.file) # #img.thumbnail(output_size) # img.save(picture_path) # return f'{static}/{folderName}/{picture_name}' async def upload_file(file_name, bucket_name, object_name=None): ConnectionUrl = f"https://{settings.AccountID}.r2.cloudflarestorage.com" r2 = boto3.client( "s3", endpoint_url=ConnectionUrl, aws_access_key_id=settings.access_key_id, aws_secret_access_key=settings.secret_access_key, ) if object_name is None: object_name = file_name try: r2.upload_file(file_name, bucket_name, object_name) except ClientError as e: print(f"An error occurred: {e}") return False return True async def save_picture(file, folder_name: str = "", file_name: str = None): import io r2 = boto3.client( "s3", endpoint_url=f"https://{settings.AccountID}.r2.cloudflarestorage.com", aws_access_key_id=settings.access_key_id, aws_secret_access_key=settings.secret_access_key, ) randon_uid = str(uuid4()) _, f_ext = os.path.splitext(file.filename) picture_name = ( randon_uid if file_name is None else file_name.lower().replace(" ", "") ) + f_ext path = os.path.join(static, folder_name) if not os.path.exists(path): os.makedirs(path) picture_path = os.path.join(path, picture_name) # output_size = (125,125) img = Image.open(file.file) # img.thumbnail(output_size) img.save(picture_path) ConnectionUrl = f"https://{settings.AccountID}.r2.cloudflarestorage.com" S3Connect = boto3.client( "s3", endpoint_url=ConnectionUrl, aws_access_key_id=settings.access_key_id, aws_secret_access_key=settings.secret_access_key, ) if upload_file(picture_path, "images"): print(f"File {picture_path} uploaded successfully to images") else: print(f"File upload failed") return f"{static}/{folder_name}/{picture_name}" async def del_picture(picture_path): try: os.remove(picture_path) except Exception as e: print("Error: ", e) return False return True