import datetime import json import logging import os import uuid from urllib.parse import urlparse import requests from django.conf import settings from django.contrib.contenttypes.models import ContentType from django.core.cache import cache from django.db import transaction from django.db.models import F, Q from django.db.models.deletion import Collector from django.utils import timezone from dynamic_preferences.registries import global_preferences_registry from requests.exceptions import RequestException from config import plugins from funkwhale_api import musicbrainz from funkwhale_api.audio import models as audio_models from funkwhale_api.common import models as common_models from funkwhale_api.common import preferences, session from funkwhale_api.common import utils as common_utils from funkwhale_api.moderation import mrf from funkwhale_api.music import models as music_models from funkwhale_api.playlists import models as playlists_models from funkwhale_api.taskapp import celery from . import ( activity, actors, exceptions, jsonld, keys, models, routes, serializers, signing, utils, webfinger, ) logger = logging.getLogger(__name__) @celery.app.task(name="federation.clean_music_cache") def clean_music_cache(): preferences = global_preferences_registry.manager() delay = preferences["federation__music_cache_duration"] if delay < 1: return # cache clearing disabled limit = timezone.now() - datetime.timedelta(minutes=delay) candidates = ( music_models.Upload.objects.filter( Q(audio_file__isnull=False) & (Q(accessed_date__lt=limit) | Q(accessed_date=None)), # library__actor__user=None, ) .local(False) .exclude(audio_file="") .filter(Q(source__startswith="http://") | Q(source__startswith="https://")) .only("audio_file", "id") .order_by("id") ) for upload in candidates: upload.audio_file.delete() # we also delete orphaned files, if any storage = models.LibraryTrack._meta.get_field("audio_file").storage files = get_files(storage, "federation_cache/tracks") existing = music_models.Upload.objects.filter(audio_file__in=files) missing = set(files) - set(existing.values_list("audio_file", flat=True)) for m in missing: storage.delete(m) def get_files(storage, *parts): """ This is a recursive function that return all files available in a given directory using django's storage. """ if not parts: raise ValueError("Missing path") try: dirs, files = storage.listdir(os.path.join(*parts)) except FileNotFoundError: return [] for dir in dirs: files += get_files(storage, *(list(parts) + [dir])) return [os.path.join(parts[-1], path) for path in files] @celery.app.task(name="federation.dispatch_inbox") @celery.require_instance(models.Activity.objects.select_related(), "activity") def dispatch_inbox(activity, call_handlers=True): """ Given an activity instance, triggers our internal delivery logic (follow creation, etc.) """ routes.inbox.dispatch( activity.payload, context={ "activity": activity, "actor": activity.actor, "inbox_items": activity.inbox_items.filter(is_read=False).order_by("id"), }, call_handlers=call_handlers, ) @celery.app.task(name="federation.dispatch_outbox") @celery.require_instance(models.Activity.objects.select_related(), "activity") def dispatch_outbox(activity): """ Deliver a local activity to its recipients, both locally and remotely """ inbox_items = activity.inbox_items.filter(is_read=False).select_related() if inbox_items.exists(): call_handlers = activity.type in ["Follow"] dispatch_inbox.delay(activity_id=activity.pk, call_handlers=call_handlers) if not preferences.get("federation__enabled"): # federation is disabled, we only deliver to local recipients return deliveries = activity.deliveries.filter(is_delivered=False) for id in deliveries.values_list("pk", flat=True): deliver_to_remote.delay(delivery_id=id) @celery.app.task( name="federation.deliver_to_remote_inbox", autoretry_for=[RequestException], retry_backoff=30, max_retries=5, ) @celery.require_instance( models.Delivery.objects.filter(is_delivered=False).select_related( "activity__actor" ), "delivery", ) def deliver_to_remote(delivery): if not preferences.get("federation__enabled"): # federation is disabled, we only deliver to local recipients return actor = delivery.activity.actor logger.info("Preparing activity delivery to %s", delivery.inbox_url) auth = signing.get_auth(actor.private_key, actor.private_key_id) try: response = session.get_session().post( auth=auth, json=delivery.activity.payload, url=delivery.inbox_url, headers={"Content-Type": "application/activity+json"}, ) logger.debug("Remote answered with %s", response.status_code) response.raise_for_status() except Exception: delivery.last_attempt_date = timezone.now() delivery.attempts = F("attempts") + 1 delivery.save(update_fields=["last_attempt_date", "attempts"]) raise else: delivery.last_attempt_date = timezone.now() delivery.attempts = F("attempts") + 1 delivery.is_delivered = True delivery.save(update_fields=["last_attempt_date", "attempts", "is_delivered"]) def fetch_nodeinfo(domain_name): s = session.get_session() wellknown_url = f"https://{domain_name}/.well-known/nodeinfo" response = s.get(url=wellknown_url) response.raise_for_status() serializer = serializers.NodeInfoSerializer(data=response.json()) serializer.is_valid(raise_exception=True) nodeinfo_url = None for link in serializer.validated_data["links"]: if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0": nodeinfo_url = link["href"] break response = s.get(url=nodeinfo_url) response.raise_for_status() return response.json() @celery.app.task(name="federation.update_domain_nodeinfo") @celery.require_instance( models.Domain.objects.external(), "domain", id_kwarg_name="domain_name" ) def update_domain_nodeinfo(domain): now = timezone.now() try: nodeinfo = {"status": "ok", "payload": fetch_nodeinfo(domain.name)} except ( requests.RequestException, serializers.serializers.ValidationError, ValueError, ) as e: nodeinfo = {"status": "error", "error": str(e)} service_actor_id = common_utils.recursive_getattr( nodeinfo, "payload.metadata.actorId", permissive=True ) try: domain.service_actor = ( utils.retrieve_ap_object( service_actor_id, actor=None, queryset=models.Actor, serializer_class=serializers.ActorSerializer, ) if service_actor_id else None ) except ( serializers.serializers.ValidationError, RequestException, exceptions.BlockedActorOrDomain, ) as e: logger.warning( "Cannot fetch system actor for domain %s: %s", domain.name, str(e) ) domain.nodeinfo_fetch_date = now domain.nodeinfo = nodeinfo domain.save(update_fields=["nodeinfo", "nodeinfo_fetch_date", "service_actor"]) @celery.app.task(name="federation.refresh_nodeinfo_known_nodes") def refresh_nodeinfo_known_nodes(): """ Trigger a node info refresh on all nodes that weren't refreshed since settings.NODEINFO_REFRESH_DELAY """ limit = timezone.now() - datetime.timedelta(seconds=settings.NODEINFO_REFRESH_DELAY) candidates = ( models.Domain.objects.external() .exclude(nodeinfo_fetch_date__gte=limit) .filter(nodeinfo__software__name="Funkwhale") ) names = candidates.values_list("name", flat=True) logger.info("Launching periodic nodeinfo refresh on %s domains", len(names)) for domain_name in names: update_domain_nodeinfo.delay(domain_name=domain_name) def delete_qs(qs): label = qs.model._meta.label result = qs.delete() related = sum(result[1].values()) logger.info( "Purged %s %s objects (and %s related entities)", result[0], label, related ) def handle_purge_actors(ids, only=[]): """ Empty only means we purge everything Otherwise, we purge only the requested bits: media """ # purge follows (received emitted) if not only: delete_qs(models.LibraryFollow.objects.filter(target__actor_id__in=ids)) delete_qs(models.Follow.objects.filter(actor_id__in=ids)) # purge audio content if not only or "media" in only: delete_qs(common_models.Attachment.objects.filter(actor__in=ids)) delete_qs(models.LibraryFollow.objects.filter(actor_id__in=ids)) delete_qs(models.Follow.objects.filter(target_id__in=ids)) delete_qs(audio_models.Channel.objects.filter(attributed_to__in=ids)) delete_qs(audio_models.Channel.objects.filter(actor__in=ids)) delete_qs(music_models.Upload.objects.filter(library__actor_id__in=ids)) delete_qs(music_models.Library.objects.filter(actor_id__in=ids)) # purge remaining activities / deliveries if not only: delete_qs(models.InboxItem.objects.filter(actor_id__in=ids)) delete_qs(models.Activity.objects.filter(actor_id__in=ids)) @celery.app.task(name="federation.purge_actors") def purge_actors(ids=[], domains=[], only=[]): actors = models.Actor.objects.filter( Q(id__in=ids) | Q(domain_id__in=domains) ).order_by("id") found_ids = list(actors.values_list("id", flat=True)) logger.info("Starting purging %s accounts", len(found_ids)) handle_purge_actors(ids=found_ids, only=only) @celery.app.task(name="federation.rotate_actor_key") @celery.require_instance(models.Actor.objects.local(), "actor") def rotate_actor_key(actor): pair = keys.get_key_pair() actor.private_key = pair[0].decode() actor.public_key = pair[1].decode() actor.save(update_fields=["private_key", "public_key"]) @celery.app.task(name="federation.fetch") @transaction.atomic @celery.require_instance( models.Fetch.objects.filter(status="pending").select_related("actor"), "fetch_obj", "fetch_id", ) def fetch(fetch_obj): def error(code, **kwargs): fetch_obj.status = "errored" fetch_obj.fetch_date = timezone.now() fetch_obj.detail = {"error_code": code} fetch_obj.detail.update(kwargs) fetch_obj.save(update_fields=["fetch_date", "status", "detail"]) url = fetch_obj.url mrf_check_url = url if not mrf_check_url.startswith("webfinger://"): payload, updated = mrf.inbox.apply({"id": mrf_check_url}) if not payload: return error("blocked", message="Blocked by MRF") actor = fetch_obj.actor if settings.FEDERATION_AUTHENTIFY_FETCHES: auth = signing.get_auth(actor.private_key, actor.private_key_id) else: auth = None try: if url.startswith("webfinger://"): # we first grab the corresponding webfinger representation # to get the ActivityPub actor ID webfinger_data = webfinger.get_resource( "acct:" + url.replace("webfinger://", "") ) url = webfinger.get_ap_url(webfinger_data["links"]) if not url: return error("webfinger", message="Invalid or missing webfinger data") payload, updated = mrf.inbox.apply({"id": url}) if not payload: return error("blocked", message="Blocked by MRF") response = session.get_session().get( auth=auth, url=url, headers={"Accept": "application/activity+json"}, ) logger.debug("Remote answered with %s: %s", response.status_code, response.text) response.raise_for_status() except requests.exceptions.HTTPError as e: return error( "http", status_code=e.response.status_code if e.response else None, message=e.response.text, ) except requests.exceptions.Timeout: return error("timeout") except requests.exceptions.ConnectionError as e: return error("connection", message=str(e)) except requests.RequestException as e: return error("request", message=str(e)) except Exception as e: return error("unhandled", message=str(e)) try: payload = response.json() except json.decoder.JSONDecodeError: # we attempt to extract a that points # to an activity pub resource, if possible, and retry with this URL alternate_url = utils.find_alternate(response.text) if alternate_url: fetch_obj.url = alternate_url fetch_obj.save(update_fields=["url"]) return fetch(fetch_id=fetch_obj.pk) return error("invalid_json") payload, updated = mrf.inbox.apply(payload) if not payload: return error("blocked", message="Blocked by MRF") try: doc = jsonld.expand(payload) except ValueError: return error("invalid_jsonld") try: type = doc.get("@type", [])[0] except IndexError: return error("missing_jsonld_type") try: serializer_classes = fetch_obj.serializers[type] model = serializer_classes[0].Meta.model except (KeyError, AttributeError): fetch_obj.status = "skipped" fetch_obj.fetch_date = timezone.now() fetch_obj.detail = {"reason": "unhandled_type", "type": type} return fetch_obj.save(update_fields=["fetch_date", "status", "detail"]) try: id = doc.get("@id") except IndexError: existing = None else: existing = model.objects.filter(fid=id).first() serializer = None for serializer_class in serializer_classes: serializer = serializer_class( existing, data=payload, context={"fetch_actor": actor} ) if not serializer.is_valid(): continue else: break if serializer.errors: return error("validation", validation_errors=serializer.errors) try: obj = serializer.save() except Exception as e: error("save", message=str(e)) raise # special case for channels # when obj is an actor, we check if the actor has a channel associated with it # if it is the case, we consider the fetch obj to be a channel instead # and also trigger a fetch on the channel outbox if isinstance(obj, models.Actor) and obj.get_channel(): obj = obj.get_channel() if obj.actor.outbox_url: try: # first page fetch is synchronous, so that at least some data is available # in the UI after subscription result = fetch_collection( obj.actor.outbox_url, channel_id=obj.pk, max_pages=1, ) except Exception: logger.exception( "Error while fetching actor outbox: %s", obj.actor.outbox_url ) else: if result.get("next_page"): # additional pages are fetched in the background result = fetch_collection.delay( result["next_page"], channel_id=obj.pk, max_pages=settings.FEDERATION_COLLECTION_MAX_PAGES - 1, is_page=True, ) fetch_obj.object = obj fetch_obj.status = "finished" fetch_obj.fetch_date = timezone.now() return fetch_obj.save( update_fields=["fetch_date", "status", "object_id", "object_content_type"] ) def trigger_third_party_upload_hook(fetch): if fetch.status == "finished" and fetch.object: if fetch.object_content_type == ContentType.objects.get_for_model( music_models.Track ): if not music_models.Track.objects.filter(pk=fetch.object.pk).playable_by( fetch.actor ): plugins.trigger_hook( plugins.TRIGGER_THIRD_PARTY_UPLOAD, track=fetch.object, ) if fetch.object_content_type == ContentType.objects.get_for_model( music_models.Album ): for track in fetch.object.tracks.all(): if not music_models.Track.objects.filter( pk=fetch.object.pk ).playable_by(fetch.actor): plugins.trigger_hook( plugins.TRIGGER_THIRD_PARTY_UPLOAD, track=track, ) def musicbrainz_type_handler(fetch): url = fetch.url path_parts = urlparse(url).path.strip("/").split("/") type_ = path_parts[0] + "s" mbid = path_parts[1] try: uuid.UUID(mbid) except ValueError: raise ValueError(f"could no get mbid from url {url}") return type_, mbid def musicbrainz_metadata_handler(type_, id): def replace_hyphens_in_keys(obj): if isinstance(obj, dict): return { k.replace("-", "_"): replace_hyphens_in_keys(v) for k, v in obj.items() } elif isinstance(obj, list): return [replace_hyphens_in_keys(item) for item in obj] else: return obj if type_ == "recordings": includes = ["tags", "artists", "releases"] elif type_ == "releases": includes = ["tags", "artists", "recordings"] result = replace_hyphens_in_keys( getattr(musicbrainz.api, type_).get(id=id, includes=includes) ) existing = ( music_models.Track.objects.filter(mbid=id).first() if music_models.Track.objects.filter(mbid=id).exists() else None ) return result, existing type_and_id_from_third_party = {"musicbrainz.org": musicbrainz_type_handler} metadata_from_third_party_ = {"musicbrainz.org": musicbrainz_metadata_handler} @celery.app.task(name="third_party_fetch") @transaction.atomic @celery.require_instance( models.Fetch.objects.filter(status="pending").select_related("actor"), "fetch_obj", "fetch_id", ) def third_party_fetch(fetch_obj): def error(code, **kwargs): fetch_obj.status = "errored" fetch_obj.fetch_date = timezone.now() fetch_obj.detail = {"error_code": code} fetch_obj.detail.update(kwargs) fetch_obj.save(update_fields=["fetch_date", "status", "detail"]) def check_url(url): if not url.startswith("webfinger://"): payload, updated = mrf.inbox.apply({"id": url}) if not payload: return error("blocked", message="Blocked by MRF") parsed_url = urlparse(url) service = parsed_url.netloc if service not in fetch_obj.supported_services: return error("invalid_url", message=f"Unsupported domain {service}") return service url = fetch_obj.url actor = fetch_obj.actor service = check_url(url) try: type_, id = type_and_id_from_third_party[service](fetch_obj) logger.debug("Parsed URL %s into type %s and id %s", url, type_, id) except ValueError as e: return error("url_parse_error", message=str(e)) try: result, existing = metadata_from_third_party_[service](type_, id) logger.debug( f"Remote answered with {result} and we found {existing} in database" ) except requests.exceptions.HTTPError as e: return error( "http", status_code=e.response.status_code if e.response else None, message=e.response.text, ) except requests.exceptions.Timeout: return error("timeout") except requests.exceptions.ConnectionError as e: return error("connection", message=str(e)) except requests.RequestException as e: return error("request", message=str(e)) except Exception as e: return error("unhandled", message=str(e)) try: serializer_classes = fetch_obj.serializers.get(type_) except (KeyError, AttributeError): fetch_obj.status = "skipped" fetch_obj.fetch_date = timezone.now() fetch_obj.detail = {"reason": "unhandled_type", "type": type_} return fetch_obj.save(update_fields=["fetch_date", "status", "detail"]) serializer = None for serializer_class in serializer_classes: serializer = serializer_class( existing, data=result, context={"fetch_actor": actor} ) if not serializer.is_valid(): continue else: break if serializer.errors: return error("validation", validation_errors=serializer.errors) try: obj = serializer.save() except Exception as e: error("save", message=str(e)) raise fetch_obj.object = obj fetch_obj.status = "finished" fetch_obj.fetch_date = timezone.now() trigger_third_party_upload_hook(fetch_obj) fetch_obj.save( update_fields=["fetch_date", "status", "object_id", "object_content_type"] ) return fetch_obj class PreserveSomeDataCollector(Collector): """ We need to delete everything related to an actor. Well… Almost everything. But definitely not the Delete Activity we send to announce the actor is deleted. """ def __init__(self, *args, **kwargs): self.creation_date = timezone.now() super().__init__(*args, **kwargs) def related_objects(self, related, *args, **kwargs): qs = super().related_objects(related, *args, **kwargs) # We can only exclude the actions if these fields are available, most likely its a # model.Activity than if hasattr(related, "type") and hasattr(related, "creation_date"): # exclude the delete activity can be broadcasted properly qs = qs.exclude(type="Delete", creation_date__gte=self.creation_date) return qs @celery.app.task(name="federation.remove_actor") @transaction.atomic @celery.require_instance( models.Actor.objects.all(), "actor", ) def remove_actor(actor): # Then we broadcast the info over federation. We do this *before* deleting objects # associated with the actor, otherwise follows are removed and we don't know where # to broadcast logger.info("Broadcasting deletion to federation…") collector = PreserveSomeDataCollector(using="default") routes.outbox.dispatch( {"type": "Delete", "object": {"type": actor.type}}, context={"actor": actor} ) # then we delete any object associated with the actor object, but *not* the actor # itself. We keep it for auditability and sending the Delete ActivityPub message logger.info( "Prepare deletion of objects associated with account %s…", actor.preferred_username, ) collector.collect([actor]) for model, instances in collector.data.items(): if issubclass(model, actor.__class__): # we skip deletion of the actor itself continue to_delete = model.objects.filter(pk__in=[instance.pk for instance in instances]) logger.info( "Deleting %s objects associated with account %s…", len(instances), actor.preferred_username, ) to_delete.delete() # Finally, we update the actor itself and mark it as removed logger.info("Marking actor as Tombsone…") actor.type = "Tombstone" actor.name = None actor.summary = None actor.save(update_fields=["type", "name", "summary"]) COLLECTION_ACTIVITY_SERIALIZERS = [ ( {"type": "Create", "object.type": "Audio"}, serializers.ChannelCreateUploadSerializer, ) ] def match_serializer(payload, conf): return [ serializer_class for route, serializer_class in conf if activity.match_route(route, payload) ] @celery.app.task(name="federation.fetch_collection") @celery.require_instance( audio_models.Channel.objects.all(), "channel", allow_null=True, ) def fetch_collection(url, max_pages, channel, is_page=False): actor = actors.get_service_actor() results = { "items": [], "skipped": 0, "errored": 0, "seen": 0, "total": 0, } if is_page: # starting immediately from a page, no need to fetch the wrapping collection logger.debug("Fetch collection page immediately at %s", url) results["next_page"] = url else: logger.debug("Fetching collection object at %s", url) collection = utils.retrieve_ap_object( url, actor=actor, serializer_class=serializers.PaginatedCollectionSerializer, ) results["next_page"] = collection["first"] results["total"] = collection.get("totalItems") seen_pages = 0 context = {} if channel: context["channel"] = channel for i in range(max_pages): page_url = results["next_page"] logger.debug("Handling page %s on max %s, at %s", i + 1, max_pages, page_url) page = utils.retrieve_ap_object( page_url, actor=actor, serializer_class=None, ) try: items = page["orderedItems"] except KeyError: try: items = page["items"] except KeyError: logger.error("Invalid collection page at %s", page_url) break for item in items: results["seen"] += 1 matching_serializer = match_serializer( item, COLLECTION_ACTIVITY_SERIALIZERS ) if not matching_serializer: results["skipped"] += 1 logger.debug("Skipping unhandled activity %s", item.get("type")) continue s = matching_serializer[0](data=item, context=context) if not s.is_valid(): logger.warn("Skipping invalid activity: %s", s.errors) results["errored"] += 1 continue results["items"].append(s.save()) seen_pages += 1 results["next_page"] = page.get("next", None) or None if not results["next_page"]: logger.debug("No more pages to fetch") break logger.info( "Finished fetch of collection pages at %s. Results:\n" " Total in collection: %s\n" " Seen: %s\n" " Handled: %s\n" " Skipped: %s\n" " Errored: %s", url, results.get("total"), results["seen"], len(results["items"]), results["skipped"], results["errored"], ) return results @celery.app.task(name="federation.check_all_remote_instance_availability") def check_all_remote_instance_availability(): domains = models.Domain.objects.all().prefetch_related() for domain in domains: if domain.name == settings.FUNKWHALE_HOSTNAME: # No need to check the instance itself: Its always reachable domain.reachable = True domain.last_successful_contact = timezone.now() else: check_single_remote_instance_availability(domain) @celery.app.task(name="federation.check_single_remote_instance_availability") def check_single_remote_instance_availability(domain): try: nodeinfo = fetch_nodeinfo(domain.name) except Exception as e: logger.info( f"Domain {domain.name} could not be reached because of the following error : {e}. \ Setting domain as unreachable." ) domain.reachable = False domain.save() return domain.reachable if "version" in nodeinfo.keys(): domain.reachable = True domain.last_successful_contact = timezone.now() domain.save() return domain.reachable else: logger.info( f"Domain {domain.name} is not reachable at the moment. Setting domain as unreachable." ) domain.reachable = False domain.save() return domain.reachable @celery.app.task(name="federation.trigger_playlist_ap_update") def trigger_playlist_ap_update(playlist): for playlist_uuid in cache.get("playlists_for_ap_update"): routes.outbox.dispatch( {"type": "Update", "object": {"type": "Playlist"}}, context={ "playlist": playlists_models.Playlist.objects.get(uuid=playlist_uuid) }, )