1592 lines
54 KiB
Python
1592 lines
54 KiB
Python
import datetime
|
|
import io
|
|
import os
|
|
import pathlib
|
|
import urllib.parse
|
|
import uuid
|
|
|
|
import magic
|
|
import pytest
|
|
from django.db.models import Count, Prefetch
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
|
|
from funkwhale_api.common import utils
|
|
from funkwhale_api.federation import api_serializers as federation_api_serializers
|
|
from funkwhale_api.federation import tasks as federation_tasks
|
|
from funkwhale_api.federation import utils as federation_utils
|
|
from funkwhale_api.music import licenses, models, serializers, tasks, views
|
|
from funkwhale_api.users import authentication as users_authentication
|
|
|
|
DATA_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
def test_artist_list_serializer(api_request, factories, logged_in_api_client):
|
|
tags = ["tag1", "tag2"]
|
|
track = factories["music.Upload"](
|
|
library__privacy_level="everyone",
|
|
import_status="finished",
|
|
track__album__artist_credit__artist__set_tags=tags,
|
|
).track
|
|
artist = track.artist_credit.all()[0].artist
|
|
request = api_request.get("/")
|
|
qs = artist.__class__.objects.with_albums().annotate(
|
|
_tracks_count=Count("artist_credit__tracks")
|
|
)
|
|
serializer = serializers.ArtistWithAlbumsSerializer(
|
|
qs, many=True, context={"request": request}
|
|
)
|
|
expected = {"count": 1, "next": None, "previous": None, "results": serializer.data}
|
|
for artist in serializer.data:
|
|
artist["tags"] = tags
|
|
|
|
url = reverse("api:v1:artists-list")
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert serializer.data[0]["tracks_count"] == 1
|
|
assert response.status_code == 200
|
|
assert response.data == expected
|
|
|
|
|
|
def test_album_list_serializer(api_request, factories, logged_in_api_client):
|
|
tags = ["tag1", "tag2"]
|
|
track = factories["music.Upload"](
|
|
library__privacy_level="everyone",
|
|
import_status="finished",
|
|
track__album__set_tags=tags,
|
|
).track
|
|
album = track.album
|
|
request = api_request.get("/")
|
|
|
|
tracks = models.Track.objects.all().prefetch_related("album")
|
|
tracks = tracks.annotate_playable_by_actor(None)
|
|
|
|
qs = album.__class__.objects.with_tracks_count().annotate_playable_by_actor(None)
|
|
qs = qs.prefetch_related(Prefetch("tracks", queryset=tracks))
|
|
serializer = serializers.AlbumSerializer(
|
|
qs, many=True, context={"request": request}
|
|
)
|
|
expected = {"count": 1, "next": None, "previous": None, "results": serializer.data}
|
|
for album in serializer.data:
|
|
album["tags"] = tags
|
|
url = reverse("api:v1:albums-list")
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["results"][0] == expected["results"][0]
|
|
|
|
|
|
def test_track_list_serializer(api_request, factories, logged_in_api_client):
|
|
tags = ["tag1", "tag2"]
|
|
track = factories["music.Upload"](
|
|
library__privacy_level="everyone",
|
|
import_status="finished",
|
|
track__set_tags=tags,
|
|
).track
|
|
request = api_request.get("/")
|
|
qs = track.__class__.objects.with_playable_uploads(None)
|
|
serializer = serializers.TrackSerializer(
|
|
qs, many=True, context={"request": request}
|
|
)
|
|
expected = {"count": 1, "next": None, "previous": None, "results": serializer.data}
|
|
for track in serializer.data:
|
|
track["tags"] = tags
|
|
url = reverse("api:v1:tracks-list")
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data == expected
|
|
|
|
|
|
def test_track_list_filter_id(api_request, factories, logged_in_api_client):
|
|
track1 = factories["music.Track"]()
|
|
track2 = factories["music.Track"]()
|
|
factories["music.Track"]()
|
|
url = reverse("api:v1:tracks-list")
|
|
response = logged_in_api_client.get(url, {"id[]": [track1.id, track2.id]})
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["count"] == 2
|
|
assert response.data["results"][0]["id"] == track2.id
|
|
assert response.data["results"][1]["id"] == track1.id
|
|
|
|
|
|
@pytest.mark.parametrize("param,expected", [("true", "full"), ("false", "empty")])
|
|
def test_artist_view_filter_playable(param, expected, factories, api_request):
|
|
artists = {
|
|
"empty": factories["music.Artist"](),
|
|
"full": factories["music.Upload"](
|
|
library__privacy_level="everyone", import_status="finished"
|
|
)
|
|
.track.artist_credit.all()[0]
|
|
.artist,
|
|
}
|
|
|
|
request = api_request.get("/", {"playable": param})
|
|
view = views.ArtistViewSet()
|
|
view.action_map = {"get": "list"}
|
|
expected = [artists[expected]]
|
|
view.request = view.initialize_request(request)
|
|
queryset = view.filter_queryset(view.get_queryset())
|
|
|
|
assert list(queryset) == expected
|
|
|
|
|
|
@pytest.mark.parametrize("param,expected", [("true", "full"), ("false", "empty")])
|
|
def test_album_view_filter_playable(param, expected, factories, api_request):
|
|
artists = {
|
|
"empty": factories["music.Album"](),
|
|
"full": factories["music.Upload"](
|
|
library__privacy_level="everyone", import_status="finished"
|
|
).track.album,
|
|
}
|
|
|
|
request = api_request.get("/", {"playable": param})
|
|
view = views.AlbumViewSet()
|
|
view.action_map = {"get": "list"}
|
|
expected = [artists[expected]]
|
|
view.request = view.initialize_request(request)
|
|
queryset = view.filter_queryset(view.get_queryset())
|
|
|
|
assert list(queryset) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"param", [("I've Got"), ("Français"), ("I've Got Everything : Spoken Word Poetry")]
|
|
)
|
|
def test_album_view_filter_query(param, factories, api_request):
|
|
# Test both partial and full search.
|
|
factories["music.Album"](title="I've Got Nothing : Original Soundtrack")
|
|
factories["music.Album"](title="I've Got Cake : Remix")
|
|
factories["music.Album"](title="Français Et Tu")
|
|
factories["music.Album"](title="I've Got Everything : Spoken Word Poetry")
|
|
|
|
request = api_request.get("/", {"q": param})
|
|
view = views.AlbumViewSet()
|
|
view.action_map = {"get": "list"}
|
|
view.request = view.initialize_request(request)
|
|
queryset = view.filter_queryset(view.get_queryset())
|
|
|
|
# Loop through our "expected list", and assert some string finds against our param.
|
|
for val in list(queryset):
|
|
assert val.title.find(param) != -1
|
|
|
|
|
|
def test_can_serve_upload_as_remote_library(
|
|
factories, authenticated_actor, logged_in_api_client, settings, preferences
|
|
):
|
|
preferences["common__api_authentication_required"] = True
|
|
upload = factories["music.Upload"](
|
|
library__privacy_level="everyone", import_status="finished"
|
|
)
|
|
library_actor = upload.library.actor
|
|
factories["federation.Follow"](
|
|
approved=True, actor=authenticated_actor, target=library_actor
|
|
)
|
|
|
|
response = logged_in_api_client.get(upload.track.listen_url)
|
|
|
|
assert response.status_code == 200
|
|
assert response["X-Accel-Redirect"] == "{}{}".format(
|
|
settings.PROTECT_FILES_PATH,
|
|
views.strip_absolute_media_url(upload.audio_file.url),
|
|
)
|
|
|
|
|
|
def test_can_serve_upload_as_remote_library_deny_not_following(
|
|
factories, authenticated_actor, settings, api_client, preferences
|
|
):
|
|
preferences["common__api_authentication_required"] = True
|
|
upload = factories["music.Upload"](
|
|
import_status="finished", library__privacy_level="instance"
|
|
)
|
|
response = api_client.get(upload.track.listen_url)
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"proxy,serve_path,expected",
|
|
[
|
|
("apache2", "/host/music", "/host/music/hello/world.mp3"),
|
|
("apache2", "/app/music", "/app/music/hello/world.mp3"),
|
|
("nginx", "/host/music", "/_protected/music/hello/world.mp3"),
|
|
("nginx", "/app/music", "/_protected/music/hello/world.mp3"),
|
|
],
|
|
)
|
|
def test_serve_file_in_place(
|
|
proxy, serve_path, expected, factories, api_client, preferences, settings
|
|
):
|
|
headers = {"apache2": "X-Sendfile", "nginx": "X-Accel-Redirect"}
|
|
preferences["common__api_authentication_required"] = False
|
|
settings.PROTECT_FILE_PATH = "/_protected/music"
|
|
settings.REVERSE_PROXY_TYPE = proxy
|
|
settings.MUSIC_DIRECTORY_PATH = "/app/music"
|
|
settings.MUSIC_DIRECTORY_SERVE_PATH = serve_path
|
|
upload = factories["music.Upload"](
|
|
in_place=True,
|
|
import_status="finished",
|
|
source="file:///app/music/hello/world.mp3",
|
|
library__privacy_level="everyone",
|
|
)
|
|
response = api_client.get(upload.track.listen_url)
|
|
|
|
assert response.status_code == 200
|
|
assert response[headers[proxy]] == expected
|
|
|
|
|
|
def test_serve_file_in_place_nginx_encode_url(
|
|
factories, api_client, preferences, settings
|
|
):
|
|
preferences["common__api_authentication_required"] = False
|
|
settings.PROTECT_FILE_PATH = "/_protected/music"
|
|
settings.REVERSE_PROXY_TYPE = "nginx"
|
|
settings.MUSIC_DIRECTORY_PATH = "/app/music"
|
|
settings.MUSIC_DIRECTORY_SERVE_PATH = "/app/music"
|
|
upload = factories["music.Upload"](
|
|
in_place=True,
|
|
import_status="finished",
|
|
source="file:///app/music/hello/world%?.mp3",
|
|
library__privacy_level="everyone",
|
|
)
|
|
response = api_client.get(upload.track.listen_url)
|
|
expected = "/_protected/music/hello/world%25%3F.mp3"
|
|
|
|
assert response.status_code == 200
|
|
assert response["X-Accel-Redirect"] == expected
|
|
|
|
|
|
def test_serve_s3_nginx_encode_url(mocker, settings):
|
|
settings.PROTECT_FILE_PATH = "/_protected/media"
|
|
settings.REVERSE_PROXY_TYPE = "nginx"
|
|
audio_file = mocker.Mock(url="https://s3.storage.example/path/to/mp3?aws=signature")
|
|
|
|
expected = (
|
|
b"/_protected/media/https://s3.storage.example/path/to/mp3%3Faws%3Dsignature"
|
|
)
|
|
|
|
assert views.get_file_path(audio_file) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"proxy,serve_path,expected",
|
|
[
|
|
("apache2", "/host/music", "/host/music/hello/worldéà.mp3"),
|
|
("apache2", "/app/music", "/app/music/hello/worldéà.mp3"),
|
|
("nginx", "/host/music", "/_protected/music/hello/world%C3%A9%C3%A0.mp3"),
|
|
("nginx", "/app/music", "/_protected/music/hello/world%C3%A9%C3%A0.mp3"),
|
|
],
|
|
)
|
|
def test_serve_file_in_place_utf8(
|
|
proxy, serve_path, expected, factories, api_client, settings, preferences
|
|
):
|
|
preferences["common__api_authentication_required"] = False
|
|
settings.PROTECT_FILE_PATH = "/_protected/music"
|
|
settings.REVERSE_PROXY_TYPE = proxy
|
|
settings.MUSIC_DIRECTORY_PATH = "/app/music"
|
|
settings.MUSIC_DIRECTORY_SERVE_PATH = serve_path
|
|
path = views.get_file_path("/app/music/hello/worldéà.mp3")
|
|
|
|
assert path == expected.encode("utf-8")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"proxy,serve_path,expected",
|
|
[
|
|
("apache2", "/host/music", "/host/media/tracks/hello/world.mp3"),
|
|
# apache with container not supported yet
|
|
# ('apache2', '/app/music', '/app/music/tracks/hello/world.mp3'),
|
|
("nginx", "/host/music", "/_protected/media/tracks/hello/world.mp3"),
|
|
("nginx", "/app/music", "/_protected/media/tracks/hello/world.mp3"),
|
|
],
|
|
)
|
|
def test_serve_file_media(
|
|
proxy, serve_path, expected, factories, api_client, settings, preferences
|
|
):
|
|
headers = {"apache2": "X-Sendfile", "nginx": "X-Accel-Redirect"}
|
|
preferences["common__api_authentication_required"] = False
|
|
settings.MEDIA_ROOT = "/host/media"
|
|
settings.PROTECT_FILE_PATH = "/_protected/music"
|
|
settings.REVERSE_PROXY_TYPE = proxy
|
|
settings.MUSIC_DIRECTORY_PATH = "/app/music"
|
|
settings.MUSIC_DIRECTORY_SERVE_PATH = serve_path
|
|
|
|
upload = factories["music.Upload"](
|
|
library__privacy_level="everyone", import_status="finished"
|
|
)
|
|
upload.__class__.objects.filter(pk=upload.pk).update(
|
|
audio_file="tracks/hello/world.mp3"
|
|
)
|
|
response = api_client.get(upload.track.listen_url)
|
|
|
|
assert response.status_code == 200
|
|
assert response[headers[proxy]] == expected
|
|
|
|
|
|
def test_can_proxy_remote_track(factories, settings, api_client, r_mock, preferences):
|
|
preferences["common__api_authentication_required"] = False
|
|
url = "https://file.test"
|
|
upload = factories["music.Upload"](
|
|
library__privacy_level="everyone",
|
|
audio_file="",
|
|
source=url,
|
|
import_status="finished",
|
|
)
|
|
|
|
r_mock.get(url, body=io.BytesIO(b"test"))
|
|
response = api_client.get(upload.track.listen_url)
|
|
upload.refresh_from_db()
|
|
|
|
assert response.status_code == 200
|
|
assert response["X-Accel-Redirect"] == "{}{}".format(
|
|
settings.PROTECT_FILES_PATH,
|
|
views.strip_absolute_media_url(upload.audio_file.url),
|
|
)
|
|
assert upload.audio_file.read() == b"test"
|
|
|
|
|
|
def test_serve_updates_access_date(factories, settings, api_client, preferences):
|
|
preferences["common__api_authentication_required"] = False
|
|
upload = factories["music.Upload"](
|
|
library__privacy_level="everyone", import_status="finished"
|
|
)
|
|
now = timezone.now()
|
|
assert upload.accessed_date is None
|
|
|
|
response = api_client.get(upload.track.listen_url)
|
|
upload.refresh_from_db()
|
|
|
|
assert response.status_code == 200
|
|
assert upload.accessed_date > now
|
|
|
|
|
|
def test_listen_no_track(factories, logged_in_api_client, mocker):
|
|
increment_downloads_count = mocker.patch(
|
|
"funkwhale_api.music.utils.increment_downloads_count"
|
|
)
|
|
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": "noop"})
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 404
|
|
increment_downloads_count.call_count == 0
|
|
|
|
|
|
def test_listen_no_file(factories, logged_in_api_client):
|
|
track = factories["music.Track"]()
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": track.uuid})
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_listen_no_available_file(factories, logged_in_api_client):
|
|
upload = factories["music.Upload"]()
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": upload.track.uuid})
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_listen_correct_access(factories, logged_in_api_client, mocker):
|
|
increment_downloads_count = mocker.patch(
|
|
"funkwhale_api.music.utils.increment_downloads_count"
|
|
)
|
|
logged_in_api_client.user.create_actor()
|
|
upload = factories["music.Upload"](
|
|
library__actor=logged_in_api_client.user.actor,
|
|
library__privacy_level="me",
|
|
import_status="finished",
|
|
)
|
|
expected_filename = upload.track.full_name + ".ogg"
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": upload.track.uuid})
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response["Content-Disposition"] == "attachment; filename*=UTF-8''{}".format(
|
|
urllib.parse.quote(expected_filename)
|
|
)
|
|
|
|
increment_downloads_count.assert_called_once_with(
|
|
upload=upload,
|
|
user=logged_in_api_client.user,
|
|
wsgi_request=response.wsgi_request,
|
|
)
|
|
|
|
|
|
def test_listen_correct_access_download_false(factories, logged_in_api_client):
|
|
logged_in_api_client.user.create_actor()
|
|
upload = factories["music.Upload"](
|
|
library__actor=logged_in_api_client.user.actor,
|
|
library__privacy_level="me",
|
|
import_status="finished",
|
|
)
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": upload.track.uuid})
|
|
response = logged_in_api_client.get(url, {"download": "false"})
|
|
|
|
assert response.status_code == 200
|
|
assert "Content-Disposition" not in response
|
|
|
|
|
|
def test_listen_explicit_file(factories, logged_in_api_client, mocker, settings):
|
|
mocked_serve = mocker.spy(views, "handle_serve")
|
|
upload1 = factories["music.Upload"](
|
|
library__privacy_level="everyone", import_status="finished"
|
|
)
|
|
upload2 = factories["music.Upload"](
|
|
library__privacy_level="everyone", track=upload1.track, import_status="finished"
|
|
)
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": upload2.track.uuid})
|
|
response = logged_in_api_client.get(url, {"upload": upload2.uuid})
|
|
|
|
assert response.status_code == 200
|
|
mocked_serve.assert_called_once_with(
|
|
upload=upload2,
|
|
user=logged_in_api_client.user,
|
|
format=None,
|
|
max_bitrate=None,
|
|
proxy_media=settings.PROXY_MEDIA,
|
|
download=True,
|
|
wsgi_request=response.wsgi_request,
|
|
)
|
|
|
|
|
|
def test_stream(factories, logged_in_api_client, mocker, settings):
|
|
mocked_serve = mocker.spy(views, "handle_serve")
|
|
upload = factories["music.Upload"](
|
|
library__privacy_level="everyone", import_status="finished"
|
|
)
|
|
url = (
|
|
reverse("api:v1:stream-detail", kwargs={"uuid": str(upload.track.uuid)})
|
|
+ ".mp3"
|
|
)
|
|
assert url.endswith(f"/{upload.track.uuid}.mp3")
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
mocked_serve.assert_called_once_with(
|
|
upload=upload,
|
|
user=logged_in_api_client.user,
|
|
format="mp3",
|
|
download=False,
|
|
max_bitrate=None,
|
|
proxy_media=True,
|
|
wsgi_request=response.wsgi_request,
|
|
)
|
|
|
|
|
|
def test_listen_no_proxy(factories, logged_in_api_client, settings):
|
|
settings.PROXY_MEDIA = False
|
|
upload = factories["music.Upload"](
|
|
library__privacy_level="everyone", import_status="finished"
|
|
)
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": upload.track.uuid})
|
|
response = logged_in_api_client.get(url, {"upload": upload.uuid})
|
|
|
|
assert response.status_code == 302
|
|
assert response["Location"] == upload.audio_file.url
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"mimetype,format,expected",
|
|
[
|
|
# already in proper format
|
|
("audio/mpeg", "mp3", False),
|
|
# empty mimetype / format
|
|
(None, "mp3", False),
|
|
("audio/mpeg", None, False),
|
|
# unsupported format
|
|
("audio/mpeg", "noop", False),
|
|
# should transcode
|
|
("audio/mpeg", "ogg", True),
|
|
],
|
|
)
|
|
def test_should_transcode(mimetype, format, expected, factories):
|
|
upload = models.Upload(mimetype=mimetype)
|
|
assert views.should_transcode(upload, format) is expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"bitrate,max_bitrate,expected",
|
|
[
|
|
# already in acceptable bitrate
|
|
(192000, 320000, False),
|
|
# No max bitrate specified
|
|
(192000, None, False),
|
|
# requested max below available
|
|
(192000, 128000, True),
|
|
],
|
|
)
|
|
def test_should_transcode_bitrate(bitrate, max_bitrate, expected, factories):
|
|
upload = models.Upload(mimetype="audio/mpeg", bitrate=bitrate)
|
|
assert views.should_transcode(upload, "mp3", max_bitrate=max_bitrate) is expected
|
|
|
|
|
|
@pytest.mark.parametrize("value", [True, False])
|
|
def test_should_transcode_according_to_preference(value, preferences, factories):
|
|
upload = models.Upload(mimetype="audio/ogg")
|
|
expected = value
|
|
preferences["music__transcoding_enabled"] = value
|
|
|
|
assert views.should_transcode(upload, "mp3") is expected
|
|
|
|
|
|
def test_handle_serve_create_mp3_version(factories, now, mocker):
|
|
mocker.patch("funkwhale_api.music.utils.increment_downloads_count")
|
|
user = factories["users.User"]()
|
|
upload = factories["music.Upload"](bitrate=42)
|
|
response = views.handle_serve(
|
|
upload=upload, user=user, format="mp3", wsgi_request=None
|
|
)
|
|
expected_filename = upload.track.full_name + ".mp3"
|
|
version = upload.versions.latest("id")
|
|
|
|
assert version.mimetype == "audio/mpeg"
|
|
assert version.accessed_date == now
|
|
assert version.bitrate == upload.bitrate
|
|
assert version.audio_file_path.endswith(".mp3")
|
|
assert version.size == version.audio_file.size
|
|
assert magic.from_buffer(version.audio_file.read(), mime=True) == "audio/mpeg"
|
|
assert response["Content-Disposition"] == "attachment; filename*=UTF-8''{}".format(
|
|
urllib.parse.quote(expected_filename)
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_listen_transcode(factories, now, logged_in_api_client, mocker, settings):
|
|
upload = factories["music.Upload"](
|
|
import_status="finished", library__actor__user=logged_in_api_client.user
|
|
)
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": upload.track.uuid})
|
|
handle_serve = mocker.spy(views, "handle_serve")
|
|
response = logged_in_api_client.get(url, {"to": "mp3"})
|
|
|
|
assert response.status_code == 200
|
|
|
|
handle_serve.assert_called_once_with(
|
|
upload=upload,
|
|
user=logged_in_api_client.user,
|
|
format="mp3",
|
|
max_bitrate=None,
|
|
proxy_media=settings.PROXY_MEDIA,
|
|
download=True,
|
|
wsgi_request=response.wsgi_request,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"max_bitrate, expected",
|
|
[
|
|
("", None),
|
|
("", None),
|
|
("-1", None),
|
|
("128", 128000),
|
|
("320", 320000),
|
|
("460", 320000),
|
|
],
|
|
)
|
|
def test_listen_transcode_bitrate(
|
|
max_bitrate, expected, factories, now, logged_in_api_client, mocker, settings
|
|
):
|
|
upload = factories["music.Upload"](
|
|
import_status="finished", library__actor__user=logged_in_api_client.user
|
|
)
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": upload.track.uuid})
|
|
handle_serve = mocker.spy(views, "handle_serve")
|
|
response = logged_in_api_client.get(url, {"max_bitrate": max_bitrate})
|
|
|
|
assert response.status_code == 200
|
|
|
|
handle_serve.assert_called_once_with(
|
|
upload=upload,
|
|
user=logged_in_api_client.user,
|
|
format=None,
|
|
max_bitrate=expected,
|
|
proxy_media=settings.PROXY_MEDIA,
|
|
download=True,
|
|
wsgi_request=response.wsgi_request,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("serve_path", [("/host/music",), ("/app/music",)])
|
|
def test_listen_transcode_in_place(
|
|
serve_path, factories, now, logged_in_api_client, mocker, settings
|
|
):
|
|
settings.MUSIC_DIRECTORY_PATH = "/app/music"
|
|
settings.MUSIC_DIRECTORY_SERVE_PATH = serve_path
|
|
upload = factories["music.Upload"](
|
|
import_status="finished",
|
|
library__actor__user=logged_in_api_client.user,
|
|
audio_file=None,
|
|
source="file://" + os.path.join(DATA_DIR, "test.ogg"),
|
|
)
|
|
|
|
assert upload.get_audio_segment()
|
|
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": upload.track.uuid})
|
|
handle_serve = mocker.spy(views, "handle_serve")
|
|
response = logged_in_api_client.get(url, {"to": "mp3"})
|
|
|
|
assert response.status_code == 200
|
|
|
|
handle_serve.assert_called_once_with(
|
|
upload=upload,
|
|
user=logged_in_api_client.user,
|
|
format="mp3",
|
|
max_bitrate=None,
|
|
proxy_media=settings.PROXY_MEDIA,
|
|
download=True,
|
|
wsgi_request=response.wsgi_request,
|
|
)
|
|
|
|
|
|
def test_user_can_list_their_library(factories, logged_in_api_client):
|
|
actor = logged_in_api_client.user.create_actor()
|
|
library = factories["music.Library"](actor=actor)
|
|
factories["music.Library"](privacy_level="everyone")
|
|
|
|
url = reverse("api:v1:libraries-list")
|
|
response = logged_in_api_client.get(url, {"scope": "me"})
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["count"] == 1
|
|
assert response.data["results"][0]["uuid"] == str(library.uuid)
|
|
|
|
|
|
def test_user_can_retrieve_another_user_library(factories, logged_in_api_client):
|
|
library = factories["music.Library"]()
|
|
|
|
url = reverse("api:v1:libraries-detail", kwargs={"uuid": library.uuid})
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["uuid"] == str(library.uuid)
|
|
|
|
|
|
def test_user_can_list_public_libraries(factories, api_client, preferences):
|
|
preferences["common__api_authentication_required"] = False
|
|
library = factories["music.Library"](privacy_level="everyone")
|
|
factories["music.Library"](privacy_level="me")
|
|
|
|
url = reverse("api:v1:libraries-list")
|
|
response = api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["count"] == 1
|
|
assert response.data["results"][0]["uuid"] == str(library.uuid)
|
|
|
|
|
|
def test_library_list_excludes_channel_library(factories, logged_in_api_client):
|
|
actor = logged_in_api_client.user.create_actor()
|
|
factories["audio.Channel"](attributed_to=actor)
|
|
url = reverse("api:v1:libraries-list")
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["count"] == 0
|
|
|
|
|
|
def test_user_cannot_delete_other_actors_library(factories, logged_in_api_client):
|
|
logged_in_api_client.user.create_actor()
|
|
library = factories["music.Library"](privacy_level="everyone")
|
|
|
|
url = reverse("api:v1:libraries-detail", kwargs={"uuid": library.uuid})
|
|
response = logged_in_api_client.delete(url)
|
|
|
|
assert response.status_code == 405
|
|
|
|
|
|
def test_user_cannot_get_other_not_playable_uploads(factories, logged_in_api_client):
|
|
logged_in_api_client.user.create_actor()
|
|
upload = factories["music.Upload"](
|
|
import_status="finished", library__privacy_level="private"
|
|
)
|
|
|
|
url = reverse("api:v1:uploads-detail", kwargs={"uuid": upload.uuid})
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_user_can_get_retrieve_playable_uploads(factories, logged_in_api_client):
|
|
logged_in_api_client.user.create_actor()
|
|
upload = factories["music.Upload"](
|
|
import_status="finished", library__privacy_level="everyone"
|
|
)
|
|
|
|
url = reverse("api:v1:uploads-detail", kwargs={"uuid": upload.uuid})
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["uuid"] == str(upload.uuid)
|
|
|
|
|
|
def test_user_cannot_delete_other_actors_uploads(factories, logged_in_api_client):
|
|
logged_in_api_client.user.create_actor()
|
|
upload = factories["music.Upload"]()
|
|
|
|
url = reverse("api:v1:uploads-detail", kwargs={"uuid": upload.uuid})
|
|
response = logged_in_api_client.delete(url)
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_upload_delete_via_api_triggers_outbox(factories, mocker):
|
|
dispatch = mocker.patch("funkwhale_api.federation.routes.outbox.dispatch")
|
|
upload = factories["music.Upload"]()
|
|
view = views.UploadViewSet()
|
|
view.perform_destroy(upload)
|
|
dispatch.assert_called_once_with(
|
|
{"type": "Delete", "object": {"type": "Audio"}}, context={"uploads": [upload]}
|
|
)
|
|
|
|
|
|
def test_user_cannot_list_other_actors_uploads(factories, logged_in_api_client):
|
|
logged_in_api_client.user.create_actor()
|
|
factories["music.Upload"]()
|
|
|
|
url = reverse("api:v1:uploads-list")
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["count"] == 0
|
|
|
|
|
|
def test_user_can_create_upload(logged_in_api_client, factories, mocker, audio_file):
|
|
library = factories["music.Library"](actor__user=logged_in_api_client.user)
|
|
url = reverse("api:v1:uploads-list")
|
|
m = mocker.patch("funkwhale_api.common.utils.on_commit")
|
|
|
|
response = logged_in_api_client.post(
|
|
url,
|
|
{
|
|
"audio_file": audio_file,
|
|
"source": "upload://test",
|
|
"import_reference": "test",
|
|
"library": library.uuid,
|
|
"import_metadata": '{"title": "foo"}',
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
|
|
upload = library.uploads.latest("id")
|
|
|
|
audio_file.seek(0)
|
|
assert upload.audio_file.read() == audio_file.read()
|
|
assert upload.source == "upload://test"
|
|
assert upload.import_reference == "test"
|
|
assert upload.import_status == "pending"
|
|
assert upload.import_metadata == {"title": "foo"}
|
|
assert upload.track is None
|
|
m.assert_called_once_with(tasks.process_upload.delay, upload_id=upload.pk)
|
|
|
|
|
|
def test_user_can_create_upload_in_channel(
|
|
logged_in_api_client, factories, mocker, audio_file
|
|
):
|
|
actor = logged_in_api_client.user.create_actor()
|
|
channel = factories["audio.Channel"](attributed_to=actor)
|
|
url = reverse("api:v1:uploads-list")
|
|
m = mocker.patch("funkwhale_api.common.utils.on_commit")
|
|
album = factories["music.Album"](artist_credit__artist=channel.artist)
|
|
response = logged_in_api_client.post(
|
|
url,
|
|
{
|
|
"audio_file": audio_file,
|
|
"source": "upload://test",
|
|
"import_reference": "test",
|
|
"channel": channel.uuid,
|
|
"import_metadata": '{"title": "foo", "album": ' + str(album.pk) + "}",
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
|
|
upload = channel.library.uploads.latest("id")
|
|
|
|
assert upload.source == "upload://test"
|
|
assert upload.import_reference == "test"
|
|
assert upload.import_status == "pending"
|
|
assert upload.import_metadata == {"title": "foo", "album": album.pk}
|
|
assert upload.track is None
|
|
m.assert_called_once_with(tasks.process_upload.delay, upload_id=upload.pk)
|
|
|
|
|
|
def test_user_can_create_draft_upload(
|
|
logged_in_api_client, factories, mocker, audio_file
|
|
):
|
|
library = factories["music.Library"](actor__user=logged_in_api_client.user)
|
|
url = reverse("api:v1:uploads-list")
|
|
m = mocker.patch("funkwhale_api.common.utils.on_commit")
|
|
|
|
response = logged_in_api_client.post(
|
|
url,
|
|
{
|
|
"audio_file": audio_file,
|
|
"source": "upload://test",
|
|
"import_reference": "test",
|
|
"import_status": "draft",
|
|
"library": library.uuid,
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
|
|
upload = library.uploads.latest("id")
|
|
|
|
audio_file.seek(0)
|
|
assert upload.audio_file.read() == audio_file.read()
|
|
assert upload.source == "upload://test"
|
|
assert upload.import_reference == "test"
|
|
assert upload.import_status == "draft"
|
|
assert upload.mimetype == "audio/ogg"
|
|
assert upload.track is None
|
|
m.assert_not_called()
|
|
|
|
|
|
def test_user_can_patch_draft_upload(
|
|
logged_in_api_client, factories, mocker, audio_file
|
|
):
|
|
actor = logged_in_api_client.user.create_actor()
|
|
library = factories["music.Library"](actor=actor)
|
|
upload = factories["music.Upload"](library__actor=actor, import_status="draft")
|
|
url = reverse("api:v1:uploads-detail", kwargs={"uuid": upload.uuid})
|
|
m = mocker.patch("funkwhale_api.common.utils.on_commit")
|
|
|
|
response = logged_in_api_client.patch(
|
|
url,
|
|
{
|
|
"audio_file": audio_file,
|
|
"source": "upload://test",
|
|
"import_reference": "test",
|
|
"library": library.uuid,
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
|
|
upload.refresh_from_db()
|
|
|
|
audio_file.seek(0)
|
|
assert upload.audio_file.read() == audio_file.read()
|
|
assert upload.source == "upload://test"
|
|
assert upload.import_reference == "test"
|
|
assert upload.import_status == "draft"
|
|
assert upload.library == library
|
|
m.assert_not_called()
|
|
|
|
|
|
@pytest.mark.parametrize("import_status", ["pending", "errored", "skipped", "finished"])
|
|
def test_user_cannot_patch_non_draft_upload(
|
|
import_status, logged_in_api_client, factories
|
|
):
|
|
actor = logged_in_api_client.user.create_actor()
|
|
upload = factories["music.Upload"](
|
|
library__actor=actor, import_status=import_status
|
|
)
|
|
url = reverse("api:v1:uploads-detail", kwargs={"uuid": upload.uuid})
|
|
response = logged_in_api_client.patch(url, {"import_reference": "test"})
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_user_can_patch_draft_upload_status_triggers_processing(
|
|
logged_in_api_client, factories, mocker
|
|
):
|
|
actor = logged_in_api_client.user.create_actor()
|
|
upload = factories["music.Upload"](library__actor=actor, import_status="draft")
|
|
url = reverse("api:v1:uploads-detail", kwargs={"uuid": upload.uuid})
|
|
m = mocker.patch("funkwhale_api.common.utils.on_commit")
|
|
|
|
response = logged_in_api_client.patch(url, {"import_status": "pending"})
|
|
|
|
upload.refresh_from_db()
|
|
|
|
assert response.status_code == 200
|
|
assert upload.import_status == "pending"
|
|
m.assert_called_once_with(tasks.process_upload.delay, upload_id=upload.pk)
|
|
|
|
|
|
@pytest.mark.parametrize("entity", ["artist", "album", "track"])
|
|
def test_can_get_libraries_for_music_entities(
|
|
factories, api_client, entity, preferences
|
|
):
|
|
preferences["common__api_authentication_required"] = False
|
|
upload = factories["music.Upload"](playable=True)
|
|
# another private library that should not appear
|
|
factories["music.Upload"](
|
|
import_status="finished", library__privacy_level="me", track=upload.track
|
|
).library
|
|
library = upload.library
|
|
setattr(library, "_uploads_count", 1)
|
|
data = {
|
|
"artist": upload.track.artist_credit.all()[0].artist,
|
|
"album": upload.track.album,
|
|
"track": upload.track,
|
|
}
|
|
# libraries in channel should be missing excluded
|
|
channel = factories["audio.Channel"](
|
|
artist=upload.track.artist_credit.all()[0].artist
|
|
)
|
|
factories["music.Upload"](
|
|
library=channel.library, playable=True, track=upload.track
|
|
)
|
|
|
|
url = reverse(f"api:v1:{entity}s-libraries", kwargs={"pk": data[entity].pk})
|
|
|
|
response = api_client.get(url)
|
|
expected = federation_api_serializers.LibrarySerializer(library).data
|
|
|
|
assert response.status_code == 200
|
|
assert response.data == {
|
|
"count": 1,
|
|
"next": None,
|
|
"previous": None,
|
|
"results": [expected],
|
|
}
|
|
|
|
|
|
def test_list_licenses(api_client, preferences, mocker):
|
|
licenses.load(licenses.LICENSES)
|
|
load = mocker.spy(licenses, "load")
|
|
preferences["common__api_authentication_required"] = False
|
|
|
|
expected = [
|
|
serializers.LicenseSerializer(l.conf).data
|
|
for l in models.License.objects.order_by("code")
|
|
]
|
|
url = reverse("api:v1:licenses-list")
|
|
|
|
response = api_client.get(url)
|
|
|
|
assert response.data["results"] == expected
|
|
load.assert_called_once_with(licenses.LICENSES)
|
|
|
|
|
|
def test_detail_license(api_client, preferences):
|
|
preferences["common__api_authentication_required"] = False
|
|
id = "cc-by-sa-4.0"
|
|
expected = serializers.LicenseSerializer(licenses.LICENSES_BY_ID[id]).data
|
|
|
|
url = reverse("api:v1:licenses-detail", kwargs={"pk": id})
|
|
|
|
response = api_client.get(url)
|
|
|
|
assert response.data == expected
|
|
|
|
|
|
def test_oembed_track(factories, no_api_auth, api_client, settings):
|
|
settings.FUNKWHALE_URL = "http://test"
|
|
settings.FUNKWHALE_EMBED_URL = "http://embed"
|
|
track = factories["music.Track"](album__with_cover=True)
|
|
url = reverse("api:v1:oembed")
|
|
track_url = f"https://test.com/library/tracks/{track.pk}"
|
|
iframe_src = f"http://embed?type=track&id={track.pk}"
|
|
expected = {
|
|
"version": "1.0",
|
|
"type": "rich",
|
|
"provider_name": settings.APP_NAME,
|
|
"provider_url": settings.FUNKWHALE_URL,
|
|
"height": 150,
|
|
"width": 600,
|
|
"title": f"{track.title} by {track.artist_credit.all()[0].artist.name}",
|
|
"description": track.full_name,
|
|
"thumbnail_url": federation_utils.full_url(
|
|
track.album.attachment_cover.file.crop["200x200"].url
|
|
),
|
|
"thumbnail_height": 200,
|
|
"thumbnail_width": 200,
|
|
"html": '<iframe width="600" height="150" scrolling="no" frameborder="no" src="{}"></iframe>'.format(
|
|
iframe_src
|
|
),
|
|
"author_name": track.artist_credit.all()[0].artist.name,
|
|
"author_url": federation_utils.full_url(
|
|
utils.spa_reverse(
|
|
"library_artist", kwargs={"pk": track.artist_credit.all()[0].artist.pk}
|
|
)
|
|
),
|
|
}
|
|
|
|
response = api_client.get(url, {"url": track_url, "format": "json"})
|
|
|
|
assert response.data == expected
|
|
|
|
|
|
def test_oembed_album(factories, no_api_auth, api_client, settings):
|
|
settings.FUNKWHALE_URL = "http://test"
|
|
settings.FUNKWHALE_EMBED_URL = "http://embed"
|
|
track = factories["music.Track"](album__with_cover=True)
|
|
album = track.album
|
|
url = reverse("api:v1:oembed")
|
|
album_url = f"https://test.com/library/albums/{album.pk}"
|
|
iframe_src = f"http://embed?type=album&id={album.pk}"
|
|
expected = {
|
|
"version": "1.0",
|
|
"type": "rich",
|
|
"provider_name": settings.APP_NAME,
|
|
"provider_url": settings.FUNKWHALE_URL,
|
|
"height": 400,
|
|
"width": 600,
|
|
"title": f"{album.title} by {album.artist_credit.all()[0].artist.name}",
|
|
"description": f"{album.title} by {album.artist_credit.all()[0].artist.name}",
|
|
"thumbnail_url": federation_utils.full_url(
|
|
album.attachment_cover.file.crop["200x200"].url
|
|
),
|
|
"thumbnail_height": 200,
|
|
"thumbnail_width": 200,
|
|
"html": '<iframe width="600" height="400" scrolling="no" frameborder="no" src="{}"></iframe>'.format(
|
|
iframe_src
|
|
),
|
|
"author_name": album.artist_credit.all()[0].artist.name,
|
|
"author_url": federation_utils.full_url(
|
|
utils.spa_reverse(
|
|
"library_artist", kwargs={"pk": album.artist_credit.all()[0].artist.pk}
|
|
)
|
|
),
|
|
}
|
|
|
|
response = api_client.get(url, {"url": album_url, "format": "json"})
|
|
|
|
assert response.data == expected
|
|
|
|
|
|
def test_oembed_artist(factories, no_api_auth, api_client, settings):
|
|
settings.FUNKWHALE_URL = "http://test"
|
|
settings.FUNKWHALE_EMBED_URL = "http://embed"
|
|
track = factories["music.Track"](album__with_cover=True)
|
|
album = track.album
|
|
artist = track.artist_credit.all()[0].artist
|
|
url = reverse("api:v1:oembed")
|
|
artist_url = f"https://test.com/library/artists/{artist.pk}"
|
|
iframe_src = f"http://embed?type=artist&id={artist.pk}"
|
|
expected = {
|
|
"version": "1.0",
|
|
"type": "rich",
|
|
"provider_name": settings.APP_NAME,
|
|
"provider_url": settings.FUNKWHALE_URL,
|
|
"height": 400,
|
|
"width": 600,
|
|
"title": artist.name,
|
|
"description": artist.name,
|
|
"thumbnail_url": federation_utils.full_url(
|
|
album.attachment_cover.file.crop["200x200"].url
|
|
),
|
|
"thumbnail_height": 200,
|
|
"thumbnail_width": 200,
|
|
"html": '<iframe width="600" height="400" scrolling="no" frameborder="no" src="{}"></iframe>'.format(
|
|
iframe_src
|
|
),
|
|
"author_name": artist.name,
|
|
"author_url": federation_utils.full_url(
|
|
utils.spa_reverse("library_artist", kwargs={"pk": artist.pk})
|
|
),
|
|
}
|
|
|
|
response = api_client.get(url, {"url": artist_url, "format": "json"})
|
|
|
|
assert response.data == expected
|
|
|
|
|
|
def test_oembed_playlist(factories, no_api_auth, api_client, settings):
|
|
settings.FUNKWHALE_URL = "http://test"
|
|
settings.FUNKWHALE_EMBED_URL = "http://embed"
|
|
playlist = factories["playlists.Playlist"](privacy_level="everyone")
|
|
track = factories["music.Upload"](
|
|
playable=True, track__album__with_cover=True
|
|
).track
|
|
playlist.insert_many([track])
|
|
url = reverse("api:v1:oembed")
|
|
playlist_url = f"https://test.com/library/playlists/{playlist.pk}"
|
|
iframe_src = f"http://embed?type=playlist&id={playlist.pk}"
|
|
expected = {
|
|
"version": "1.0",
|
|
"type": "rich",
|
|
"provider_name": settings.APP_NAME,
|
|
"provider_url": settings.FUNKWHALE_URL,
|
|
"height": 400,
|
|
"width": 600,
|
|
"title": playlist.name,
|
|
"description": playlist.name,
|
|
"thumbnail_url": federation_utils.full_url(
|
|
track.album.attachment_cover.file.crop["200x200"].url
|
|
),
|
|
"thumbnail_height": 200,
|
|
"thumbnail_width": 200,
|
|
"html": '<iframe width="600" height="400" scrolling="no" frameborder="no" src="{}"></iframe>'.format(
|
|
iframe_src
|
|
),
|
|
"author_name": playlist.name,
|
|
"author_url": federation_utils.full_url(
|
|
utils.spa_reverse("library_playlist", kwargs={"pk": playlist.pk})
|
|
),
|
|
}
|
|
|
|
response = api_client.get(url, {"url": playlist_url, "format": "json"})
|
|
|
|
assert response.data == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"factory_name, url_name",
|
|
[
|
|
("music.Artist", "api:v1:artists-detail"),
|
|
("music.Album", "api:v1:albums-detail"),
|
|
("music.Track", "api:v1:tracks-detail"),
|
|
],
|
|
)
|
|
def test_refresh_remote_entity_when_param_is_true(
|
|
factories,
|
|
factory_name,
|
|
url_name,
|
|
mocker,
|
|
logged_in_api_client,
|
|
queryset_equal_queries,
|
|
):
|
|
obj = factories[factory_name](mbid=None)
|
|
|
|
assert obj.is_local is False
|
|
|
|
new_mbid = uuid.uuid4()
|
|
|
|
def fake_refetch(obj, queryset):
|
|
obj.mbid = new_mbid
|
|
return obj
|
|
|
|
refetch_obj = mocker.patch.object(views, "refetch_obj", side_effect=fake_refetch)
|
|
url = reverse(url_name, kwargs={"pk": obj.pk})
|
|
response = logged_in_api_client.get(url, {"refresh": "true"})
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["mbid"] == str(new_mbid)
|
|
assert refetch_obj.call_count == 1
|
|
assert refetch_obj.call_args[0][0] == obj
|
|
|
|
|
|
@pytest.mark.parametrize("param", ["false", "0", ""])
|
|
def test_refresh_remote_entity_no_param(
|
|
factories, param, mocker, logged_in_api_client, service_actor
|
|
):
|
|
obj = factories["music.Artist"](mbid=None)
|
|
|
|
assert obj.is_local is False
|
|
|
|
fetch_task = mocker.patch.object(federation_tasks, "fetch")
|
|
url = reverse("api:v1:artists-detail", kwargs={"pk": obj.pk})
|
|
response = logged_in_api_client.get(url, {"refresh": param})
|
|
|
|
assert response.status_code == 200
|
|
fetch_task.assert_not_called()
|
|
assert service_actor.fetches.count() == 0
|
|
|
|
|
|
def test_refetch_obj_not_local(mocker, factories, service_actor):
|
|
obj = factories["music.Artist"](local=True)
|
|
fetch_task = mocker.patch.object(federation_tasks, "fetch")
|
|
assert views.refetch_obj(obj, obj.__class__.objects.all()) == obj
|
|
fetch_task.assert_not_called()
|
|
assert service_actor.fetches.count() == 0
|
|
|
|
|
|
def test_refetch_obj_last_fetch_date_too_close(
|
|
mocker, factories, settings, service_actor
|
|
):
|
|
settings.FEDERATION_OBJECT_FETCH_DELAY = 300
|
|
obj = factories["music.Artist"]()
|
|
factories["federation.Fetch"](
|
|
object=obj,
|
|
creation_date=timezone.now()
|
|
- datetime.timedelta(minutes=settings.FEDERATION_OBJECT_FETCH_DELAY - 1),
|
|
)
|
|
fetch_task = mocker.patch.object(federation_tasks, "fetch")
|
|
assert views.refetch_obj(obj, obj.__class__.objects.all()) == obj
|
|
fetch_task.assert_not_called()
|
|
assert service_actor.fetches.count() == 0
|
|
|
|
|
|
def test_refetch_obj(mocker, factories, settings, service_actor):
|
|
settings.FEDERATION_OBJECT_FETCH_DELAY = 300
|
|
obj = factories["music.Artist"]()
|
|
factories["federation.Fetch"](
|
|
object=obj,
|
|
creation_date=timezone.now()
|
|
- datetime.timedelta(minutes=settings.FEDERATION_OBJECT_FETCH_DELAY + 1),
|
|
)
|
|
fetch_task = mocker.patch.object(federation_tasks, "fetch")
|
|
views.refetch_obj(obj, obj.__class__.objects.all())
|
|
fetch = obj.fetches.filter(actor=service_actor).order_by("-creation_date").first()
|
|
fetch_task.assert_called_once_with(fetch_id=fetch.pk)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"params, expected",
|
|
[({}, 0), ({"include_channels": "false"}, 0), ({"include_channels": "true"}, 1)],
|
|
)
|
|
def test_artist_list_exclude_channels(
|
|
params, expected, factories, logged_in_api_client
|
|
):
|
|
factories["audio.Channel"]()
|
|
|
|
url = reverse("api:v1:artists-list")
|
|
response = logged_in_api_client.get(url, params)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["count"] == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"params, expected",
|
|
[({}, 0), ({"include_channels": "false"}, 0), ({"include_channels": "true"}, 1)],
|
|
)
|
|
def test_album_list_exclude_channels(params, expected, factories, logged_in_api_client):
|
|
channel_artist = factories["audio.Channel"]().artist
|
|
factories["music.Album"](artist_credit__artist=channel_artist)
|
|
|
|
url = reverse("api:v1:albums-list")
|
|
response = logged_in_api_client.get(url, params)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["count"] == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"params, expected",
|
|
[({}, 0), ({"include_channels": "false"}, 0), ({"include_channels": "true"}, 1)],
|
|
)
|
|
def test_track_list_exclude_channels(params, expected, factories, logged_in_api_client):
|
|
channel_artist = factories["audio.Channel"]().artist
|
|
factories["music.Track"](artist_credit__artist=channel_artist)
|
|
|
|
url = reverse("api:v1:tracks-list")
|
|
response = logged_in_api_client.get(url, params)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["count"] == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"media_url, input, expected",
|
|
[
|
|
("https://domain/media/", "https://domain/media/file.mp3", "/media/file.mp3"),
|
|
(
|
|
"https://domain/media/",
|
|
"https://otherdomain/media/file.mp3",
|
|
"https://otherdomain/media/file.mp3",
|
|
),
|
|
("https://domain/media/", "/media/file.mp3", "/media/file.mp3"),
|
|
],
|
|
)
|
|
def test_strip_absolute_media_url(media_url, input, expected, settings):
|
|
settings.MEDIA_URL = media_url
|
|
assert views.strip_absolute_media_url(input) == expected
|
|
|
|
|
|
def test_get_upload_audio_metadata(logged_in_api_client, factories):
|
|
actor = logged_in_api_client.user.create_actor()
|
|
upload = factories["music.Upload"](library__actor=actor)
|
|
metadata = tasks.metadata.Metadata(upload.get_audio_file())
|
|
serializer = tasks.metadata.TrackMetadataSerializer(data=metadata)
|
|
url = reverse("api:v1:uploads-audio-file-metadata", kwargs={"uuid": upload.uuid})
|
|
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert serializer.is_valid(raise_exception=True) is True
|
|
assert response.data == serializer.validated_data
|
|
|
|
|
|
def test_search_get(logged_in_api_client, factories):
|
|
artist = factories["music.Artist"](name="Foo Fighters")
|
|
album = factories["music.Album"](title="Foo Bar")
|
|
track = factories["music.Track"](title="Foo Baz")
|
|
tag = factories["tags.Tag"](name="Foo")
|
|
|
|
factories["music.Track"]()
|
|
factories["tags.Tag"]()
|
|
|
|
url = reverse("api:v1:search")
|
|
expected = serializers.SearchResultSerializer(
|
|
{
|
|
"artists": [artist],
|
|
"albums": [album],
|
|
"tracks": [track],
|
|
"tags": [tag],
|
|
}
|
|
).data
|
|
|
|
response = logged_in_api_client.get(url, {"q": "foo"})
|
|
|
|
assert response.status_code == 200
|
|
assert response.data == expected
|
|
|
|
|
|
def test_search_get_fts_advanced(logged_in_api_client, factories):
|
|
artist1 = factories["music.Artist"](name="Foo Bighters")
|
|
artist2 = factories["music.Artist"](name="Bar Fighter")
|
|
factories["music.Artist"]()
|
|
|
|
url = reverse("api:v1:search")
|
|
expected = {
|
|
"artists": serializers.ArtistWithAlbumsSerializer(
|
|
[artist2, artist1], many=True
|
|
).data,
|
|
"albums": [],
|
|
"tracks": [],
|
|
"tags": [],
|
|
}
|
|
response = logged_in_api_client.get(url, {"q": '"foo | bar"'})
|
|
|
|
assert response.status_code == 200
|
|
assert response.data == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"route, factory_name",
|
|
[
|
|
("api:v1:artists-detail", "music.Artist"),
|
|
("api:v1:albums-detail", "music.Album"),
|
|
("api:v1:tracks-detail", "music.Track"),
|
|
],
|
|
)
|
|
def test_detail_includes_description_key(
|
|
route, factory_name, logged_in_api_client, factories
|
|
):
|
|
obj = factories[factory_name]()
|
|
url = reverse(route, kwargs={"pk": obj.pk})
|
|
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.data["description"] is None
|
|
|
|
|
|
def test_channel_owner_can_create_album(factories, logged_in_api_client):
|
|
actor = logged_in_api_client.user.create_actor()
|
|
channel = factories["audio.Channel"](attributed_to=actor, artist__with_cover=True)
|
|
attachment = factories["common.Attachment"](actor=actor)
|
|
ac = factories["music.ArtistCredit"](artist=channel.artist)
|
|
|
|
url = reverse("api:v1:albums-list")
|
|
|
|
data = {
|
|
"artist_credit": ac.pk,
|
|
"cover": attachment.uuid,
|
|
"title": "Hello world",
|
|
"release_date": "2019-01-02",
|
|
"tags": ["Hello", "World"],
|
|
"description": {"content_type": "text/markdown", "text": "hello world"},
|
|
}
|
|
|
|
response = logged_in_api_client.post(url, data, format="json")
|
|
|
|
assert response.status_code == 204
|
|
|
|
album = channel.artist.artist_credit.albums().get(title=data["title"])
|
|
assert (
|
|
response.data
|
|
== serializers.AlbumSerializer(album, context={"description": True}).data
|
|
)
|
|
assert album.attachment_cover == attachment
|
|
assert album.attributed_to == actor
|
|
assert album.release_date == datetime.date(2019, 1, 2)
|
|
assert album.get_tags() == ["Hello", "World"]
|
|
assert album.description.content_type == "text/markdown"
|
|
assert album.description.text == "hello world"
|
|
|
|
|
|
def test_channel_owner_can_delete_album(factories, logged_in_api_client, mocker):
|
|
dispatch = mocker.patch("funkwhale_api.federation.routes.outbox.dispatch")
|
|
actor = logged_in_api_client.user.create_actor()
|
|
channel = factories["audio.Channel"](attributed_to=actor)
|
|
album = factories["music.Album"](artist_credit__artist=channel.artist)
|
|
url = reverse("api:v1:albums-detail", kwargs={"pk": album.pk})
|
|
|
|
response = logged_in_api_client.delete(url)
|
|
|
|
assert response.status_code == 204
|
|
|
|
dispatch.assert_called_once_with(
|
|
{"type": "Delete", "object": {"type": "Album"}}, context={"album": album}
|
|
)
|
|
with pytest.raises(album.DoesNotExist):
|
|
album.refresh_from_db()
|
|
|
|
|
|
def test_other_user_cannot_create_album(factories, logged_in_api_client):
|
|
actor = logged_in_api_client.user.create_actor()
|
|
channel = factories["audio.Channel"]()
|
|
attachment = factories["common.Attachment"](actor=actor)
|
|
url = reverse("api:v1:albums-list")
|
|
|
|
data = {
|
|
"artist": channel.artist.pk,
|
|
"cover": attachment.uuid,
|
|
"title": "Hello world",
|
|
"release_date": "2019-01-02",
|
|
"tags": ["Hello", "World"],
|
|
"description": {"content_type": "text/markdown", "text": "hello world"},
|
|
}
|
|
|
|
response = logged_in_api_client.post(url, data, format="json")
|
|
|
|
assert response.status_code == 400
|
|
|
|
|
|
def test_other_user_cannot_delete_album(factories, logged_in_api_client):
|
|
logged_in_api_client.user.create_actor()
|
|
channel = factories["audio.Channel"]()
|
|
album = factories["music.Album"](artist_credit__artist=channel.artist)
|
|
url = reverse("api:v1:albums-detail", kwargs={"pk": album.pk})
|
|
|
|
response = logged_in_api_client.delete(url)
|
|
|
|
assert response.status_code == 404
|
|
album.refresh_from_db()
|
|
|
|
|
|
def test_channel_owner_can_delete_track(factories, logged_in_api_client, mocker):
|
|
dispatch = mocker.patch("funkwhale_api.federation.routes.outbox.dispatch")
|
|
actor = logged_in_api_client.user.create_actor()
|
|
channel = factories["audio.Channel"](attributed_to=actor)
|
|
track = factories["music.Track"](artist_credit__artist=channel.artist)
|
|
upload1 = factories["music.Upload"](track=track)
|
|
upload2 = factories["music.Upload"](track=track)
|
|
url = reverse("api:v1:tracks-detail", kwargs={"pk": track.pk})
|
|
|
|
response = logged_in_api_client.delete(url)
|
|
|
|
assert response.status_code == 204
|
|
dispatch.assert_called_once_with(
|
|
{"type": "Delete", "object": {"type": "Audio"}},
|
|
context={"uploads": [upload1, upload2]},
|
|
)
|
|
with pytest.raises(track.DoesNotExist):
|
|
track.refresh_from_db()
|
|
|
|
|
|
def test_other_user_cannot_delete_track(factories, logged_in_api_client):
|
|
logged_in_api_client.user.create_actor()
|
|
channel = factories["audio.Channel"]()
|
|
track = factories["music.Track"](artist_credit__artist=channel.artist)
|
|
url = reverse("api:v1:tracks-detail", kwargs={"pk": track.pk})
|
|
|
|
response = logged_in_api_client.delete(url)
|
|
|
|
assert response.status_code == 404
|
|
track.refresh_from_db()
|
|
|
|
|
|
def test_listen_to_track_with_scoped_token(factories, api_client):
|
|
user = factories["users.User"]()
|
|
token = users_authentication.generate_scoped_token(
|
|
user_id=user.pk, user_secret=user.secret_key, scopes=["read:libraries"]
|
|
)
|
|
upload = factories["music.Upload"](playable=True)
|
|
url = reverse("api:v1:listen-detail", kwargs={"uuid": upload.track.uuid})
|
|
response = api_client.get(url, {"token": token})
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_fs_import_get(factories, superuser_api_client, mocker, settings):
|
|
browse_dir = mocker.patch.object(
|
|
views.utils, "browse_dir", return_value={"hello": "world"}
|
|
)
|
|
url = reverse("api:v1:libraries-fs-import")
|
|
|
|
expected = {
|
|
"root": settings.MUSIC_DIRECTORY_PATH,
|
|
"path": "",
|
|
"content": {"hello": "world"},
|
|
"import": None,
|
|
}
|
|
response = superuser_api_client.get(url, {"path": ""})
|
|
|
|
assert response.status_code == 200
|
|
assert response.data == expected
|
|
browse_dir.assert_called_once_with(expected["root"], expected["path"])
|
|
|
|
|
|
def test_fs_import_post(
|
|
factories, superuser_api_client, cache, mocker, settings, tmpdir
|
|
):
|
|
actor = superuser_api_client.user.create_actor()
|
|
library = factories["music.Library"](actor=actor)
|
|
settings.MUSIC_DIRECTORY_PATH = tmpdir
|
|
(pathlib.Path(tmpdir) / "test").mkdir()
|
|
fs_import = mocker.patch(
|
|
"funkwhale_api.music.tasks.fs_import.delay", return_value={"hello": "world"}
|
|
)
|
|
url = reverse("api:v1:libraries-fs-import")
|
|
|
|
response = superuser_api_client.post(
|
|
url, {"path": "test", "library": library.uuid, "import_reference": "test"}
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
fs_import.assert_called_once_with(
|
|
path="test",
|
|
library_id=library.pk,
|
|
import_reference="test",
|
|
prune=True,
|
|
outbox=False,
|
|
broadcast=False,
|
|
batch_size=1000,
|
|
verbosity=1,
|
|
)
|
|
assert cache.get("fs-import:status") == "pending"
|
|
|
|
|
|
def test_fs_import_post_already_running(
|
|
factories, superuser_api_client, cache, mocker, settings, tmpdir
|
|
):
|
|
url = reverse("api:v1:libraries-fs-import")
|
|
cache.set("fs-import:status", "pending")
|
|
|
|
response = superuser_api_client.post(url, {"path": "test"})
|
|
|
|
assert response.status_code == 400
|
|
|
|
|
|
def test_fs_import_cancel_already_running(
|
|
factories, superuser_api_client, cache, mocker, settings, tmpdir
|
|
):
|
|
url = reverse("api:v1:libraries-fs-import")
|
|
cache.set("fs-import:status", "pending")
|
|
|
|
response = superuser_api_client.delete(url)
|
|
|
|
assert response.status_code == 204
|
|
assert cache.get("fs-import:status") == "canceled"
|
|
|
|
|
|
def test_album_create_artist_credit(factories, logged_in_api_client):
|
|
artist = factories["music.Artist"]()
|
|
factories["audio.Channel"](artist=artist)
|
|
url = reverse("api:v1:albums-list")
|
|
response = logged_in_api_client.post(
|
|
url, {"artist": artist.pk, "title": "super album"}, format="json"
|
|
)
|
|
assert response.status_code == 204
|
|
|
|
|
|
def test_can_patch_upload_list(factories, logged_in_api_client):
|
|
url = reverse("api:v1:uploads-bulk-update")
|
|
actor = logged_in_api_client.user.create_actor()
|
|
upload = factories["music.Upload"](library__actor=actor)
|
|
upload2 = factories["music.Upload"](library__actor=actor)
|
|
factories["music.Library"](actor=actor, privacy_level="everyone")
|
|
|
|
response = logged_in_api_client.patch(
|
|
url,
|
|
[
|
|
{"uuid": upload.uuid, "privacy_level": "everyone"},
|
|
{"uuid": upload2.uuid, "privacy_level": "everyone"},
|
|
],
|
|
format="json",
|
|
)
|
|
upload.refresh_from_db()
|
|
upload2.refresh_from_db()
|
|
|
|
assert response.status_code == 200
|
|
assert upload.library.privacy_level == "everyone"
|