Merge branch '170-outbox-fetch' into 'develop'

See #170: fetch channel outbox on discovery/detail

See merge request funkwhale/funkwhale!1073
This commit is contained in:
Eliot Berriot 2020-03-31 16:26:44 +02:00
commit 97e07f6abd
11 changed files with 297 additions and 28 deletions

View File

@ -146,6 +146,8 @@ FEDERATION_ACTOR_FETCH_DELAY = env.int("FEDERATION_ACTOR_FETCH_DELAY", default=6
FEDERATION_SERVICE_ACTOR_USERNAME = env( FEDERATION_SERVICE_ACTOR_USERNAME = env(
"FEDERATION_SERVICE_ACTOR_USERNAME", default="service" "FEDERATION_SERVICE_ACTOR_USERNAME", default="service"
) )
# How many pages to fetch when crawling outboxes and third-party collections
FEDERATION_COLLECTION_MAX_PAGES = env.int("FEDERATION_COLLECTION_MAX_PAGES", default=5)
ALLOWED_HOSTS = env.list("DJANGO_ALLOWED_HOSTS", default=[]) + [FUNKWHALE_HOSTNAME] ALLOWED_HOSTS = env.list("DJANGO_ALLOWED_HOSTS", default=[]) + [FUNKWHALE_HOSTNAME]
# APP CONFIGURATION # APP CONFIGURATION

View File

@ -1,6 +1,7 @@
import uuid import uuid
from django.contrib.contenttypes.fields import GenericRelation
from django.contrib.postgres.fields import JSONField from django.contrib.postgres.fields import JSONField
from django.core.serializers.json import DjangoJSONEncoder from django.core.serializers.json import DjangoJSONEncoder
from django.db import models from django.db import models
@ -67,6 +68,11 @@ class Channel(models.Model):
default=empty_dict, max_length=50000, encoder=DjangoJSONEncoder, blank=True default=empty_dict, max_length=50000, encoder=DjangoJSONEncoder, blank=True
) )
fetches = GenericRelation(
"federation.Fetch",
content_type_field="object_content_type",
object_id_field="object_id",
)
objects = ChannelQuerySet.as_manager() objects = ChannelQuerySet.as_manager()
@property @property
@ -74,6 +80,10 @@ class Channel(models.Model):
if not self.is_external_rss: if not self.is_external_rss:
return self.actor.fid return self.actor.fid
@property
def is_local(self):
return self.actor.is_local
@property @property
def is_external_rss(self): def is_external_rss(self):
return self.actor.preferred_username.startswith("rssfeed-") return self.actor.preferred_username.startswith("rssfeed-")

View File

