124 lines
4.0 KiB
Python
124 lines
4.0 KiB
Python
from django.db import transaction
|
|
|
|
from rest_framework.decorators import action
|
|
from rest_framework import exceptions
|
|
from rest_framework import mixins
|
|
from rest_framework import permissions
|
|
from rest_framework import response
|
|
from rest_framework import viewsets
|
|
|
|
from . import filters
|
|
from . import models
|
|
from . import mutations
|
|
from . import serializers
|
|
from . import signals
|
|
from . import tasks
|
|
from . import utils
|
|
|
|
|
|
class SkipFilterForGetObject:
|
|
def get_object(self, *args, **kwargs):
|
|
setattr(self.request, "_skip_filters", True)
|
|
return super().get_object(*args, **kwargs)
|
|
|
|
def filter_queryset(self, queryset):
|
|
if getattr(self.request, "_skip_filters", False):
|
|
return queryset
|
|
return super().filter_queryset(queryset)
|
|
|
|
|
|
class MutationViewSet(
|
|
mixins.ListModelMixin,
|
|
mixins.RetrieveModelMixin,
|
|
mixins.DestroyModelMixin,
|
|
viewsets.GenericViewSet,
|
|
):
|
|
lookup_field = "uuid"
|
|
queryset = (
|
|
models.Mutation.objects.all()
|
|
.exclude(target_id=None)
|
|
.order_by("-creation_date")
|
|
.select_related("created_by", "approved_by")
|
|
.prefetch_related("target")
|
|
)
|
|
serializer_class = serializers.APIMutationSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
ordering_fields = ("creation_date",)
|
|
filterset_class = filters.MutationFilter
|
|
|
|
def perform_destroy(self, instance):
|
|
if instance.is_applied:
|
|
raise exceptions.PermissionDenied("You cannot delete an applied mutation")
|
|
|
|
actor = self.request.user.actor
|
|
is_owner = actor == instance.created_by
|
|
|
|
if not any(
|
|
[
|
|
is_owner,
|
|
mutations.registry.has_perm(
|
|
perm="approve", type=instance.type, obj=instance.target, actor=actor
|
|
),
|
|
]
|
|
):
|
|
raise exceptions.PermissionDenied()
|
|
|
|
return super().perform_destroy(instance)
|
|
|
|
@action(detail=True, methods=["post"])
|
|
@transaction.atomic
|
|
def approve(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
if instance.is_applied:
|
|
return response.Response(
|
|
{"error": "This mutation was already applied"}, status=403
|
|
)
|
|
actor = self.request.user.actor
|
|
can_approve = mutations.registry.has_perm(
|
|
perm="approve", type=instance.type, obj=instance.target, actor=actor
|
|
)
|
|
|
|
if not can_approve:
|
|
raise exceptions.PermissionDenied()
|
|
previous_is_approved = instance.is_approved
|
|
instance.approved_by = actor
|
|
instance.is_approved = True
|
|
instance.save(update_fields=["approved_by", "is_approved"])
|
|
utils.on_commit(tasks.apply_mutation.delay, mutation_id=instance.id)
|
|
utils.on_commit(
|
|
signals.mutation_updated.send,
|
|
sender=None,
|
|
mutation=instance,
|
|
old_is_approved=previous_is_approved,
|
|
new_is_approved=instance.is_approved,
|
|
)
|
|
return response.Response({}, status=200)
|
|
|
|
@action(detail=True, methods=["post"])
|
|
@transaction.atomic
|
|
def reject(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
if instance.is_applied:
|
|
return response.Response(
|
|
{"error": "This mutation was already applied"}, status=403
|
|
)
|
|
actor = self.request.user.actor
|
|
can_approve = mutations.registry.has_perm(
|
|
perm="approve", type=instance.type, obj=instance.target, actor=actor
|
|
)
|
|
|
|
if not can_approve:
|
|
raise exceptions.PermissionDenied()
|
|
previous_is_approved = instance.is_approved
|
|
instance.approved_by = actor
|
|
instance.is_approved = False
|
|
instance.save(update_fields=["approved_by", "is_approved"])
|
|
utils.on_commit(
|
|
signals.mutation_updated.send,
|
|
sender=None,
|
|
mutation=instance,
|
|
old_is_approved=previous_is_approved,
|
|
new_is_approved=instance.is_approved,
|
|
)
|
|
return response.Response({}, status=200)
|