398 lines
13 KiB
Python
398 lines
13 KiB
Python
import datetime
|
|
import os
|
|
import pathlib
|
|
import pytest
|
|
|
|
from django.utils import timezone
|
|
|
|
from funkwhale_api.federation import jsonld
|
|
from funkwhale_api.federation import models
|
|
from funkwhale_api.federation import serializers
|
|
from funkwhale_api.federation import tasks
|
|
from funkwhale_api.federation import utils
|
|
|
|
|
|
def test_clean_federation_music_cache_if_no_listen(preferences, factories):
|
|
preferences["federation__music_cache_duration"] = 60
|
|
remote_library = factories["music.Library"]()
|
|
upload1 = factories["music.Upload"](
|
|
library=remote_library, accessed_date=timezone.now()
|
|
)
|
|
upload2 = factories["music.Upload"](
|
|
library=remote_library,
|
|
accessed_date=timezone.now() - datetime.timedelta(minutes=61),
|
|
)
|
|
upload3 = factories["music.Upload"](library=remote_library, accessed_date=None)
|
|
# local upload, should not be cleaned
|
|
upload4 = factories["music.Upload"](library__actor__local=True, accessed_date=None)
|
|
|
|
path1 = upload1.audio_file_path
|
|
path2 = upload2.audio_file_path
|
|
path3 = upload3.audio_file_path
|
|
path4 = upload4.audio_file_path
|
|
|
|
tasks.clean_music_cache()
|
|
|
|
upload1.refresh_from_db()
|
|
upload2.refresh_from_db()
|
|
upload3.refresh_from_db()
|
|
upload4.refresh_from_db()
|
|
|
|
assert bool(upload1.audio_file) is True
|
|
assert bool(upload2.audio_file) is False
|
|
assert bool(upload3.audio_file) is False
|
|
assert bool(upload4.audio_file) is True
|
|
assert os.path.exists(path1) is True
|
|
assert os.path.exists(path2) is False
|
|
assert os.path.exists(path3) is False
|
|
assert os.path.exists(path4) is True
|
|
|
|
|
|
def test_clean_federation_music_cache_orphaned(settings, preferences, factories):
|
|
preferences["federation__music_cache_duration"] = 60
|
|
path = os.path.join(settings.MEDIA_ROOT, "federation_cache", "tracks")
|
|
keep_path = os.path.join(os.path.join(path, "1a", "b2"), "keep.ogg")
|
|
remove_path = os.path.join(os.path.join(path, "c3", "d4"), "remove.ogg")
|
|
os.makedirs(os.path.dirname(keep_path), exist_ok=True)
|
|
os.makedirs(os.path.dirname(remove_path), exist_ok=True)
|
|
pathlib.Path(keep_path).touch()
|
|
pathlib.Path(remove_path).touch()
|
|
upload = factories["music.Upload"](
|
|
accessed_date=timezone.now(), audio_file__path=keep_path
|
|
)
|
|
|
|
tasks.clean_music_cache()
|
|
|
|
upload.refresh_from_db()
|
|
|
|
assert bool(upload.audio_file) is True
|
|
assert os.path.exists(upload.audio_file_path) is True
|
|
assert os.path.exists(remove_path) is False
|
|
|
|
|
|
def test_handle_in(factories, mocker, now, queryset_equal_list):
|
|
mocked_dispatch = mocker.patch("funkwhale_api.federation.routes.inbox.dispatch")
|
|
|
|
r1 = factories["users.User"](with_actor=True).actor
|
|
r2 = factories["users.User"](with_actor=True).actor
|
|
a = factories["federation.Activity"](payload={"hello": "world"})
|
|
ii1 = factories["federation.InboxItem"](activity=a, actor=r1)
|
|
ii2 = factories["federation.InboxItem"](activity=a, actor=r2)
|
|
tasks.dispatch_inbox(activity_id=a.pk, call_handlers=False)
|
|
|
|
mocked_dispatch.assert_called_once_with(
|
|
a.payload,
|
|
context={"actor": a.actor, "activity": a, "inbox_items": [ii1, ii2]},
|
|
call_handlers=False,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"type, call_handlers", [("Noop", False), ("Update", False), ("Follow", True)]
|
|
)
|
|
def test_dispatch_outbox(factories, mocker, type, call_handlers):
|
|
mocked_inbox = mocker.patch("funkwhale_api.federation.tasks.dispatch_inbox.delay")
|
|
mocked_deliver_to_remote = mocker.patch(
|
|
"funkwhale_api.federation.tasks.deliver_to_remote.delay"
|
|
)
|
|
activity = factories["federation.Activity"](actor__local=True, type=type)
|
|
factories["federation.InboxItem"](activity=activity)
|
|
delivery = factories["federation.Delivery"](activity=activity)
|
|
tasks.dispatch_outbox(activity_id=activity.pk)
|
|
mocked_inbox.assert_called_once_with(
|
|
activity_id=activity.pk, call_handlers=call_handlers
|
|
)
|
|
mocked_deliver_to_remote.assert_called_once_with(delivery_id=delivery.pk)
|
|
|
|
|
|
def test_dispatch_outbox_disabled_federation(factories, mocker, preferences):
|
|
preferences["federation__enabled"] = False
|
|
mocked_inbox = mocker.patch("funkwhale_api.federation.tasks.dispatch_inbox.delay")
|
|
mocked_deliver_to_remote = mocker.patch(
|
|
"funkwhale_api.federation.tasks.deliver_to_remote.delay"
|
|
)
|
|
activity = factories["federation.Activity"](actor__local=True)
|
|
factories["federation.InboxItem"](activity=activity)
|
|
factories["federation.Delivery"](activity=activity)
|
|
tasks.dispatch_outbox(activity_id=activity.pk)
|
|
mocked_inbox.assert_called_once_with(activity_id=activity.pk, call_handlers=False)
|
|
mocked_deliver_to_remote.assert_not_called()
|
|
|
|
|
|
def test_deliver_to_remote_success_mark_as_delivered(factories, r_mock, now):
|
|
delivery = factories["federation.Delivery"]()
|
|
r_mock.post(delivery.inbox_url)
|
|
tasks.deliver_to_remote(delivery_id=delivery.pk)
|
|
|
|
delivery.refresh_from_db()
|
|
|
|
request = r_mock.request_history[0]
|
|
assert delivery.is_delivered is True
|
|
assert delivery.attempts == 1
|
|
assert delivery.last_attempt_date == now
|
|
assert r_mock.called is True
|
|
assert r_mock.call_count == 1
|
|
assert request.url == delivery.inbox_url
|
|
assert request.headers["content-type"] == "application/activity+json"
|
|
assert request.json() == delivery.activity.payload
|
|
|
|
|
|
def test_deliver_to_remote_error(factories, r_mock, now):
|
|
delivery = factories["federation.Delivery"]()
|
|
r_mock.post(delivery.inbox_url, status_code=404)
|
|
|
|
with pytest.raises(tasks.RequestException):
|
|
tasks.deliver_to_remote(delivery_id=delivery.pk)
|
|
|
|
delivery.refresh_from_db()
|
|
|
|
assert delivery.is_delivered is False
|
|
assert delivery.attempts == 1
|
|
assert delivery.last_attempt_date == now
|
|
|
|
|
|
def test_fetch_nodeinfo(factories, r_mock, now):
|
|
wellknown_url = "https://test.test/.well-known/nodeinfo"
|
|
nodeinfo_url = "https://test.test/nodeinfo"
|
|
|
|
r_mock.get(
|
|
wellknown_url,
|
|
json={
|
|
"links": [
|
|
{
|
|
"rel": "http://nodeinfo.diaspora.software/ns/schema/2.0",
|
|
"href": "https://test.test/nodeinfo",
|
|
}
|
|
]
|
|
},
|
|
)
|
|
r_mock.get(nodeinfo_url, json={"hello": "world"})
|
|
|
|
assert tasks.fetch_nodeinfo("test.test") == {"hello": "world"}
|
|
|
|
|
|
def test_update_domain_nodeinfo(factories, mocker, now, service_actor):
|
|
domain = factories["federation.Domain"](nodeinfo_fetch_date=None)
|
|
actor = factories["federation.Actor"](fid="https://actor.id")
|
|
retrieve_ap_object = mocker.spy(utils, "retrieve_ap_object")
|
|
|
|
mocker.patch.object(
|
|
tasks,
|
|
"fetch_nodeinfo",
|
|
return_value={"hello": "world", "metadata": {"actorId": "https://actor.id"}},
|
|
)
|
|
|
|
assert domain.nodeinfo == {}
|
|
assert domain.nodeinfo_fetch_date is None
|
|
assert domain.service_actor is None
|
|
|
|
tasks.update_domain_nodeinfo(domain_name=domain.name)
|
|
|
|
domain.refresh_from_db()
|
|
|
|
assert domain.nodeinfo_fetch_date == now
|
|
assert domain.nodeinfo == {
|
|
"status": "ok",
|
|
"payload": {"hello": "world", "metadata": {"actorId": "https://actor.id"}},
|
|
}
|
|
assert domain.service_actor == actor
|
|
|
|
retrieve_ap_object.assert_called_once_with(
|
|
"https://actor.id",
|
|
actor=service_actor,
|
|
queryset=models.Actor,
|
|
serializer_class=serializers.ActorSerializer,
|
|
)
|
|
|
|
|
|
def test_update_domain_nodeinfo_error(factories, r_mock, now):
|
|
domain = factories["federation.Domain"](nodeinfo_fetch_date=None)
|
|
wellknown_url = "https://{}/.well-known/nodeinfo".format(domain.name)
|
|
|
|
r_mock.get(wellknown_url, status_code=500)
|
|
|
|
tasks.update_domain_nodeinfo(domain_name=domain.name)
|
|
|
|
domain.refresh_from_db()
|
|
|
|
assert domain.nodeinfo_fetch_date == now
|
|
assert domain.nodeinfo == {
|
|
"status": "error",
|
|
"error": "500 Server Error: None for url: {}".format(wellknown_url),
|
|
}
|
|
|
|
|
|
def test_refresh_nodeinfo_known_nodes(settings, factories, mocker, now):
|
|
settings.NODEINFO_REFRESH_DELAY = 666
|
|
|
|
refreshed = [
|
|
factories["federation.Domain"](nodeinfo_fetch_date=None),
|
|
factories["federation.Domain"](
|
|
nodeinfo_fetch_date=now
|
|
- datetime.timedelta(seconds=settings.NODEINFO_REFRESH_DELAY + 1)
|
|
),
|
|
]
|
|
factories["federation.Domain"](
|
|
nodeinfo_fetch_date=now
|
|
- datetime.timedelta(seconds=settings.NODEINFO_REFRESH_DELAY - 1)
|
|
)
|
|
|
|
update_domain_nodeinfo = mocker.patch.object(tasks.update_domain_nodeinfo, "delay")
|
|
|
|
tasks.refresh_nodeinfo_known_nodes()
|
|
|
|
assert update_domain_nodeinfo.call_count == len(refreshed)
|
|
|
|
for d in refreshed:
|
|
update_domain_nodeinfo.assert_any_call(domain_name=d.name)
|
|
|
|
|
|
def test_handle_purge_actors(factories, mocker):
|
|
to_purge = factories["federation.Actor"]()
|
|
keeped = [
|
|
factories["music.Upload"](),
|
|
factories["federation.Activity"](),
|
|
factories["federation.InboxItem"](),
|
|
factories["federation.Follow"](),
|
|
factories["federation.LibraryFollow"](),
|
|
]
|
|
|
|
library = factories["music.Library"](actor=to_purge)
|
|
deleted = [
|
|
library,
|
|
factories["music.Upload"](library=library),
|
|
factories["federation.Activity"](actor=to_purge),
|
|
factories["federation.InboxItem"](actor=to_purge),
|
|
factories["federation.Follow"](actor=to_purge),
|
|
factories["federation.LibraryFollow"](actor=to_purge),
|
|
]
|
|
|
|
tasks.handle_purge_actors([to_purge.pk])
|
|
|
|
for k in keeped:
|
|
# this should not be deleted
|
|
k.refresh_from_db()
|
|
|
|
for d in deleted:
|
|
with pytest.raises(d.__class__.DoesNotExist):
|
|
d.refresh_from_db()
|
|
|
|
|
|
def test_handle_purge_actors_restrict_media(factories, mocker):
|
|
to_purge = factories["federation.Actor"]()
|
|
keeped = [
|
|
factories["music.Upload"](),
|
|
factories["federation.Activity"](),
|
|
factories["federation.InboxItem"](),
|
|
factories["federation.Follow"](),
|
|
factories["federation.LibraryFollow"](),
|
|
factories["federation.Activity"](actor=to_purge),
|
|
factories["federation.InboxItem"](actor=to_purge),
|
|
factories["federation.Follow"](actor=to_purge),
|
|
]
|
|
|
|
library = factories["music.Library"](actor=to_purge)
|
|
deleted = [
|
|
library,
|
|
factories["music.Upload"](library=library),
|
|
factories["federation.LibraryFollow"](actor=to_purge),
|
|
]
|
|
|
|
tasks.handle_purge_actors([to_purge.pk], only=["media"])
|
|
|
|
for k in keeped:
|
|
# this should not be deleted
|
|
k.refresh_from_db()
|
|
|
|
for d in deleted:
|
|
with pytest.raises(d.__class__.DoesNotExist):
|
|
d.refresh_from_db()
|
|
|
|
|
|
def test_purge_actors(factories, mocker):
|
|
handle_purge_actors = mocker.spy(tasks, "handle_purge_actors")
|
|
factories["federation.Actor"]()
|
|
to_delete = factories["federation.Actor"]()
|
|
to_delete_domain = factories["federation.Actor"]()
|
|
tasks.purge_actors(
|
|
ids=[to_delete.pk], domains=[to_delete_domain.domain.name], only=["hello"]
|
|
)
|
|
|
|
handle_purge_actors.assert_called_once_with(
|
|
ids=[to_delete.pk, to_delete_domain.pk], only=["hello"]
|
|
)
|
|
|
|
|
|
def test_rotate_actor_key(factories, settings, mocker):
|
|
actor = factories["federation.Actor"](local=True)
|
|
get_key_pair = mocker.patch(
|
|
"funkwhale_api.federation.keys.get_key_pair",
|
|
return_value=(b"private", b"public"),
|
|
)
|
|
|
|
tasks.rotate_actor_key(actor_id=actor.pk)
|
|
|
|
actor.refresh_from_db()
|
|
|
|
get_key_pair.assert_called_once_with()
|
|
|
|
assert actor.public_key == "public"
|
|
assert actor.private_key == "private"
|
|
|
|
|
|
def test_fetch_skipped(factories, r_mock):
|
|
url = "https://fetch.object"
|
|
fetch = factories["federation.Fetch"](url=url)
|
|
payload = {"@context": jsonld.get_default_context(), "type": "Unhandled"}
|
|
r_mock.get(url, json=payload)
|
|
|
|
tasks.fetch(fetch_id=fetch.pk)
|
|
|
|
fetch.refresh_from_db()
|
|
|
|
assert fetch.status == "skipped"
|
|
assert fetch.detail["reason"] == "unhandled_type"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"r_mock_args, expected_error_code",
|
|
[
|
|
({"json": {"type": "Unhandled"}}, "invalid_jsonld"),
|
|
({"json": {"@context": jsonld.get_default_context()}}, "invalid_jsonld"),
|
|
({"text": "invalidjson"}, "invalid_json"),
|
|
({"status_code": 404}, "http"),
|
|
({"status_code": 500}, "http"),
|
|
],
|
|
)
|
|
def test_fetch_errored(factories, r_mock_args, expected_error_code, r_mock):
|
|
url = "https://fetch.object"
|
|
fetch = factories["federation.Fetch"](url=url)
|
|
r_mock.get(url, **r_mock_args)
|
|
|
|
tasks.fetch(fetch_id=fetch.pk)
|
|
|
|
fetch.refresh_from_db()
|
|
|
|
assert fetch.status == "errored"
|
|
assert fetch.detail["error_code"] == expected_error_code
|
|
|
|
|
|
def test_fetch_success(factories, r_mock, mocker):
|
|
artist = factories["music.Artist"]()
|
|
fetch = factories["federation.Fetch"](url=artist.fid)
|
|
payload = serializers.ArtistSerializer(artist).data
|
|
init = mocker.spy(serializers.ArtistSerializer, "__init__")
|
|
save = mocker.spy(serializers.ArtistSerializer, "save")
|
|
|
|
r_mock.get(artist.fid, json=payload)
|
|
|
|
tasks.fetch(fetch_id=fetch.pk)
|
|
|
|
fetch.refresh_from_db()
|
|
payload["@context"].append("https://funkwhale.audio/ns")
|
|
assert fetch.status == "finished"
|
|
assert init.call_count == 1
|
|
assert init.call_args[0][1] == artist
|
|
assert init.call_args[1]["data"] == payload
|
|
assert save.call_count == 1
|