@ -110,6 +110,15 @@ class ChannelViewSet(
else: else:
return super().list(request, *args, **kwargs) return super().list(request, *args, **kwargs)
def get_object(self):
obj = super().get_object()
if (
self.action == "retrieve"
and self.request.GET.get("refresh", "").lower() == "true"
):
obj = music_views.refetch_obj(obj, self.get_queryset())
return obj
@decorators.action( @decorators.action(
detail=True, detail=True,
methods=["post"], methods=["post"],

View File

@ -19,7 +19,7 @@ from funkwhale_api.music import models as music_models
from funkwhale_api.music import tasks as music_tasks from funkwhale_api.music import tasks as music_tasks
from funkwhale_api.tags import models as tags_models from funkwhale_api.tags import models as tags_models
from . import activity, actors, contexts, jsonld, models, tasks, utils from . import activity, actors, contexts, jsonld, models, utils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -391,6 +391,8 @@ class ActorSerializer(jsonld.JsonLdSerializer):
domain = urllib.parse.urlparse(kwargs["fid"]).netloc domain = urllib.parse.urlparse(kwargs["fid"]).netloc
domain, domain_created = models.Domain.objects.get_or_create(pk=domain) domain, domain_created = models.Domain.objects.get_or_create(pk=domain)
if domain_created and not domain.is_local: if domain_created and not domain.is_local:
from . import tasks
# first time we see the domain, we trigger nodeinfo fetching # first time we see the domain, we trigger nodeinfo fetching
tasks.update_domain_nodeinfo(domain_name=domain.name) tasks.update_domain_nodeinfo(domain_name=domain.name)
@ -896,8 +898,6 @@ def get_additional_fields(data):
PAGINATED_COLLECTION_JSONLD_MAPPING = { PAGINATED_COLLECTION_JSONLD_MAPPING = {
"totalItems": jsonld.first_val(contexts.AS.totalItems), "totalItems": jsonld.first_val(contexts.AS.totalItems),
"actor": jsonld.first_id(contexts.AS.actor),
"attributedTo": jsonld.first_id(contexts.AS.attributedTo),
"first": jsonld.first_id(contexts.AS.first), "first": jsonld.first_id(contexts.AS.first),
"last": jsonld.first_id(contexts.AS.last), "last": jsonld.first_id(contexts.AS.last),
"partOf": jsonld.first_id(contexts.AS.partOf), "partOf": jsonld.first_id(contexts.AS.partOf),
@ -905,10 +905,10 @@ PAGINATED_COLLECTION_JSONLD_MAPPING = {
class PaginatedCollectionSerializer(jsonld.JsonLdSerializer): class PaginatedCollectionSerializer(jsonld.JsonLdSerializer):
type = serializers.ChoiceField(choices=[contexts.AS.Collection]) type = serializers.ChoiceField(
choices=[contexts.AS.Collection, contexts.AS.OrderedCollection]
)
totalItems = serializers.IntegerField(min_value=0) totalItems = serializers.IntegerField(min_value=0)
actor = serializers.URLField(max_length=500, required=False)
attributedTo = serializers.URLField(max_length=500, required=False)
id = serializers.URLField(max_length=500) id = serializers.URLField(max_length=500)
first = serializers.URLField(max_length=500) first = serializers.URLField(max_length=500)
last = serializers.URLField(max_length=500) last = serializers.URLField(max_length=500)
@ -916,18 +916,6 @@ class PaginatedCollectionSerializer(jsonld.JsonLdSerializer):
class Meta: class Meta:
jsonld_mapping = PAGINATED_COLLECTION_JSONLD_MAPPING jsonld_mapping = PAGINATED_COLLECTION_JSONLD_MAPPING
def validate(self, validated_data):
d = super().validate(validated_data)
actor = d.get("actor")
attributed_to = d.get("attributedTo")
if not actor and not attributed_to:
raise serializers.ValidationError(
"You need to provide at least actor or attributedTo"
)
d["attributedTo"] = attributed_to or actor
return d
def to_representation(self, conf): def to_representation(self, conf):
paginator = Paginator(conf["items"], conf.get("page_size", 20)) paginator = Paginator(conf["items"], conf.get("page_size", 20))
first = common_utils.set_query_parameter(conf["id"], page=1) first = common_utils.set_query_parameter(conf["id"], page=1)
@ -954,6 +942,8 @@ class LibrarySerializer(PaginatedCollectionSerializer):
type = serializers.ChoiceField( type = serializers.ChoiceField(
choices=[contexts.AS.Collection, contexts.FW.Library] choices=[contexts.AS.Collection, contexts.FW.Library]
) )
actor = serializers.URLField(max_length=500, required=False)
attributedTo = serializers.URLField(max_length=500, required=False)
name = serializers.CharField() name = serializers.CharField()
summary = serializers.CharField(allow_blank=True, allow_null=True, required=False) summary = serializers.CharField(allow_blank=True, allow_null=True, required=False)
followers = serializers.URLField(max_length=500) followers = serializers.URLField(max_length=500)
@ -976,9 +966,23 @@ class LibrarySerializer(PaginatedCollectionSerializer):
"summary": jsonld.first_val(contexts.AS.summary), "summary": jsonld.first_val(contexts.AS.summary),
"audience": jsonld.first_id(contexts.AS.audience), "audience": jsonld.first_id(contexts.AS.audience),
"followers": jsonld.first_id(contexts.AS.followers), "followers": jsonld.first_id(contexts.AS.followers),
"actor": jsonld.first_id(contexts.AS.actor),
"attributedTo": jsonld.first_id(contexts.AS.attributedTo),
}, },
) )
def validate(self, validated_data):
d = super().validate(validated_data)
actor = d.get("actor")
attributed_to = d.get("attributedTo")
if not actor and not attributed_to:
raise serializers.ValidationError(
"You need to provide at least actor or attributedTo"
)
d["attributedTo"] = attributed_to or actor
return d
def to_representation(self, library): def to_representation(self, library):
conf = { conf = {
"id": library.fid, "id": library.fid,
@ -1934,7 +1938,15 @@ class ChannelUploadSerializer(jsonld.JsonLdSerializer):
return self.update_or_create(validated_data) return self.update_or_create(validated_data)
class ChannelCreateUploadSerializer(serializers.Serializer): class ChannelCreateUploadSerializer(jsonld.JsonLdSerializer):
type = serializers.ChoiceField(choices=[contexts.AS.Create])
object = serializers.DictField()
class Meta:
jsonld_mapping = {
"object": jsonld.first_obj(contexts.AS.object),
}
def to_representation(self, upload): def to_representation(self, upload):
return { return {
"@context": jsonld.get_default_context(), "@context": jsonld.get_default_context(),
@ -1944,3 +1956,13 @@ class ChannelCreateUploadSerializer(serializers.Serializer):
upload, context={"include_ap_context": False} upload, context={"include_ap_context": False}
).data, ).data,
} }
def validate(self, validated_data):
serializer = ChannelUploadSerializer(
data=validated_data["object"], context=self.context, jsonld_expand=False
)
serializer.is_valid(raise_exception=True)
return {"audio_serializer": serializer}
def save(self, **kwargs):
return self.validated_data["audio_serializer"].save(**kwargs)

View File

@ -21,6 +21,7 @@ from funkwhale_api.moderation import mrf
from funkwhale_api.music import models as music_models from funkwhale_api.music import models as music_models
from funkwhale_api.taskapp import celery from funkwhale_api.taskapp import celery
from . import activity
from . import actors from . import actors
from . import jsonld from . import jsonld
from . import keys from . import keys
@ -399,8 +400,24 @@ def fetch(fetch_obj):
# special case for channels # special case for channels
# when obj is an actor, we check if the actor has a channel associated with it # when obj is an actor, we check if the actor has a channel associated with it
# if it is the case, we consider the fetch obj to be a channel instead # if it is the case, we consider the fetch obj to be a channel instead
# and also trigger a fetch on the channel outbox
if isinstance(obj, models.Actor) and obj.get_channel(): if isinstance(obj, models.Actor) and obj.get_channel():
obj = obj.get_channel() obj = obj.get_channel()
if obj.actor.outbox_url:
# first page fetch is synchronous, so that at least some data is available
# in the UI after subscription
result = fetch_collection(
obj.actor.outbox_url, channel_id=obj.pk, max_pages=1,
)
if result.get("next_page"):
# additional pages are fetched in the background
result = fetch_collection.delay(
result["next_page"],
channel_id=obj.pk,
max_pages=settings.FEDERATION_COLLECTION_MAX_PAGES - 1,
is_page=True,
)
fetch_obj.object = obj fetch_obj.object = obj
fetch_obj.status = "finished" fetch_obj.status = "finished"
fetch_obj.fetch_date = timezone.now() fetch_obj.fetch_date = timezone.now()
@ -469,3 +486,106 @@ def remove_actor(actor):
actor.name = None actor.name = None
actor.summary = None actor.summary = None
actor.save(update_fields=["type", "name", "summary"]) actor.save(update_fields=["type", "name", "summary"])
COLLECTION_ACTIVITY_SERIALIZERS = [
(
{"type": "Create", "object.type": "Audio"},
serializers.ChannelCreateUploadSerializer,
)
]
def match_serializer(payload, conf):
return [
serializer_class
for route, serializer_class in conf
if activity.match_route(route, payload)
]
@celery.app.task(name="federation.fetch_collection")
@celery.require_instance(
audio_models.Channel.objects.all(), "channel", allow_null=True,
)
def fetch_collection(url, max_pages, channel, is_page=False):
actor = actors.get_service_actor()
results = {
"items": [],
"skipped": 0,
"errored": 0,
"seen": 0,
"total": 0,
}
if is_page:
# starting immediatly from a page, no need to fetch the wrapping collection
logger.debug("Fetch collection page immediatly at %s", url)
results["next_page"] = url
else:
logger.debug("Fetching collection object at %s", url)
collection = utils.retrieve_ap_object(
url,
actor=actor,
serializer_class=serializers.PaginatedCollectionSerializer,
)
results["next_page"] = collection["first"]
results["total"] = collection.get("totalItems")
seen_pages = 0
context = {}
if channel:
context["channel"] = channel
for i in range(max_pages):
page_url = results["next_page"]
logger.debug("Handling page %s on max %s, at %s", i + 1, max_pages, page_url)
page = utils.retrieve_ap_object(page_url, actor=actor, serializer_class=None,)
try:
items = page["orderedItems"]
except KeyError:
try:
items = page["items"]
except KeyError:
logger.error("Invalid collection page at %s", page_url)
break
for item in items:
results["seen"] += 1
matching_serializer = match_serializer(
item, COLLECTION_ACTIVITY_SERIALIZERS
)
if not matching_serializer:
results["skipped"] += 1
logger.debug("Skipping unhandled activity %s", item.get("type"))
continue
s = matching_serializer[0](data=item, context=context)
if not s.is_valid():
logger.warn("Skipping invalid activity: %s", s.errors)
results["errored"] += 1
continue
results["items"].append(s.save())
seen_pages += 1
results["next_page"] = page.get("next", None) or None
if not results["next_page"]:
logger.debug("No more pages to fetch")
break
logger.info(
"Finished fetch of collection pages at %s. Results:\n"
" Total in collection: %s\n"
" Seen: %s\n"
" Handled: %s\n"
" Skipped: %s\n"
" Errored: %s",
url,
results.get("total"),
results["seen"],
len(results["items"]),
results["skipped"],
results["errored"],
)
return results

View File

@ -107,7 +107,10 @@ def retrieve_ap_object(
return data return data
serializer = serializer_class(data=data, context={"fetch_actor": actor}) serializer = serializer_class(data=data, context={"fetch_actor": actor})
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
return serializer.save() try:
return serializer.save()
except NotImplementedError:
return serializer.validated_data
def get_domain_query_from_url(domain, url_field="fid"): def get_domain_query_from_url(domain, url_field="fid"):

View File

@ -37,16 +37,22 @@ class CeleryConfig(AppConfig):
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, force=True) app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, force=True)
def require_instance(model_or_qs, parameter_name, id_kwarg_name=None): def require_instance(model_or_qs, parameter_name, id_kwarg_name=None, allow_null=False):
def decorator(function): def decorator(function):
@functools.wraps(function) @functools.wraps(function)
def inner(*args, **kwargs): def inner(*args, **kwargs):
kw = id_kwarg_name or "_".join([parameter_name, "id"]) kw = id_kwarg_name or "_".join([parameter_name, "id"])
pk = kwargs.pop(kw)
try: try:
instance = model_or_qs.get(pk=pk) pk = kwargs.pop(kw)
except AttributeError: except KeyError:
instance = model_or_qs.objects.get(pk=pk) if not allow_null:
raise
instance = None
else:
try:
instance = model_or_qs.get(pk=pk)
except AttributeError:
instance = model_or_qs.objects.get(pk=pk)
kwargs[parameter_name] = instance kwargs[parameter_name] = instance
return function(*args, **kwargs) return function(*args, **kwargs)

View File

@ -420,3 +420,18 @@ def test_subscribe_to_rss_creates_channel(factories, logged_in_api_client, mocke
assert response.data["channel"]["uuid"] == channel.uuid assert response.data["channel"]["uuid"] == channel.uuid
get_channel_from_rss_url.assert_called_once_with(rss_url) get_channel_from_rss_url.assert_called_once_with(rss_url)
def test_refresh_channel_when_param_is_true(
factories, mocker, logged_in_api_client, queryset_equal_queries,
):
obj = factories["audio.Channel"]()
refetch_obj = mocker.patch(
"funkwhale_api.music.views.refetch_obj", return_value=obj
)
url = reverse("api:v1:channels-detail", kwargs={"composite": obj.uuid})
response = logged_in_api_client.get(url, {"refresh": "true"})
assert response.status_code == 200
assert refetch_obj.call_count == 1
assert refetch_obj.call_args[0][0] == obj

View File

@ -418,7 +418,6 @@ def test_paginated_collection_serializer_validation():
assert serializer.is_valid(raise_exception=True) is True assert serializer.is_valid(raise_exception=True) is True
assert serializer.validated_data["totalItems"] == 5 assert serializer.validated_data["totalItems"] == 5
assert serializer.validated_data["id"] == data["id"] assert serializer.validated_data["id"] == data["id"]
assert serializer.validated_data["attributedTo"] == data["actor"]
def test_collection_page_serializer_validation(): def test_collection_page_serializer_validation():

View File

@ -491,10 +491,16 @@ def test_fetch_url(factory_name, serializer_class, factories, r_mock, mocker):
assert save.call_count == 1 assert save.call_count == 1
def test_fetch_channel_actor_returns_channel(factories, r_mock): def test_fetch_channel_actor_returns_channel_and_fetch_outbox(
factories, r_mock, settings, mocker
):
obj = factories["audio.Channel"]() obj = factories["audio.Channel"]()
fetch = factories["federation.Fetch"](url=obj.actor.fid) fetch = factories["federation.Fetch"](url=obj.actor.fid)
payload = serializers.ActorSerializer(obj.actor).data payload = serializers.ActorSerializer(obj.actor).data
fetch_collection = mocker.patch.object(
tasks, "fetch_collection", return_value={"next_page": "http://outbox.url/page2"}
)
fetch_collection_delayed = mocker.patch.object(tasks.fetch_collection, "delay")
r_mock.get(obj.fid, json=payload) r_mock.get(obj.fid, json=payload)
@ -504,6 +510,15 @@ def test_fetch_channel_actor_returns_channel(factories, r_mock):
assert fetch.status == "finished" assert fetch.status == "finished"
assert fetch.object == obj assert fetch.object == obj
fetch_collection.assert_called_once_with(
obj.actor.outbox_url, channel_id=obj.pk, max_pages=1,
)
fetch_collection_delayed.assert_called_once_with(
"http://outbox.url/page2",
max_pages=settings.FEDERATION_COLLECTION_MAX_PAGES - 1,
is_page=True,
channel_id=obj.pk,
)
def test_fetch_honor_instance_policy_domain(factories): def test_fetch_honor_instance_policy_domain(factories):
@ -563,3 +578,71 @@ def test_fetch_honor_instance_policy_different_url_and_id(r_mock, factories):
assert fetch.status == "errored" assert fetch.status == "errored"
assert fetch.detail["error_code"] == "blocked" assert fetch.detail["error_code"] == "blocked"
def test_fetch_collection(mocker, r_mock):
class DummySerializer(serializers.serializers.Serializer):
def validate(self, validated_data):
validated_data = self.initial_data
if "id" not in validated_data["object"]:
raise serializers.serializers.ValidationError()
return validated_data
def save(self):
return self.initial_data
mocker.patch.object(
tasks,
"COLLECTION_ACTIVITY_SERIALIZERS",
[({"type": "Create", "object.type": "Audio"}, DummySerializer)],
)
payloads = {
"outbox": {
"id": "https://actor.url/outbox",
"@context": jsonld.get_default_context(),
"type": "OrderedCollection",
"totalItems": 27094,
"first": "https://actor.url/outbox?page=1",
"last": "https://actor.url/outbox?page=3",
},
"page1": {
"@context": jsonld.get_default_context(),
"type": "OrderedCollectionPage",
"next": "https://actor.url/outbox?page=2",
"orderedItems": [
{"type": "Unhandled"},
{"type": "Unhandled"},
{
"type": "Create",
"object": {"type": "Audio", "id": "https://actor.url/audio1"},
},
],
},
"page2": {
"@context": jsonld.get_default_context(),
"type": "OrderedCollectionPage",
"next": "https://actor.url/outbox?page=3",
"orderedItems": [
{"type": "Unhandled"},
{
"type": "Create",
"object": {"type": "Audio", "id": "https://actor.url/audio2"},
},
{"type": "Unhandled"},
{"type": "Create", "object": {"type": "Audio"}},
],
},
}
r_mock.get(payloads["outbox"]["id"], json=payloads["outbox"])
r_mock.get(payloads["outbox"]["first"], json=payloads["page1"])
r_mock.get(payloads["page1"]["next"], json=payloads["page2"])
result = tasks.fetch_collection(payloads["outbox"]["id"], max_pages=2,)
assert result["items"] == [
payloads["page1"]["orderedItems"][2],
payloads["page2"]["orderedItems"][1],
]
assert result["skipped"] == 4
assert result["errored"] == 1
assert result["seen"] == 7
assert result["total"] == 27094
assert result["next_page"] == payloads["page2"]["next"]

View File

@ -272,7 +272,7 @@ export default {
this.showEditModal = false this.showEditModal = false
this.edit.isLoading = false this.edit.isLoading = false
this.isLoading = true this.isLoading = true
let channelPromise = axios.get(`channels/${this.id}`).then(response => { let channelPromise = axios.get(`channels/${this.id}`, {params: {refresh: 'true'}}).then(response => {
self.object = response.data self.object = response.data
if ((self.id == response.data.uuid) && response.data.actor) { if ((self.id == response.data.uuid) && response.data.actor) {
// replace with the pretty channel url if possible // replace with the pretty channel url if possible