first commit
This commit is contained in:
0
backend/app/tests/__init__.py
Normal file
0
backend/app/tests/__init__.py
Normal file
0
backend/app/tests/api/__init__.py
Normal file
0
backend/app/tests/api/__init__.py
Normal file
0
backend/app/tests/api/routes/__init__.py
Normal file
0
backend/app/tests/api/routes/__init__.py
Normal file
164
backend/app/tests/api/routes/test_items.py
Normal file
164
backend/app/tests/api/routes/test_items.py
Normal file
@@ -0,0 +1,164 @@
|
||||
import uuid
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlmodel import Session
|
||||
|
||||
from app.core.config import settings
|
||||
from app.tests.utils.item import create_random_item
|
||||
|
||||
|
||||
def test_create_item(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
data = {"title": "Foo", "description": "Fighters"}
|
||||
response = client.post(
|
||||
f"{settings.API_V1_STR}/items/",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
content = response.json()
|
||||
assert content["title"] == data["title"]
|
||||
assert content["description"] == data["description"]
|
||||
assert "id" in content
|
||||
assert "owner_id" in content
|
||||
|
||||
|
||||
def test_read_item(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
item = create_random_item(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/items/{item.id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
content = response.json()
|
||||
assert content["title"] == item.title
|
||||
assert content["description"] == item.description
|
||||
assert content["id"] == str(item.id)
|
||||
assert content["owner_id"] == str(item.owner_id)
|
||||
|
||||
|
||||
def test_read_item_not_found(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/items/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == 404
|
||||
content = response.json()
|
||||
assert content["detail"] == "Item not found"
|
||||
|
||||
|
||||
def test_read_item_not_enough_permissions(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
item = create_random_item(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/items/{item.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == 400
|
||||
content = response.json()
|
||||
assert content["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_read_items(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
create_random_item(db)
|
||||
create_random_item(db)
|
||||
response = client.get(
|
||||
f"{settings.API_V1_STR}/items/",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
content = response.json()
|
||||
assert len(content["data"]) >= 2
|
||||
|
||||
|
||||
def test_update_item(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
item = create_random_item(db)
|
||||
data = {"title": "Updated title", "description": "Updated description"}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/items/{item.id}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
content = response.json()
|
||||
assert content["title"] == data["title"]
|
||||
assert content["description"] == data["description"]
|
||||
assert content["id"] == str(item.id)
|
||||
assert content["owner_id"] == str(item.owner_id)
|
||||
|
||||
|
||||
def test_update_item_not_found(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
data = {"title": "Updated title", "description": "Updated description"}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/items/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == 404
|
||||
content = response.json()
|
||||
assert content["detail"] == "Item not found"
|
||||
|
||||
|
||||
def test_update_item_not_enough_permissions(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
item = create_random_item(db)
|
||||
data = {"title": "Updated title", "description": "Updated description"}
|
||||
response = client.put(
|
||||
f"{settings.API_V1_STR}/items/{item.id}",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert response.status_code == 400
|
||||
content = response.json()
|
||||
assert content["detail"] == "Not enough permissions"
|
||||
|
||||
|
||||
def test_delete_item(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
item = create_random_item(db)
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/items/{item.id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
content = response.json()
|
||||
assert content["message"] == "Item deleted successfully"
|
||||
|
||||
|
||||
def test_delete_item_not_found(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/items/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert response.status_code == 404
|
||||
content = response.json()
|
||||
assert content["detail"] == "Item not found"
|
||||
|
||||
|
||||
def test_delete_item_not_enough_permissions(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
item = create_random_item(db)
|
||||
response = client.delete(
|
||||
f"{settings.API_V1_STR}/items/{item.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert response.status_code == 400
|
||||
content = response.json()
|
||||
assert content["detail"] == "Not enough permissions"
|
104
backend/app/tests/api/routes/test_login.py
Normal file
104
backend/app/tests/api/routes/test_login.py
Normal file
@@ -0,0 +1,104 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlmodel import Session, select
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.security import verify_password
|
||||
from app.models import User
|
||||
from app.utils import generate_password_reset_token
|
||||
|
||||
|
||||
def test_get_access_token(client: TestClient) -> None:
|
||||
login_data = {
|
||||
"username": settings.FIRST_SUPERUSER,
|
||||
"password": settings.FIRST_SUPERUSER_PASSWORD,
|
||||
}
|
||||
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
|
||||
tokens = r.json()
|
||||
assert r.status_code == 200
|
||||
assert "access_token" in tokens
|
||||
assert tokens["access_token"]
|
||||
|
||||
|
||||
def test_get_access_token_incorrect_password(client: TestClient) -> None:
|
||||
login_data = {
|
||||
"username": settings.FIRST_SUPERUSER,
|
||||
"password": "incorrect",
|
||||
}
|
||||
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
|
||||
assert r.status_code == 400
|
||||
|
||||
|
||||
def test_use_access_token(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
r = client.post(
|
||||
f"{settings.API_V1_STR}/login/test-token",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
result = r.json()
|
||||
assert r.status_code == 200
|
||||
assert "email" in result
|
||||
|
||||
|
||||
def test_recovery_password(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
with (
|
||||
patch("app.core.config.settings.SMTP_HOST", "smtp.example.com"),
|
||||
patch("app.core.config.settings.SMTP_USER", "admin@example.com"),
|
||||
):
|
||||
email = "test@example.com"
|
||||
r = client.post(
|
||||
f"{settings.API_V1_STR}/password-recovery/{email}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert r.json() == {"message": "Password recovery email sent"}
|
||||
|
||||
|
||||
def test_recovery_password_user_not_exits(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
email = "jVgQr@example.com"
|
||||
r = client.post(
|
||||
f"{settings.API_V1_STR}/password-recovery/{email}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert r.status_code == 404
|
||||
|
||||
|
||||
def test_reset_password(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
token = generate_password_reset_token(email=settings.FIRST_SUPERUSER)
|
||||
data = {"new_password": "changethis", "token": token}
|
||||
r = client.post(
|
||||
f"{settings.API_V1_STR}/reset-password/",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert r.json() == {"message": "Password updated successfully"}
|
||||
|
||||
user_query = select(User).where(User.email == settings.FIRST_SUPERUSER)
|
||||
user = db.exec(user_query).first()
|
||||
assert user
|
||||
assert verify_password(data["new_password"], user.hashed_password)
|
||||
|
||||
|
||||
def test_reset_password_invalid_token(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
data = {"new_password": "changethis", "token": "invalid"}
|
||||
r = client.post(
|
||||
f"{settings.API_V1_STR}/reset-password/",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
response = r.json()
|
||||
|
||||
assert "detail" in response
|
||||
assert r.status_code == 400
|
||||
assert response["detail"] == "Invalid token"
|
486
backend/app/tests/api/routes/test_users.py
Normal file
486
backend/app/tests/api/routes/test_users.py
Normal file
@@ -0,0 +1,486 @@
|
||||
import uuid
|
||||
from unittest.mock import patch
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlmodel import Session, select
|
||||
|
||||
from app import crud
|
||||
from app.core.config import settings
|
||||
from app.core.security import verify_password
|
||||
from app.models import User, UserCreate
|
||||
from app.tests.utils.utils import random_email, random_lower_string
|
||||
|
||||
|
||||
def test_get_users_superuser_me(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
r = client.get(f"{settings.API_V1_STR}/users/me", headers=superuser_token_headers)
|
||||
current_user = r.json()
|
||||
assert current_user
|
||||
assert current_user["is_active"] is True
|
||||
assert current_user["is_superuser"]
|
||||
assert current_user["email"] == settings.FIRST_SUPERUSER
|
||||
|
||||
|
||||
def test_get_users_normal_user_me(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
r = client.get(f"{settings.API_V1_STR}/users/me", headers=normal_user_token_headers)
|
||||
current_user = r.json()
|
||||
assert current_user
|
||||
assert current_user["is_active"] is True
|
||||
assert current_user["is_superuser"] is False
|
||||
assert current_user["email"] == settings.EMAIL_TEST_USER
|
||||
|
||||
|
||||
def test_create_user_new_email(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
with (
|
||||
patch("app.utils.send_email", return_value=None),
|
||||
patch("app.core.config.settings.SMTP_HOST", "smtp.example.com"),
|
||||
patch("app.core.config.settings.SMTP_USER", "admin@example.com"),
|
||||
):
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
data = {"email": username, "password": password}
|
||||
r = client.post(
|
||||
f"{settings.API_V1_STR}/users/",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert 200 <= r.status_code < 300
|
||||
created_user = r.json()
|
||||
user = crud.get_user_by_email(session=db, email=username)
|
||||
assert user
|
||||
assert user.email == created_user["email"]
|
||||
|
||||
|
||||
def test_get_existing_user(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
user_id = user.id
|
||||
r = client.get(
|
||||
f"{settings.API_V1_STR}/users/{user_id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert 200 <= r.status_code < 300
|
||||
api_user = r.json()
|
||||
existing_user = crud.get_user_by_email(session=db, email=username)
|
||||
assert existing_user
|
||||
assert existing_user.email == api_user["email"]
|
||||
|
||||
|
||||
def test_get_existing_user_current_user(client: TestClient, db: Session) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
user_id = user.id
|
||||
|
||||
login_data = {
|
||||
"username": username,
|
||||
"password": password,
|
||||
}
|
||||
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
|
||||
tokens = r.json()
|
||||
a_token = tokens["access_token"]
|
||||
headers = {"Authorization": f"Bearer {a_token}"}
|
||||
|
||||
r = client.get(
|
||||
f"{settings.API_V1_STR}/users/{user_id}",
|
||||
headers=headers,
|
||||
)
|
||||
assert 200 <= r.status_code < 300
|
||||
api_user = r.json()
|
||||
existing_user = crud.get_user_by_email(session=db, email=username)
|
||||
assert existing_user
|
||||
assert existing_user.email == api_user["email"]
|
||||
|
||||
|
||||
def test_get_existing_user_permissions_error(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
r = client.get(
|
||||
f"{settings.API_V1_STR}/users/{uuid.uuid4()}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert r.status_code == 403
|
||||
assert r.json() == {"detail": "The user doesn't have enough privileges"}
|
||||
|
||||
|
||||
def test_create_user_existing_username(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
username = random_email()
|
||||
# username = email
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
crud.create_user(session=db, user_create=user_in)
|
||||
data = {"email": username, "password": password}
|
||||
r = client.post(
|
||||
f"{settings.API_V1_STR}/users/",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
created_user = r.json()
|
||||
assert r.status_code == 400
|
||||
assert "_id" not in created_user
|
||||
|
||||
|
||||
def test_create_user_by_normal_user(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
data = {"email": username, "password": password}
|
||||
r = client.post(
|
||||
f"{settings.API_V1_STR}/users/",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 403
|
||||
|
||||
|
||||
def test_retrieve_users(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
crud.create_user(session=db, user_create=user_in)
|
||||
|
||||
username2 = random_email()
|
||||
password2 = random_lower_string()
|
||||
user_in2 = UserCreate(email=username2, password=password2)
|
||||
crud.create_user(session=db, user_create=user_in2)
|
||||
|
||||
r = client.get(f"{settings.API_V1_STR}/users/", headers=superuser_token_headers)
|
||||
all_users = r.json()
|
||||
|
||||
assert len(all_users["data"]) > 1
|
||||
assert "count" in all_users
|
||||
for item in all_users["data"]:
|
||||
assert "email" in item
|
||||
|
||||
|
||||
def test_update_user_me(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
full_name = "Updated Name"
|
||||
email = random_email()
|
||||
data = {"full_name": full_name, "email": email}
|
||||
r = client.patch(
|
||||
f"{settings.API_V1_STR}/users/me",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
updated_user = r.json()
|
||||
assert updated_user["email"] == email
|
||||
assert updated_user["full_name"] == full_name
|
||||
|
||||
user_query = select(User).where(User.email == email)
|
||||
user_db = db.exec(user_query).first()
|
||||
assert user_db
|
||||
assert user_db.email == email
|
||||
assert user_db.full_name == full_name
|
||||
|
||||
|
||||
def test_update_password_me(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
new_password = random_lower_string()
|
||||
data = {
|
||||
"current_password": settings.FIRST_SUPERUSER_PASSWORD,
|
||||
"new_password": new_password,
|
||||
}
|
||||
r = client.patch(
|
||||
f"{settings.API_V1_STR}/users/me/password",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
updated_user = r.json()
|
||||
assert updated_user["message"] == "Password updated successfully"
|
||||
|
||||
user_query = select(User).where(User.email == settings.FIRST_SUPERUSER)
|
||||
user_db = db.exec(user_query).first()
|
||||
assert user_db
|
||||
assert user_db.email == settings.FIRST_SUPERUSER
|
||||
assert verify_password(new_password, user_db.hashed_password)
|
||||
|
||||
# Revert to the old password to keep consistency in test
|
||||
old_data = {
|
||||
"current_password": new_password,
|
||||
"new_password": settings.FIRST_SUPERUSER_PASSWORD,
|
||||
}
|
||||
r = client.patch(
|
||||
f"{settings.API_V1_STR}/users/me/password",
|
||||
headers=superuser_token_headers,
|
||||
json=old_data,
|
||||
)
|
||||
db.refresh(user_db)
|
||||
|
||||
assert r.status_code == 200
|
||||
assert verify_password(settings.FIRST_SUPERUSER_PASSWORD, user_db.hashed_password)
|
||||
|
||||
|
||||
def test_update_password_me_incorrect_password(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
new_password = random_lower_string()
|
||||
data = {"current_password": new_password, "new_password": new_password}
|
||||
r = client.patch(
|
||||
f"{settings.API_V1_STR}/users/me/password",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 400
|
||||
updated_user = r.json()
|
||||
assert updated_user["detail"] == "Incorrect password"
|
||||
|
||||
|
||||
def test_update_user_me_email_exists(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
|
||||
data = {"email": user.email}
|
||||
r = client.patch(
|
||||
f"{settings.API_V1_STR}/users/me",
|
||||
headers=normal_user_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 409
|
||||
assert r.json()["detail"] == "User with this email already exists"
|
||||
|
||||
|
||||
def test_update_password_me_same_password_error(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
data = {
|
||||
"current_password": settings.FIRST_SUPERUSER_PASSWORD,
|
||||
"new_password": settings.FIRST_SUPERUSER_PASSWORD,
|
||||
}
|
||||
r = client.patch(
|
||||
f"{settings.API_V1_STR}/users/me/password",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 400
|
||||
updated_user = r.json()
|
||||
assert (
|
||||
updated_user["detail"] == "New password cannot be the same as the current one"
|
||||
)
|
||||
|
||||
|
||||
def test_register_user(client: TestClient, db: Session) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
full_name = random_lower_string()
|
||||
data = {"email": username, "password": password, "full_name": full_name}
|
||||
r = client.post(
|
||||
f"{settings.API_V1_STR}/users/signup",
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
created_user = r.json()
|
||||
assert created_user["email"] == username
|
||||
assert created_user["full_name"] == full_name
|
||||
|
||||
user_query = select(User).where(User.email == username)
|
||||
user_db = db.exec(user_query).first()
|
||||
assert user_db
|
||||
assert user_db.email == username
|
||||
assert user_db.full_name == full_name
|
||||
assert verify_password(password, user_db.hashed_password)
|
||||
|
||||
|
||||
def test_register_user_already_exists_error(client: TestClient) -> None:
|
||||
password = random_lower_string()
|
||||
full_name = random_lower_string()
|
||||
data = {
|
||||
"email": settings.FIRST_SUPERUSER,
|
||||
"password": password,
|
||||
"full_name": full_name,
|
||||
}
|
||||
r = client.post(
|
||||
f"{settings.API_V1_STR}/users/signup",
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 400
|
||||
assert r.json()["detail"] == "The user with this email already exists in the system"
|
||||
|
||||
|
||||
def test_update_user(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
|
||||
data = {"full_name": "Updated_full_name"}
|
||||
r = client.patch(
|
||||
f"{settings.API_V1_STR}/users/{user.id}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
updated_user = r.json()
|
||||
|
||||
assert updated_user["full_name"] == "Updated_full_name"
|
||||
|
||||
user_query = select(User).where(User.email == username)
|
||||
user_db = db.exec(user_query).first()
|
||||
db.refresh(user_db)
|
||||
assert user_db
|
||||
assert user_db.full_name == "Updated_full_name"
|
||||
|
||||
|
||||
def test_update_user_not_exists(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
data = {"full_name": "Updated_full_name"}
|
||||
r = client.patch(
|
||||
f"{settings.API_V1_STR}/users/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 404
|
||||
assert r.json()["detail"] == "The user with this id does not exist in the system"
|
||||
|
||||
|
||||
def test_update_user_email_exists(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
|
||||
username2 = random_email()
|
||||
password2 = random_lower_string()
|
||||
user_in2 = UserCreate(email=username2, password=password2)
|
||||
user2 = crud.create_user(session=db, user_create=user_in2)
|
||||
|
||||
data = {"email": user2.email}
|
||||
r = client.patch(
|
||||
f"{settings.API_V1_STR}/users/{user.id}",
|
||||
headers=superuser_token_headers,
|
||||
json=data,
|
||||
)
|
||||
assert r.status_code == 409
|
||||
assert r.json()["detail"] == "User with this email already exists"
|
||||
|
||||
|
||||
def test_delete_user_me(client: TestClient, db: Session) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
user_id = user.id
|
||||
|
||||
login_data = {
|
||||
"username": username,
|
||||
"password": password,
|
||||
}
|
||||
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
|
||||
tokens = r.json()
|
||||
a_token = tokens["access_token"]
|
||||
headers = {"Authorization": f"Bearer {a_token}"}
|
||||
|
||||
r = client.delete(
|
||||
f"{settings.API_V1_STR}/users/me",
|
||||
headers=headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
deleted_user = r.json()
|
||||
assert deleted_user["message"] == "User deleted successfully"
|
||||
result = db.exec(select(User).where(User.id == user_id)).first()
|
||||
assert result is None
|
||||
|
||||
user_query = select(User).where(User.id == user_id)
|
||||
user_db = db.execute(user_query).first()
|
||||
assert user_db is None
|
||||
|
||||
|
||||
def test_delete_user_me_as_superuser(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
r = client.delete(
|
||||
f"{settings.API_V1_STR}/users/me",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert r.status_code == 403
|
||||
response = r.json()
|
||||
assert response["detail"] == "Super users are not allowed to delete themselves"
|
||||
|
||||
|
||||
def test_delete_user_super_user(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
user_id = user.id
|
||||
r = client.delete(
|
||||
f"{settings.API_V1_STR}/users/{user_id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
deleted_user = r.json()
|
||||
assert deleted_user["message"] == "User deleted successfully"
|
||||
result = db.exec(select(User).where(User.id == user_id)).first()
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_delete_user_not_found(
|
||||
client: TestClient, superuser_token_headers: dict[str, str]
|
||||
) -> None:
|
||||
r = client.delete(
|
||||
f"{settings.API_V1_STR}/users/{uuid.uuid4()}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert r.status_code == 404
|
||||
assert r.json()["detail"] == "User not found"
|
||||
|
||||
|
||||
def test_delete_user_current_super_user_error(
|
||||
client: TestClient, superuser_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
super_user = crud.get_user_by_email(session=db, email=settings.FIRST_SUPERUSER)
|
||||
assert super_user
|
||||
user_id = super_user.id
|
||||
|
||||
r = client.delete(
|
||||
f"{settings.API_V1_STR}/users/{user_id}",
|
||||
headers=superuser_token_headers,
|
||||
)
|
||||
assert r.status_code == 403
|
||||
assert r.json()["detail"] == "Super users are not allowed to delete themselves"
|
||||
|
||||
|
||||
def test_delete_user_without_privileges(
|
||||
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
|
||||
) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
|
||||
r = client.delete(
|
||||
f"{settings.API_V1_STR}/users/{user.id}",
|
||||
headers=normal_user_token_headers,
|
||||
)
|
||||
assert r.status_code == 403
|
||||
assert r.json()["detail"] == "The user doesn't have enough privileges"
|
42
backend/app/tests/conftest.py
Normal file
42
backend/app/tests/conftest.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from collections.abc import Generator
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlmodel import Session, delete
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.db import engine, init_db
|
||||
from app.main import app
|
||||
from app.models import Item, User
|
||||
from app.tests.utils.user import authentication_token_from_email
|
||||
from app.tests.utils.utils import get_superuser_token_headers
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def db() -> Generator[Session, None, None]:
|
||||
with Session(engine) as session:
|
||||
init_db(session)
|
||||
yield session
|
||||
statement = delete(Item)
|
||||
session.execute(statement)
|
||||
statement = delete(User)
|
||||
session.execute(statement)
|
||||
session.commit()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def client() -> Generator[TestClient, None, None]:
|
||||
with TestClient(app) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def superuser_token_headers(client: TestClient) -> dict[str, str]:
|
||||
return get_superuser_token_headers(client)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def normal_user_token_headers(client: TestClient, db: Session) -> dict[str, str]:
|
||||
return authentication_token_from_email(
|
||||
client=client, email=settings.EMAIL_TEST_USER, db=db
|
||||
)
|
0
backend/app/tests/crud/__init__.py
Normal file
0
backend/app/tests/crud/__init__.py
Normal file
91
backend/app/tests/crud/test_user.py
Normal file
91
backend/app/tests/crud/test_user.py
Normal file
@@ -0,0 +1,91 @@
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from sqlmodel import Session
|
||||
|
||||
from app import crud
|
||||
from app.core.security import verify_password
|
||||
from app.models import User, UserCreate, UserUpdate
|
||||
from app.tests.utils.utils import random_email, random_lower_string
|
||||
|
||||
|
||||
def test_create_user(db: Session) -> None:
|
||||
email = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=email, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
assert user.email == email
|
||||
assert hasattr(user, "hashed_password")
|
||||
|
||||
|
||||
def test_authenticate_user(db: Session) -> None:
|
||||
email = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=email, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
authenticated_user = crud.authenticate(session=db, email=email, password=password)
|
||||
assert authenticated_user
|
||||
assert user.email == authenticated_user.email
|
||||
|
||||
|
||||
def test_not_authenticate_user(db: Session) -> None:
|
||||
email = random_email()
|
||||
password = random_lower_string()
|
||||
user = crud.authenticate(session=db, email=email, password=password)
|
||||
assert user is None
|
||||
|
||||
|
||||
def test_check_if_user_is_active(db: Session) -> None:
|
||||
email = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=email, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
assert user.is_active is True
|
||||
|
||||
|
||||
def test_check_if_user_is_active_inactive(db: Session) -> None:
|
||||
email = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=email, password=password, disabled=True)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
assert user.is_active
|
||||
|
||||
|
||||
def test_check_if_user_is_superuser(db: Session) -> None:
|
||||
email = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=email, password=password, is_superuser=True)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
assert user.is_superuser is True
|
||||
|
||||
|
||||
def test_check_if_user_is_superuser_normal_user(db: Session) -> None:
|
||||
username = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=username, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
assert user.is_superuser is False
|
||||
|
||||
|
||||
def test_get_user(db: Session) -> None:
|
||||
password = random_lower_string()
|
||||
username = random_email()
|
||||
user_in = UserCreate(email=username, password=password, is_superuser=True)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
user_2 = db.get(User, user.id)
|
||||
assert user_2
|
||||
assert user.email == user_2.email
|
||||
assert jsonable_encoder(user) == jsonable_encoder(user_2)
|
||||
|
||||
|
||||
def test_update_user(db: Session) -> None:
|
||||
password = random_lower_string()
|
||||
email = random_email()
|
||||
user_in = UserCreate(email=email, password=password, is_superuser=True)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
new_password = random_lower_string()
|
||||
user_in_update = UserUpdate(password=new_password, is_superuser=True)
|
||||
if user.id is not None:
|
||||
crud.update_user(session=db, db_user=user, user_in=user_in_update)
|
||||
user_2 = db.get(User, user.id)
|
||||
assert user_2
|
||||
assert user.email == user_2.email
|
||||
assert verify_password(new_password, user_2.hashed_password)
|
0
backend/app/tests/scripts/__init__.py
Normal file
0
backend/app/tests/scripts/__init__.py
Normal file
33
backend/app/tests/scripts/test_backend_pre_start.py
Normal file
33
backend/app/tests/scripts/test_backend_pre_start.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from sqlmodel import select
|
||||
|
||||
from app.backend_pre_start import init, logger
|
||||
|
||||
|
||||
def test_init_successful_connection() -> None:
|
||||
engine_mock = MagicMock()
|
||||
|
||||
session_mock = MagicMock()
|
||||
exec_mock = MagicMock(return_value=True)
|
||||
session_mock.configure_mock(**{"exec.return_value": exec_mock})
|
||||
|
||||
with (
|
||||
patch("sqlmodel.Session", return_value=session_mock),
|
||||
patch.object(logger, "info"),
|
||||
patch.object(logger, "error"),
|
||||
patch.object(logger, "warn"),
|
||||
):
|
||||
try:
|
||||
init(engine_mock)
|
||||
connection_successful = True
|
||||
except Exception:
|
||||
connection_successful = False
|
||||
|
||||
assert (
|
||||
connection_successful
|
||||
), "The database connection should be successful and not raise an exception."
|
||||
|
||||
assert session_mock.exec.called_once_with(
|
||||
select(1)
|
||||
), "The session should execute a select statement once."
|
33
backend/app/tests/scripts/test_test_pre_start.py
Normal file
33
backend/app/tests/scripts/test_test_pre_start.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from sqlmodel import select
|
||||
|
||||
from app.tests_pre_start import init, logger
|
||||
|
||||
|
||||
def test_init_successful_connection() -> None:
|
||||
engine_mock = MagicMock()
|
||||
|
||||
session_mock = MagicMock()
|
||||
exec_mock = MagicMock(return_value=True)
|
||||
session_mock.configure_mock(**{"exec.return_value": exec_mock})
|
||||
|
||||
with (
|
||||
patch("sqlmodel.Session", return_value=session_mock),
|
||||
patch.object(logger, "info"),
|
||||
patch.object(logger, "error"),
|
||||
patch.object(logger, "warn"),
|
||||
):
|
||||
try:
|
||||
init(engine_mock)
|
||||
connection_successful = True
|
||||
except Exception:
|
||||
connection_successful = False
|
||||
|
||||
assert (
|
||||
connection_successful
|
||||
), "The database connection should be successful and not raise an exception."
|
||||
|
||||
assert session_mock.exec.called_once_with(
|
||||
select(1)
|
||||
), "The session should execute a select statement once."
|
0
backend/app/tests/utils/__init__.py
Normal file
0
backend/app/tests/utils/__init__.py
Normal file
16
backend/app/tests/utils/item.py
Normal file
16
backend/app/tests/utils/item.py
Normal file
@@ -0,0 +1,16 @@
|
||||
from sqlmodel import Session
|
||||
|
||||
from app import crud
|
||||
from app.models import Item, ItemCreate
|
||||
from app.tests.utils.user import create_random_user
|
||||
from app.tests.utils.utils import random_lower_string
|
||||
|
||||
|
||||
def create_random_item(db: Session) -> Item:
|
||||
user = create_random_user(db)
|
||||
owner_id = user.id
|
||||
assert owner_id is not None
|
||||
title = random_lower_string()
|
||||
description = random_lower_string()
|
||||
item_in = ItemCreate(title=title, description=description)
|
||||
return crud.create_item(session=db, item_in=item_in, owner_id=owner_id)
|
49
backend/app/tests/utils/user.py
Normal file
49
backend/app/tests/utils/user.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlmodel import Session
|
||||
|
||||
from app import crud
|
||||
from app.core.config import settings
|
||||
from app.models import User, UserCreate, UserUpdate
|
||||
from app.tests.utils.utils import random_email, random_lower_string
|
||||
|
||||
|
||||
def user_authentication_headers(
|
||||
*, client: TestClient, email: str, password: str
|
||||
) -> dict[str, str]:
|
||||
data = {"username": email, "password": password}
|
||||
|
||||
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=data)
|
||||
response = r.json()
|
||||
auth_token = response["access_token"]
|
||||
headers = {"Authorization": f"Bearer {auth_token}"}
|
||||
return headers
|
||||
|
||||
|
||||
def create_random_user(db: Session) -> User:
|
||||
email = random_email()
|
||||
password = random_lower_string()
|
||||
user_in = UserCreate(email=email, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in)
|
||||
return user
|
||||
|
||||
|
||||
def authentication_token_from_email(
|
||||
*, client: TestClient, email: str, db: Session
|
||||
) -> dict[str, str]:
|
||||
"""
|
||||
Return a valid token for the user with given email.
|
||||
|
||||
If the user doesn't exist it is created first.
|
||||
"""
|
||||
password = random_lower_string()
|
||||
user = crud.get_user_by_email(session=db, email=email)
|
||||
if not user:
|
||||
user_in_create = UserCreate(email=email, password=password)
|
||||
user = crud.create_user(session=db, user_create=user_in_create)
|
||||
else:
|
||||
user_in_update = UserUpdate(password=password)
|
||||
if not user.id:
|
||||
raise Exception("User id not set")
|
||||
user = crud.update_user(session=db, db_user=user, user_in=user_in_update)
|
||||
|
||||
return user_authentication_headers(client=client, email=email, password=password)
|
26
backend/app/tests/utils/utils.py
Normal file
26
backend/app/tests/utils/utils.py
Normal file
@@ -0,0 +1,26 @@
|
||||
import random
|
||||
import string
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
|
||||
def random_lower_string() -> str:
|
||||
return "".join(random.choices(string.ascii_lowercase, k=32))
|
||||
|
||||
|
||||
def random_email() -> str:
|
||||
return f"{random_lower_string()}@{random_lower_string()}.com"
|
||||
|
||||
|
||||
def get_superuser_token_headers(client: TestClient) -> dict[str, str]:
|
||||
login_data = {
|
||||
"username": settings.FIRST_SUPERUSER,
|
||||
"password": settings.FIRST_SUPERUSER_PASSWORD,
|
||||
}
|
||||
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
|
||||
tokens = r.json()
|
||||
a_token = tokens["access_token"]
|
||||
headers = {"Authorization": f"Bearer {a_token}"}
|
||||
return headers
|
Reference in New Issue
Block a user