Fix #737: delivering of local activities causing unintended side effects, such as rollbacking changes
This commit is contained in:
parent
ff03909ca3
commit
0afa4f2e27
|
@ -183,70 +183,77 @@ class Router:
|
||||||
|
|
||||||
|
|
||||||
class InboxRouter(Router):
|
class InboxRouter(Router):
|
||||||
|
def get_matching_handlers(self, payload):
|
||||||
|
return [
|
||||||
|
handler for route, handler in self.routes if match_route(route, payload)
|
||||||
|
]
|
||||||
|
|
||||||
@transaction.atomic
|
@transaction.atomic
|
||||||
def dispatch(self, payload, context):
|
def dispatch(self, payload, context, call_handlers=True):
|
||||||
"""
|
"""
|
||||||
Receives an Activity payload and some context and trigger our
|
Receives an Activity payload and some context and trigger our
|
||||||
business logic
|
business logic.
|
||||||
|
|
||||||
|
call_handlers should be False when are delivering a local activity, because
|
||||||
|
we want only want to bind activities to their recipients, not reapply the changes.
|
||||||
"""
|
"""
|
||||||
from . import api_serializers
|
from . import api_serializers
|
||||||
from . import models
|
from . import models
|
||||||
|
|
||||||
for route, handler in self.routes:
|
handlers = self.get_matching_handlers(payload)
|
||||||
if match_route(route, payload):
|
for handler in handlers:
|
||||||
|
if call_handlers:
|
||||||
r = handler(payload, context=context)
|
r = handler(payload, context=context)
|
||||||
activity_obj = context.get("activity")
|
else:
|
||||||
if activity_obj and r:
|
r = None
|
||||||
# handler returned additional data we can use
|
activity_obj = context.get("activity")
|
||||||
# to update the activity target
|
if activity_obj and r:
|
||||||
for key, value in r.items():
|
# handler returned additional data we can use
|
||||||
setattr(activity_obj, key, value)
|
# to update the activity target
|
||||||
|
for key, value in r.items():
|
||||||
|
setattr(activity_obj, key, value)
|
||||||
|
|
||||||
update_fields = []
|
update_fields = []
|
||||||
for k in r.keys():
|
for k in r.keys():
|
||||||
if k in ["object", "target", "related_object"]:
|
if k in ["object", "target", "related_object"]:
|
||||||
update_fields += [
|
update_fields += [
|
||||||
"{}_id".format(k),
|
"{}_id".format(k),
|
||||||
"{}_content_type".format(k),
|
"{}_content_type".format(k),
|
||||||
]
|
]
|
||||||
else:
|
else:
|
||||||
update_fields.append(k)
|
update_fields.append(k)
|
||||||
activity_obj.save(update_fields=update_fields)
|
activity_obj.save(update_fields=update_fields)
|
||||||
|
|
||||||
if payload["type"] not in BROADCAST_TO_USER_ACTIVITIES:
|
if payload["type"] not in BROADCAST_TO_USER_ACTIVITIES:
|
||||||
return
|
|
||||||
|
|
||||||
inbox_items = context.get(
|
|
||||||
"inbox_items", models.InboxItem.objects.none()
|
|
||||||
)
|
|
||||||
inbox_items = (
|
|
||||||
inbox_items.select_related()
|
|
||||||
.select_related("actor__user")
|
|
||||||
.prefetch_related(
|
|
||||||
"activity__object",
|
|
||||||
"activity__target",
|
|
||||||
"activity__related_object",
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
for ii in inbox_items:
|
|
||||||
user = ii.actor.get_user()
|
|
||||||
if not user:
|
|
||||||
continue
|
|
||||||
group = "user.{}.inbox".format(user.pk)
|
|
||||||
channels.group_send(
|
|
||||||
group,
|
|
||||||
{
|
|
||||||
"type": "event.send",
|
|
||||||
"text": "",
|
|
||||||
"data": {
|
|
||||||
"type": "inbox.item_added",
|
|
||||||
"item": api_serializers.InboxItemSerializer(ii).data,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
inbox_items = context.get("inbox_items", models.InboxItem.objects.none())
|
||||||
|
inbox_items = (
|
||||||
|
inbox_items.select_related()
|
||||||
|
.select_related("actor__user")
|
||||||
|
.prefetch_related(
|
||||||
|
"activity__object", "activity__target", "activity__related_object"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
for ii in inbox_items:
|
||||||
|
user = ii.actor.get_user()
|
||||||
|
if not user:
|
||||||
|
continue
|
||||||
|
group = "user.{}.inbox".format(user.pk)
|
||||||
|
channels.group_send(
|
||||||
|
group,
|
||||||
|
{
|
||||||
|
"type": "event.send",
|
||||||
|
"text": "",
|
||||||
|
"data": {
|
||||||
|
"type": "inbox.item_added",
|
||||||
|
"item": api_serializers.InboxItemSerializer(ii).data,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
ACTOR_KEY_ROTATION_LOCK_CACHE_KEY = "federation:actor-key-rotation-lock:{}"
|
ACTOR_KEY_ROTATION_LOCK_CACHE_KEY = "federation:actor-key-rotation-lock:{}"
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,7 @@ def get_files(storage, *parts):
|
||||||
|
|
||||||
@celery.app.task(name="federation.dispatch_inbox")
|
@celery.app.task(name="federation.dispatch_inbox")
|
||||||
@celery.require_instance(models.Activity.objects.select_related(), "activity")
|
@celery.require_instance(models.Activity.objects.select_related(), "activity")
|
||||||
def dispatch_inbox(activity):
|
def dispatch_inbox(activity, call_handlers=True):
|
||||||
"""
|
"""
|
||||||
Given an activity instance, triggers our internal delivery logic (follow
|
Given an activity instance, triggers our internal delivery logic (follow
|
||||||
creation, etc.)
|
creation, etc.)
|
||||||
|
@ -86,6 +86,7 @@ def dispatch_inbox(activity):
|
||||||
"actor": activity.actor,
|
"actor": activity.actor,
|
||||||
"inbox_items": activity.inbox_items.filter(is_read=False),
|
"inbox_items": activity.inbox_items.filter(is_read=False),
|
||||||
},
|
},
|
||||||
|
call_handlers=call_handlers,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -98,7 +99,7 @@ def dispatch_outbox(activity):
|
||||||
inbox_items = activity.inbox_items.filter(is_read=False).select_related()
|
inbox_items = activity.inbox_items.filter(is_read=False).select_related()
|
||||||
|
|
||||||
if inbox_items.exists():
|
if inbox_items.exists():
|
||||||
dispatch_inbox.delay(activity_id=activity.pk)
|
dispatch_inbox.delay(activity_id=activity.pk, call_handlers=False)
|
||||||
|
|
||||||
if not preferences.get("federation__enabled"):
|
if not preferences.get("federation__enabled"):
|
||||||
# federation is disabled, we only deliver to local recipients
|
# federation is disabled, we only deliver to local recipients
|
||||||
|
|
|
@ -196,6 +196,16 @@ def test_inbox_routing(factories, mocker):
|
||||||
assert a.target == target
|
assert a.target == target
|
||||||
|
|
||||||
|
|
||||||
|
def test_inbox_routing_no_handler(factories, mocker):
|
||||||
|
router = activity.InboxRouter()
|
||||||
|
a = factories["federation.Activity"](type="Follow")
|
||||||
|
handler = mocker.Mock()
|
||||||
|
router.connect({"type": "Follow"}, handler)
|
||||||
|
|
||||||
|
router.dispatch({"type": "Follow"}, context={"activity": a}, call_handlers=False)
|
||||||
|
handler.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
def test_inbox_routing_send_to_channel(factories, mocker):
|
def test_inbox_routing_send_to_channel(factories, mocker):
|
||||||
group_send = mocker.patch("funkwhale_api.common.channels.group_send")
|
group_send = mocker.patch("funkwhale_api.common.channels.group_send")
|
||||||
a = factories["federation.Activity"](type="Follow")
|
a = factories["federation.Activity"](type="Follow")
|
||||||
|
|
|
@ -74,10 +74,12 @@ def test_handle_in(factories, mocker, now, queryset_equal_list):
|
||||||
a = factories["federation.Activity"](payload={"hello": "world"})
|
a = factories["federation.Activity"](payload={"hello": "world"})
|
||||||
ii1 = factories["federation.InboxItem"](activity=a, actor=r1)
|
ii1 = factories["federation.InboxItem"](activity=a, actor=r1)
|
||||||
ii2 = factories["federation.InboxItem"](activity=a, actor=r2)
|
ii2 = factories["federation.InboxItem"](activity=a, actor=r2)
|
||||||
tasks.dispatch_inbox(activity_id=a.pk)
|
tasks.dispatch_inbox(activity_id=a.pk, call_handlers=False)
|
||||||
|
|
||||||
mocked_dispatch.assert_called_once_with(
|
mocked_dispatch.assert_called_once_with(
|
||||||
a.payload, context={"actor": a.actor, "activity": a, "inbox_items": [ii1, ii2]}
|
a.payload,
|
||||||
|
context={"actor": a.actor, "activity": a, "inbox_items": [ii1, ii2]},
|
||||||
|
call_handlers=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -90,7 +92,7 @@ def test_dispatch_outbox(factories, mocker):
|
||||||
factories["federation.InboxItem"](activity=activity)
|
factories["federation.InboxItem"](activity=activity)
|
||||||
delivery = factories["federation.Delivery"](activity=activity)
|
delivery = factories["federation.Delivery"](activity=activity)
|
||||||
tasks.dispatch_outbox(activity_id=activity.pk)
|
tasks.dispatch_outbox(activity_id=activity.pk)
|
||||||
mocked_inbox.assert_called_once_with(activity_id=activity.pk)
|
mocked_inbox.assert_called_once_with(activity_id=activity.pk, call_handlers=False)
|
||||||
mocked_deliver_to_remote.assert_called_once_with(delivery_id=delivery.pk)
|
mocked_deliver_to_remote.assert_called_once_with(delivery_id=delivery.pk)
|
||||||
|
|
||||||
|
|
||||||
|
@ -104,7 +106,7 @@ def test_dispatch_outbox_disabled_federation(factories, mocker, preferences):
|
||||||
factories["federation.InboxItem"](activity=activity)
|
factories["federation.InboxItem"](activity=activity)
|
||||||
factories["federation.Delivery"](activity=activity)
|
factories["federation.Delivery"](activity=activity)
|
||||||
tasks.dispatch_outbox(activity_id=activity.pk)
|
tasks.dispatch_outbox(activity_id=activity.pk)
|
||||||
mocked_inbox.assert_called_once_with(activity_id=activity.pk)
|
mocked_inbox.assert_called_once_with(activity_id=activity.pk, call_handlers=False)
|
||||||
mocked_deliver_to_remote.assert_not_called()
|
mocked_deliver_to_remote.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed delivering of local activities causing unintended side effects, such as rollbacking changes (#737)
|
Loading…
Reference in New Issue