diff --git a/api/config/settings/common.py b/api/config/settings/common.py index 2daec55ca..c4a7e1ef8 100644 --- a/api/config/settings/common.py +++ b/api/config/settings/common.py @@ -146,6 +146,8 @@ FEDERATION_ACTOR_FETCH_DELAY = env.int("FEDERATION_ACTOR_FETCH_DELAY", default=6 FEDERATION_SERVICE_ACTOR_USERNAME = env( "FEDERATION_SERVICE_ACTOR_USERNAME", default="service" ) +# How many pages to fetch when crawling outboxes and third-party collections +FEDERATION_COLLECTION_MAX_PAGES = env.int("FEDERATION_COLLECTION_MAX_PAGES", default=5) ALLOWED_HOSTS = env.list("DJANGO_ALLOWED_HOSTS", default=[]) + [FUNKWHALE_HOSTNAME] # APP CONFIGURATION diff --git a/api/funkwhale_api/audio/models.py b/api/funkwhale_api/audio/models.py index 37800962c..15e40f4d9 100644 --- a/api/funkwhale_api/audio/models.py +++ b/api/funkwhale_api/audio/models.py @@ -1,6 +1,7 @@ import uuid +from django.contrib.contenttypes.fields import GenericRelation from django.contrib.postgres.fields import JSONField from django.core.serializers.json import DjangoJSONEncoder from django.db import models @@ -67,6 +68,11 @@ class Channel(models.Model): default=empty_dict, max_length=50000, encoder=DjangoJSONEncoder, blank=True ) + fetches = GenericRelation( + "federation.Fetch", + content_type_field="object_content_type", + object_id_field="object_id", + ) objects = ChannelQuerySet.as_manager() @property @@ -74,6 +80,10 @@ class Channel(models.Model): if not self.is_external_rss: return self.actor.fid + @property + def is_local(self): + return self.actor.is_local + @property def is_external_rss(self): return self.actor.preferred_username.startswith("rssfeed-") diff --git a/api/funkwhale_api/audio/views.py b/api/funkwhale_api/audio/views.py index bce39d54c..4f32ad63c 100644 --- a/api/funkwhale_api/audio/views.py +++ b/api/funkwhale_api/audio/views.py @@ -110,6 +110,15 @@ class ChannelViewSet( else: return super().list(request, *args, **kwargs) + def get_object(self): + obj = super().get_object() + if ( + self.action == "retrieve" + and self.request.GET.get("refresh", "").lower() == "true" + ): + obj = music_views.refetch_obj(obj, self.get_queryset()) + return obj + @decorators.action( detail=True, methods=["post"], diff --git a/api/funkwhale_api/federation/serializers.py b/api/funkwhale_api/federation/serializers.py index f4adbfe12..313f225f7 100644 --- a/api/funkwhale_api/federation/serializers.py +++ b/api/funkwhale_api/federation/serializers.py @@ -19,7 +19,7 @@ from funkwhale_api.music import models as music_models from funkwhale_api.music import tasks as music_tasks from funkwhale_api.tags import models as tags_models -from . import activity, actors, contexts, jsonld, models, tasks, utils +from . import activity, actors, contexts, jsonld, models, utils logger = logging.getLogger(__name__) @@ -391,6 +391,8 @@ class ActorSerializer(jsonld.JsonLdSerializer): domain = urllib.parse.urlparse(kwargs["fid"]).netloc domain, domain_created = models.Domain.objects.get_or_create(pk=domain) if domain_created and not domain.is_local: + from . import tasks + # first time we see the domain, we trigger nodeinfo fetching tasks.update_domain_nodeinfo(domain_name=domain.name) @@ -896,8 +898,6 @@ def get_additional_fields(data): PAGINATED_COLLECTION_JSONLD_MAPPING = { "totalItems": jsonld.first_val(contexts.AS.totalItems), - "actor": jsonld.first_id(contexts.AS.actor), - "attributedTo": jsonld.first_id(contexts.AS.attributedTo), "first": jsonld.first_id(contexts.AS.first), "last": jsonld.first_id(contexts.AS.last), "partOf": jsonld.first_id(contexts.AS.partOf), @@ -905,10 +905,10 @@ PAGINATED_COLLECTION_JSONLD_MAPPING = { class PaginatedCollectionSerializer(jsonld.JsonLdSerializer): - type = serializers.ChoiceField(choices=[contexts.AS.Collection]) + type = serializers.ChoiceField( + choices=[contexts.AS.Collection, contexts.AS.OrderedCollection] + ) totalItems = serializers.IntegerField(min_value=0) - actor = serializers.URLField(max_length=500, required=False) - attributedTo = serializers.URLField(max_length=500, required=False) id = serializers.URLField(max_length=500) first = serializers.URLField(max_length=500) last = serializers.URLField(max_length=500) @@ -916,18 +916,6 @@ class PaginatedCollectionSerializer(jsonld.JsonLdSerializer): class Meta: jsonld_mapping = PAGINATED_COLLECTION_JSONLD_MAPPING - def validate(self, validated_data): - d = super().validate(validated_data) - actor = d.get("actor") - attributed_to = d.get("attributedTo") - if not actor and not attributed_to: - raise serializers.ValidationError( - "You need to provide at least actor or attributedTo" - ) - - d["attributedTo"] = attributed_to or actor - return d - def to_representation(self, conf): paginator = Paginator(conf["items"], conf.get("page_size", 20)) first = common_utils.set_query_parameter(conf["id"], page=1) @@ -954,6 +942,8 @@ class LibrarySerializer(PaginatedCollectionSerializer): type = serializers.ChoiceField( choices=[contexts.AS.Collection, contexts.FW.Library] ) + actor = serializers.URLField(max_length=500, required=False) + attributedTo = serializers.URLField(max_length=500, required=False) name = serializers.CharField() summary = serializers.CharField(allow_blank=True, allow_null=True, required=False) followers = serializers.URLField(max_length=500) @@ -976,9 +966,23 @@ class LibrarySerializer(PaginatedCollectionSerializer): "summary": jsonld.first_val(contexts.AS.summary), "audience": jsonld.first_id(contexts.AS.audience), "followers": jsonld.first_id(contexts.AS.followers), + "actor": jsonld.first_id(contexts.AS.actor), + "attributedTo": jsonld.first_id(contexts.AS.attributedTo), }, ) + def validate(self, validated_data): + d = super().validate(validated_data) + actor = d.get("actor") + attributed_to = d.get("attributedTo") + if not actor and not attributed_to: + raise serializers.ValidationError( + "You need to provide at least actor or attributedTo" + ) + + d["attributedTo"] = attributed_to or actor + return d + def to_representation(self, library): conf = { "id": library.fid, @@ -1934,7 +1938,15 @@ class ChannelUploadSerializer(jsonld.JsonLdSerializer): return self.update_or_create(validated_data) -class ChannelCreateUploadSerializer(serializers.Serializer): +class ChannelCreateUploadSerializer(jsonld.JsonLdSerializer): + type = serializers.ChoiceField(choices=[contexts.AS.Create]) + object = serializers.DictField() + + class Meta: + jsonld_mapping = { + "object": jsonld.first_obj(contexts.AS.object), + } + def to_representation(self, upload): return { "@context": jsonld.get_default_context(), @@ -1944,3 +1956,13 @@ class ChannelCreateUploadSerializer(serializers.Serializer): upload, context={"include_ap_context": False} ).data, } + + def validate(self, validated_data): + serializer = ChannelUploadSerializer( + data=validated_data["object"], context=self.context, jsonld_expand=False + ) + serializer.is_valid(raise_exception=True) + return {"audio_serializer": serializer} + + def save(self, **kwargs): + return self.validated_data["audio_serializer"].save(**kwargs) diff --git a/api/funkwhale_api/federation/tasks.py b/api/funkwhale_api/federation/tasks.py index 1a43970ef..b84e4c5f9 100644 --- a/api/funkwhale_api/federation/tasks.py +++ b/api/funkwhale_api/federation/tasks.py @@ -21,6 +21,7 @@ from funkwhale_api.moderation import mrf from funkwhale_api.music import models as music_models from funkwhale_api.taskapp import celery +from . import activity from . import actors from . import jsonld from . import keys @@ -399,8 +400,24 @@ def fetch(fetch_obj): # special case for channels # when obj is an actor, we check if the actor has a channel associated with it # if it is the case, we consider the fetch obj to be a channel instead + # and also trigger a fetch on the channel outbox if isinstance(obj, models.Actor) and obj.get_channel(): obj = obj.get_channel() + if obj.actor.outbox_url: + # first page fetch is synchronous, so that at least some data is available + # in the UI after subscription + result = fetch_collection( + obj.actor.outbox_url, channel_id=obj.pk, max_pages=1, + ) + if result.get("next_page"): + # additional pages are fetched in the background + result = fetch_collection.delay( + result["next_page"], + channel_id=obj.pk, + max_pages=settings.FEDERATION_COLLECTION_MAX_PAGES - 1, + is_page=True, + ) + fetch_obj.object = obj fetch_obj.status = "finished" fetch_obj.fetch_date = timezone.now() @@ -469,3 +486,106 @@ def remove_actor(actor): actor.name = None actor.summary = None actor.save(update_fields=["type", "name", "summary"]) + + +COLLECTION_ACTIVITY_SERIALIZERS = [ + ( + {"type": "Create", "object.type": "Audio"}, + serializers.ChannelCreateUploadSerializer, + ) +] + + +def match_serializer(payload, conf): + return [ + serializer_class + for route, serializer_class in conf + if activity.match_route(route, payload) + ] + + +@celery.app.task(name="federation.fetch_collection") +@celery.require_instance( + audio_models.Channel.objects.all(), "channel", allow_null=True, +) +def fetch_collection(url, max_pages, channel, is_page=False): + actor = actors.get_service_actor() + results = { + "items": [], + "skipped": 0, + "errored": 0, + "seen": 0, + "total": 0, + } + if is_page: + # starting immediatly from a page, no need to fetch the wrapping collection + logger.debug("Fetch collection page immediatly at %s", url) + results["next_page"] = url + else: + logger.debug("Fetching collection object at %s", url) + collection = utils.retrieve_ap_object( + url, + actor=actor, + serializer_class=serializers.PaginatedCollectionSerializer, + ) + results["next_page"] = collection["first"] + results["total"] = collection.get("totalItems") + + seen_pages = 0 + context = {} + if channel: + context["channel"] = channel + + for i in range(max_pages): + page_url = results["next_page"] + logger.debug("Handling page %s on max %s, at %s", i + 1, max_pages, page_url) + page = utils.retrieve_ap_object(page_url, actor=actor, serializer_class=None,) + try: + items = page["orderedItems"] + except KeyError: + try: + items = page["items"] + except KeyError: + logger.error("Invalid collection page at %s", page_url) + break + + for item in items: + results["seen"] += 1 + + matching_serializer = match_serializer( + item, COLLECTION_ACTIVITY_SERIALIZERS + ) + if not matching_serializer: + results["skipped"] += 1 + logger.debug("Skipping unhandled activity %s", item.get("type")) + continue + + s = matching_serializer[0](data=item, context=context) + if not s.is_valid(): + logger.warn("Skipping invalid activity: %s", s.errors) + results["errored"] += 1 + continue + + results["items"].append(s.save()) + + seen_pages += 1 + results["next_page"] = page.get("next", None) or None + if not results["next_page"]: + logger.debug("No more pages to fetch") + break + + logger.info( + "Finished fetch of collection pages at %s. Results:\n" + " Total in collection: %s\n" + " Seen: %s\n" + " Handled: %s\n" + " Skipped: %s\n" + " Errored: %s", + url, + results.get("total"), + results["seen"], + len(results["items"]), + results["skipped"], + results["errored"], + ) + return results diff --git a/api/funkwhale_api/federation/utils.py b/api/funkwhale_api/federation/utils.py index 7f2f346e1..58da7cbe6 100644 --- a/api/funkwhale_api/federation/utils.py +++ b/api/funkwhale_api/federation/utils.py @@ -107,7 +107,10 @@ def retrieve_ap_object( return data serializer = serializer_class(data=data, context={"fetch_actor": actor}) serializer.is_valid(raise_exception=True) - return serializer.save() + try: + return serializer.save() + except NotImplementedError: + return serializer.validated_data def get_domain_query_from_url(domain, url_field="fid"): diff --git a/api/funkwhale_api/taskapp/celery.py b/api/funkwhale_api/taskapp/celery.py index e86cf0eae..fa827c2dd 100644 --- a/api/funkwhale_api/taskapp/celery.py +++ b/api/funkwhale_api/taskapp/celery.py @@ -37,16 +37,22 @@ class CeleryConfig(AppConfig): app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, force=True) -def require_instance(model_or_qs, parameter_name, id_kwarg_name=None): +def require_instance(model_or_qs, parameter_name, id_kwarg_name=None, allow_null=False): def decorator(function): @functools.wraps(function) def inner(*args, **kwargs): kw = id_kwarg_name or "_".join([parameter_name, "id"]) - pk = kwargs.pop(kw) try: - instance = model_or_qs.get(pk=pk) - except AttributeError: - instance = model_or_qs.objects.get(pk=pk) + pk = kwargs.pop(kw) + except KeyError: + if not allow_null: + raise + instance = None + else: + try: + instance = model_or_qs.get(pk=pk) + except AttributeError: + instance = model_or_qs.objects.get(pk=pk) kwargs[parameter_name] = instance return function(*args, **kwargs) diff --git a/api/tests/audio/test_views.py b/api/tests/audio/test_views.py index 49b7eb07a..63cb2ebf9 100644 --- a/api/tests/audio/test_views.py +++ b/api/tests/audio/test_views.py @@ -420,3 +420,18 @@ def test_subscribe_to_rss_creates_channel(factories, logged_in_api_client, mocke assert response.data["channel"]["uuid"] == channel.uuid get_channel_from_rss_url.assert_called_once_with(rss_url) + + +def test_refresh_channel_when_param_is_true( + factories, mocker, logged_in_api_client, queryset_equal_queries, +): + obj = factories["audio.Channel"]() + refetch_obj = mocker.patch( + "funkwhale_api.music.views.refetch_obj", return_value=obj + ) + url = reverse("api:v1:channels-detail", kwargs={"composite": obj.uuid}) + response = logged_in_api_client.get(url, {"refresh": "true"}) + + assert response.status_code == 200 + assert refetch_obj.call_count == 1 + assert refetch_obj.call_args[0][0] == obj diff --git a/api/tests/federation/test_serializers.py b/api/tests/federation/test_serializers.py index 0c7166728..bb66c9fc8 100644 --- a/api/tests/federation/test_serializers.py +++ b/api/tests/federation/test_serializers.py @@ -418,7 +418,6 @@ def test_paginated_collection_serializer_validation(): assert serializer.is_valid(raise_exception=True) is True assert serializer.validated_data["totalItems"] == 5 assert serializer.validated_data["id"] == data["id"] - assert serializer.validated_data["attributedTo"] == data["actor"] def test_collection_page_serializer_validation(): diff --git a/api/tests/federation/test_tasks.py b/api/tests/federation/test_tasks.py index 433435efb..ce4736503 100644 --- a/api/tests/federation/test_tasks.py +++ b/api/tests/federation/test_tasks.py @@ -491,10 +491,16 @@ def test_fetch_url(factory_name, serializer_class, factories, r_mock, mocker): assert save.call_count == 1 -def test_fetch_channel_actor_returns_channel(factories, r_mock): +def test_fetch_channel_actor_returns_channel_and_fetch_outbox( + factories, r_mock, settings, mocker +): obj = factories["audio.Channel"]() fetch = factories["federation.Fetch"](url=obj.actor.fid) payload = serializers.ActorSerializer(obj.actor).data + fetch_collection = mocker.patch.object( + tasks, "fetch_collection", return_value={"next_page": "http://outbox.url/page2"} + ) + fetch_collection_delayed = mocker.patch.object(tasks.fetch_collection, "delay") r_mock.get(obj.fid, json=payload) @@ -504,6 +510,15 @@ def test_fetch_channel_actor_returns_channel(factories, r_mock): assert fetch.status == "finished" assert fetch.object == obj + fetch_collection.assert_called_once_with( + obj.actor.outbox_url, channel_id=obj.pk, max_pages=1, + ) + fetch_collection_delayed.assert_called_once_with( + "http://outbox.url/page2", + max_pages=settings.FEDERATION_COLLECTION_MAX_PAGES - 1, + is_page=True, + channel_id=obj.pk, + ) def test_fetch_honor_instance_policy_domain(factories): @@ -563,3 +578,71 @@ def test_fetch_honor_instance_policy_different_url_and_id(r_mock, factories): assert fetch.status == "errored" assert fetch.detail["error_code"] == "blocked" + + +def test_fetch_collection(mocker, r_mock): + class DummySerializer(serializers.serializers.Serializer): + def validate(self, validated_data): + validated_data = self.initial_data + if "id" not in validated_data["object"]: + raise serializers.serializers.ValidationError() + return validated_data + + def save(self): + return self.initial_data + + mocker.patch.object( + tasks, + "COLLECTION_ACTIVITY_SERIALIZERS", + [({"type": "Create", "object.type": "Audio"}, DummySerializer)], + ) + payloads = { + "outbox": { + "id": "https://actor.url/outbox", + "@context": jsonld.get_default_context(), + "type": "OrderedCollection", + "totalItems": 27094, + "first": "https://actor.url/outbox?page=1", + "last": "https://actor.url/outbox?page=3", + }, + "page1": { + "@context": jsonld.get_default_context(), + "type": "OrderedCollectionPage", + "next": "https://actor.url/outbox?page=2", + "orderedItems": [ + {"type": "Unhandled"}, + {"type": "Unhandled"}, + { + "type": "Create", + "object": {"type": "Audio", "id": "https://actor.url/audio1"}, + }, + ], + }, + "page2": { + "@context": jsonld.get_default_context(), + "type": "OrderedCollectionPage", + "next": "https://actor.url/outbox?page=3", + "orderedItems": [ + {"type": "Unhandled"}, + { + "type": "Create", + "object": {"type": "Audio", "id": "https://actor.url/audio2"}, + }, + {"type": "Unhandled"}, + {"type": "Create", "object": {"type": "Audio"}}, + ], + }, + } + r_mock.get(payloads["outbox"]["id"], json=payloads["outbox"]) + r_mock.get(payloads["outbox"]["first"], json=payloads["page1"]) + r_mock.get(payloads["page1"]["next"], json=payloads["page2"]) + result = tasks.fetch_collection(payloads["outbox"]["id"], max_pages=2,) + assert result["items"] == [ + payloads["page1"]["orderedItems"][2], + payloads["page2"]["orderedItems"][1], + ] + assert result["skipped"] == 4 + assert result["errored"] == 1 + assert result["seen"] == 7 + assert result["total"] == 27094 + assert result["next_page"] == payloads["page2"]["next"] diff --git a/front/src/views/channels/DetailBase.vue b/front/src/views/channels/DetailBase.vue index 4e5dabc23..2445f1091 100644 --- a/front/src/views/channels/DetailBase.vue +++ b/front/src/views/channels/DetailBase.vue @@ -272,7 +272,7 @@ export default { this.showEditModal = false this.edit.isLoading = false this.isLoading = true - let channelPromise = axios.get(`channels/${this.id}`).then(response => { + let channelPromise = axios.get(`channels/${this.id}`, {params: {refresh: 'true'}}).then(response => { self.object = response.data if ((self.id == response.data.uuid) && response.data.actor) { // replace with the pretty channel url if possible