funkwhale/api/funkwhale_api/federation/activity.py

633 lines
21 KiB
Python

import logging
import urllib.parse
import uuid
from django.conf import settings
from django.core.cache import cache
from django.db import IntegrityError, transaction
from django.db.models import Q
from funkwhale_api.common import channels
from funkwhale_api.common import utils as funkwhale_utils
from . import contexts
recursive_getattr = funkwhale_utils.recursive_getattr
logger = logging.getLogger(__name__)
PUBLIC_ADDRESS = contexts.AS.Public
ACTIVITY_TYPES = [
"Accept",
"Add",
"Announce",
"Arrive",
"Block",
"Create",
"Delete",
"Dislike",
"Flag",
"Follow",
"Ignore",
"Invite",
"Join",
"Leave",
"Like",
"Listen",
"Move",
"Offer",
"Question",
"Reject",
"Read",
"Remove",
"TentativeReject",
"TentativeAccept",
"Travel",
"Undo",
"Update",
"View",
]
FUNKWHALE_OBJECT_TYPES = [
("Domain", "Domain"),
("Artist", "Artist"),
("Album", "Album"),
("Track", "Track"),
("Library", "Library"),
]
OBJECT_TYPES = (
[
"Application",
"Article",
"Audio",
"Collection",
"Document",
"Event",
"Group",
"Image",
"Note",
"Object",
"OrderedCollection",
"Organization",
"Page",
"Person",
"Place",
"Profile",
"Relationship",
"Service",
"Tombstone",
"Video",
]
+ ACTIVITY_TYPES
+ FUNKWHALE_OBJECT_TYPES
)
BROADCAST_TO_USER_ACTIVITIES = ["Follow", "Accept"]
def should_reject(fid, actor_id=None, payload={}):
if fid is None and actor_id is None:
return False
from funkwhale_api.moderation import models as moderation_models
policies = moderation_models.InstancePolicy.objects.active()
media_types = ["Audio", "Artist", "Album", "Track", "Library", "Image"]
relevant_values = [
recursive_getattr(payload, "type", permissive=True),
recursive_getattr(payload, "object.type", permissive=True),
recursive_getattr(payload, "target.type", permissive=True),
]
# if one of the payload types match our internal media types, then
# we apply policies that reject media
if set(media_types) & set(relevant_values):
policy_type = Q(block_all=True) | Q(reject_media=True)
else:
policy_type = Q(block_all=True)
if fid:
query = policies.matching_url_query(fid) & policy_type
if fid and actor_id:
query |= policies.matching_url_query(actor_id) & policy_type
elif actor_id:
query = policies.matching_url_query(actor_id) & policy_type
return policies.filter(query).exists()
@transaction.atomic
def receive(activity, on_behalf_of, inbox_actor=None):
"""
Receive an activity, find his recipients and save it to the database before dispatching it
"""
from funkwhale_api.moderation import mrf
from . import models, serializers, tasks
from .routes import inbox
logger.debug(
"[federation] Received activity from %s : %s", on_behalf_of.fid, activity
)
# we ensure the activity has the bare minimum structure before storing
# it in our database
serializer = serializers.BaseActivitySerializer(
data=activity,
context={
"actor": on_behalf_of,
"local_recipients": True,
"recipients": [inbox_actor] if inbox_actor else [],
},
)
serializer.is_valid(raise_exception=True)
payload, updated = mrf.inbox.apply(activity, sender_id=on_behalf_of.fid)
if not payload:
logger.info(
"[federation] Discarding activity due to mrf %s",
serializer.validated_data.get("id"),
)
return
if not inbox.get_matching_handlers(payload):
# discard unhandlable activity
logger.debug(
"[federation] No matching route found for activity, discarding: %s", payload
)
return
try:
copy = serializer.save(payload=payload, type=payload["type"])
except IntegrityError:
logger.warning(
"[federation] Discarding already delivered activity %s",
serializer.validated_data.get("id"),
)
return
local_to_recipients = get_actors_from_audience(
serializer.validated_data.get("to", [])
)
local_to_recipients = local_to_recipients.local()
local_to_recipients = local_to_recipients.values_list("pk", flat=True)
local_to_recipients = list(local_to_recipients)
if inbox_actor:
local_to_recipients.append(inbox_actor.pk)
local_cc_recipients = get_actors_from_audience(
serializer.validated_data.get("cc", [])
)
local_cc_recipients = local_cc_recipients.local()
local_cc_recipients = local_cc_recipients.values_list("pk", flat=True)
inbox_items = []
for recipients, type in [(local_to_recipients, "to"), (local_cc_recipients, "cc")]:
for r in recipients:
inbox_items.append(models.InboxItem(actor_id=r, type=type, activity=copy))
models.InboxItem.objects.bulk_create(inbox_items)
# at this point, we have the activity in database. Even if we crash, it's
# okay, as we can retry later
funkwhale_utils.on_commit(tasks.dispatch_inbox.delay, activity_id=copy.pk)
return copy
class Router:
def __init__(self):
self.routes = []
def connect(self, route, handler):
self.routes.append((route, handler))
def register(self, route):
def decorator(handler):
self.connect(route, handler)
return handler
return decorator
class InboxRouter(Router):
def get_matching_handlers(self, payload):
return [
handler for route, handler in self.routes if match_route(route, payload)
]
@transaction.atomic
def dispatch(self, payload, context, call_handlers=True):
"""
Receives an Activity payload and some context and trigger our
business logic.
call_handlers should be False when are delivering a local activity, because
we want only want to bind activities to their recipients, not reapply the changes.
"""
from . import api_serializers, models
logger.debug(
f"[federation] Inbox dispatch payload : {payload} with context : {context}"
)
handlers = self.get_matching_handlers(payload)
for handler in handlers:
if call_handlers:
r = handler(payload, context=context)
else:
r = None
activity_obj = context.get("activity")
if activity_obj and r:
# handler returned additional data we can use
# to update the activity target
for key, value in r.items():
setattr(activity_obj, key, value)
update_fields = []
for k in r.keys():
if k in ["object", "target", "related_object"]:
update_fields += [
f"{k}_id",
f"{k}_content_type",
]
else:
update_fields.append(k)
activity_obj.save(update_fields=update_fields)
if payload["type"] not in BROADCAST_TO_USER_ACTIVITIES:
return
inbox_items = context.get("inbox_items", models.InboxItem.objects.none())
inbox_items = (
inbox_items.select_related()
.select_related("actor__user")
.prefetch_related(
"activity__object", "activity__target", "activity__related_object"
)
)
for ii in inbox_items:
user = ii.actor.get_user()
if not user:
continue
group = f"user.{user.pk}.inbox"
channels.group_send(
group,
{
"type": "event.send",
"text": "",
"data": {
"type": "inbox.item_added",
"item": api_serializers.InboxItemSerializer(ii).data,
},
},
)
return
ACTOR_KEY_ROTATION_LOCK_CACHE_KEY = "federation:actor-key-rotation-lock:{}"
def should_rotate_actor_key(actor_id):
lock = cache.get(ACTOR_KEY_ROTATION_LOCK_CACHE_KEY.format(actor_id))
return lock is None
def schedule_key_rotation(actor_id, delay):
from . import tasks
cache.set(ACTOR_KEY_ROTATION_LOCK_CACHE_KEY.format(actor_id), True, timeout=delay)
tasks.rotate_actor_key.apply_async(kwargs={"actor_id": actor_id}, countdown=delay)
def activity_pass_user_privacy_level(context, routing):
TYPE_FOLLOW_USER_PRIVACY_LEVEL = ["Listen", "Like", "Create"]
TYPE_IGNORE_USER_PRIVACY_LEVEL = ["Delete", "Accept", "Follow"]
MUSIC_OBJECT_TYPE = ["Audio", "Track", "Album", "Artist"]
actor = context.get("actor", False)
type = routing.get("type", False)
object_type = routing.get("object", {}).get("type", None)
if not actor:
logger.warning(
"No actor provided in activity context : \
we cannot follow actor.privacy_level, activity will be sent by default."
)
# We do not consider music metadata has private
if object_type in MUSIC_OBJECT_TYPE:
return True
if type:
if type in TYPE_IGNORE_USER_PRIVACY_LEVEL:
return True
if type in TYPE_FOLLOW_USER_PRIVACY_LEVEL and actor and actor.is_local:
if actor.user.privacy_level in [
"me",
"instance",
]:
return False
return True
return True
def activity_pass_object_privacy_level(context, routing):
MUSIC_OBJECT_TYPE = ["Audio", "Track", "Album", "Artist"]
# we only support playlist federation for now
object = context.get("playlist", False)
obj_privacy_level = object.privacy_level if object else None
object_type = routing.get("object", {}).get("type", None)
# We do not consider music metadata has private
if object_type in MUSIC_OBJECT_TYPE:
return True
if object and obj_privacy_level and obj_privacy_level in ["me", "instance"]:
return False
return True
class OutboxRouter(Router):
@transaction.atomic
def dispatch(self, routing, context):
"""
Receives a routing payload and some business objects in the context
and may yield data that should be persisted in the Activity model
for further delivery.
"""
from funkwhale_api.common import preferences
from . import models, tasks
logger.debug(f"[federation] Outbox dispatch context : {context}")
allow_list_enabled = preferences.get("moderation__allow_list_enabled")
allowed_domains = None
if allow_list_enabled:
allowed_domains = set(
models.Domain.objects.filter(allowed=True).values_list(
"name", flat=True
)
)
if activity_pass_user_privacy_level(context, routing) is False:
logger.info(
"[federation] Discarding outbox dispatch due to user privacy_level"
)
return
if activity_pass_object_privacy_level(context, routing) is False:
logger.info(
"[federation] Discarding outbox dispatch due to object privacy_level"
)
return
for route, handler in self.routes:
if not match_route(route, routing):
continue
activities_data = []
for e in handler(context):
# a route can yield zero, one or more activity payloads
if e:
activities_data.append(e)
deletions = [
a["actor"].id
for a in activities_data
if a["payload"]["type"] == "Delete"
]
for actor_id in deletions:
# we way need to triggers a blind key rotation
if should_rotate_actor_key(actor_id):
schedule_key_rotation(actor_id, settings.ACTOR_KEY_ROTATION_DELAY)
inbox_items_by_activity_uuid = {}
deliveries_by_activity_uuid = {}
prepared_activities = []
for activity_data in activities_data:
activity_data["payload"]["actor"] = activity_data["actor"].fid
to = activity_data["payload"].pop("to", [])
cc = activity_data["payload"].pop("cc", [])
a = models.Activity(**activity_data)
a.uuid = uuid.uuid4()
(
to_inbox_items,
to_deliveries,
new_to,
) = prepare_deliveries_and_inbox_items(
to, "to", allowed_domains=allowed_domains
)
(
cc_inbox_items,
cc_deliveries,
new_cc,
) = prepare_deliveries_and_inbox_items(
cc, "cc", allowed_domains=allowed_domains
)
if not any(
[to_inbox_items, to_deliveries, cc_inbox_items, cc_deliveries]
):
continue
deliveries_by_activity_uuid[str(a.uuid)] = to_deliveries + cc_deliveries
inbox_items_by_activity_uuid[str(a.uuid)] = (
to_inbox_items + cc_inbox_items
)
if new_to:
a.payload["to"] = new_to
if new_cc:
a.payload["cc"] = new_cc
prepared_activities.append(a)
activities = models.Activity.objects.bulk_create(prepared_activities)
for activity in activities:
if str(activity.uuid) in deliveries_by_activity_uuid:
for obj in deliveries_by_activity_uuid[str(a.uuid)]:
obj.activity = activity
if str(activity.uuid) in inbox_items_by_activity_uuid:
for obj in inbox_items_by_activity_uuid[str(a.uuid)]:
obj.activity = activity
# create all deliveries and items, in bulk
models.Delivery.objects.bulk_create(
[
obj
for collection in deliveries_by_activity_uuid.values()
for obj in collection
]
)
models.InboxItem.objects.bulk_create(
[
obj
for collection in inbox_items_by_activity_uuid.values()
for obj in collection
]
)
for a in activities:
logger.info(f"[federation] OUtbox sending activity : {a.pk}")
funkwhale_utils.on_commit(tasks.dispatch_outbox.delay, activity_id=a.pk)
return activities
def match_route(route, payload):
for key, value in route.items():
payload_value = recursive_getattr(payload, key, permissive=True)
if isinstance(value, list):
if payload_value not in value:
return False
elif payload_value != value:
return False
return True
def is_allowed_url(url, allowed_domains):
return (
allowed_domains is None
or urllib.parse.urlparse(url).hostname in allowed_domains
)
def prepare_deliveries_and_inbox_items(recipient_list, type, allowed_domains=None):
"""
Given a list of recipients (
either actor instances, public addresses, a dictionary with a "type" and "target"
keys for followers collections)
returns a list of deliveries, alist of inbox_items and a list
of urls to persist in the activity in place of the initial recipient list.
"""
from . import models
if allowed_domains is not None:
allowed_domains = set(allowed_domains)
allowed_domains.add(settings.FEDERATION_HOSTNAME)
local_recipients = set()
remote_inbox_urls = set()
urls = []
for r in recipient_list:
if isinstance(r, models.Actor):
if r.is_local:
local_recipients.add(r)
else:
remote_inbox_urls.add(r.shared_inbox_url or r.inbox_url)
urls.append(r.fid)
elif r == PUBLIC_ADDRESS:
urls.append(r)
elif isinstance(r, dict) and r["type"] == "followers":
received_follows = (
r["target"]
.received_follows.filter(approved=True)
.select_related("actor__user")
)
for follow in received_follows:
actor = follow.actor
if actor.is_local:
local_recipients.add(actor)
else:
remote_inbox_urls.add(actor.shared_inbox_url or actor.inbox_url)
urls.append(r["target"].followers_url)
elif isinstance(r, dict) and r["type"] == "actor_inbox":
actor = r["actor"]
urls.append(actor.fid)
if actor.is_local:
local_recipients.add(actor)
else:
remote_inbox_urls.add(actor.inbox_url)
elif isinstance(r, dict) and r["type"] == "instances_with_followers":
# we want to broadcast the activity to other instances service actors
# when we have at least one follower from this instance
follows = (
models.LibraryFollow.objects.filter(approved=True)
.exclude(actor__domain_id=settings.FEDERATION_HOSTNAME)
.exclude(actor__domain=None)
.union(
models.Follow.objects.filter(approved=True)
.exclude(actor__domain_id=settings.FEDERATION_HOSTNAME)
.exclude(actor__domain=None)
)
)
followed_domains = list(follows.values_list("actor__domain_id", flat=True))
actors = models.Actor.objects.filter(
managed_domains__name__in=followed_domains
)
values = actors.values("shared_inbox_url", "inbox_url", "domain_id")
handled_domains = set()
for v in values:
remote_inbox_urls.add(v["shared_inbox_url"] or v["inbox_url"])
handled_domains.add(v["domain_id"])
if len(handled_domains) >= len(followed_domains):
continue
# for all remaining domains (probably non-funkwhale instances, with no
# service actors), we also pick the latest known actor per domain and send the message
# there instead
remaining_domains = models.Domain.objects.exclude(name__in=handled_domains)
remaining_domains = remaining_domains.filter(name__in=followed_domains)
actors = models.Actor.objects.filter(domain__in=remaining_domains)
actors = (
actors.order_by("domain_id", "-last_fetch_date")
.distinct("domain_id")
.values("shared_inbox_url", "inbox_url")
)
for v in actors:
remote_inbox_urls.add(v["shared_inbox_url"] or v["inbox_url"])
deliveries = [
models.Delivery(inbox_url=url)
for url in remote_inbox_urls
if is_allowed_url(url, allowed_domains)
]
urls = [url for url in urls if is_allowed_url(url, allowed_domains)]
inbox_items = [
models.InboxItem(actor=actor, type=type) for actor in local_recipients
]
return inbox_items, deliveries, urls
def get_actors_from_audience(urls):
"""
Given a list of urls such as [
"https://hello.world/@bob/followers",
"https://eldritch.cafe/@alice/followers",
"https://funkwhale.demo/libraries/uuid/followers",
]
Returns a queryset of actors that are member of the collections
listed in the given urls. The urls may contain urls referring
to an actor, an actor followers collection or an library followers
collection.
Urls that don't match anything are simply discarded
"""
from . import models
queries = {"followed": None, "actors": []}
for url in urls:
if url == PUBLIC_ADDRESS:
continue
queries["actors"].append(url)
queries["followed"] = funkwhale_utils.join_queries_or(
queries["followed"], Q(target__followers_url=url)
)
final_query = None
if queries["actors"]:
final_query = funkwhale_utils.join_queries_or(
final_query, Q(fid__in=queries["actors"])
)
if queries["followed"]:
actor_follows = models.Follow.objects.filter(queries["followed"], approved=True)
final_query = funkwhale_utils.join_queries_or(
final_query, Q(pk__in=actor_follows.values_list("actor", flat=True))
)
if not final_query:
return models.Actor.objects.none()
return models.Actor.objects.filter(final_query)