diff --git a/CHANGELOG.md b/CHANGELOG.md index 508d7eb..270a43b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,19 +7,57 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.1.10] - 2024-12-30 + +### Added +- **Permissions System**: Complete RBAC (Role-Based Access Control) implementation + - `Role`, `Group`, `UserGroup` models for role management + - `RoleGrant` for role-based permission templates + - `Grant` model for user-specific permissions with context support + - Action expansion system for permission wildcards + - PostgreSQL GIN indexes for efficient permission queries + - Group-specific role grants for fine-grained control +- **Oxiliere Multi-Tenant Enhancements**: + - `BaseTenant` and `BaseTenantUser` abstract models + - Soft delete support with `delete_tenant()` and `restore()` methods + - Tenant status management with `TenantStatus` enum + - User management with `add_user()` and `remove_user()` methods + - Tenant signals: `tenant_user_added`, `tenant_user_removed` + - Schema name generation utilities + - System tenant protection + - Authorization middleware and checks +- **JWT Enhancements**: + - Extended JWT authentication with permission support + - Token user model for JWT-based user representation + - Improved JWT models and schemas +- **Model Improvements**: + - New field types in `oxutils.models.fields` + - Enhanced base model mixins + - Better timestamp and tracking support +- **User Management**: + - Enhanced user models with tenant integration + - User permission management + ### Changed -- **BREAKING**: Removed `django-cid` dependency in favor of `django-structlog`'s built-in request_id - - `request_id` is now automatically generated by `django-structlog.middlewares.RequestMiddleware` - - Removed `cid.middleware.CidMiddleware` from `AUDIT_MIDDLEWARE` - - Removed `cid.apps.CidAppConfig` from `UTILS_APPS` - - Auditlog now uses `request_id` from django-structlog via `oxutils.audit.utils.get_request_id()` - - The `cid` field in auditlog entries now contains the `request_id` for correlation +- **Test Suite Reorganization**: Tests restructured by module with isolated settings + - `tests/common/` for common tests + - `tests/oxiliere/` for multi-tenant tests + - `tests/permissions/` for permission tests + - Each module has its own `settings.py` for isolation + - Improved test documentation in `tests/README.md` +- **Middleware**: Enhanced tenant middleware with better error handling +- **Permissions**: Improved permission controllers and schemas +- **Dependencies**: Updated dependency versions in `pyproject.toml` + +### Fixed +- Permission grant override now preserves actions in expanded form +- Improved error handling in tenant operations ### Migration Guide -If you were using `django-cid` directly: -- Replace `from cid.locals import get_cid` with `from oxutils.audit.utils import get_request_id` -- The `request_id` is now available in structlog context: `structlog.contextvars.get_contextvars().get('request_id')` -- Auditlog entries still use the `cid` field but it now contains the `request_id` from django-structlog +If upgrading from 0.1.9: +- Review new permission models and run migrations +- Update tenant models to inherit from `BaseTenant` and `BaseTenantUser` +- Update test imports if using test utilities ## [0.1.3] - 2024-12-08 @@ -84,6 +122,7 @@ If you were using `django-cid` directly: - django-auditlog, django-structlog - PyJWT, jwcrypto, cryptography -[Unreleased]: https://github.com/oxiliere/oxutils/compare/v0.1.3...HEAD +[Unreleased]: https://github.com/oxiliere/oxutils/compare/v0.1.10...HEAD +[0.1.10]: https://github.com/oxiliere/oxutils/compare/v0.1.9...v0.1.10 [0.1.3]: https://github.com/oxiliere/oxutils/compare/v0.1.0...v0.1.3 [0.1.0]: https://github.com/oxiliere/oxutils/releases/tag/v0.1.0 diff --git a/docs/permissions.md b/docs/permissions.md index 2a48a29..e369f23 100644 --- a/docs/permissions.md +++ b/docs/permissions.md @@ -64,7 +64,6 @@ Actions have dependencies that are automatically expanded: - `r`: Read - `w`: Write (implies `r`) - `d`: Delete (implies `w`, `r`) -- `x`: Execute (implies `r`) Example: Granting `['w']` automatically gives `['r', 'w']` diff --git a/pyproject.toml b/pyproject.toml index a081761..ff28978 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oxutils" -version = "0.1.9" +version = "0.1.10" description = "Production-ready utilities for Django applications in the Oxiliere ecosystem" readme = "README.md" license = "Apache-2.0" diff --git a/src/oxutils/__init__.py b/src/oxutils/__init__.py index fe17e6b..33687bb 100644 --- a/src/oxutils/__init__.py +++ b/src/oxutils/__init__.py @@ -11,7 +11,7 @@ - Permission management """ -__version__ = "0.1.9" +__version__ = "0.1.10" from oxutils.settings import oxi_settings from oxutils.conf import UTILS_APPS, AUDIT_MIDDLEWARE diff --git a/src/oxutils/jwt/auth.py b/src/oxutils/jwt/auth.py index 7f1329d..f7e64d1 100644 --- a/src/oxutils/jwt/auth.py +++ b/src/oxutils/jwt/auth.py @@ -1,17 +1,24 @@ import os -from typing import Dict, Any, Optional, Type +from typing import Dict, Any, Optional, Type, Tuple from django.utils.translation import gettext_lazy as _ from django.http import HttpRequest from django.contrib.auth.models import AbstractUser +from django.contrib.auth import ( + authenticate as django_authenticate, + login as django_login, + get_user_model +) from jwcrypto import jwk from django.core.exceptions import ImproperlyConfigured +from ninja.security.base import AuthBase from ninja_jwt.authentication import ( JWTBaseAuthentication, JWTStatelessUserAuthentication ) from ninja.security import ( APIKeyCookie, + HttpBasicAuth, ) from ninja_jwt.exceptions import InvalidToken from ninja_jwt.settings import api_settings @@ -81,7 +88,6 @@ def jwt_authenticate(self, request: HttpRequest, token: str) -> AbstractUser: return token_user - class JWTAuth(AuthMixin, JWTStatelessUserAuthentication): pass @@ -110,5 +116,89 @@ def get_user(self, validated_token: Any) -> Type[AbstractUser]: return api_settings.TOKEN_USER_CLASS(validated_token) +def authenticate_by_x_session_token(token: str) -> Optional[Tuple]: + """ + Copied from allauth.headless.internal.sessionkit, to "select_related" + """ + from allauth.headless import app_settings + + + session = app_settings.TOKEN_STRATEGY.lookup_session(token) + if not session: + return None + user_id_str = session.get(SESSION_KEY) + if user_id_str: + meta_pk = get_user_model()._meta.pk + if meta_pk: + user_id = meta_pk.to_python(user_id_str) + user = get_user_model().objects.filter(pk=user_id).first() + if user and user.is_active: + return (user, session) + return None + + +class XSessionTokenAuth(AuthBase): + """ + This security class uses the X-Session-Token that django-allauth + is using for authentication purposes. + """ + + openapi_type: str = "apiKey" + + def __call__(self, request: HttpRequest): + token = self.get_session_token(request) + if token: + user_session = authenticate_by_x_session_token(token) + if user_session: + return user_session[0] + return None + + def get_session_token(self, request: HttpRequest) -> Optional[str]: + """ + Returns the session token for the given request, by looking up the + ``X-Session-Token`` header. Override this if you want to extract the token + from e.g. the ``Authorization`` header. + """ + if request.session.session_key: + return request.session.session_key + + return request.headers.get("X-Session-Token") + + +class BasicAuth(HttpBasicAuth): + def authenticate(self, request: HttpRequest, username: str, password: str) -> Optional[Any]: + user = django_authenticate(email=username, password=password) + if user and user.is_active: + django_login(request, user) + return user + return None + + +class BasicNoPasswordAuth(HttpBasicAuth): + def authenticate(self, request: HttpRequest, username: str, password: str) -> Optional[Any]: + try: + user = get_user_model().objects.get(email=username) + if user and user.is_active: + django_login(request, user) + return user + return None + except Exception as e: + return None + +x_session_token_auth = XSessionTokenAuth() +basic_auth = BasicAuth() +basic_no_password_auth = BasicNoPasswordAuth() jwt_auth = JWTAuth() jwt_cookie_auth = JWTCookieAuth() + + + + +def get_auth_handlers(auths: List[AuthBase] = []) -> List[AuthBase]: + """Auth handler switcher based on settings.DEBUG""" + from django.conf import settings + + if settings.DEBUG: + return auths + + return [jwt_auth, jwt_cookie_auth] diff --git a/src/oxutils/jwt/models.py b/src/oxutils/jwt/models.py index 0e8ef86..7b69a46 100644 --- a/src/oxutils/jwt/models.py +++ b/src/oxutils/jwt/models.py @@ -36,6 +36,14 @@ def __str__(self): def pk(self): return self.id + @property + def is_active(self): + return self.status == 'active' + + @property + def is_deleted(self): + return self.status == 'deleted' + @classmethod def for_token(cls, token): try: @@ -64,10 +72,6 @@ def oxi_id(self): # for compatibility with the User model return self.id - @property - def role(self): - return self.token.get('role', None) - @cached_property def token_created_at(self): return self.token.get('cat', None) diff --git a/src/oxutils/models/base.py b/src/oxutils/models/base.py index dd28071..a1802b9 100644 --- a/src/oxutils/models/base.py +++ b/src/oxutils/models/base.py @@ -1,6 +1,24 @@ import uuid +import time from django.db import models +from django.db import transaction from django.conf import settings +from oxutils.models.fields import MaskedBackupField + + + +try: + from safedelete.models import SafeDeleteModel + from safedelete.models import SOFT_DELETE_CASCADE + from safedelete.signals import post_undelete +except ImportError: + from django.dispatch import Signal + post_undelete = Signal() + SOFT_DELETE_CASCADE = 2 + + class SafeDeleteModel(models.Model): + def __new__(cls, *args, **kwargs): + raise ImportError("django-safedelete is not installed, please install it to use SafeDeleteModel") class UUIDPrimaryKeyMixin(models.Model): @@ -114,3 +132,87 @@ class BaseModelMixin(UUIDPrimaryKeyMixin, TimestampMixin, ActiveMixin): """ class Meta: abstract = True + + +class SafeDeleteModelMixin(SafeDeleteModel): + _safedelete_policy = SOFT_DELETE_CASCADE + mask_fields = [] + + _masked_backup = MaskedBackupField(default=dict, editable=False) + + class Meta: + abstract = True + + @transaction.atomic + def delete(self, *args, **kwargs): + backup = {} + + for field_name in self.mask_fields: + field = self._meta.get_field(field_name) + old_value = getattr(self, field_name) + + if old_value is None: + continue + + backup[field_name] = old_value + masked = self._mask_value(field, old_value) + setattr(self, field_name, masked) + + if backup: + self._masked_backup = backup + self.save(update_fields=[*backup.keys(), "_masked_backup"]) + + return super().delete(*args, **kwargs) + + def _mask_value(self, field: models.Field, old_value): + uid = uuid.uuid4().hex + ts = int(time.time()) + + if isinstance(field, models.EmailField): + return f"{ts}.{uid}.deleted@invalid.local" + + if isinstance(field, models.URLField): + return f"https://deleted.invalid/{ts}/{uid}" + + if isinstance(field, models.SlugField): + return f"deleted-{ts}-{uid}" + + if isinstance(field, models.CharField): + return f"__deleted__{ts}__{uid}" + + if isinstance(field, models.IntegerField): + return None # souvent OK, sinon adapte + + # fallback générique + return f"deleted-{ts}-{uid}" + + @transaction.atomic + def restore_masked_fields(self): + if not self._masked_backup: + return + + for field_name, old_value in self._masked_backup.items(): + field = self._meta.get_field(field_name) + + # vérification collision + qs = self.__class__._default_manager.filter( + **{field_name: old_value} + ).exclude(pk=self.pk) + + if qs.exists(): + raise ValueError( + f"Collision détectée lors de la restauration du champ '{field_name}'" + ) + + setattr(self, field_name, old_value) + + self._masked_backup = {} + self.save() + + +def _restore_masked_fields(sender, instance, **kwargs): + if isinstance(instance, SafeDeleteModelMixin): + instance.restore_masked_fields() + + +post_undelete.connect(_restore_masked_fields) diff --git a/src/oxutils/models/fields.py b/src/oxutils/models/fields.py new file mode 100644 index 0000000..32cb294 --- /dev/null +++ b/src/oxutils/models/fields.py @@ -0,0 +1,79 @@ +"""# settings.py + +FIELD_MASKING_CRYPTO_ENABLED = True # switch global + +# optionnel (recommandé) +FIELD_MASKING_KEY = env("FIELD_MASKING_KEY", default=None) + +""" + +import json +import base64 +import hashlib +from django.utils.functional import cached_property +from django.conf import settings +from django.db import models + + + + + +def get_field_masking_fernet(): + from cryptography.fernet import Fernet + + if not hasattr(settings, "FIELD_MASKING_CRYPTO_ENABLED") or not settings.FIELD_MASKING_CRYPTO_ENABLED: + return None + + if not hasattr(settings, "FIELD_MASKING_KEY") or settings.FIELD_MASKING_KEY: + return Fernet(settings.FIELD_MASKING_KEY) + + # fallback contrôlé + digest = hashlib.sha256( + (settings.SECRET_KEY + ":field-masking:v1").encode() + ).digest() + + key = base64.urlsafe_b64encode(digest) + return Fernet(key) + + + +class MaskedBackupField(models.TextField): + """ + JSONField avec chiffrement optionnel + """ + + @cached_property + def fernet(self): + return get_field_masking_fernet() + + def get_prep_value(self, value): + if value in (None, ""): + return None + + raw = json.dumps(value).encode() + + if not self.fernet: + return raw.decode() + + return self.fernet.encrypt(raw).decode() + + def from_db_value(self, value, expression, connection): + return self.to_python(value) + + def to_python(self, value): + if isinstance(value, dict): + return value + + if value in (None, ""): + return {} + + try: + if self.fernet: + decrypted = self.fernet.decrypt(value.encode()) + return json.loads(decrypted.decode()) + + return json.loads(value) + + except Exception: + # sécurité : ne jamais casser un fetch DB + return {} diff --git a/src/oxutils/oxiliere/apps.py b/src/oxutils/oxiliere/apps.py index ff484f8..721c690 100644 --- a/src/oxutils/oxiliere/apps.py +++ b/src/oxutils/oxiliere/apps.py @@ -6,4 +6,9 @@ class OxiliereConfig(AppConfig): name = 'oxutils.oxiliere' def ready(self): - import oxutils.oxiliere.caches + import oxutils.oxiliere.checks + + try: + import oxutils.oxiliere.caches + except LookupError: + pass diff --git a/src/oxutils/oxiliere/authorization.py b/src/oxutils/oxiliere/authorization.py new file mode 100644 index 0000000..ccd5298 --- /dev/null +++ b/src/oxutils/oxiliere/authorization.py @@ -0,0 +1,45 @@ +from django.conf import settings +from django.db import transaction +from oxutils.permissions.actions import ACTIONS +from oxutils.permissions.models import Grant, Group, UserGroup +from oxutils.oxiliere.utils import get_tenant_user_model +from oxutils.oxiliere.models import BaseTenant + + +@transaction.atomic +def grant_manager_access_to_owners(tenant: BaseTenant): + tenant_user_model = get_tenant_user_model() + tenant_users = tenant_user_model.objects.select_related("user").filter(tenant=tenant, is_owner=True) + + access_scope = getattr(settings, 'ACCESS_MANAGER_SCOPE') + access_group = getattr(settings, 'ACCESS_MANAGER_GROUP') + + if access_group: + try: + group = Group.objects.get(slug=access_group) + except Group.DoesNotExist: + group = None + + bulk_grant = [] + for tenant_user in tenant_users: + if group: + user_group, _ = UserGroup.objects.get_or_create( + user=tenant_user.user, + group=group, + ) + else: + user_group = None + + bulk_grant.append( + Grant( + user=tenant_user.user, + scope=access_scope, + role=None, + actions=ACTIONS, + context={}, + user_group=user_group, + created_by=None, + ) + ) + + Grant.objects.bulk_create(bulk_grant) diff --git a/src/oxutils/oxiliere/checks.py b/src/oxutils/oxiliere/checks.py new file mode 100644 index 0000000..b575018 --- /dev/null +++ b/src/oxutils/oxiliere/checks.py @@ -0,0 +1,31 @@ +""" +Check TENANT_MODEL & TENANT_USER_MODEL +""" +from django.conf import settings +from django.core.checks import Error, register, Tags + + +@register(Tags.models) +def check_tenant_settings(app_configs, **kwargs): + """Check that TENANT_MODEL and TENANT_USER_MODEL are defined in settings.""" + errors = [] + + if not hasattr(settings, 'TENANT_MODEL'): + errors.append( + Error( + 'TENANT_MODEL is not defined in settings', + hint='Add TENANT_MODEL = "app_label.ModelName" to your settings', + id='oxiliere.E001', + ) + ) + + if not hasattr(settings, 'TENANT_USER_MODEL'): + errors.append( + Error( + 'TENANT_USER_MODEL is not defined in settings', + hint='Add TENANT_USER_MODEL = "app_label.ModelName" to your settings', + id='oxiliere.E002', + ) + ) + + return errors \ No newline at end of file diff --git a/src/oxutils/oxiliere/exceptions.py b/src/oxutils/oxiliere/exceptions.py new file mode 100644 index 0000000..0f40fb7 --- /dev/null +++ b/src/oxutils/oxiliere/exceptions.py @@ -0,0 +1,16 @@ +# exceptions + +class InactiveError(Exception): + pass + + +class ExistsError(Exception): + pass + + +class DeleteError(Exception): + pass + + +class SchemaError(Exception): + pass diff --git a/src/oxutils/oxiliere/management/commands/grant_tenant_owners.py b/src/oxutils/oxiliere/management/commands/grant_tenant_owners.py new file mode 100644 index 0000000..f5bd34a --- /dev/null +++ b/src/oxutils/oxiliere/management/commands/grant_tenant_owners.py @@ -0,0 +1,19 @@ +from django.core.management.base import BaseCommand +from django.db import connection +from django_tenants.management.commands import InteractiveTenantOption +from oxutils.oxiliere.authorization import grant_manager_access_to_owners + + +class Command(InteractiveTenantOption, BaseCommand): + help = "Wrapper around django commands for use with an individual tenant" + + def add_arguments(self, parser): + super().add_arguments(parser) + + def handle(self, *args, **options): + tenant = self.get_tenant_from_options_or_interactive(**options) + connection.set_tenant(tenant) + options.pop('schema_name', None) + + grant_manager_access_to_owners(tenant) + self.stdout.write(self.style.SUCCESS('Successfully granted manager access to owners')) diff --git a/src/oxutils/oxiliere/middleware.py b/src/oxutils/oxiliere/middleware.py index f97f99a..9c0ba8f 100644 --- a/src/oxutils/oxiliere/middleware.py +++ b/src/oxutils/oxiliere/middleware.py @@ -83,6 +83,9 @@ def process_request(self, request): from django.http import HttpResponseBadRequest return HttpResponseBadRequest('Missing X-Organization-ID header') + if tenant.is_deleted or not tenant.is_active: + return self.no_tenant_found(request, oxi_id) + request.tenant = tenant set_current_tenant_schema_name(tenant.schema_name) connection.set_tenant(request.tenant) diff --git a/src/oxutils/oxiliere/models.py b/src/oxutils/oxiliere/models.py index 4a19013..5f03f30 100644 --- a/src/oxutils/oxiliere/models.py +++ b/src/oxutils/oxiliere/models.py @@ -1,25 +1,44 @@ +import time +import uuid +import structlog from django.db import models +from django.utils import timezone +from django.contrib.auth.models import AbstractBaseUser from django.conf import settings from django.utils.translation import gettext_lazy as _ from django_tenants.models import TenantMixin from oxutils.models import ( - TimestampMixin, BaseModelMixin, - UUIDPrimaryKeyMixin, ) from oxutils.oxiliere.enums import TenantStatus +from oxutils.oxiliere.exceptions import DeleteError +from oxutils.oxiliere.signals import ( + tenant_user_removed, + tenant_user_added, +) +from oxutils.oxiliere.utils import ( + is_system_tenant, + generate_schema_name, +) +logger = structlog.get_logger(__name__) +class TenantQuerySet(models.QuerySet): + def active(self): + return self.filter(is_deleted=False) -tenant_model = getattr(settings, 'TENANT_MODEL', 'oxiliere.Tenant') -tenant_user_model = getattr(settings, 'TENANT_USER_MODEL', 'oxiliere.TenantUser') + def deleted(self): + return self.filter(is_deleted=True) +class TenantManager(models.Manager): + def get_queryset(self): + return TenantQuerySet(self.model, using=self._db).active() -class BaseTenant(TenantMixin, UUIDPrimaryKeyMixin, TimestampMixin): +class BaseTenant(TenantMixin, BaseModelMixin): name = models.CharField(max_length=100) - oxi_id = models.CharField(unique=True) + oxi_id = models.CharField(unique=True, max_length=25) subscription_plan = models.CharField(max_length=255, null=True, blank=True) subscription_status = models.CharField(max_length=255, null=True, blank=True) subscription_end_date = models.DateTimeField(null=True, blank=True) @@ -29,24 +48,126 @@ class BaseTenant(TenantMixin, UUIDPrimaryKeyMixin, TimestampMixin): default=TenantStatus.ACTIVE ) + # soft delete + is_deleted = models.BooleanField(default=False) + deleted_at = models.DateTimeField(null=True, blank=True) + + suffix = models.CharField(max_length=8, editable=False) + # default true, schema will be automatically created and synced when it is saved auto_create_schema = True + # Schema will be automatically deleted when related tenant is deleted + auto_drop_schema = True + + objects = models.Manager() + active = TenantManager() + + + def __str__(self): + return self.name + + def save(self, *args, **kwargs): + if self._state.adding: + self.suffix = uuid.uuid4().hex[:8] + self.schema_name = generate_schema_name(self.oxi_id, self.suffix) + super().save(*args, **kwargs) + + def delete(self, *args, force_drop: bool = False, **kwargs) -> None: + """Override deleting of Tenant object. + + Args: + force_drop (bool): If True, forces the deletion of the object. Defaults to False. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + """ + if force_drop: + super().delete(force_drop, *args, **kwargs) + else: + logger.warning("Tenant deletion is not allowed. Use delete_tenant to delete the tenant.") + raise DeleteError(_("Tenant deletion is not allowed. Use delete_tenant to delete the tenant.")) + + def delete_tenant(self) -> None: + """Mark tenant for deletion.""" + + if self.is_deleted: + return + + # Prevent public tenant schema from being deleted + if is_system_tenant(self): + logger.warning("Cannot delete public tenant schema.") + raise ValueError(_("Cannot delete public tenant schema")) + + time_string = str(int(time.time())) + new_id = f"{time_string}-deleted-{self.oxi_id}" + + self.oxi_id = new_id + self.deleted_at = timezone.now() + self.is_deleted = True + self.is_active = False + self.status = TenantStatus.DELETED + + self.save(update_fields=[ + 'oxi_id', 'deleted_at', 'is_deleted', 'is_active', 'status' + ]) + + def restore(self): + if not self.is_deleted: + return + + oxi_id = self.oxi_id.split("-deleted-")[1] + self.oxi_id = oxi_id + self.is_deleted = False + self.deleted_at = None + self.is_active = True + self.status = TenantStatus.ACTIVE + self.save(update_fields=["oxi_id", "is_deleted", "deleted_at", "is_active", "status"]) + + + def add_user(self, user: AbstractBaseUser, is_owner: bool = False, is_admin: bool = False): + """Add user to tenant.""" + + if self.users.filter(user=user).exists(): + logger.warning("User is already a member of this tenant.") + raise ValueError(_("User is already a member of this tenant.")) + + self.users.create(user=user, is_owner=is_owner, is_admin=is_admin) + tenant_user_added.send(sender=self.__class__, tenant=self, user=user) + + + def remove_user(self, user: AbstractBaseUser): + """Remove user from tenant.""" + + if not self.users.filter(user=user).exists(): + logger.warning("User is not a member of this tenant.") + raise ValueError("User is not a member of this tenant.") + + self.users.filter(user=user).delete() + logger.info("User removed from tenant.") + tenant_user_removed.send(sender=self.__class__, tenant=self, user=user) + class Meta: abstract = True verbose_name = _('Tenant') verbose_name_plural = _('Tenants') indexes = [ - models.Index(fields=['oxi_id']) + models.Index(fields=['schema_name']), + models.Index(fields=['oxi_id']), + models.Index(fields=['is_deleted']), + models.Index(fields=['oxi_id', 'is_deleted']) ] class BaseTenantUser(BaseModelMixin): tenant = models.ForeignKey( - tenant_model, on_delete=models.CASCADE + settings.TENANT_MODEL, + on_delete=models.CASCADE, + related_name='users' ) user = models.ForeignKey( - settings.AUTH_USER_MODEL, on_delete=models.CASCADE + settings.AUTH_USER_MODEL, + on_delete=models.CASCADE, + related_name='tenants' ) is_owner = models.BooleanField(default=False) is_admin = models.BooleanField(default=False) @@ -69,13 +190,3 @@ class Meta: indexes = [ models.Index(fields=['tenant', 'user']) ] - - -class Tenant(BaseTenant): - class Meta(BaseTenant.Meta): - abstract = not tenant_model == 'oxiliere.Tenant' - - -class TenantUser(BaseTenantUser): - class Meta(BaseTenantUser.Meta): - abstract = not tenant_user_model == 'oxiliere.TenantUser' diff --git a/src/oxutils/oxiliere/permissions.py b/src/oxutils/oxiliere/permissions.py index 0212c1e..a48211c 100644 --- a/src/oxutils/oxiliere/permissions.py +++ b/src/oxutils/oxiliere/permissions.py @@ -1,9 +1,10 @@ from ninja_extra.permissions import BasePermission -from oxutils.oxiliere.models import TenantUser +from oxutils.oxiliere.utils import get_tenant_user_model from oxutils.constants import OXILIERE_SERVICE_TOKEN from oxutils.jwt.tokens import OxilierServiceToken + class TenantPermission(BasePermission): """ Vérifie que l'utilisateur a accès au tenant actuel. @@ -17,7 +18,7 @@ def has_permission(self, request, **kwargs): return False # Vérifier que l'utilisateur a accès à ce tenant - return TenantUser.objects.filter( + return get_tenant_user_model().objects.filter( tenant__pk=request.tenant.pk, user__pk=request.user.pk ).exists() @@ -34,7 +35,7 @@ def has_permission(self, request, **kwargs): if not hasattr(request, 'tenant'): return False - return TenantUser.objects.filter( + return get_tenant_user_model().objects.filter( tenant__pk=request.tenant.pk, user__pk=request.user.pk, is_owner=True @@ -52,7 +53,7 @@ def has_permission(self, request, **kwargs): if not hasattr(request, 'tenant'): return False - return TenantUser.objects.filter( + return get_tenant_user_model().objects.filter( tenant__pk=request.tenant.pk, user__pk=request.user.pk, is_admin=True @@ -71,7 +72,7 @@ def has_permission(self, request, **kwargs): if not hasattr(request, 'tenant'): return False - return TenantUser.objects.filter( + return get_tenant_user_model().objects.filter( tenant__pk=request.tenant.pk, user__pk=request.user.pk ).exists() diff --git a/src/oxutils/oxiliere/schemas.py b/src/oxutils/oxiliere/schemas.py index 90f1b53..8a6c693 100644 --- a/src/oxutils/oxiliere/schemas.py +++ b/src/oxutils/oxiliere/schemas.py @@ -4,8 +4,10 @@ from django.db import transaction from django.contrib.auth import get_user_model from django_tenants.utils import get_tenant_model -from oxutils.oxiliere.models import TenantUser -from oxutils.oxiliere.utils import oxid_to_schema_name +from oxutils.oxiliere.utils import ( + get_tenant_user_model, +) +from oxutils.oxiliere.authorization import grant_manager_access_to_owners import structlog logger = structlog.get_logger(__name__) @@ -22,6 +24,8 @@ class TenantSchema(Schema): class TenantOwnerSchema(Schema): oxi_id: UUID + first_name: Optional[str] = None + last_name: Optional[str] = None email: str @@ -34,6 +38,7 @@ class CreateTenantSchema(Schema): def create_tenant(self): UserModel = get_user_model() TenantModel = get_tenant_model() + TenantUserModel = get_tenant_user_model() if TenantModel.objects.filter(oxi_id=self.tenant.oxi_id).exists(): logger.info("tenant_exists", oxi_id=self.tenant.oxi_id) @@ -44,24 +49,27 @@ def create_tenant(self): defaults={ 'id': self.owner.oxi_id, 'email': self.owner.email, + 'first_name': self.owner.first_name, + 'last_name': self.owner.last_name } ) tenant = TenantModel.objects.create( name=self.tenant.name, - schema_name=oxid_to_schema_name(self.tenant.oxi_id), + schema_name=self.tenant.oxi_id, oxi_id=self.tenant.oxi_id, subscription_plan=self.tenant.subscription_plan, subscription_status=self.tenant.subscription_status, subscription_end_date=self.tenant.subscription_end_date, ) - TenantUser.objects.create( + TenantUserModel.objects.create( tenant=tenant, user=user, is_owner=True, is_admin=True, ) + grant_manager_access_to_owners(tenant) logger.info("tenant_created", oxi_id=self.tenant.oxi_id) return tenant diff --git a/src/oxutils/oxiliere/signals.py b/src/oxutils/oxiliere/signals.py new file mode 100644 index 0000000..89151b6 --- /dev/null +++ b/src/oxutils/oxiliere/signals.py @@ -0,0 +1,5 @@ +from django.dispatch import Signal + + +tenant_user_removed = Signal() +tenant_user_added = Signal() diff --git a/src/oxutils/oxiliere/utils.py b/src/oxutils/oxiliere/utils.py index d2c6dc0..d336d1d 100644 --- a/src/oxutils/oxiliere/utils.py +++ b/src/oxutils/oxiliere/utils.py @@ -1,4 +1,5 @@ from typing import Any +import uuid from django.apps import apps from django.conf import settings from .constants import OXI_SYSTEM_TENANT @@ -68,6 +69,13 @@ def oxid_to_schema_name(oxid: str) -> str: return schema_name +def generate_schema_name(oxi_id: str, suffix: str = None) -> str: + cleaned = oxid_to_schema_name(oxi_id) + if suffix: + return f"{cleaned}_{suffix}" + return f"{cleaned}_{uuid.uuid4().hex[:8]}" + + def update_tenant_user(oxi_org_id: str, oxi_user_id: str, data: dict): if not data or isinstance(data, dict) == False: return if not oxi_org_id or not oxi_user_id: return diff --git a/src/oxutils/permissions/actions.py b/src/oxutils/permissions/actions.py index 50ceb01..8cc9d3c 100644 --- a/src/oxutils/permissions/actions.py +++ b/src/oxutils/permissions/actions.py @@ -1,5 +1,14 @@ # actions.py +READ = "r" +WRITE = "w" +DELETE = "d" +UPDATE = "u" +APPROVE = "a" + +ACTIONS = [READ, WRITE, DELETE, UPDATE, APPROVE] + + ACTION_HIERARCHY = { "r": set(), # read "w": {"r"}, # write ⇒ read @@ -9,9 +18,6 @@ } -VALID_ACTIONS = list(ACTION_HIERARCHY.keys()) - - def collapse_actions(actions: list[str]) -> set[str]: """ ['d','w','r'] -> {'d'} diff --git a/src/oxutils/permissions/constants.py b/src/oxutils/permissions/constants.py new file mode 100644 index 0000000..e69de29 diff --git a/src/oxutils/permissions/controllers.py b/src/oxutils/permissions/controllers.py index 389ddff..785eccc 100644 --- a/src/oxutils/permissions/controllers.py +++ b/src/oxutils/permissions/controllers.py @@ -1,4 +1,5 @@ from typing import List, Optional +from django.conf import settings from django.http import HttpRequest from ninja_extra import ( api_controller, @@ -32,6 +33,10 @@ class PermissionController(ControllerBase): """ service = PermissionService() + @http_get('/scopes', response=List[str]) + def list_scopes(self): + return getattr(settings, 'ACCESS_SCOPES', []) + @http_get("/roles", response=PaginatedResponseSchema[schemas.RoleSchema]) @paginate(PageNumberPaginationExtra, page_size=20) def list_roles(self): diff --git a/src/oxutils/permissions/migrations/0002_alter_grant_role.py b/src/oxutils/permissions/migrations/0002_alter_grant_role.py new file mode 100644 index 0000000..800c5ce --- /dev/null +++ b/src/oxutils/permissions/migrations/0002_alter_grant_role.py @@ -0,0 +1,19 @@ +# Generated by Django 5.2.9 on 2025-12-29 09:04 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('permissions', '0001_initial'), + ] + + operations = [ + migrations.AlterField( + model_name='grant', + name='role', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='user_grants', to='permissions.role'), + ), + ] diff --git a/src/oxutils/permissions/migrations/0003_alter_grant_options_alter_group_options_and_more.py b/src/oxutils/permissions/migrations/0003_alter_grant_options_alter_group_options_and_more.py new file mode 100644 index 0000000..c4b8fd2 --- /dev/null +++ b/src/oxutils/permissions/migrations/0003_alter_grant_options_alter_group_options_and_more.py @@ -0,0 +1,33 @@ +# Generated by Django 5.2.9 on 2025-12-29 13:28 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('permissions', '0002_alter_grant_role'), + ] + + operations = [ + migrations.AlterModelOptions( + name='grant', + options={'ordering': ['scope']}, + ), + migrations.AlterModelOptions( + name='group', + options={'ordering': ['slug']}, + ), + migrations.AlterModelOptions( + name='role', + options={'ordering': ['slug']}, + ), + migrations.AlterModelOptions( + name='rolegrant', + options={'ordering': ['role__slug', 'group__slug']}, + ), + migrations.AddIndex( + model_name='group', + index=models.Index(fields=['slug'], name='permissions_slug_ed0901_idx'), + ), + ] diff --git a/src/oxutils/permissions/models.py b/src/oxutils/permissions/models.py index cb70d51..e70f14e 100644 --- a/src/oxutils/permissions/models.py +++ b/src/oxutils/permissions/models.py @@ -22,6 +22,7 @@ class Meta: indexes = [ models.Index(fields=["slug"]), ] + ordering = ["slug"] class Group(TimestampMixin): @@ -35,6 +36,12 @@ class Group(TimestampMixin): def __str__(self): return self.slug + class Meta: + indexes = [ + models.Index(fields=["slug"]), + ] + ordering = ["slug"] + class UserGroup(TimestampMixin): """ @@ -93,6 +100,7 @@ class Meta: models.Index(fields=["group"]), models.Index(fields=["role", "group"]), ] + ordering = ["role__slug", "group__slug"] def __str__(self): group_str = f"[{self.group.slug}]" if self.group else "" @@ -119,7 +127,7 @@ class Grant(TimestampMixin): Role, null=True, blank=True, - related_name="grants", + related_name="user_grants", on_delete=models.SET_NULL, ) @@ -157,6 +165,7 @@ class Meta: GinIndex(fields=["actions"]), GinIndex(fields=["context"]), ] + ordering = ["scope"] def __str__(self): return f"{self.user} {self.scope} {self.actions}" diff --git a/src/oxutils/permissions/schemas.py b/src/oxutils/permissions/schemas.py index b074d35..23b2ebe 100644 --- a/src/oxutils/permissions/schemas.py +++ b/src/oxutils/permissions/schemas.py @@ -3,7 +3,7 @@ from ninja import Schema from pydantic import field_validator -from .actions import VALID_ACTIONS +from .actions import ACTIONS def validate_actions_list(actions: list[str]) -> list[str]: @@ -19,11 +19,11 @@ def validate_actions_list(actions: list[str]) -> list[str]: Raises: ValueError: Si des actions invalides sont présentes """ - invalid_actions = [a for a in actions if a not in VALID_ACTIONS] + invalid_actions = [a for a in actions if a not in ACTIONS] if invalid_actions: raise ValueError( f"Actions invalides: {invalid_actions}. " - f"Actions valides: {VALID_ACTIONS}" + f"Actions valides: {ACTIONS}" ) return actions diff --git a/src/oxutils/users/migrations/0002_alter_user_first_name_alter_user_last_name.py b/src/oxutils/users/migrations/0002_alter_user_first_name_alter_user_last_name.py new file mode 100644 index 0000000..499bf31 --- /dev/null +++ b/src/oxutils/users/migrations/0002_alter_user_first_name_alter_user_last_name.py @@ -0,0 +1,23 @@ +# Generated by Django 5.2.9 on 2025-12-29 13:28 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('users', '0001_initial'), + ] + + operations = [ + migrations.AlterField( + model_name='user', + name='first_name', + field=models.CharField(blank=True, max_length=255, null=True), + ), + migrations.AlterField( + model_name='user', + name='last_name', + field=models.CharField(blank=True, max_length=255, null=True), + ), + ] diff --git a/src/oxutils/users/models.py b/src/oxutils/users/models.py index 366551a..290e262 100644 --- a/src/oxutils/users/models.py +++ b/src/oxutils/users/models.py @@ -57,6 +57,8 @@ class User(AbstractUser, SafeDeleteModel, BaseModelMixin): oxi_id = models.UUIDField(unique=True) # id venant de auth.oxi.com email = models.EmailField(unique=True) + first_name = models.CharField(max_length=255, blank=True, null=True) + last_name = models.CharField(max_length=255, blank=True, null=True) is_active = models.BooleanField(default=True) subscription_plan = models.CharField(max_length=255, null=True, blank=True) subscription_status = models.CharField(max_length=255, null=True, blank=True) diff --git a/tests/README.md b/tests/README.md index d9cbcf9..ab14987 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,6 +1,51 @@ # OxUtils Tests -**126 tests - 100% passing ✅** +Structure organisée par module avec settings isolés. + +## Structure + +``` +tests/ +├── oxiliere/ +│ ├── settings.py # Settings avec tenant models +│ ├── test_oxiliere.py +│ └── test_permissions.py +├── permissions/ +│ ├── settings.py # Settings pour permissions +│ └── test_permissions.py +├── common/ +│ ├── settings.py # Settings communs +│ └── test_*.py # Tous les autres tests +└── conftest.py # Configuration pytest principale +``` + +## Exécution des tests + +Pour exécuter les tests d'un module spécifique, utiliser l'option `--ds` : + +```bash +# Tests permissions +pytest tests/permissions/ --ds=tests.permissions.settings + +# Tests oxiliere +pytest tests/oxiliere/ --ds=tests.oxiliere.settings + +# Tests common +pytest tests/common/ --ds=tests.common.settings +``` + +Ou définir la variable d'environnement : + +```bash +# Tests permissions +DJANGO_SETTINGS_MODULE=tests.permissions.settings pytest tests/permissions/ + +# Tests oxiliere +DJANGO_SETTINGS_MODULE=tests.oxiliere.settings pytest tests/oxiliere/ + +# Tests common +DJANGO_SETTINGS_MODULE=tests.common.settings pytest tests/common/ +``` ## Coverage diff --git a/tests/common/__init__.py b/tests/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_audit_models.py b/tests/common/test_audit_models.py similarity index 100% rename from tests/test_audit_models.py rename to tests/common/test_audit_models.py diff --git a/tests/test_conf.py b/tests/common/test_conf.py similarity index 100% rename from tests/test_conf.py rename to tests/common/test_conf.py diff --git a/tests/test_context_processors.py b/tests/common/test_context_processors.py similarity index 100% rename from tests/test_context_processors.py rename to tests/common/test_context_processors.py diff --git a/tests/test_currency.py b/tests/common/test_currency.py similarity index 100% rename from tests/test_currency.py rename to tests/common/test_currency.py diff --git a/tests/test_enums.py b/tests/common/test_enums.py similarity index 100% rename from tests/test_enums.py rename to tests/common/test_enums.py diff --git a/tests/test_exceptions.py b/tests/common/test_exceptions.py similarity index 100% rename from tests/test_exceptions.py rename to tests/common/test_exceptions.py diff --git a/tests/test_functions.py b/tests/common/test_functions.py similarity index 100% rename from tests/test_functions.py rename to tests/common/test_functions.py diff --git a/tests/test_jwt_tokens.py b/tests/common/test_jwt_tokens.py similarity index 100% rename from tests/test_jwt_tokens.py rename to tests/common/test_jwt_tokens.py diff --git a/tests/test_mixins.py b/tests/common/test_mixins.py similarity index 100% rename from tests/test_mixins.py rename to tests/common/test_mixins.py diff --git a/tests/test_pdf.py b/tests/common/test_pdf.py similarity index 100% rename from tests/test_pdf.py rename to tests/common/test_pdf.py diff --git a/tests/test_s3.py b/tests/common/test_s3.py similarity index 100% rename from tests/test_s3.py rename to tests/common/test_s3.py diff --git a/tests/common/test_safedelete_and_masking.py b/tests/common/test_safedelete_and_masking.py new file mode 100644 index 0000000..4fd6b96 --- /dev/null +++ b/tests/common/test_safedelete_and_masking.py @@ -0,0 +1,416 @@ +""" +Tests for SafeDeleteModelMixin and MaskedBackupField. +""" +import pytest +import time +from unittest.mock import patch, MagicMock +from django.db import models +from django.test import TestCase, override_settings +from django.conf import settings + + +try: + from oxutils.models.base import SafeDeleteModelMixin + from oxutils.models.fields import MaskedBackupField, get_field_masking_fernet + from safedelete.models import SafeDeleteModel + SAFEDELETE_AVAILABLE = True +except ImportError: + SAFEDELETE_AVAILABLE = False + + +@pytest.mark.skipif(not SAFEDELETE_AVAILABLE, reason="django-safedelete not available") +class TestMaskedBackupField: + """Test MaskedBackupField functionality.""" + + def test_field_initialization(self): + """Test MaskedBackupField can be initialized.""" + field = MaskedBackupField() + assert isinstance(field, models.TextField) + + @override_settings(FIELD_MASKING_CRYPTO_ENABLED=False) + def test_field_without_encryption(self): + """Test MaskedBackupField without encryption.""" + field = MaskedBackupField() + + # Test data + test_data = {"email": "test@example.com", "name": "John Doe"} + + # Prepare value for database + prepared = field.get_prep_value(test_data) + assert prepared is not None + assert isinstance(prepared, str) + + # Convert back from database + result = field.to_python(prepared) + assert result == test_data + + @override_settings(FIELD_MASKING_CRYPTO_ENABLED=False) + def test_field_with_none_value(self): + """Test MaskedBackupField with None value.""" + field = MaskedBackupField() + + prepared = field.get_prep_value(None) + assert prepared is None + + result = field.to_python(None) + assert result == {} + + @override_settings(FIELD_MASKING_CRYPTO_ENABLED=False) + def test_field_with_empty_string(self): + """Test MaskedBackupField with empty string.""" + field = MaskedBackupField() + + prepared = field.get_prep_value("") + assert prepared is None + + result = field.to_python("") + assert result == {} + + @override_settings(FIELD_MASKING_CRYPTO_ENABLED=False) + def test_field_with_dict_input(self): + """Test MaskedBackupField to_python with dict input.""" + field = MaskedBackupField() + + test_data = {"key": "value"} + result = field.to_python(test_data) + assert result == test_data + + @override_settings(FIELD_MASKING_CRYPTO_ENABLED=False) + def test_field_with_invalid_json(self): + """Test MaskedBackupField handles invalid JSON gracefully.""" + field = MaskedBackupField() + + # Invalid JSON should return empty dict + result = field.to_python("invalid json {") + assert result == {} + + @override_settings(FIELD_MASKING_CRYPTO_ENABLED=True, SECRET_KEY="test-secret-key") + def test_field_with_encryption_fallback(self): + """Test MaskedBackupField with encryption using SECRET_KEY fallback.""" + field = MaskedBackupField() + + test_data = {"sensitive": "data"} + + # Prepare value (should be encrypted) + prepared = field.get_prep_value(test_data) + assert prepared is not None + assert isinstance(prepared, str) + # Encrypted data should be different from plain JSON + import json + assert prepared != json.dumps(test_data) + + # Convert back (should decrypt) + result = field.to_python(prepared) + assert result == test_data + + +@pytest.mark.skipif(not SAFEDELETE_AVAILABLE, reason="django-safedelete not available") +class TestGetFieldMaskingFernet: + """Test get_field_masking_fernet function.""" + + @override_settings(FIELD_MASKING_CRYPTO_ENABLED=False) + def test_disabled_encryption(self): + """Test when encryption is disabled.""" + fernet = get_field_masking_fernet() + assert fernet is None + + @override_settings(FIELD_MASKING_CRYPTO_ENABLED=True, FIELD_MASKING_KEY="LCPN2bFN2NHA6XCZscpv8JctYJQ2FTfuVKIunFUchnE=") + def test_with_custom_key(self): + """Test with custom FIELD_MASKING_KEY.""" + from cryptography.fernet import Fernet + + fernet = get_field_masking_fernet() + assert fernet is not None + assert isinstance(fernet, Fernet) + + def test_with_secret_key_fallback(self): + """Test fallback to SECRET_KEY.""" + # Skip if FIELD_MASKING_KEY is already defined in settings + if hasattr(settings, 'FIELD_MASKING_KEY'): + pytest.skip("FIELD_MASKING_KEY is defined in settings, cannot test fallback") + + +@pytest.mark.skipif(not SAFEDELETE_AVAILABLE, reason="django-safedelete not available") +class TestSafeDeleteModelMixin(TestCase): + """Test SafeDeleteModelMixin functionality.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + + # Create a test model + class TestModel(SafeDeleteModelMixin): + email = models.EmailField(unique=True) + username = models.CharField(max_length=100, unique=True) + slug = models.SlugField(unique=True) + url = models.URLField() + bio = models.CharField(max_length=255) + age = models.IntegerField(null=True) + + mask_fields = ['email', 'username', 'slug', 'url', 'bio', 'age'] + + class Meta: + app_label = 'test' + + cls.TestModel = TestModel + + def test_mixin_has_masked_backup_field(self): + """Test SafeDeleteModelMixin has _masked_backup field.""" + assert hasattr(self.TestModel, '_masked_backup') + + def test_mixin_has_mask_fields_attribute(self): + """Test SafeDeleteModelMixin has mask_fields attribute.""" + assert hasattr(self.TestModel, 'mask_fields') + assert isinstance(self.TestModel.mask_fields, list) + + def test_mask_value_email_field(self): + """Test _mask_value for EmailField.""" + instance = self.TestModel() + field = self.TestModel._meta.get_field('email') + + masked = instance._mask_value(field, "test@example.com") + + assert masked.endswith(".deleted@invalid.local") + assert "@" in masked + + def test_mask_value_url_field(self): + """Test _mask_value for URLField.""" + instance = self.TestModel() + field = self.TestModel._meta.get_field('url') + + masked = instance._mask_value(field, "https://example.com") + + assert masked.startswith("https://deleted.invalid/") + + def test_mask_value_slug_field(self): + """Test _mask_value for SlugField.""" + instance = self.TestModel() + field = self.TestModel._meta.get_field('slug') + + masked = instance._mask_value(field, "my-slug") + + assert masked.startswith("deleted-") + assert len(masked) > len("deleted-") + + def test_mask_value_char_field(self): + """Test _mask_value for CharField.""" + instance = self.TestModel() + field = self.TestModel._meta.get_field('bio') + + masked = instance._mask_value(field, "My bio") + + assert masked.startswith("__deleted__") + + def test_mask_value_integer_field(self): + """Test _mask_value for IntegerField.""" + instance = self.TestModel() + field = self.TestModel._meta.get_field('age') + + masked = instance._mask_value(field, 25) + + assert masked is None + + def test_mask_value_generates_unique_values(self): + """Test _mask_value generates unique values each time.""" + instance = self.TestModel() + field = self.TestModel._meta.get_field('email') + + masked1 = instance._mask_value(field, "test@example.com") + time.sleep(0.01) # Small delay to ensure different timestamp + masked2 = instance._mask_value(field, "test@example.com") + + # Should be different due to UUID + assert masked1 != masked2 + + @patch.object(SafeDeleteModelMixin, 'save') + @patch.object(SafeDeleteModel, 'delete') + def test_delete_masks_fields(self, mock_super_delete, mock_save): + """Test delete method masks specified fields.""" + instance = self.TestModel( + email="test@example.com", + username="testuser", + slug="test-slug", + url="https://example.com", + bio="Test bio", + age=25 + ) + instance.pk = 1 + + # Mock the _meta.get_field to return actual fields + original_email = instance.email + + instance.delete() + + # Check that fields were masked + assert instance.email != original_email + assert instance.email.endswith(".deleted@invalid.local") + + # Check that backup was created + assert instance._masked_backup != {} + assert 'email' in instance._masked_backup + assert instance._masked_backup['email'] == original_email + + # Check that save was called + mock_save.assert_called_once() + + # Check that super().delete() was called + mock_super_delete.assert_called_once() + + @patch.object(SafeDeleteModelMixin, 'save') + def test_restore_masked_fields(self, mock_save): + """Test restore_masked_fields method.""" + instance = self.TestModel( + email="masked@invalid.local", + username="masked-user" + ) + instance.pk = 1 + instance._masked_backup = { + 'email': 'original@example.com', + 'username': 'originaluser' + } + + # Mock the queryset to return no conflicts + with patch.object(self.TestModel._default_manager, 'filter') as mock_filter: + mock_qs = MagicMock() + mock_qs.exclude.return_value.exists.return_value = False + mock_filter.return_value = mock_qs + + instance.restore_masked_fields() + + # Check that fields were restored + assert instance.email == 'original@example.com' + assert instance.username == 'originaluser' + + # Check that backup was cleared + assert instance._masked_backup == {} + + # Check that save was called + mock_save.assert_called_once() + + def test_restore_masked_fields_with_collision(self): + """Test restore_masked_fields raises error on collision.""" + instance = self.TestModel( + email="masked@invalid.local" + ) + instance.pk = 1 + instance._masked_backup = { + 'email': 'original@example.com' + } + + # Mock the queryset to return a conflict + with patch.object(self.TestModel._default_manager, 'filter') as mock_filter: + mock_qs = MagicMock() + mock_qs.exclude.return_value.exists.return_value = True + mock_filter.return_value = mock_qs + + with pytest.raises(ValueError) as exc_info: + instance.restore_masked_fields() + + assert "Collision détectée" in str(exc_info.value) + + def test_restore_masked_fields_with_empty_backup(self): + """Test restore_masked_fields does nothing with empty backup.""" + instance = self.TestModel(email="test@example.com") + instance._masked_backup = {} + + # Should not raise any error + instance.restore_masked_fields() + + # Email should remain unchanged + assert instance.email == "test@example.com" + + @patch.object(SafeDeleteModelMixin, 'save') + @patch.object(SafeDeleteModel, 'delete') + def test_delete_skips_none_values(self, mock_super_delete, mock_save): + """Test delete method skips None values.""" + instance = self.TestModel( + email="test@example.com", + username="testuser", + slug="test-slug", + url="https://example.com", + bio="Test bio", + age=None # None value + ) + instance.pk = 1 + + instance.delete() + + # age should not be in backup since it was None + assert 'age' not in instance._masked_backup + + # Other fields should be in backup + assert 'email' in instance._masked_backup + + +@pytest.mark.skipif(not SAFEDELETE_AVAILABLE, reason="django-safedelete not available") +class TestSafeDeleteSignalIntegration: + """Test signal integration for SafeDeleteModelMixin.""" + + def test_post_undelete_signal_connected(self): + """Test that post_undelete signal is connected.""" + from oxutils.models.base import _restore_masked_fields, post_undelete + + # Check that the signal handler is connected + receivers = post_undelete.receivers + assert len(receivers) > 0 + + @patch.object(SafeDeleteModelMixin, 'restore_masked_fields') + def test_signal_calls_restore_on_undelete(self, mock_restore): + """Test that undelete signal calls restore_masked_fields.""" + from oxutils.models.base import _restore_masked_fields + + # Create a mock instance + class TestModel(SafeDeleteModelMixin): + class Meta: + app_label = 'test' + + instance = TestModel() + + # Call the signal handler directly + _restore_masked_fields(sender=TestModel, instance=instance) + + # Check that restore_masked_fields was called + mock_restore.assert_called_once() + + +@pytest.mark.skipif(not SAFEDELETE_AVAILABLE, reason="django-safedelete not available") +class TestSafeDeleteModelMixinEdgeCases: + """Test edge cases for SafeDeleteModelMixin.""" + + @pytest.mark.django_db + def test_empty_mask_fields(self): + """Test SafeDeleteModelMixin with empty mask_fields.""" + class TestModel(SafeDeleteModelMixin): + email = models.EmailField() + mask_fields = [] # Empty list + + class Meta: + app_label = 'test' + + instance = TestModel(email="test@example.com") + instance.pk = 1 + + with patch.object(SafeDeleteModel, 'delete'): + with patch.object(SafeDeleteModelMixin, 'save'): + instance.delete() + + # No fields should be masked + assert instance._masked_backup == {} + + def test_nonexistent_field_in_mask_fields(self): + """Test SafeDeleteModelMixin with nonexistent field in mask_fields.""" + class TestModel(SafeDeleteModelMixin): + email = models.EmailField() + mask_fields = ['nonexistent_field'] + + class Meta: + app_label = 'test' + + instance = TestModel(email="test@example.com") + instance.pk = 1 + + # Should raise an error when trying to get nonexistent field + with pytest.raises(Exception): + with patch.object(SafeDeleteModel, 'delete'): + with patch.object(SafeDeleteModelMixin, 'save'): + instance.delete() diff --git a/tests/test_settings.py b/tests/common/test_settings.py similarity index 100% rename from tests/test_settings.py rename to tests/common/test_settings.py diff --git a/tests/conftest.py b/tests/conftest.py index b384095..1543f8a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,46 +3,13 @@ """ import os import pytest -from django.conf import settings -from django.test import RequestFactory from unittest.mock import Mock, MagicMock -# Configure Django settings for tests -def pytest_configure(): - """Configure Django settings for pytest.""" - if not settings.configured: - settings.configure( - DEBUG=True, - DATABASES={ - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': ':memory:', - } - }, - INSTALLED_APPS=[ - 'django.contrib.contenttypes', - 'django.contrib.auth', - 'django_structlog', - 'auditlog', - 'django_celery_results', - ], - MIDDLEWARE=[ - 'auditlog.middleware.AuditlogMiddleware', - 'django_structlog.middlewares.RequestMiddleware', - ], - SECRET_KEY='test-secret-key', - USE_TZ=True, - ROOT_URLCONF='', - ) - - import django - django.setup() - - @pytest.fixture def request_factory(): """Provide Django RequestFactory.""" + from django.test import RequestFactory return RequestFactory() diff --git a/tests/oxiliere/__init__.py b/tests/oxiliere/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_oxiliere.py b/tests/oxiliere/test_oxiliere.py similarity index 97% rename from tests/test_oxiliere.py rename to tests/oxiliere/test_oxiliere.py index 819e4cf..01d892f 100644 --- a/tests/test_oxiliere.py +++ b/tests/oxiliere/test_oxiliere.py @@ -149,6 +149,8 @@ def test_successful_tenant_switch(self, mock_connection): mock_tenant = Mock() mock_tenant.oxi_id = 'acme-corp' mock_tenant.schema_name = 'tenant_acmecorp' + mock_tenant.is_deleted = False + mock_tenant.is_active = True mock_connection.set_schema_to_public = Mock() mock_connection.tenant_model = Mock() @@ -158,7 +160,8 @@ def test_successful_tenant_switch(self, mock_connection): with patch.object(self.middleware, 'get_tenant', return_value=mock_tenant): with patch.object(self.middleware, 'setup_url_routing'): - self.middleware.process_request(request) + with patch('oxutils.oxiliere.middleware.set_current_tenant_schema_name'): + self.middleware.process_request(request) assert request.tenant == mock_tenant mock_connection.set_tenant.assert_called_once_with(mock_tenant) diff --git a/tests/test_oxiliere_permissions.py b/tests/oxiliere/test_permissions.py similarity index 80% rename from tests/test_oxiliere_permissions.py rename to tests/oxiliere/test_permissions.py index fd6ef9a..9099492 100644 --- a/tests/test_oxiliere_permissions.py +++ b/tests/oxiliere/test_permissions.py @@ -33,13 +33,15 @@ def test_permission_authenticated_user_with_access(self): mock_request.user = mock_user mock_request.tenant = mock_tenant - with patch('oxutils.oxiliere.permissions.TenantUser') as mock_tenant_user: - mock_tenant_user.objects.filter.return_value.exists.return_value = True + with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: + mock_tenant_user_model = Mock() + mock_tenant_user_model.objects.filter.return_value.exists.return_value = True + mock_get_model.return_value = mock_tenant_user_model result = permission.has_permission(mock_request) assert result is True - mock_tenant_user.objects.filter.assert_called_once() + mock_tenant_user_model.objects.filter.assert_called_once() def test_permission_authenticated_user_without_access(self): """Test permission denied for authenticated user without tenant access.""" @@ -56,8 +58,10 @@ def test_permission_authenticated_user_without_access(self): mock_request.user = mock_user mock_request.tenant = mock_tenant - with patch('oxutils.oxiliere.permissions.TenantUser') as mock_tenant_user: - mock_tenant_user.objects.filter.return_value.exists.return_value = False + with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: + mock_tenant_user_model = Mock() + mock_tenant_user_model.objects.filter.return_value.exists.return_value = False + mock_get_model.return_value = mock_tenant_user_model result = permission.has_permission(mock_request) @@ -118,14 +122,16 @@ def test_permission_owner_user(self): mock_request.user = mock_user mock_request.tenant = mock_tenant - with patch('oxutils.oxiliere.permissions.TenantUser') as mock_tenant_user: - mock_tenant_user.objects.filter.return_value.exists.return_value = True + with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: + mock_tenant_user_model = Mock() + mock_tenant_user_model.objects.filter.return_value.exists.return_value = True + mock_get_model.return_value = mock_tenant_user_model result = permission.has_permission(mock_request) assert result is True # Verify is_owner=True was in filter - call_kwargs = mock_tenant_user.objects.filter.call_args[1] + call_kwargs = mock_tenant_user_model.objects.filter.call_args[1] assert call_kwargs['is_owner'] is True def test_permission_non_owner_user(self): @@ -143,8 +149,10 @@ def test_permission_non_owner_user(self): mock_request.user = mock_user mock_request.tenant = mock_tenant - with patch('oxutils.oxiliere.permissions.TenantUser') as mock_tenant_user: - mock_tenant_user.objects.filter.return_value.exists.return_value = False + with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: + mock_tenant_user_model = Mock() + mock_tenant_user_model.objects.filter.return_value.exists.return_value = False + mock_get_model.return_value = mock_tenant_user_model result = permission.has_permission(mock_request) @@ -180,14 +188,16 @@ def test_permission_admin_user(self): mock_request.user = mock_user mock_request.tenant = mock_tenant - with patch('oxutils.oxiliere.permissions.TenantUser') as mock_tenant_user: - mock_tenant_user.objects.filter.return_value.exists.return_value = True + with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: + mock_tenant_user_model = Mock() + mock_tenant_user_model.objects.filter.return_value.exists.return_value = True + mock_get_model.return_value = mock_tenant_user_model result = permission.has_permission(mock_request) assert result is True # Verify is_admin=True was in filter - call_kwargs = mock_tenant_user.objects.filter.call_args[1] + call_kwargs = mock_tenant_user_model.objects.filter.call_args[1] assert call_kwargs['is_admin'] is True def test_permission_non_admin_user(self): @@ -205,8 +215,10 @@ def test_permission_non_admin_user(self): mock_request.user = mock_user mock_request.tenant = mock_tenant - with patch('oxutils.oxiliere.permissions.TenantUser') as mock_tenant_user: - mock_tenant_user.objects.filter.return_value.exists.return_value = False + with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: + mock_tenant_user_model = Mock() + mock_tenant_user_model.objects.filter.return_value.exists.return_value = False + mock_get_model.return_value = mock_tenant_user_model result = permission.has_permission(mock_request) @@ -231,8 +243,10 @@ def test_permission_tenant_member(self): mock_request.user = mock_user mock_request.tenant = mock_tenant - with patch('oxutils.oxiliere.permissions.TenantUser') as mock_tenant_user: - mock_tenant_user.objects.filter.return_value.exists.return_value = True + with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: + mock_tenant_user_model = Mock() + mock_tenant_user_model.objects.filter.return_value.exists.return_value = True + mock_get_model.return_value = mock_tenant_user_model result = permission.has_permission(mock_request) @@ -253,8 +267,10 @@ def test_permission_non_member(self): mock_request.user = mock_user mock_request.tenant = mock_tenant - with patch('oxutils.oxiliere.permissions.TenantUser') as mock_tenant_user: - mock_tenant_user.objects.filter.return_value.exists.return_value = False + with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: + mock_tenant_user_model = Mock() + mock_tenant_user_model.objects.filter.return_value.exists.return_value = False + mock_get_model.return_value = mock_tenant_user_model result = permission.has_permission(mock_request) @@ -381,7 +397,9 @@ def test_multiple_permissions_hierarchy(self): tenant_perm = TenantPermission() owner_perm = TenantOwnerPermission() - with patch('oxutils.oxiliere.permissions.TenantUser') as mock_tenant_user: + with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: + mock_tenant_user_model = Mock() + # User is member but not owner def filter_side_effect(**kwargs): mock_qs = Mock() @@ -391,7 +409,8 @@ def filter_side_effect(**kwargs): mock_qs.exists.return_value = True return mock_qs - mock_tenant_user.objects.filter.side_effect = filter_side_effect + mock_tenant_user_model.objects.filter.side_effect = filter_side_effect + mock_get_model.return_value = mock_tenant_user_model # Should pass tenant permission assert tenant_perm.has_permission(mock_request) is True @@ -432,8 +451,10 @@ def test_kwargs_passed_to_permission(self): mock_request.user = mock_user mock_request.tenant = mock_tenant - with patch('oxutils.oxiliere.permissions.TenantUser') as mock_tenant_user: - mock_tenant_user.objects.filter.return_value.exists.return_value = True + with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: + mock_tenant_user_model = Mock() + mock_tenant_user_model.objects.filter.return_value.exists.return_value = True + mock_get_model.return_value = mock_tenant_user_model # Should work with kwargs result = permission.has_permission(mock_request, view=Mock(), extra_param='test') diff --git a/tests/permissions/__init__.py b/tests/permissions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_permissions.py b/tests/permissions/test_permissions.py similarity index 100% rename from tests/test_permissions.py rename to tests/permissions/test_permissions.py diff --git a/tests/settings.py b/tests/settings.py index a2c9d09..6e7a565 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -3,40 +3,23 @@ """ import os -# Build paths BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -# Set required environment variables for S3 storage (used by audit models) -os.environ.setdefault('OXI_SERVICE_NAME', 'test-service') -os.environ.setdefault('OXI_USE_LOG_S3', 'True') -os.environ.setdefault('OXI_USE_PRIVATE_S3', 'True') -os.environ.setdefault('OXI_USE_PRIVATE_S3_AS_LOG', 'True') -os.environ.setdefault('OXI_PRIVATE_S3_STORAGE_BUCKET_NAME', 'test-bucket') -os.environ.setdefault('OXI_PRIVATE_S3_ACCESS_KEY_ID', 'test-key') -os.environ.setdefault('OXI_PRIVATE_S3_SECRET_ACCESS_KEY', 'test-secret') -os.environ.setdefault('OXI_PRIVATE_S3_CUSTOM_DOMAIN', 'test.example.com') - -# SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = 'test-secret-key-not-for-production' - -# SECURITY WARNING: don't run with debug turned on in production! DEBUG = True - ALLOWED_HOSTS = ['*'] -# Application definition INSTALLED_APPS = [ 'django.contrib.contenttypes', 'django.contrib.auth', 'django_structlog', 'auditlog', 'django_celery_results', + 'cacheops', 'oxutils.audit', 'oxutils.currency', 'oxutils.users', - 'cacheops', 'oxutils.permissions', - 'oxutils.oxiliere', ] MIDDLEWARE = [ @@ -46,7 +29,19 @@ 'django.middleware.common.CommonMiddleware', ] -ROOT_URLCONF = '' +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.postgresql', + 'NAME': 'oxutils_test', + 'USER': os.environ.get('POSTGRES_USER', 'postgres'), + 'PASSWORD': os.environ.get('POSTGRES_PASSWORD', 'postgres'), + 'HOST': os.environ.get('POSTGRES_HOST', 'localhost'), + 'PORT': os.environ.get('POSTGRES_PORT', '5432'), + 'TEST': { + 'NAME': 'oxutils_test_db', + } + } +} TEMPLATES = [ { @@ -63,53 +58,38 @@ }, ] -# Database -DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.postgresql', - 'NAME': 'oxutils_test', - 'USER': os.environ.get('POSTGRES_USER', 'postgres'), - 'PASSWORD': os.environ.get('POSTGRES_PASSWORD', 'postgres'), - 'HOST': os.environ.get('POSTGRES_HOST', 'localhost'), - 'PORT': os.environ.get('POSTGRES_PORT', '5432'), - 'TEST': { - 'NAME': 'oxutils_test_db', - } - } -} - -# Password validation AUTH_PASSWORD_VALIDATORS = [] -# Internationalization LANGUAGE_CODE = 'en-us' TIME_ZONE = 'UTC' USE_I18N = True USE_TZ = True -# Static files (CSS, JavaScript, Images) STATIC_URL = '/static/' - -# Default primary key field type DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' - -# OxUtils settings for tests -OXI_SERVICE_NAME = 'test-service' -OXI_LOG_ACCESS = False -OXI_RETENTION_DELAY = 7 - -# Django-tenants settings -TENANT_MODEL = 'oxiliere.Tenant' -TENANT_USER_MODEL = 'oxiliere.TenantUser' - +ROOT_URLCONF = '' CACHEOPS = { "*.*": {'ops': {}, 'timeout': 0} } +os.environ.setdefault('OXI_SERVICE_NAME', 'test-service') +os.environ.setdefault('OXI_USE_LOG_S3', 'True') +os.environ.setdefault('OXI_USE_PRIVATE_S3', 'True') +os.environ.setdefault('OXI_USE_PRIVATE_S3_AS_LOG', 'True') +os.environ.setdefault('OXI_PRIVATE_S3_STORAGE_BUCKET_NAME', 'test-bucket') +os.environ.setdefault('OXI_PRIVATE_S3_ACCESS_KEY_ID', 'test-key') +os.environ.setdefault('OXI_PRIVATE_S3_SECRET_ACCESS_KEY', 'test-secret') +os.environ.setdefault('OXI_PRIVATE_S3_CUSTOM_DOMAIN', 'test.example.com') + +OXI_SERVICE_NAME = 'test-service' +OXI_LOG_ACCESS = False +OXI_RETENTION_DELAY = 7 + # Permissions settings ACCESS_MANAGER_SCOPE = 'access' ACCESS_MANAGER_GROUP = 'manager' ACCESS_MANAGER_CONTEXT = {} ACCESS_SCOPES = ['access', 'articles', 'users', 'comments'] CACHE_CHECK_PERMISSION = False +FIELD_MASKING_KEY = 'LCPN2bFN2NHA6XCZscpv8JctYJQ2FTfuVKIunFUchnE=' diff --git a/tests/testapp/__init__.py b/tests/testapp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/testapp/models.py b/tests/testapp/models.py new file mode 100644 index 0000000..2731e03 --- /dev/null +++ b/tests/testapp/models.py @@ -0,0 +1,20 @@ +""" +Test models for oxutils tests. +""" +from oxutils.oxiliere.models import BaseTenant, BaseTenantUser + + +class Tenant(BaseTenant): + """Test Tenant model.""" + + class Meta(BaseTenant.Meta): + abstract = False + app_label = 'testapp' + + +class TenantUser(BaseTenantUser): + """Test TenantUser model.""" + + class Meta(BaseTenantUser.Meta): + abstract = False + app_label = 'testapp' diff --git a/uv.lock b/uv.lock index ee5c7a2..5a9aa68 100644 --- a/uv.lock +++ b/uv.lock @@ -1144,7 +1144,7 @@ wheels = [ [[package]] name = "oxutils" -version = "0.1.9" +version = "0.1.10" source = { editable = "." } dependencies = [ { name = "boto3" },