Merge branch 'import-trust-source' into 'develop'
Import trust source Closes #361 and #367 See merge request funkwhale/funkwhale!414
This commit is contained in:
commit
6af73c4d39
|
@ -4,7 +4,6 @@ import urllib.parse
|
|||
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
from django.core.paginator import Paginator
|
||||
from django.db.models import F, Q
|
||||
from rest_framework import serializers
|
||||
|
||||
from funkwhale_api.common import utils as funkwhale_utils
|
||||
|
@ -21,6 +20,31 @@ AP_CONTEXT = [
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LinkSerializer(serializers.Serializer):
|
||||
type = serializers.ChoiceField(choices=["Link"])
|
||||
href = serializers.URLField(max_length=500)
|
||||
mediaType = serializers.CharField()
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.allowed_mimetypes = kwargs.pop("allowed_mimetypes", [])
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def validate_mediaType(self, v):
|
||||
if not self.allowed_mimetypes:
|
||||
# no restrictions
|
||||
return v
|
||||
for mt in self.allowed_mimetypes:
|
||||
if mt.endswith("/*"):
|
||||
if v.startswith(mt.replace("*", "")):
|
||||
return v
|
||||
else:
|
||||
if v == mt:
|
||||
return v
|
||||
raise serializers.ValidationError(
|
||||
"Invalid mimetype {}. Allowed: {}".format(v, self.allowed_mimetypes)
|
||||
)
|
||||
|
||||
|
||||
class ActorSerializer(serializers.Serializer):
|
||||
id = serializers.URLField(max_length=500)
|
||||
outbox = serializers.URLField(max_length=500)
|
||||
|
@ -626,32 +650,8 @@ class MusicEntitySerializer(serializers.Serializer):
|
|||
musicbrainzId = serializers.UUIDField(allow_null=True, required=False)
|
||||
name = serializers.CharField(max_length=1000)
|
||||
|
||||
def create(self, validated_data):
|
||||
mbid = validated_data.get("musicbrainzId")
|
||||
candidates = self.model.objects.filter(
|
||||
Q(mbid=mbid) | Q(fid=validated_data["id"])
|
||||
).order_by(F("fid").desc(nulls_last=True))
|
||||
|
||||
existing = candidates.first()
|
||||
if existing:
|
||||
return existing
|
||||
|
||||
# nothing matching in our database, let's create a new object
|
||||
return self.model.objects.create(**self.get_create_data(validated_data))
|
||||
|
||||
def get_create_data(self, validated_data):
|
||||
return {
|
||||
"mbid": validated_data.get("musicbrainzId"),
|
||||
"fid": validated_data["id"],
|
||||
"name": validated_data["name"],
|
||||
"creation_date": validated_data["published"],
|
||||
"from_activity": self.context.get("activity"),
|
||||
}
|
||||
|
||||
|
||||
class ArtistSerializer(MusicEntitySerializer):
|
||||
model = music_models.Artist
|
||||
|
||||
def to_representation(self, instance):
|
||||
d = {
|
||||
"type": "Artist",
|
||||
|
@ -667,9 +667,11 @@ class ArtistSerializer(MusicEntitySerializer):
|
|||
|
||||
|
||||
class AlbumSerializer(MusicEntitySerializer):
|
||||
model = music_models.Album
|
||||
released = serializers.DateField(allow_null=True, required=False)
|
||||
artists = serializers.ListField(child=ArtistSerializer(), min_length=1)
|
||||
cover = LinkSerializer(
|
||||
allowed_mimetypes=["image/*"], allow_null=True, required=False
|
||||
)
|
||||
|
||||
def to_representation(self, instance):
|
||||
d = {
|
||||
|
@ -688,7 +690,12 @@ class AlbumSerializer(MusicEntitySerializer):
|
|||
],
|
||||
}
|
||||
if instance.cover:
|
||||
d["cover"] = {"type": "Image", "url": utils.full_url(instance.cover.url)}
|
||||
d["cover"] = {
|
||||
"type": "Link",
|
||||
"href": utils.full_url(instance.cover.url),
|
||||
"mediaType": mimetypes.guess_type(instance.cover.path)[0]
|
||||
or "image/jpeg",
|
||||
}
|
||||
if self.context.get("include_ap_context", self.parent is None):
|
||||
d["@context"] = AP_CONTEXT
|
||||
return d
|
||||
|
@ -711,7 +718,6 @@ class AlbumSerializer(MusicEntitySerializer):
|
|||
|
||||
|
||||
class TrackSerializer(MusicEntitySerializer):
|
||||
model = music_models.Track
|
||||
position = serializers.IntegerField(min_value=0, allow_null=True, required=False)
|
||||
artists = serializers.ListField(child=ArtistSerializer(), min_length=1)
|
||||
album = AlbumSerializer()
|
||||
|
@ -738,32 +744,22 @@ class TrackSerializer(MusicEntitySerializer):
|
|||
d["@context"] = AP_CONTEXT
|
||||
return d
|
||||
|
||||
def get_create_data(self, validated_data):
|
||||
artist_data = validated_data["artists"][0]
|
||||
artist = ArtistSerializer(
|
||||
context={"activity": self.context.get("activity")}
|
||||
).create(artist_data)
|
||||
album = AlbumSerializer(
|
||||
context={"activity": self.context.get("activity")}
|
||||
).create(validated_data["album"])
|
||||
def create(self, validated_data):
|
||||
from funkwhale_api.music import tasks as music_tasks
|
||||
|
||||
return {
|
||||
"mbid": validated_data.get("musicbrainzId"),
|
||||
"fid": validated_data["id"],
|
||||
"title": validated_data["name"],
|
||||
"position": validated_data.get("position"),
|
||||
"creation_date": validated_data["published"],
|
||||
"artist": artist,
|
||||
"album": album,
|
||||
"from_activity": self.context.get("activity"),
|
||||
}
|
||||
metadata = music_tasks.federation_audio_track_to_metadata(validated_data)
|
||||
from_activity = self.context.get("activity")
|
||||
if from_activity:
|
||||
metadata["from_activity_id"] = from_activity.pk
|
||||
track = music_tasks.get_track_from_import_metadata(metadata)
|
||||
return track
|
||||
|
||||
|
||||
class UploadSerializer(serializers.Serializer):
|
||||
type = serializers.ChoiceField(choices=["Audio"])
|
||||
id = serializers.URLField(max_length=500)
|
||||
library = serializers.URLField(max_length=500)
|
||||
url = serializers.JSONField()
|
||||
url = LinkSerializer(allowed_mimetypes=["audio/*"])
|
||||
published = serializers.DateTimeField()
|
||||
updated = serializers.DateTimeField(required=False, allow_null=True)
|
||||
bitrate = serializers.IntegerField(min_value=0)
|
||||
|
|
|
@ -93,9 +93,9 @@ def convert_track_number(v):
|
|||
class FirstUUIDField(forms.UUIDField):
|
||||
def to_python(self, value):
|
||||
try:
|
||||
# sometimes, Picard leaves to uuids in the field, separated
|
||||
# by a slash
|
||||
value = value.split("/")[0]
|
||||
# sometimes, Picard leaves two uuids in the field, separated
|
||||
# by a slash or a ;
|
||||
value = value.split(";")[0].split("/")[0].strip()
|
||||
except (AttributeError, IndexError, TypeError):
|
||||
pass
|
||||
|
||||
|
@ -107,10 +107,18 @@ def get_date(value):
|
|||
return datetime.date(parsed.year, parsed.month, parsed.day)
|
||||
|
||||
|
||||
def split_and_return_first(separator):
|
||||
def inner(v):
|
||||
return v.split(separator)[0].strip()
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
VALIDATION = {
|
||||
"musicbrainz_artistid": FirstUUIDField(),
|
||||
"musicbrainz_albumid": FirstUUIDField(),
|
||||
"musicbrainz_recordingid": FirstUUIDField(),
|
||||
"musicbrainz_albumartistid": FirstUUIDField(),
|
||||
}
|
||||
|
||||
CONF = {
|
||||
|
@ -123,10 +131,15 @@ CONF = {
|
|||
},
|
||||
"title": {},
|
||||
"artist": {},
|
||||
"album_artist": {
|
||||
"field": "albumartist",
|
||||
"to_application": split_and_return_first(";"),
|
||||
},
|
||||
"album": {},
|
||||
"date": {"field": "date", "to_application": get_date},
|
||||
"musicbrainz_albumid": {},
|
||||
"musicbrainz_artistid": {},
|
||||
"musicbrainz_albumartistid": {},
|
||||
"musicbrainz_recordingid": {"field": "musicbrainz_trackid"},
|
||||
},
|
||||
},
|
||||
|
@ -139,10 +152,15 @@ CONF = {
|
|||
},
|
||||
"title": {},
|
||||
"artist": {},
|
||||
"album_artist": {
|
||||
"field": "albumartist",
|
||||
"to_application": split_and_return_first(";"),
|
||||
},
|
||||
"album": {},
|
||||
"date": {"field": "date", "to_application": get_date},
|
||||
"musicbrainz_albumid": {},
|
||||
"musicbrainz_artistid": {},
|
||||
"musicbrainz_albumartistid": {},
|
||||
"musicbrainz_recordingid": {"field": "musicbrainz_trackid"},
|
||||
},
|
||||
},
|
||||
|
@ -155,10 +173,12 @@ CONF = {
|
|||
},
|
||||
"title": {},
|
||||
"artist": {},
|
||||
"album_artist": {"field": "albumartist"},
|
||||
"album": {},
|
||||
"date": {"field": "date", "to_application": get_date},
|
||||
"musicbrainz_albumid": {"field": "MusicBrainz Album Id"},
|
||||
"musicbrainz_artistid": {"field": "MusicBrainz Artist Id"},
|
||||
"musicbrainz_albumartistid": {"field": "MusicBrainz Album Artist Id"},
|
||||
"musicbrainz_recordingid": {"field": "MusicBrainz Track Id"},
|
||||
},
|
||||
},
|
||||
|
@ -169,10 +189,12 @@ CONF = {
|
|||
"track_number": {"field": "TRCK", "to_application": convert_track_number},
|
||||
"title": {"field": "TIT2"},
|
||||
"artist": {"field": "TPE1"},
|
||||
"album_artist": {"field": "TPE2"},
|
||||
"album": {"field": "TALB"},
|
||||
"date": {"field": "TDRC", "to_application": get_date},
|
||||
"musicbrainz_albumid": {"field": "MusicBrainz Album Id"},
|
||||
"musicbrainz_artistid": {"field": "MusicBrainz Artist Id"},
|
||||
"musicbrainz_albumartistid": {"field": "MusicBrainz Album Artist Id"},
|
||||
"musicbrainz_recordingid": {
|
||||
"field": "UFID",
|
||||
"getter": get_mp3_recording_id,
|
||||
|
@ -190,10 +212,12 @@ CONF = {
|
|||
},
|
||||
"title": {},
|
||||
"artist": {},
|
||||
"album_artist": {"field": "albumartist"},
|
||||
"album": {},
|
||||
"date": {"field": "date", "to_application": get_date},
|
||||
"musicbrainz_albumid": {},
|
||||
"musicbrainz_artistid": {},
|
||||
"musicbrainz_albumartistid": {},
|
||||
"musicbrainz_recordingid": {"field": "musicbrainz_trackid"},
|
||||
"test": {},
|
||||
"pictures": {},
|
||||
|
@ -201,6 +225,19 @@ CONF = {
|
|||
},
|
||||
}
|
||||
|
||||
ALL_FIELDS = [
|
||||
"track_number",
|
||||
"title",
|
||||
"artist",
|
||||
"album_artist",
|
||||
"album",
|
||||
"date",
|
||||
"musicbrainz_albumid",
|
||||
"musicbrainz_artistid",
|
||||
"musicbrainz_albumartistid",
|
||||
"musicbrainz_recordingid",
|
||||
]
|
||||
|
||||
|
||||
class Metadata(object):
|
||||
def __init__(self, path):
|
||||
|
@ -238,6 +275,20 @@ class Metadata(object):
|
|||
v = field.to_python(v)
|
||||
return v
|
||||
|
||||
def all(self):
|
||||
"""
|
||||
Return a dict containing all metadata of the file
|
||||
"""
|
||||
|
||||
data = {}
|
||||
for field in ALL_FIELDS:
|
||||
try:
|
||||
data[field] = self.get(field, None)
|
||||
except (TagNotFound, forms.ValidationError):
|
||||
data[field] = None
|
||||
|
||||
return data
|
||||
|
||||
def get_picture(self, picture_type="cover_front"):
|
||||
ptype = getattr(mutagen.id3.PictureType, picture_type.upper())
|
||||
try:
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
import uuid
|
||||
|
@ -21,11 +22,14 @@ from versatileimagefield.image_warmer import VersatileImageFieldWarmer
|
|||
|
||||
from funkwhale_api import musicbrainz
|
||||
from funkwhale_api.common import fields
|
||||
from funkwhale_api.common import session
|
||||
from funkwhale_api.common import utils as common_utils
|
||||
from funkwhale_api.federation import models as federation_models
|
||||
from funkwhale_api.federation import utils as federation_utils
|
||||
from . import importers, metadata, utils
|
||||
|
||||
logger = logging.getLogger(__file__)
|
||||
|
||||
|
||||
def empty_dict():
|
||||
return {}
|
||||
|
@ -240,14 +244,35 @@ class Album(APIModelMixin):
|
|||
|
||||
def get_image(self, data=None):
|
||||
if data:
|
||||
f = ContentFile(data["content"])
|
||||
extensions = {"image/jpeg": "jpg", "image/png": "png", "image/gif": "gif"}
|
||||
extension = extensions.get(data["mimetype"], "jpg")
|
||||
self.cover.save("{}.{}".format(self.uuid, extension), f)
|
||||
else:
|
||||
if data.get("content"):
|
||||
# we have to cover itself
|
||||
f = ContentFile(data["content"])
|
||||
elif data.get("url"):
|
||||
# we can fetch from a url
|
||||
try:
|
||||
response = session.get_session().get(
|
||||
data.get("url"),
|
||||
timeout=3,
|
||||
verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
logger.warn(
|
||||
"Cannot download cover at url %s: %s", data.get("url"), e
|
||||
)
|
||||
return
|
||||
else:
|
||||
f = ContentFile(response.content)
|
||||
self.cover.save("{}.{}".format(self.uuid, extension), f, save=False)
|
||||
self.save(update_fields=["cover"])
|
||||
return self.cover.file
|
||||
if self.mbid:
|
||||
image_data = musicbrainz.api.images.get_front(str(self.mbid))
|
||||
f = ContentFile(image_data)
|
||||
self.cover.save("{0}.jpg".format(self.mbid), f)
|
||||
self.cover.save("{0}.jpg".format(self.mbid), f, save=False)
|
||||
self.save(update_fields=["cover"])
|
||||
return self.cover.file
|
||||
|
||||
def __str__(self):
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
import collections
|
||||
import logging
|
||||
import os
|
||||
|
||||
from django.utils import timezone
|
||||
from django.db import transaction
|
||||
from django.db.models import F
|
||||
from django.db.models import F, Q
|
||||
from django.dispatch import receiver
|
||||
|
||||
from musicbrainzngs import ResponseError
|
||||
|
@ -14,7 +15,6 @@ from funkwhale_api.common import preferences
|
|||
from funkwhale_api.federation import activity, actors, routes
|
||||
from funkwhale_api.federation import library as lb
|
||||
from funkwhale_api.federation import library as federation_serializers
|
||||
from funkwhale_api.providers.acoustid import get_acoustid_client
|
||||
from funkwhale_api.taskapp import celery
|
||||
|
||||
from . import lyrics as lyrics_utils
|
||||
|
@ -26,102 +26,32 @@ from . import serializers
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@celery.app.task(name="acoustid.set_on_upload")
|
||||
@celery.require_instance(models.Upload, "upload")
|
||||
def set_acoustid_on_upload(upload):
|
||||
client = get_acoustid_client()
|
||||
result = client.get_best_match(upload.audio_file.path)
|
||||
|
||||
def update(id):
|
||||
upload.acoustid_track_id = id
|
||||
upload.save(update_fields=["acoustid_track_id"])
|
||||
return id
|
||||
|
||||
if result:
|
||||
return update(result["id"])
|
||||
|
||||
|
||||
def import_track_from_remote(metadata):
|
||||
try:
|
||||
track_mbid = metadata["recording"]["musicbrainz_id"]
|
||||
assert track_mbid # for null/empty values
|
||||
except (KeyError, AssertionError):
|
||||
pass
|
||||
else:
|
||||
return models.Track.get_or_create_from_api(mbid=track_mbid)[0]
|
||||
|
||||
try:
|
||||
album_mbid = metadata["release"]["musicbrainz_id"]
|
||||
assert album_mbid # for null/empty values
|
||||
except (KeyError, AssertionError):
|
||||
pass
|
||||
else:
|
||||
album, _ = models.Album.get_or_create_from_api(mbid=album_mbid)
|
||||
return models.Track.get_or_create_from_title(
|
||||
metadata["title"], artist=album.artist, album=album
|
||||
)[0]
|
||||
|
||||
try:
|
||||
artist_mbid = metadata["artist"]["musicbrainz_id"]
|
||||
assert artist_mbid # for null/empty values
|
||||
except (KeyError, AssertionError):
|
||||
pass
|
||||
else:
|
||||
artist, _ = models.Artist.get_or_create_from_api(mbid=artist_mbid)
|
||||
album, _ = models.Album.get_or_create_from_title(
|
||||
metadata["album_title"], artist=artist
|
||||
)
|
||||
return models.Track.get_or_create_from_title(
|
||||
metadata["title"], artist=artist, album=album
|
||||
)[0]
|
||||
|
||||
# worst case scenario, we have absolutely no way to link to a
|
||||
# musicbrainz resource, we rely on the name/titles
|
||||
artist, _ = models.Artist.get_or_create_from_name(metadata["artist_name"])
|
||||
album, _ = models.Album.get_or_create_from_title(
|
||||
metadata["album_title"], artist=artist
|
||||
)
|
||||
return models.Track.get_or_create_from_title(
|
||||
metadata["title"], artist=artist, album=album
|
||||
)[0]
|
||||
|
||||
|
||||
def update_album_cover(album, upload, replace=False):
|
||||
def update_album_cover(album, source=None, cover_data=None, replace=False):
|
||||
if album.cover and not replace:
|
||||
return
|
||||
|
||||
if upload:
|
||||
# maybe the file has a cover embedded?
|
||||
if cover_data:
|
||||
return album.get_image(data=cover_data)
|
||||
|
||||
if source and source.startswith("file://"):
|
||||
# let's look for a cover in the same directory
|
||||
path = os.path.dirname(source.replace("file://", "", 1))
|
||||
logger.info("[Album %s] scanning covers from %s", album.pk, path)
|
||||
cover = get_cover_from_fs(path)
|
||||
if cover:
|
||||
return album.get_image(data=cover)
|
||||
if album.mbid:
|
||||
try:
|
||||
metadata = upload.get_metadata()
|
||||
except FileNotFoundError:
|
||||
metadata = None
|
||||
if metadata:
|
||||
cover = metadata.get_picture("cover_front")
|
||||
if cover:
|
||||
# best case scenario, cover is embedded in the track
|
||||
logger.info("[Album %s] Using cover embedded in file", album.pk)
|
||||
return album.get_image(data=cover)
|
||||
if upload.source and upload.source.startswith("file://"):
|
||||
# let's look for a cover in the same directory
|
||||
path = os.path.dirname(upload.source.replace("file://", "", 1))
|
||||
logger.info("[Album %s] scanning covers from %s", album.pk, path)
|
||||
cover = get_cover_from_fs(path)
|
||||
if cover:
|
||||
return album.get_image(data=cover)
|
||||
if not album.mbid:
|
||||
return
|
||||
try:
|
||||
logger.info(
|
||||
"[Album %s] Fetching cover from musicbrainz release %s",
|
||||
album.pk,
|
||||
str(album.mbid),
|
||||
)
|
||||
return album.get_image()
|
||||
except ResponseError as exc:
|
||||
logger.warning(
|
||||
"[Album %s] cannot fetch cover from musicbrainz: %s", album.pk, str(exc)
|
||||
)
|
||||
logger.info(
|
||||
"[Album %s] Fetching cover from musicbrainz release %s",
|
||||
album.pk,
|
||||
str(album.mbid),
|
||||
)
|
||||
return album.get_image()
|
||||
except ResponseError as exc:
|
||||
logger.warning(
|
||||
"[Album %s] cannot fetch cover from musicbrainz: %s", album.pk, str(exc)
|
||||
)
|
||||
|
||||
|
||||
IMAGE_TYPES = [("jpg", "image/jpeg"), ("png", "image/png")]
|
||||
|
@ -244,15 +174,15 @@ def scan_library_page(library_scan, page_url):
|
|||
scan_library_page.delay(library_scan_id=library_scan.pk, page_url=next_page)
|
||||
|
||||
|
||||
def getter(data, *keys):
|
||||
def getter(data, *keys, default=None):
|
||||
if not data:
|
||||
return
|
||||
return default
|
||||
v = data
|
||||
for k in keys:
|
||||
try:
|
||||
v = v[k]
|
||||
except KeyError:
|
||||
return
|
||||
return default
|
||||
|
||||
return v
|
||||
|
||||
|
@ -269,12 +199,17 @@ def fail_import(upload, error_code):
|
|||
upload.import_details = {"error_code": error_code}
|
||||
upload.import_date = timezone.now()
|
||||
upload.save(update_fields=["import_details", "import_status", "import_date"])
|
||||
signals.upload_import_status_updated.send(
|
||||
old_status=old_status,
|
||||
new_status=upload.import_status,
|
||||
upload=upload,
|
||||
sender=None,
|
||||
|
||||
broadcast = getter(
|
||||
upload.import_metadata, "funkwhale", "config", "broadcast", default=True
|
||||
)
|
||||
if broadcast:
|
||||
signals.upload_import_status_updated.send(
|
||||
old_status=old_status,
|
||||
new_status=upload.import_status,
|
||||
upload=upload,
|
||||
sender=None,
|
||||
)
|
||||
|
||||
|
||||
@celery.app.task(name="music.process_upload")
|
||||
|
@ -285,22 +220,29 @@ def fail_import(upload, error_code):
|
|||
"upload",
|
||||
)
|
||||
def process_upload(upload):
|
||||
data = upload.import_metadata or {}
|
||||
import_metadata = upload.import_metadata or {}
|
||||
old_status = upload.import_status
|
||||
audio_file = upload.get_audio_file()
|
||||
try:
|
||||
track = get_track_from_import_metadata(upload.import_metadata or {})
|
||||
if not track and upload.audio_file:
|
||||
# easy ways did not work. Now we have to be smart and use
|
||||
# metadata from the file itself if any
|
||||
track = import_track_data_from_file(upload.audio_file.file, hints=data)
|
||||
if not track and upload.metadata:
|
||||
# we can try to import using federation metadata
|
||||
track = import_track_from_remote(upload.metadata)
|
||||
additional_data = {}
|
||||
if not audio_file:
|
||||
# we can only rely on user proveded data
|
||||
final_metadata = import_metadata
|
||||
else:
|
||||
# we use user provided data and data from the file itself
|
||||
m = metadata.Metadata(audio_file)
|
||||
file_metadata = m.all()
|
||||
final_metadata = collections.ChainMap(
|
||||
additional_data, import_metadata, file_metadata
|
||||
)
|
||||
additional_data["cover_data"] = m.get_picture("cover_front")
|
||||
additional_data["upload_source"] = upload.source
|
||||
track = get_track_from_import_metadata(final_metadata)
|
||||
except UploadImportError as e:
|
||||
return fail_import(upload, e.code)
|
||||
except Exception:
|
||||
fail_import(upload, "unknown_error")
|
||||
raise
|
||||
return fail_import(upload, "unknown_error")
|
||||
|
||||
# under some situations, we want to skip the import (
|
||||
# for instance if the user already owns the files)
|
||||
owned_duplicates = get_owned_duplicates(upload, track)
|
||||
|
@ -342,33 +284,69 @@ def process_upload(upload):
|
|||
"bitrate",
|
||||
]
|
||||
)
|
||||
signals.upload_import_status_updated.send(
|
||||
old_status=old_status,
|
||||
new_status=upload.import_status,
|
||||
upload=upload,
|
||||
sender=None,
|
||||
broadcast = getter(
|
||||
import_metadata, "funkwhale", "config", "broadcast", default=True
|
||||
)
|
||||
routes.outbox.dispatch(
|
||||
{"type": "Create", "object": {"type": "Audio"}}, context={"upload": upload}
|
||||
if broadcast:
|
||||
signals.upload_import_status_updated.send(
|
||||
old_status=old_status,
|
||||
new_status=upload.import_status,
|
||||
upload=upload,
|
||||
sender=None,
|
||||
)
|
||||
dispatch_outbox = getter(
|
||||
import_metadata, "funkwhale", "config", "dispatch_outbox", default=True
|
||||
)
|
||||
if not track.album.cover:
|
||||
update_album_cover(track.album, upload)
|
||||
if dispatch_outbox:
|
||||
routes.outbox.dispatch(
|
||||
{"type": "Create", "object": {"type": "Audio"}}, context={"upload": upload}
|
||||
)
|
||||
|
||||
|
||||
def get_track_from_import_metadata(data):
|
||||
track_mbid = getter(data, "track", "mbid")
|
||||
track_uuid = getter(data, "track", "uuid")
|
||||
def federation_audio_track_to_metadata(payload):
|
||||
"""
|
||||
Given a valid payload as returned by federation.serializers.TrackSerializer.validated_data,
|
||||
returns a correct metadata payload for use with get_track_from_import_metadata.
|
||||
"""
|
||||
musicbrainz_recordingid = payload.get("musicbrainzId")
|
||||
musicbrainz_artistid = payload["artists"][0].get("musicbrainzId")
|
||||
musicbrainz_albumartistid = payload["album"]["artists"][0].get("musicbrainzId")
|
||||
musicbrainz_albumid = payload["album"].get("musicbrainzId")
|
||||
|
||||
if track_mbid:
|
||||
# easiest case: there is a MBID provided in the import_metadata
|
||||
return models.Track.get_or_create_from_api(mbid=track_mbid)[0]
|
||||
if track_uuid:
|
||||
# another easy case, we have a reference to a uuid of a track that
|
||||
# already exists in our database
|
||||
try:
|
||||
return models.Track.objects.get(uuid=track_uuid)
|
||||
except models.Track.DoesNotExist:
|
||||
raise UploadImportError(code="track_uuid_not_found")
|
||||
new_data = {
|
||||
"title": payload["name"],
|
||||
"album": payload["album"]["name"],
|
||||
"track_number": payload["position"],
|
||||
"artist": payload["artists"][0]["name"],
|
||||
"album_artist": payload["album"]["artists"][0]["name"],
|
||||
"date": payload["album"].get("released"),
|
||||
# musicbrainz
|
||||
"musicbrainz_recordingid": str(musicbrainz_recordingid)
|
||||
if musicbrainz_recordingid
|
||||
else None,
|
||||
"musicbrainz_artistid": str(musicbrainz_artistid)
|
||||
if musicbrainz_artistid
|
||||
else None,
|
||||
"musicbrainz_albumartistid": str(musicbrainz_albumartistid)
|
||||
if musicbrainz_albumartistid
|
||||
else None,
|
||||
"musicbrainz_albumid": str(musicbrainz_albumid)
|
||||
if musicbrainz_albumid
|
||||
else None,
|
||||
# federation
|
||||
"fid": payload["id"],
|
||||
"artist_fid": payload["artists"][0]["id"],
|
||||
"album_artist_fid": payload["album"]["artists"][0]["id"],
|
||||
"album_fid": payload["album"]["id"],
|
||||
"fdate": payload["published"],
|
||||
"album_fdate": payload["album"]["published"],
|
||||
"album_artist_fdate": payload["album"]["artists"][0]["published"],
|
||||
"artist_fdate": payload["artists"][0]["published"],
|
||||
}
|
||||
cover = payload["album"].get("cover")
|
||||
if cover:
|
||||
new_data["cover_data"] = {"mimetype": cover["mediaType"], "url": cover["href"]}
|
||||
return new_data
|
||||
|
||||
|
||||
def get_owned_duplicates(upload, track):
|
||||
|
@ -385,45 +363,191 @@ def get_owned_duplicates(upload, track):
|
|||
)
|
||||
|
||||
|
||||
def get_best_candidate_or_create(model, query, defaults, sort_fields):
|
||||
"""
|
||||
Like queryset.get_or_create() but does not crash if multiple objects
|
||||
are returned on the get() call
|
||||
"""
|
||||
candidates = model.objects.filter(query)
|
||||
if candidates:
|
||||
|
||||
return sort_candidates(candidates, sort_fields)[0], False
|
||||
|
||||
return model.objects.create(**defaults), True
|
||||
|
||||
|
||||
def sort_candidates(candidates, important_fields):
|
||||
"""
|
||||
Given a list of objects and a list of fields,
|
||||
will return a sorted list of those objects by score.
|
||||
|
||||
Score is higher for objects that have a non-empty attribute
|
||||
that is also present in important fields::
|
||||
|
||||
artist1 = Artist(mbid=None, fid=None)
|
||||
artist2 = Artist(mbid="something", fid=None)
|
||||
|
||||
# artist2 has a mbid, so is sorted first
|
||||
assert sort_candidates([artist1, artist2], ['mbid'])[0] == artist2
|
||||
|
||||
Only supports string fields.
|
||||
"""
|
||||
|
||||
# map each fields to its score, giving a higher score to first fields
|
||||
fields_scores = {f: i + 1 for i, f in enumerate(sorted(important_fields))}
|
||||
candidates_with_scores = []
|
||||
for candidate in candidates:
|
||||
current_score = 0
|
||||
for field, score in fields_scores.items():
|
||||
v = getattr(candidate, field, "")
|
||||
if v:
|
||||
current_score += score
|
||||
|
||||
candidates_with_scores.append((candidate, current_score))
|
||||
|
||||
return [c for c, s in reversed(sorted(candidates_with_scores, key=lambda v: v[1]))]
|
||||
|
||||
|
||||
@transaction.atomic
|
||||
def import_track_data_from_file(file, hints={}):
|
||||
data = metadata.Metadata(file)
|
||||
album = None
|
||||
def get_track_from_import_metadata(data):
|
||||
track_uuid = getter(data, "funkwhale", "track", "uuid")
|
||||
|
||||
if track_uuid:
|
||||
# easy case, we have a reference to a uuid of a track that
|
||||
# already exists in our database
|
||||
try:
|
||||
track = models.Track.objects.get(uuid=track_uuid)
|
||||
except models.Track.DoesNotExist:
|
||||
raise UploadImportError(code="track_uuid_not_found")
|
||||
|
||||
if not track.album.cover:
|
||||
update_album_cover(
|
||||
track.album,
|
||||
source=data.get("upload_source"),
|
||||
cover_data=data.get("cover_data"),
|
||||
)
|
||||
return track
|
||||
|
||||
from_activity_id = data.get("from_activity_id", None)
|
||||
track_mbid = data.get("musicbrainz_recordingid", None)
|
||||
album_mbid = data.get("musicbrainz_albumid", None)
|
||||
track_fid = getter(data, "fid")
|
||||
|
||||
query = None
|
||||
|
||||
if album_mbid and track_mbid:
|
||||
# to gain performance and avoid additional mb lookups,
|
||||
# we import from the release data, which is already cached
|
||||
return models.Track.get_or_create_from_release(album_mbid, track_mbid)[0]
|
||||
elif track_mbid:
|
||||
return models.Track.get_or_create_from_api(track_mbid)[0]
|
||||
elif album_mbid:
|
||||
album = models.Album.get_or_create_from_api(album_mbid)[0]
|
||||
query = Q(mbid=track_mbid, album__mbid=album_mbid)
|
||||
|
||||
artist = album.artist if album else None
|
||||
if track_fid:
|
||||
query = query | Q(fid=track_fid) if query else Q(fid=track_fid)
|
||||
|
||||
if query:
|
||||
# second easy case: we have a (track_mbid, album_mbid) pair or
|
||||
# a federation uuid we can check on
|
||||
try:
|
||||
return sort_candidates(models.Track.objects.filter(query), ["mbid", "fid"])[
|
||||
0
|
||||
]
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
# get / create artist and album artist
|
||||
artist_mbid = data.get("musicbrainz_artistid", None)
|
||||
if not artist:
|
||||
if artist_mbid:
|
||||
artist = models.Artist.get_or_create_from_api(artist_mbid)[0]
|
||||
else:
|
||||
artist = models.Artist.objects.get_or_create(
|
||||
name__iexact=data.get("artist"), defaults={"name": data.get("artist")}
|
||||
)[0]
|
||||
artist_fid = data.get("artist_fid", None)
|
||||
artist_name = data["artist"]
|
||||
query = Q(name__iexact=artist_name)
|
||||
if artist_mbid:
|
||||
query |= Q(mbid=artist_mbid)
|
||||
if artist_fid:
|
||||
query |= Q(fid=artist_fid)
|
||||
defaults = {
|
||||
"name": artist_name,
|
||||
"mbid": artist_mbid,
|
||||
"fid": artist_fid,
|
||||
"from_activity_id": from_activity_id,
|
||||
}
|
||||
if data.get("artist_fdate"):
|
||||
defaults["creation_date"] = data.get("artist_fdate")
|
||||
|
||||
release_date = data.get("date", default=None)
|
||||
if not album:
|
||||
album = models.Album.objects.get_or_create(
|
||||
title__iexact=data.get("album"),
|
||||
artist=artist,
|
||||
defaults={"title": data.get("album"), "release_date": release_date},
|
||||
)[0]
|
||||
position = data.get("track_number", default=None)
|
||||
track = models.Track.objects.get_or_create(
|
||||
title__iexact=data.get("title"),
|
||||
album=album,
|
||||
defaults={"title": data.get("title"), "position": position},
|
||||
artist = get_best_candidate_or_create(
|
||||
models.Artist, query, defaults=defaults, sort_fields=["mbid", "fid"]
|
||||
)[0]
|
||||
|
||||
album_artist_name = data.get("album_artist", artist_name)
|
||||
if album_artist_name == artist_name:
|
||||
album_artist = artist
|
||||
else:
|
||||
query = Q(name__iexact=album_artist_name)
|
||||
album_artist_mbid = data.get("musicbrainz_albumartistid", None)
|
||||
album_artist_fid = data.get("album_artist_fid", None)
|
||||
if album_artist_mbid:
|
||||
query |= Q(mbid=album_artist_mbid)
|
||||
if album_artist_fid:
|
||||
query |= Q(fid=album_artist_fid)
|
||||
defaults = {
|
||||
"name": album_artist_name,
|
||||
"mbid": album_artist_mbid,
|
||||
"fid": album_artist_fid,
|
||||
"from_activity_id": from_activity_id,
|
||||
}
|
||||
if data.get("album_artist_fdate"):
|
||||
defaults["creation_date"] = data.get("album_artist_fdate")
|
||||
|
||||
album_artist = get_best_candidate_or_create(
|
||||
models.Artist, query, defaults=defaults, sort_fields=["mbid", "fid"]
|
||||
)[0]
|
||||
|
||||
# get / create album
|
||||
album_title = data["album"]
|
||||
album_fid = data.get("album_fid", None)
|
||||
query = Q(title__iexact=album_title, artist=album_artist)
|
||||
if album_mbid:
|
||||
query |= Q(mbid=album_mbid)
|
||||
if album_fid:
|
||||
query |= Q(fid=album_fid)
|
||||
defaults = {
|
||||
"title": album_title,
|
||||
"artist": album_artist,
|
||||
"mbid": album_mbid,
|
||||
"release_date": data.get("date"),
|
||||
"fid": album_fid,
|
||||
"from_activity_id": from_activity_id,
|
||||
}
|
||||
if data.get("album_fdate"):
|
||||
defaults["creation_date"] = data.get("album_fdate")
|
||||
|
||||
album = get_best_candidate_or_create(
|
||||
models.Album, query, defaults=defaults, sort_fields=["mbid", "fid"]
|
||||
)[0]
|
||||
if not album.cover:
|
||||
update_album_cover(
|
||||
album, source=data.get("upload_source"), cover_data=data.get("cover_data")
|
||||
)
|
||||
|
||||
# get / create track
|
||||
track_title = data["title"]
|
||||
track_number = data.get("track_number", 1)
|
||||
query = Q(title__iexact=track_title, artist=artist, album=album)
|
||||
if track_mbid:
|
||||
query |= Q(mbid=track_mbid)
|
||||
if track_fid:
|
||||
query |= Q(fid=track_fid)
|
||||
defaults = {
|
||||
"title": track_title,
|
||||
"album": album,
|
||||
"mbid": track_mbid,
|
||||
"artist": artist,
|
||||
"position": track_number,
|
||||
"fid": track_fid,
|
||||
"from_activity_id": from_activity_id,
|
||||
}
|
||||
if data.get("fdate"):
|
||||
defaults["creation_date"] = data.get("fdate")
|
||||
|
||||
track = get_best_candidate_or_create(
|
||||
models.Track, query, defaults=defaults, sort_fields=["mbid", "fid"]
|
||||
)[0]
|
||||
|
||||
return track
|
||||
|
||||
|
||||
|
@ -432,6 +556,7 @@ def broadcast_import_status_update_to_owner(old_status, new_status, upload, **kw
|
|||
user = upload.library.actor.get_user()
|
||||
if not user:
|
||||
return
|
||||
|
||||
group = "user.{}.imports".format(user.pk)
|
||||
channels.group_send(
|
||||
group,
|
||||
|
|
|
@ -77,6 +77,29 @@ class Command(BaseCommand):
|
|||
"with their newest version."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--outbox",
|
||||
action="store_true",
|
||||
dest="outbox",
|
||||
default=False,
|
||||
help=(
|
||||
"Use this flag to notify library followers of newly imported files. "
|
||||
"You'll likely want to keep this disabled for CLI imports, especially if"
|
||||
"you plan to import hundreds or thousands of files, as it will cause a lot "
|
||||
"of overhead on your server and on servers you are federating with."
|
||||
),
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--broadcast",
|
||||
action="store_true",
|
||||
dest="broadcast",
|
||||
default=False,
|
||||
help=(
|
||||
"Use this flag to enable realtime updates about the import in the UI. "
|
||||
"This causes some overhead, so it's disabled by default."
|
||||
),
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--reference",
|
||||
|
@ -261,6 +284,8 @@ class Command(BaseCommand):
|
|||
async_,
|
||||
options["replace"],
|
||||
options["in_place"],
|
||||
options["outbox"],
|
||||
options["broadcast"],
|
||||
)
|
||||
except Exception as e:
|
||||
if options["exit_on_failure"]:
|
||||
|
@ -272,11 +297,29 @@ class Command(BaseCommand):
|
|||
errors.append((path, "{} {}".format(e.__class__.__name__, e)))
|
||||
return errors
|
||||
|
||||
def create_upload(self, path, reference, library, async_, replace, in_place):
|
||||
def create_upload(
|
||||
self,
|
||||
path,
|
||||
reference,
|
||||
library,
|
||||
async_,
|
||||
replace,
|
||||
in_place,
|
||||
dispatch_outbox,
|
||||
broadcast,
|
||||
):
|
||||
import_handler = tasks.process_upload.delay if async_ else tasks.process_upload
|
||||
upload = models.Upload(library=library, import_reference=reference)
|
||||
upload.source = "file://" + path
|
||||
upload.import_metadata = {"replace": replace}
|
||||
upload.import_metadata = {
|
||||
"funkwhale": {
|
||||
"config": {
|
||||
"replace": replace,
|
||||
"dispatch_outbox": dispatch_outbox,
|
||||
"broadcast": broadcast,
|
||||
}
|
||||
}
|
||||
}
|
||||
if not in_place:
|
||||
name = os.path.basename(path)
|
||||
with open(path, "rb") as f:
|
||||
|
|
|
@ -10,3 +10,4 @@ django-debug-toolbar>=1.9,<1.10
|
|||
# improved REPL
|
||||
ipdb==0.8.1
|
||||
black
|
||||
profiling
|
||||
|
|
|
@ -11,7 +11,7 @@ import uuid
|
|||
from faker.providers import internet as internet_provider
|
||||
import factory
|
||||
import pytest
|
||||
import requests_mock
|
||||
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
from django.core.cache import cache as django_cache
|
||||
from django.core.files import uploadedfile
|
||||
|
@ -271,14 +271,13 @@ def media_root(settings):
|
|||
shutil.rmtree(tmp_dir)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def r_mock():
|
||||
@pytest.fixture(autouse=True)
|
||||
def r_mock(requests_mock):
|
||||
"""
|
||||
Returns a requests_mock.mock() object you can use to mock HTTP calls made
|
||||
using python-requests
|
||||
"""
|
||||
with requests_mock.mock() as m:
|
||||
yield m
|
||||
yield requests_mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import io
|
||||
import pytest
|
||||
import uuid
|
||||
|
||||
|
@ -588,42 +589,6 @@ def test_music_library_serializer_from_private(factories, mocker):
|
|||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"model,serializer_class",
|
||||
[
|
||||
("music.Artist", serializers.ArtistSerializer),
|
||||
("music.Album", serializers.AlbumSerializer),
|
||||
("music.Track", serializers.TrackSerializer),
|
||||
],
|
||||
)
|
||||
def test_music_entity_serializer_create_existing_mbid(
|
||||
model, serializer_class, factories
|
||||
):
|
||||
entity = factories[model]()
|
||||
data = {"musicbrainzId": str(entity.mbid), "id": "https://noop"}
|
||||
serializer = serializer_class()
|
||||
|
||||
assert serializer.create(data) == entity
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"model,serializer_class",
|
||||
[
|
||||
("music.Artist", serializers.ArtistSerializer),
|
||||
("music.Album", serializers.AlbumSerializer),
|
||||
("music.Track", serializers.TrackSerializer),
|
||||
],
|
||||
)
|
||||
def test_music_entity_serializer_create_existing_fid(
|
||||
model, serializer_class, factories
|
||||
):
|
||||
entity = factories[model](fid="https://entity.url")
|
||||
data = {"musicbrainzId": None, "id": "https://entity.url"}
|
||||
serializer = serializer_class()
|
||||
|
||||
assert serializer.create(data) == entity
|
||||
|
||||
|
||||
def test_activity_pub_artist_serializer_to_ap(factories):
|
||||
artist = factories["music.Artist"]()
|
||||
expected = {
|
||||
|
@ -639,30 +604,6 @@ def test_activity_pub_artist_serializer_to_ap(factories):
|
|||
assert serializer.data == expected
|
||||
|
||||
|
||||
def test_activity_pub_artist_serializer_from_ap(factories):
|
||||
activity = factories["federation.Activity"]()
|
||||
|
||||
published = timezone.now()
|
||||
data = {
|
||||
"type": "Artist",
|
||||
"id": "http://hello.artist",
|
||||
"name": "John Smith",
|
||||
"musicbrainzId": str(uuid.uuid4()),
|
||||
"published": published.isoformat(),
|
||||
}
|
||||
serializer = serializers.ArtistSerializer(data=data, context={"activity": activity})
|
||||
|
||||
assert serializer.is_valid(raise_exception=True)
|
||||
|
||||
artist = serializer.save()
|
||||
|
||||
assert artist.from_activity == activity
|
||||
assert artist.name == data["name"]
|
||||
assert artist.fid == data["id"]
|
||||
assert str(artist.mbid) == data["musicbrainzId"]
|
||||
assert artist.creation_date == published
|
||||
|
||||
|
||||
def test_activity_pub_album_serializer_to_ap(factories):
|
||||
album = factories["music.Album"]()
|
||||
|
||||
|
@ -671,7 +612,11 @@ def test_activity_pub_album_serializer_to_ap(factories):
|
|||
"type": "Album",
|
||||
"id": album.fid,
|
||||
"name": album.title,
|
||||
"cover": {"type": "Image", "url": utils.full_url(album.cover.url)},
|
||||
"cover": {
|
||||
"type": "Link",
|
||||
"mediaType": "image/jpeg",
|
||||
"href": utils.full_url(album.cover.url),
|
||||
},
|
||||
"musicbrainzId": album.mbid,
|
||||
"published": album.creation_date.isoformat(),
|
||||
"released": album.release_date.isoformat(),
|
||||
|
@ -686,49 +631,6 @@ def test_activity_pub_album_serializer_to_ap(factories):
|
|||
assert serializer.data == expected
|
||||
|
||||
|
||||
def test_activity_pub_album_serializer_from_ap(factories):
|
||||
activity = factories["federation.Activity"]()
|
||||
|
||||
published = timezone.now()
|
||||
released = timezone.now().date()
|
||||
data = {
|
||||
"type": "Album",
|
||||
"id": "http://hello.album",
|
||||
"name": "Purple album",
|
||||
"musicbrainzId": str(uuid.uuid4()),
|
||||
"published": published.isoformat(),
|
||||
"released": released.isoformat(),
|
||||
"artists": [
|
||||
{
|
||||
"type": "Artist",
|
||||
"id": "http://hello.artist",
|
||||
"name": "John Smith",
|
||||
"musicbrainzId": str(uuid.uuid4()),
|
||||
"published": published.isoformat(),
|
||||
}
|
||||
],
|
||||
}
|
||||
serializer = serializers.AlbumSerializer(data=data, context={"activity": activity})
|
||||
|
||||
assert serializer.is_valid(raise_exception=True)
|
||||
|
||||
album = serializer.save()
|
||||
artist = album.artist
|
||||
|
||||
assert album.from_activity == activity
|
||||
assert album.title == data["name"]
|
||||
assert album.fid == data["id"]
|
||||
assert str(album.mbid) == data["musicbrainzId"]
|
||||
assert album.creation_date == published
|
||||
assert album.release_date == released
|
||||
|
||||
assert artist.from_activity == activity
|
||||
assert artist.name == data["artists"][0]["name"]
|
||||
assert artist.fid == data["artists"][0]["id"]
|
||||
assert str(artist.mbid) == data["artists"][0]["musicbrainzId"]
|
||||
assert artist.creation_date == published
|
||||
|
||||
|
||||
def test_activity_pub_track_serializer_to_ap(factories):
|
||||
track = factories["music.Track"]()
|
||||
expected = {
|
||||
|
@ -753,7 +655,7 @@ def test_activity_pub_track_serializer_to_ap(factories):
|
|||
assert serializer.data == expected
|
||||
|
||||
|
||||
def test_activity_pub_track_serializer_from_ap(factories):
|
||||
def test_activity_pub_track_serializer_from_ap(factories, r_mock):
|
||||
activity = factories["federation.Activity"]()
|
||||
published = timezone.now()
|
||||
released = timezone.now().date()
|
||||
|
@ -771,6 +673,11 @@ def test_activity_pub_track_serializer_from_ap(factories):
|
|||
"musicbrainzId": str(uuid.uuid4()),
|
||||
"published": published.isoformat(),
|
||||
"released": released.isoformat(),
|
||||
"cover": {
|
||||
"type": "Link",
|
||||
"href": "https://cover.image/test.png",
|
||||
"mediaType": "image/png",
|
||||
},
|
||||
"artists": [
|
||||
{
|
||||
"type": "Artist",
|
||||
|
@ -791,12 +698,14 @@ def test_activity_pub_track_serializer_from_ap(factories):
|
|||
}
|
||||
],
|
||||
}
|
||||
r_mock.get(data["album"]["cover"]["href"], body=io.BytesIO(b"coucou"))
|
||||
serializer = serializers.TrackSerializer(data=data, context={"activity": activity})
|
||||
assert serializer.is_valid(raise_exception=True)
|
||||
|
||||
track = serializer.save()
|
||||
album = track.album
|
||||
artist = track.artist
|
||||
album_artist = track.album.artist
|
||||
|
||||
assert track.from_activity == activity
|
||||
assert track.fid == data["id"]
|
||||
|
@ -806,7 +715,8 @@ def test_activity_pub_track_serializer_from_ap(factories):
|
|||
assert str(track.mbid) == data["musicbrainzId"]
|
||||
|
||||
assert album.from_activity == activity
|
||||
|
||||
assert album.cover.read() == b"coucou"
|
||||
assert album.cover.path.endswith(".png")
|
||||
assert album.title == data["album"]["name"]
|
||||
assert album.fid == data["album"]["id"]
|
||||
assert str(album.mbid) == data["album"]["musicbrainzId"]
|
||||
|
@ -819,6 +729,12 @@ def test_activity_pub_track_serializer_from_ap(factories):
|
|||
assert str(artist.mbid) == data["artists"][0]["musicbrainzId"]
|
||||
assert artist.creation_date == published
|
||||
|
||||
assert album_artist.from_activity == activity
|
||||
assert album_artist.name == data["album"]["artists"][0]["name"]
|
||||
assert album_artist.fid == data["album"]["artists"][0]["id"]
|
||||
assert str(album_artist.mbid) == data["album"]["artists"][0]["musicbrainzId"]
|
||||
assert album_artist.creation_date == published
|
||||
|
||||
|
||||
def test_activity_pub_upload_serializer_from_ap(factories, mocker):
|
||||
activity = factories["federation.Activity"]()
|
||||
|
|
Binary file not shown.
|
@ -9,24 +9,24 @@ from funkwhale_api.music import metadata
|
|||
DATA_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"field,value",
|
||||
[
|
||||
("title", "Peer Gynt Suite no. 1, op. 46: I. Morning"),
|
||||
("artist", "Edvard Grieg"),
|
||||
("album", "Peer Gynt Suite no. 1, op. 46"),
|
||||
("date", datetime.date(2012, 8, 15)),
|
||||
("track_number", 1),
|
||||
("musicbrainz_albumid", uuid.UUID("a766da8b-8336-47aa-a3ee-371cc41ccc75")),
|
||||
("musicbrainz_recordingid", uuid.UUID("bd21ac48-46d8-4e78-925f-d9cc2a294656")),
|
||||
("musicbrainz_artistid", uuid.UUID("013c8e5b-d72a-4cd3-8dee-6c64d6125823")),
|
||||
],
|
||||
)
|
||||
def test_can_get_metadata_from_opus_file(field, value):
|
||||
path = os.path.join(DATA_DIR, "test.opus")
|
||||
def test_get_all_metadata_at_once():
|
||||
path = os.path.join(DATA_DIR, "test.ogg")
|
||||
data = metadata.Metadata(path)
|
||||
|
||||
assert data.get(field) == value
|
||||
expected = {
|
||||
"title": "Peer Gynt Suite no. 1, op. 46: I. Morning",
|
||||
"artist": "Edvard Grieg",
|
||||
"album_artist": "Edvard Grieg",
|
||||
"album": "Peer Gynt Suite no. 1, op. 46",
|
||||
"date": datetime.date(2012, 8, 15),
|
||||
"track_number": 1,
|
||||
"musicbrainz_albumid": uuid.UUID("a766da8b-8336-47aa-a3ee-371cc41ccc75"),
|
||||
"musicbrainz_recordingid": uuid.UUID("bd21ac48-46d8-4e78-925f-d9cc2a294656"),
|
||||
"musicbrainz_artistid": uuid.UUID("013c8e5b-d72a-4cd3-8dee-6c64d6125823"),
|
||||
"musicbrainz_albumartistid": uuid.UUID("013c8e5b-d72a-4cd3-8dee-6c64d6125823"),
|
||||
}
|
||||
|
||||
assert data.all() == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -34,12 +34,17 @@ def test_can_get_metadata_from_opus_file(field, value):
|
|||
[
|
||||
("title", "Peer Gynt Suite no. 1, op. 46: I. Morning"),
|
||||
("artist", "Edvard Grieg"),
|
||||
("album_artist", "Edvard Grieg"),
|
||||
("album", "Peer Gynt Suite no. 1, op. 46"),
|
||||
("date", datetime.date(2012, 8, 15)),
|
||||
("track_number", 1),
|
||||
("musicbrainz_albumid", uuid.UUID("a766da8b-8336-47aa-a3ee-371cc41ccc75")),
|
||||
("musicbrainz_recordingid", uuid.UUID("bd21ac48-46d8-4e78-925f-d9cc2a294656")),
|
||||
("musicbrainz_artistid", uuid.UUID("013c8e5b-d72a-4cd3-8dee-6c64d6125823")),
|
||||
(
|
||||
"musicbrainz_albumartistid",
|
||||
uuid.UUID("013c8e5b-d72a-4cd3-8dee-6c64d6125823"),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_can_get_metadata_from_ogg_file(field, value):
|
||||
|
@ -49,17 +54,47 @@ def test_can_get_metadata_from_ogg_file(field, value):
|
|||
assert data.get(field) == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"field,value",
|
||||
[
|
||||
("title", "Peer Gynt Suite no. 1, op. 46: I. Morning"),
|
||||
("artist", "Edvard Grieg"),
|
||||
("album_artist", "Edvard Grieg"),
|
||||
("album", "Peer Gynt Suite no. 1, op. 46"),
|
||||
("date", datetime.date(2012, 8, 15)),
|
||||
("track_number", 1),
|
||||
("musicbrainz_albumid", uuid.UUID("a766da8b-8336-47aa-a3ee-371cc41ccc75")),
|
||||
("musicbrainz_recordingid", uuid.UUID("bd21ac48-46d8-4e78-925f-d9cc2a294656")),
|
||||
("musicbrainz_artistid", uuid.UUID("013c8e5b-d72a-4cd3-8dee-6c64d6125823")),
|
||||
(
|
||||
"musicbrainz_albumartistid",
|
||||
uuid.UUID("013c8e5b-d72a-4cd3-8dee-6c64d6125823"),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_can_get_metadata_from_opus_file(field, value):
|
||||
path = os.path.join(DATA_DIR, "test.opus")
|
||||
data = metadata.Metadata(path)
|
||||
|
||||
assert data.get(field) == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"field,value",
|
||||
[
|
||||
("title", "Drei Kreuze (dass wir hier sind)"),
|
||||
("artist", "Die Toten Hosen"),
|
||||
("album_artist", "Die Toten Hosen"),
|
||||
("album", "Ballast der Republik"),
|
||||
("date", datetime.date(2012, 5, 4)),
|
||||
("track_number", 1),
|
||||
("musicbrainz_albumid", uuid.UUID("1f0441ad-e609-446d-b355-809c445773cf")),
|
||||
("musicbrainz_recordingid", uuid.UUID("124d0150-8627-46bc-bc14-789a3bc960c8")),
|
||||
("musicbrainz_artistid", uuid.UUID("c3bc80a6-1f4a-4e17-8cf0-6b1efe8302f1")),
|
||||
(
|
||||
"musicbrainz_albumartistid",
|
||||
uuid.UUID("c3bc80a6-1f4a-4e17-8cf0-6b1efe8302f1"),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_can_get_metadata_from_ogg_theora_file(field, value):
|
||||
|
@ -73,13 +108,18 @@ def test_can_get_metadata_from_ogg_theora_file(field, value):
|
|||
"field,value",
|
||||
[
|
||||
("title", "Bend"),
|
||||
("artist", "Bindrpilot"),
|
||||
("artist", "Binärpilot"),
|
||||
("album_artist", "Binärpilot"),
|
||||
("album", "You Can't Stop Da Funk"),
|
||||
("date", datetime.date(2006, 2, 7)),
|
||||
("track_number", 2),
|
||||
("musicbrainz_albumid", uuid.UUID("ce40cdb1-a562-4fd8-a269-9269f98d4124")),
|
||||
("musicbrainz_recordingid", uuid.UUID("f269d497-1cc0-4ae4-a0c4-157ec7d73fcb")),
|
||||
("musicbrainz_artistid", uuid.UUID("9c6bddde-6228-4d9f-ad0d-03f6fcb19e13")),
|
||||
(
|
||||
"musicbrainz_albumartistid",
|
||||
uuid.UUID("9c6bddde-6228-4d9f-ad0d-03f6fcb19e13"),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_can_get_metadata_from_id3_mp3_file(field, value):
|
||||
|
@ -108,12 +148,17 @@ def test_can_get_pictures(name):
|
|||
[
|
||||
("title", "999,999"),
|
||||
("artist", "Nine Inch Nails"),
|
||||
("album_artist", "Nine Inch Nails"),
|
||||
("album", "The Slip"),
|
||||
("date", datetime.date(2008, 5, 5)),
|
||||
("track_number", 1),
|
||||
("musicbrainz_albumid", uuid.UUID("12b57d46-a192-499e-a91f-7da66790a1c1")),
|
||||
("musicbrainz_recordingid", uuid.UUID("30f3f33e-8d0c-4e69-8539-cbd701d18f28")),
|
||||
("musicbrainz_artistid", uuid.UUID("b7ffd2af-418f-4be2-bdd1-22f8b48613da")),
|
||||
(
|
||||
"musicbrainz_albumartistid",
|
||||
uuid.UUID("b7ffd2af-418f-4be2-bdd1-22f8b48613da"),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_can_get_metadata_from_flac_file(field, value):
|
||||
|
@ -133,7 +178,12 @@ def test_can_get_metadata_from_flac_file_not_crash_if_empty():
|
|||
|
||||
@pytest.mark.parametrize(
|
||||
"field_name",
|
||||
["musicbrainz_artistid", "musicbrainz_albumid", "musicbrainz_recordingid"],
|
||||
[
|
||||
"musicbrainz_artistid",
|
||||
"musicbrainz_albumid",
|
||||
"musicbrainz_recordingid",
|
||||
"musicbrainz_albumartistid",
|
||||
],
|
||||
)
|
||||
def test_mbid_clean_keeps_only_first(field_name):
|
||||
u1 = str(uuid.uuid4())
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
import datetime
|
||||
import io
|
||||
import os
|
||||
import pytest
|
||||
import uuid
|
||||
|
||||
from django.core.paginator import Paginator
|
||||
from django.utils import timezone
|
||||
|
||||
from funkwhale_api.federation import serializers as federation_serializers
|
||||
from funkwhale_api.music import signals, tasks
|
||||
from funkwhale_api.music import metadata, signals, tasks
|
||||
|
||||
DATA_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
@ -16,84 +18,163 @@ DATA_DIR = os.path.dirname(os.path.abspath(__file__))
|
|||
|
||||
def test_can_create_track_from_file_metadata_no_mbid(db, mocker):
|
||||
metadata = {
|
||||
"artist": ["Test artist"],
|
||||
"album": ["Test album"],
|
||||
"title": ["Test track"],
|
||||
"TRACKNUMBER": ["4"],
|
||||
"date": ["2012-08-15"],
|
||||
"title": "Test track",
|
||||
"artist": "Test artist",
|
||||
"album": "Test album",
|
||||
"date": datetime.date(2012, 8, 15),
|
||||
"track_number": 4,
|
||||
}
|
||||
mocker.patch("mutagen.File", return_value=metadata)
|
||||
mocker.patch(
|
||||
"funkwhale_api.music.metadata.Metadata.get_file_type", return_value="OggVorbis"
|
||||
)
|
||||
track = tasks.import_track_data_from_file(os.path.join(DATA_DIR, "dummy_file.ogg"))
|
||||
mocker.patch("funkwhale_api.music.metadata.Metadata.all", return_value=metadata)
|
||||
|
||||
assert track.title == metadata["title"][0]
|
||||
track = tasks.get_track_from_import_metadata(metadata)
|
||||
|
||||
assert track.title == metadata["title"]
|
||||
assert track.mbid is None
|
||||
assert track.position == 4
|
||||
assert track.album.title == metadata["album"][0]
|
||||
assert track.album.title == metadata["album"]
|
||||
assert track.album.mbid is None
|
||||
assert track.album.release_date == datetime.date(2012, 8, 15)
|
||||
assert track.artist.name == metadata["artist"][0]
|
||||
assert track.artist.name == metadata["artist"]
|
||||
assert track.artist.mbid is None
|
||||
|
||||
|
||||
def test_can_create_track_from_file_metadata_mbid(factories, mocker):
|
||||
album = factories["music.Album"]()
|
||||
artist = factories["music.Artist"]()
|
||||
mocker.patch(
|
||||
"funkwhale_api.music.models.Album.get_or_create_from_api",
|
||||
return_value=(album, True),
|
||||
)
|
||||
|
||||
album_data = {
|
||||
"release": {
|
||||
"id": album.mbid,
|
||||
"medium-list": [
|
||||
{
|
||||
"track-list": [
|
||||
{
|
||||
"id": "03baca8b-855a-3c05-8f3d-d3235287d84d",
|
||||
"position": "4",
|
||||
"number": "4",
|
||||
"recording": {
|
||||
"id": "2109e376-132b-40ad-b993-2bb6812e19d4",
|
||||
"title": "Teen Age Riot",
|
||||
"artist-credit": [
|
||||
{"artist": {"id": artist.mbid, "name": artist.name}}
|
||||
],
|
||||
},
|
||||
}
|
||||
],
|
||||
"track-count": 1,
|
||||
}
|
||||
],
|
||||
}
|
||||
}
|
||||
mocker.patch("funkwhale_api.musicbrainz.api.releases.get", return_value=album_data)
|
||||
track_data = album_data["release"]["medium-list"][0]["track-list"][0]
|
||||
metadata = {
|
||||
"musicbrainz_albumid": [album.mbid],
|
||||
"musicbrainz_trackid": [track_data["recording"]["id"]],
|
||||
"title": "Test track",
|
||||
"artist": "Test artist",
|
||||
"album_artist": "Test album artist",
|
||||
"album": "Test album",
|
||||
"date": datetime.date(2012, 8, 15),
|
||||
"track_number": 4,
|
||||
"musicbrainz_albumid": "ce40cdb1-a562-4fd8-a269-9269f98d4124",
|
||||
"musicbrainz_recordingid": "f269d497-1cc0-4ae4-a0c4-157ec7d73fcb",
|
||||
"musicbrainz_artistid": "9c6bddde-6228-4d9f-ad0d-03f6fcb19e13",
|
||||
"musicbrainz_albumartistid": "9c6bddde-6478-4d9f-ad0d-03f6fcb19e13",
|
||||
}
|
||||
mocker.patch("mutagen.File", return_value=metadata)
|
||||
mocker.patch(
|
||||
"funkwhale_api.music.metadata.Metadata.get_file_type", return_value="OggVorbis"
|
||||
)
|
||||
track = tasks.import_track_data_from_file(os.path.join(DATA_DIR, "dummy_file.ogg"))
|
||||
|
||||
assert track.title == track_data["recording"]["title"]
|
||||
assert track.mbid == track_data["recording"]["id"]
|
||||
mocker.patch("funkwhale_api.music.metadata.Metadata.all", return_value=metadata)
|
||||
|
||||
track = tasks.get_track_from_import_metadata(metadata)
|
||||
|
||||
assert track.title == metadata["title"]
|
||||
assert track.mbid == metadata["musicbrainz_recordingid"]
|
||||
assert track.position == 4
|
||||
assert track.album.title == metadata["album"]
|
||||
assert track.album.mbid == metadata["musicbrainz_albumid"]
|
||||
assert track.album.artist.mbid == metadata["musicbrainz_albumartistid"]
|
||||
assert track.album.artist.name == metadata["album_artist"]
|
||||
assert track.album.release_date == datetime.date(2012, 8, 15)
|
||||
assert track.artist.name == metadata["artist"]
|
||||
assert track.artist.mbid == metadata["musicbrainz_artistid"]
|
||||
|
||||
|
||||
def test_can_create_track_from_file_metadata_mbid_existing_album_artist(
|
||||
factories, mocker
|
||||
):
|
||||
artist = factories["music.Artist"]()
|
||||
album = factories["music.Album"]()
|
||||
metadata = {
|
||||
"artist": "",
|
||||
"album": "",
|
||||
"title": "Hello",
|
||||
"track_number": 4,
|
||||
"musicbrainz_albumid": album.mbid,
|
||||
"musicbrainz_recordingid": "f269d497-1cc0-4ae4-a0c4-157ec7d73fcb",
|
||||
"musicbrainz_artistid": artist.mbid,
|
||||
"musicbrainz_albumartistid": album.artist.mbid,
|
||||
}
|
||||
|
||||
mocker.patch("funkwhale_api.music.metadata.Metadata.all", return_value=metadata)
|
||||
|
||||
track = tasks.get_track_from_import_metadata(metadata)
|
||||
|
||||
assert track.title == metadata["title"]
|
||||
assert track.mbid == metadata["musicbrainz_recordingid"]
|
||||
assert track.position == 4
|
||||
assert track.album == album
|
||||
assert track.artist == artist
|
||||
|
||||
|
||||
def test_upload_import_mbid(now, factories, temp_signal, mocker):
|
||||
def test_can_create_track_from_file_metadata_fid_existing_album_artist(
|
||||
factories, mocker
|
||||
):
|
||||
artist = factories["music.Artist"]()
|
||||
album = factories["music.Album"]()
|
||||
metadata = {
|
||||
"artist": "",
|
||||
"album": "",
|
||||
"title": "Hello",
|
||||
"track_number": 4,
|
||||
"fid": "https://hello",
|
||||
"album_fid": album.fid,
|
||||
"artist_fid": artist.fid,
|
||||
"album_artist_fid": album.artist.fid,
|
||||
}
|
||||
|
||||
mocker.patch("funkwhale_api.music.metadata.Metadata.all", return_value=metadata)
|
||||
|
||||
track = tasks.get_track_from_import_metadata(metadata)
|
||||
|
||||
assert track.title == metadata["title"]
|
||||
assert track.fid == metadata["fid"]
|
||||
assert track.position == 4
|
||||
assert track.album == album
|
||||
assert track.artist == artist
|
||||
|
||||
|
||||
def test_can_create_track_from_file_metadata_federation(factories, mocker, r_mock):
|
||||
metadata = {
|
||||
"artist": "Artist",
|
||||
"album": "Album",
|
||||
"album_artist": "Album artist",
|
||||
"title": "Hello",
|
||||
"track_number": 4,
|
||||
"fid": "https://hello",
|
||||
"album_fid": "https://album.fid",
|
||||
"artist_fid": "https://artist.fid",
|
||||
"album_artist_fid": "https://album.artist.fid",
|
||||
"fdate": timezone.now(),
|
||||
"album_fdate": timezone.now(),
|
||||
"album_artist_fdate": timezone.now(),
|
||||
"artist_fdate": timezone.now(),
|
||||
"cover_data": {"url": "https://cover/hello.png", "mimetype": "image/png"},
|
||||
}
|
||||
r_mock.get(metadata["cover_data"]["url"], body=io.BytesIO(b"coucou"))
|
||||
mocker.patch("funkwhale_api.music.metadata.Metadata.all", return_value=metadata)
|
||||
|
||||
track = tasks.get_track_from_import_metadata(metadata)
|
||||
|
||||
assert track.title == metadata["title"]
|
||||
assert track.fid == metadata["fid"]
|
||||
assert track.creation_date == metadata["fdate"]
|
||||
assert track.position == 4
|
||||
assert track.album.cover.read() == b"coucou"
|
||||
assert track.album.cover.path.endswith(".png")
|
||||
assert track.album.fid == metadata["album_fid"]
|
||||
assert track.album.title == metadata["album"]
|
||||
assert track.album.creation_date == metadata["album_fdate"]
|
||||
assert track.album.artist.fid == metadata["album_artist_fid"]
|
||||
assert track.album.artist.name == metadata["album_artist"]
|
||||
assert track.album.artist.creation_date == metadata["album_artist_fdate"]
|
||||
assert track.artist.fid == metadata["artist_fid"]
|
||||
assert track.artist.name == metadata["artist"]
|
||||
assert track.artist.creation_date == metadata["artist_fdate"]
|
||||
|
||||
|
||||
def test_sort_candidates(factories):
|
||||
artist1 = factories["music.Artist"].build(fid=None, mbid=None)
|
||||
artist2 = factories["music.Artist"].build(fid=None)
|
||||
artist3 = factories["music.Artist"].build(mbid=None)
|
||||
result = tasks.sort_candidates([artist1, artist2, artist3], ["mbid", "fid"])
|
||||
|
||||
assert result == [artist2, artist3, artist1]
|
||||
|
||||
|
||||
def test_upload_import(now, factories, temp_signal, mocker):
|
||||
outbox = mocker.patch("funkwhale_api.federation.routes.outbox.dispatch")
|
||||
track = factories["music.Track"]()
|
||||
upload = factories["music.Upload"](
|
||||
track=None, import_metadata={"track": {"mbid": track.mbid}}
|
||||
track=None, import_metadata={"funkwhale": {"track": {"uuid": str(track.uuid)}}}
|
||||
)
|
||||
|
||||
with temp_signal(signals.upload_import_status_updated) as handler:
|
||||
|
@ -123,7 +204,29 @@ def test_upload_import_get_audio_data(factories, mocker):
|
|||
)
|
||||
track = factories["music.Track"]()
|
||||
upload = factories["music.Upload"](
|
||||
track=None, import_metadata={"track": {"mbid": track.mbid}}
|
||||
track=None, import_metadata={"funkwhale": {"track": {"uuid": track.uuid}}}
|
||||
)
|
||||
|
||||
tasks.process_upload(upload_id=upload.pk)
|
||||
|
||||
upload.refresh_from_db()
|
||||
assert upload.size == 23
|
||||
assert upload.duration == 42
|
||||
assert upload.bitrate == 66
|
||||
|
||||
|
||||
def test_upload_import_in_place(factories, mocker):
|
||||
mocker.patch(
|
||||
"funkwhale_api.music.models.Upload.get_audio_data",
|
||||
return_value={"size": 23, "duration": 42, "bitrate": 66},
|
||||
)
|
||||
track = factories["music.Track"]()
|
||||
path = os.path.join(DATA_DIR, "test.ogg")
|
||||
upload = factories["music.Upload"](
|
||||
track=None,
|
||||
audio_file=None,
|
||||
source="file://{}".format(path),
|
||||
import_metadata={"funkwhale": {"track": {"uuid": track.uuid}}},
|
||||
)
|
||||
|
||||
tasks.process_upload(upload_id=upload.pk)
|
||||
|
@ -141,13 +244,13 @@ def test_upload_import_skip_existing_track_in_own_library(factories, temp_signal
|
|||
track=track,
|
||||
import_status="finished",
|
||||
library=library,
|
||||
import_metadata={"track": {"mbid": track.mbid}},
|
||||
import_metadata={"funkwhale": {"track": {"uuid": track.mbid}}},
|
||||
)
|
||||
duplicate = factories["music.Upload"](
|
||||
track=track,
|
||||
import_status="pending",
|
||||
library=library,
|
||||
import_metadata={"track": {"mbid": track.mbid}},
|
||||
import_metadata={"funkwhale": {"track": {"uuid": track.uuid}}},
|
||||
)
|
||||
with temp_signal(signals.upload_import_status_updated) as handler:
|
||||
tasks.process_upload(upload_id=duplicate.pk)
|
||||
|
@ -172,7 +275,7 @@ def test_upload_import_skip_existing_track_in_own_library(factories, temp_signal
|
|||
def test_upload_import_track_uuid(now, factories):
|
||||
track = factories["music.Track"]()
|
||||
upload = factories["music.Upload"](
|
||||
track=None, import_metadata={"track": {"uuid": track.uuid}}
|
||||
track=None, import_metadata={"funkwhale": {"track": {"uuid": track.uuid}}}
|
||||
)
|
||||
|
||||
tasks.process_upload(upload_id=upload.pk)
|
||||
|
@ -184,9 +287,43 @@ def test_upload_import_track_uuid(now, factories):
|
|||
assert upload.import_date == now
|
||||
|
||||
|
||||
def test_upload_import_skip_federation(now, factories, mocker):
|
||||
outbox = mocker.patch("funkwhale_api.federation.routes.outbox.dispatch")
|
||||
track = factories["music.Track"]()
|
||||
upload = factories["music.Upload"](
|
||||
track=None,
|
||||
import_metadata={
|
||||
"funkwhale": {
|
||||
"track": {"uuid": track.uuid},
|
||||
"config": {"dispatch_outbox": False},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
tasks.process_upload(upload_id=upload.pk)
|
||||
|
||||
outbox.assert_not_called()
|
||||
|
||||
|
||||
def test_upload_import_skip_broadcast(now, factories, mocker):
|
||||
group_send = mocker.patch("funkwhale_api.common.channels.group_send")
|
||||
track = factories["music.Track"]()
|
||||
upload = factories["music.Upload"](
|
||||
library__actor__local=True,
|
||||
track=None,
|
||||
import_metadata={
|
||||
"funkwhale": {"track": {"uuid": track.uuid}, "config": {"broadcast": False}}
|
||||
},
|
||||
)
|
||||
|
||||
tasks.process_upload(upload_id=upload.pk)
|
||||
|
||||
group_send.assert_not_called()
|
||||
|
||||
|
||||
def test_upload_import_error(factories, now, temp_signal):
|
||||
upload = factories["music.Upload"](
|
||||
import_metadata={"track": {"uuid": uuid.uuid4()}}
|
||||
import_metadata={"funkwhale": {"track": {"uuid": uuid.uuid4()}}}
|
||||
)
|
||||
with temp_signal(signals.upload_import_status_updated) as handler:
|
||||
tasks.process_upload(upload_id=upload.pk)
|
||||
|
@ -209,32 +346,26 @@ def test_upload_import_updates_cover_if_no_cover(factories, mocker, now):
|
|||
album = factories["music.Album"](cover="")
|
||||
track = factories["music.Track"](album=album)
|
||||
upload = factories["music.Upload"](
|
||||
track=None, import_metadata={"track": {"uuid": track.uuid}}
|
||||
track=None, import_metadata={"funkwhale": {"track": {"uuid": track.uuid}}}
|
||||
)
|
||||
tasks.process_upload(upload_id=upload.pk)
|
||||
mocked_update.assert_called_once_with(album, upload)
|
||||
mocked_update.assert_called_once_with(album, source=None, cover_data=None)
|
||||
|
||||
|
||||
def test_update_album_cover_mbid(factories, mocker):
|
||||
album = factories["music.Album"](cover="")
|
||||
|
||||
mocked_get = mocker.patch("funkwhale_api.music.models.Album.get_image")
|
||||
tasks.update_album_cover(album=album, upload=None)
|
||||
tasks.update_album_cover(album=album)
|
||||
|
||||
mocked_get.assert_called_once_with()
|
||||
|
||||
|
||||
def test_update_album_cover_file_data(factories, mocker):
|
||||
album = factories["music.Album"](cover="", mbid=None)
|
||||
upload = factories["music.Upload"](track__album=album)
|
||||
|
||||
mocked_get = mocker.patch("funkwhale_api.music.models.Album.get_image")
|
||||
mocker.patch(
|
||||
"funkwhale_api.music.metadata.Metadata.get_picture",
|
||||
return_value={"hello": "world"},
|
||||
)
|
||||
tasks.update_album_cover(album=album, upload=upload)
|
||||
upload.get_metadata()
|
||||
tasks.update_album_cover(album=album, cover_data={"hello": "world"})
|
||||
mocked_get.assert_called_once_with(data={"hello": "world"})
|
||||
|
||||
|
||||
|
@ -245,19 +376,87 @@ def test_update_album_cover_file_cover_separate_file(ext, mimetype, factories, m
|
|||
with open(image_path, "rb") as f:
|
||||
image_content = f.read()
|
||||
album = factories["music.Album"](cover="", mbid=None)
|
||||
upload = factories["music.Upload"](
|
||||
track__album=album, source="file://" + image_path
|
||||
)
|
||||
|
||||
mocked_get = mocker.patch("funkwhale_api.music.models.Album.get_image")
|
||||
mocker.patch("funkwhale_api.music.metadata.Metadata.get_picture", return_value=None)
|
||||
tasks.update_album_cover(album=album, upload=upload)
|
||||
upload.get_metadata()
|
||||
tasks.update_album_cover(album=album, source="file://" + image_path)
|
||||
mocked_get.assert_called_once_with(
|
||||
data={"mimetype": mimetype, "content": image_content}
|
||||
)
|
||||
|
||||
|
||||
def test_federation_audio_track_to_metadata(now):
|
||||
published = now
|
||||
released = now.date()
|
||||
payload = {
|
||||
"type": "Track",
|
||||
"id": "http://hello.track",
|
||||
"musicbrainzId": str(uuid.uuid4()),
|
||||
"name": "Black in back",
|
||||
"position": 5,
|
||||
"published": published.isoformat(),
|
||||
"album": {
|
||||
"published": published.isoformat(),
|
||||
"type": "Album",
|
||||
"id": "http://hello.album",
|
||||
"name": "Purple album",
|
||||
"musicbrainzId": str(uuid.uuid4()),
|
||||
"released": released.isoformat(),
|
||||
"artists": [
|
||||
{
|
||||
"type": "Artist",
|
||||
"published": published.isoformat(),
|
||||
"id": "http://hello.artist",
|
||||
"name": "John Smith",
|
||||
"musicbrainzId": str(uuid.uuid4()),
|
||||
}
|
||||
],
|
||||
},
|
||||
"artists": [
|
||||
{
|
||||
"published": published.isoformat(),
|
||||
"type": "Artist",
|
||||
"id": "http://hello.trackartist",
|
||||
"name": "Bob Smith",
|
||||
"musicbrainzId": str(uuid.uuid4()),
|
||||
}
|
||||
],
|
||||
}
|
||||
serializer = federation_serializers.TrackSerializer(data=payload)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
expected = {
|
||||
"artist": payload["artists"][0]["name"],
|
||||
"album": payload["album"]["name"],
|
||||
"album_artist": payload["album"]["artists"][0]["name"],
|
||||
"title": payload["name"],
|
||||
"date": released,
|
||||
"track_number": payload["position"],
|
||||
# musicbrainz
|
||||
"musicbrainz_albumid": payload["album"]["musicbrainzId"],
|
||||
"musicbrainz_recordingid": payload["musicbrainzId"],
|
||||
"musicbrainz_artistid": payload["artists"][0]["musicbrainzId"],
|
||||
"musicbrainz_albumartistid": payload["album"]["artists"][0]["musicbrainzId"],
|
||||
# federation
|
||||
"fid": payload["id"],
|
||||
"album_fid": payload["album"]["id"],
|
||||
"artist_fid": payload["artists"][0]["id"],
|
||||
"album_artist_fid": payload["album"]["artists"][0]["id"],
|
||||
"fdate": serializer.validated_data["published"],
|
||||
"artist_fdate": serializer.validated_data["artists"][0]["published"],
|
||||
"album_artist_fdate": serializer.validated_data["album"]["artists"][0][
|
||||
"published"
|
||||
],
|
||||
"album_fdate": serializer.validated_data["album"]["published"],
|
||||
}
|
||||
|
||||
result = tasks.federation_audio_track_to_metadata(serializer.validated_data)
|
||||
assert result == expected
|
||||
|
||||
# ensure we never forget to test a mandatory field
|
||||
for k in metadata.ALL_FIELDS:
|
||||
assert k in result
|
||||
|
||||
|
||||
def test_scan_library_fetches_page_and_calls_scan_page(now, mocker, factories, r_mock):
|
||||
scan = factories["music.LibraryScan"]()
|
||||
collection_conf = {
|
||||
|
|
|
@ -54,6 +54,39 @@ def test_import_files_stores_proper_data(factories, mocker, now, path):
|
|||
assert upload.import_reference == "cli-{}".format(now.isoformat())
|
||||
assert upload.import_status == "pending"
|
||||
assert upload.source == "file://{}".format(path)
|
||||
assert upload.import_metadata == {
|
||||
"funkwhale": {
|
||||
"config": {"replace": False, "dispatch_outbox": False, "broadcast": False}
|
||||
}
|
||||
}
|
||||
|
||||
mocked_process.assert_called_once_with(upload_id=upload.pk)
|
||||
|
||||
|
||||
def test_import_with_outbox_flag(factories, mocker):
|
||||
library = factories["music.Library"](actor__local=True)
|
||||
path = os.path.join(DATA_DIR, "dummy_file.ogg")
|
||||
mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
|
||||
call_command(
|
||||
"import_files", str(library.uuid), path, outbox=True, interactive=False
|
||||
)
|
||||
upload = library.uploads.last()
|
||||
|
||||
assert upload.import_metadata["funkwhale"]["config"]["dispatch_outbox"] is True
|
||||
|
||||
mocked_process.assert_called_once_with(upload_id=upload.pk)
|
||||
|
||||
|
||||
def test_import_with_broadcast_flag(factories, mocker):
|
||||
library = factories["music.Library"](actor__local=True)
|
||||
path = os.path.join(DATA_DIR, "dummy_file.ogg")
|
||||
mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
|
||||
call_command(
|
||||
"import_files", str(library.uuid), path, broadcast=True, interactive=False
|
||||
)
|
||||
upload = library.uploads.last()
|
||||
|
||||
assert upload.import_metadata["funkwhale"]["config"]["broadcast"] is True
|
||||
|
||||
mocked_process.assert_called_once_with(upload_id=upload.pk)
|
||||
|
||||
|
@ -67,7 +100,7 @@ def test_import_with_replace_flag(factories, mocker):
|
|||
)
|
||||
upload = library.uploads.last()
|
||||
|
||||
assert upload.import_metadata["replace"] is True
|
||||
assert upload.import_metadata["funkwhale"]["config"]["replace"] is True
|
||||
|
||||
mocked_process.assert_called_once_with(upload_id=upload.pk)
|
||||
|
||||
|
|
1
dev.yml
1
dev.yml
|
@ -30,6 +30,7 @@ services:
|
|||
- .env.dev
|
||||
- .env
|
||||
image: postgres
|
||||
command: postgres -c log_min_duration_statement=0
|
||||
volumes:
|
||||
- "./data/${COMPOSE_PROJECT_NAME-node1}/postgres:/var/lib/postgresql/data"
|
||||
networks:
|
||||
|
|
|
@ -282,6 +282,7 @@ export default {
|
|||
'search.tokens': {
|
||||
handler (newValue) {
|
||||
this.search.query = compileTokens(newValue)
|
||||
this.page = 1
|
||||
this.fetchData()
|
||||
},
|
||||
deep: true
|
||||
|
@ -290,6 +291,9 @@ export default {
|
|||
this.page = 1
|
||||
this.fetchData()
|
||||
},
|
||||
page: function () {
|
||||
this.fetchData()
|
||||
},
|
||||
ordering: function () {
|
||||
this.page = 1
|
||||
this.fetchData()
|
||||
|
|
Loading…
Reference in New Issue