553 lines
18 KiB
Python
553 lines
18 KiB
Python
import collections
|
|
import datetime
|
|
import logging
|
|
import os
|
|
|
|
from django.utils import timezone
|
|
from django.db import transaction
|
|
from django.db.models import F, Q
|
|
from django.dispatch import receiver
|
|
|
|
from musicbrainzngs import ResponseError
|
|
from requests.exceptions import RequestException
|
|
|
|
from funkwhale_api.common import channels, preferences
|
|
from funkwhale_api.federation import routes
|
|
from funkwhale_api.federation import library as lb
|
|
from funkwhale_api.taskapp import celery
|
|
|
|
from . import licenses
|
|
from . import lyrics as lyrics_utils
|
|
from . import models
|
|
from . import metadata
|
|
from . import signals
|
|
from . import serializers
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def update_album_cover(album, source=None, cover_data=None, replace=False):
|
|
if album.cover and not replace:
|
|
return
|
|
if cover_data:
|
|
return album.get_image(data=cover_data)
|
|
|
|
if source and source.startswith("file://"):
|
|
# let's look for a cover in the same directory
|
|
path = os.path.dirname(source.replace("file://", "", 1))
|
|
logger.info("[Album %s] scanning covers from %s", album.pk, path)
|
|
cover = get_cover_from_fs(path)
|
|
if cover:
|
|
return album.get_image(data=cover)
|
|
if album.mbid:
|
|
try:
|
|
logger.info(
|
|
"[Album %s] Fetching cover from musicbrainz release %s",
|
|
album.pk,
|
|
str(album.mbid),
|
|
)
|
|
return album.get_image()
|
|
except ResponseError as exc:
|
|
logger.warning(
|
|
"[Album %s] cannot fetch cover from musicbrainz: %s", album.pk, str(exc)
|
|
)
|
|
|
|
|
|
IMAGE_TYPES = [("jpg", "image/jpeg"), ("png", "image/png")]
|
|
|
|
|
|
def get_cover_from_fs(dir_path):
|
|
if os.path.exists(dir_path):
|
|
for e, m in IMAGE_TYPES:
|
|
cover_path = os.path.join(dir_path, "cover.{}".format(e))
|
|
if not os.path.exists(cover_path):
|
|
logger.debug("Cover %s does not exists", cover_path)
|
|
continue
|
|
with open(cover_path, "rb") as c:
|
|
logger.info("Found cover at %s", cover_path)
|
|
return {"mimetype": m, "content": c.read()}
|
|
|
|
|
|
@celery.app.task(name="Lyrics.fetch_content")
|
|
@celery.require_instance(models.Lyrics, "lyrics")
|
|
def fetch_content(lyrics):
|
|
html = lyrics_utils._get_html(lyrics.url)
|
|
content = lyrics_utils.extract_content(html)
|
|
cleaned_content = lyrics_utils.clean_content(content)
|
|
lyrics.content = cleaned_content
|
|
lyrics.save(update_fields=["content"])
|
|
|
|
|
|
@celery.app.task(name="music.start_library_scan")
|
|
@celery.require_instance(
|
|
models.LibraryScan.objects.select_related().filter(status="pending"), "library_scan"
|
|
)
|
|
def start_library_scan(library_scan):
|
|
try:
|
|
data = lb.get_library_data(library_scan.library.fid, actor=library_scan.actor)
|
|
except Exception:
|
|
library_scan.status = "errored"
|
|
library_scan.save(update_fields=["status", "modification_date"])
|
|
raise
|
|
library_scan.modification_date = timezone.now()
|
|
library_scan.status = "scanning"
|
|
library_scan.total_files = data["totalItems"]
|
|
library_scan.save(update_fields=["status", "modification_date", "total_files"])
|
|
scan_library_page.delay(library_scan_id=library_scan.pk, page_url=data["first"])
|
|
|
|
|
|
@celery.app.task(
|
|
name="music.scan_library_page",
|
|
retry_backoff=60,
|
|
max_retries=5,
|
|
autoretry_for=[RequestException],
|
|
)
|
|
@celery.require_instance(
|
|
models.LibraryScan.objects.select_related().filter(status="scanning"),
|
|
"library_scan",
|
|
)
|
|
def scan_library_page(library_scan, page_url):
|
|
data = lb.get_library_page(library_scan.library, page_url, library_scan.actor)
|
|
uploads = []
|
|
|
|
for item_serializer in data["items"]:
|
|
upload = item_serializer.save(library=library_scan.library)
|
|
uploads.append(upload)
|
|
|
|
library_scan.processed_files = F("processed_files") + len(uploads)
|
|
library_scan.modification_date = timezone.now()
|
|
update_fields = ["modification_date", "processed_files"]
|
|
|
|
next_page = data.get("next")
|
|
fetch_next = next_page and next_page != page_url
|
|
|
|
if not fetch_next:
|
|
update_fields.append("status")
|
|
library_scan.status = "finished"
|
|
library_scan.save(update_fields=update_fields)
|
|
|
|
if fetch_next:
|
|
scan_library_page.delay(library_scan_id=library_scan.pk, page_url=next_page)
|
|
|
|
|
|
def getter(data, *keys, default=None):
|
|
if not data:
|
|
return default
|
|
v = data
|
|
for k in keys:
|
|
try:
|
|
v = v[k]
|
|
except KeyError:
|
|
return default
|
|
|
|
return v
|
|
|
|
|
|
class UploadImportError(ValueError):
|
|
def __init__(self, code):
|
|
self.code = code
|
|
super().__init__(code)
|
|
|
|
|
|
def fail_import(upload, error_code):
|
|
old_status = upload.import_status
|
|
upload.import_status = "errored"
|
|
upload.import_details = {"error_code": error_code}
|
|
upload.import_date = timezone.now()
|
|
upload.save(update_fields=["import_details", "import_status", "import_date"])
|
|
|
|
broadcast = getter(
|
|
upload.import_metadata, "funkwhale", "config", "broadcast", default=True
|
|
)
|
|
if broadcast:
|
|
signals.upload_import_status_updated.send(
|
|
old_status=old_status,
|
|
new_status=upload.import_status,
|
|
upload=upload,
|
|
sender=None,
|
|
)
|
|
|
|
|
|
@celery.app.task(name="music.process_upload")
|
|
@celery.require_instance(
|
|
models.Upload.objects.filter(import_status="pending").select_related(
|
|
"library__actor__user"
|
|
),
|
|
"upload",
|
|
)
|
|
def process_upload(upload):
|
|
import_metadata = upload.import_metadata or {}
|
|
old_status = upload.import_status
|
|
audio_file = upload.get_audio_file()
|
|
try:
|
|
additional_data = {}
|
|
if not audio_file:
|
|
# we can only rely on user proveded data
|
|
final_metadata = import_metadata
|
|
else:
|
|
# we use user provided data and data from the file itself
|
|
m = metadata.Metadata(audio_file)
|
|
file_metadata = m.all()
|
|
final_metadata = collections.ChainMap(
|
|
additional_data, import_metadata, file_metadata
|
|
)
|
|
additional_data["cover_data"] = m.get_picture("cover_front")
|
|
additional_data["upload_source"] = upload.source
|
|
track = get_track_from_import_metadata(final_metadata)
|
|
except UploadImportError as e:
|
|
return fail_import(upload, e.code)
|
|
except Exception:
|
|
fail_import(upload, "unknown_error")
|
|
raise
|
|
|
|
# under some situations, we want to skip the import (
|
|
# for instance if the user already owns the files)
|
|
owned_duplicates = get_owned_duplicates(upload, track)
|
|
upload.track = track
|
|
|
|
if owned_duplicates:
|
|
upload.import_status = "skipped"
|
|
upload.import_details = {
|
|
"code": "already_imported_in_owned_libraries",
|
|
"duplicates": list(owned_duplicates),
|
|
}
|
|
upload.import_date = timezone.now()
|
|
upload.save(
|
|
update_fields=["import_details", "import_status", "import_date", "track"]
|
|
)
|
|
signals.upload_import_status_updated.send(
|
|
old_status=old_status,
|
|
new_status=upload.import_status,
|
|
upload=upload,
|
|
sender=None,
|
|
)
|
|
return
|
|
|
|
# all is good, let's finalize the import
|
|
audio_data = upload.get_audio_data()
|
|
if audio_data:
|
|
upload.duration = audio_data["duration"]
|
|
upload.size = audio_data["size"]
|
|
upload.bitrate = audio_data["bitrate"]
|
|
upload.import_status = "finished"
|
|
upload.import_date = timezone.now()
|
|
upload.save(
|
|
update_fields=[
|
|
"track",
|
|
"import_status",
|
|
"import_date",
|
|
"size",
|
|
"duration",
|
|
"bitrate",
|
|
]
|
|
)
|
|
broadcast = getter(
|
|
import_metadata, "funkwhale", "config", "broadcast", default=True
|
|
)
|
|
if broadcast:
|
|
signals.upload_import_status_updated.send(
|
|
old_status=old_status,
|
|
new_status=upload.import_status,
|
|
upload=upload,
|
|
sender=None,
|
|
)
|
|
dispatch_outbox = getter(
|
|
import_metadata, "funkwhale", "config", "dispatch_outbox", default=True
|
|
)
|
|
if dispatch_outbox:
|
|
routes.outbox.dispatch(
|
|
{"type": "Create", "object": {"type": "Audio"}}, context={"upload": upload}
|
|
)
|
|
|
|
|
|
def federation_audio_track_to_metadata(payload):
|
|
"""
|
|
Given a valid payload as returned by federation.serializers.TrackSerializer.validated_data,
|
|
returns a correct metadata payload for use with get_track_from_import_metadata.
|
|
"""
|
|
musicbrainz_recordingid = payload.get("musicbrainzId")
|
|
musicbrainz_artistid = payload["artists"][0].get("musicbrainzId")
|
|
musicbrainz_albumartistid = payload["album"]["artists"][0].get("musicbrainzId")
|
|
musicbrainz_albumid = payload["album"].get("musicbrainzId")
|
|
|
|
new_data = {
|
|
"title": payload["name"],
|
|
"album": payload["album"]["name"],
|
|
"track_number": payload["position"],
|
|
"disc_number": payload.get("disc"),
|
|
"artist": payload["artists"][0]["name"],
|
|
"album_artist": payload["album"]["artists"][0]["name"],
|
|
"date": payload["album"].get("released"),
|
|
"license": payload.get("license"),
|
|
"copyright": payload.get("copyright"),
|
|
# musicbrainz
|
|
"musicbrainz_recordingid": str(musicbrainz_recordingid)
|
|
if musicbrainz_recordingid
|
|
else None,
|
|
"musicbrainz_artistid": str(musicbrainz_artistid)
|
|
if musicbrainz_artistid
|
|
else None,
|
|
"musicbrainz_albumartistid": str(musicbrainz_albumartistid)
|
|
if musicbrainz_albumartistid
|
|
else None,
|
|
"musicbrainz_albumid": str(musicbrainz_albumid)
|
|
if musicbrainz_albumid
|
|
else None,
|
|
# federation
|
|
"fid": payload["id"],
|
|
"artist_fid": payload["artists"][0]["id"],
|
|
"album_artist_fid": payload["album"]["artists"][0]["id"],
|
|
"album_fid": payload["album"]["id"],
|
|
"fdate": payload["published"],
|
|
"album_fdate": payload["album"]["published"],
|
|
"album_artist_fdate": payload["album"]["artists"][0]["published"],
|
|
"artist_fdate": payload["artists"][0]["published"],
|
|
}
|
|
cover = payload["album"].get("cover")
|
|
if cover:
|
|
new_data["cover_data"] = {"mimetype": cover["mediaType"], "url": cover["href"]}
|
|
return new_data
|
|
|
|
|
|
def get_owned_duplicates(upload, track):
|
|
"""
|
|
Ensure we skip duplicate tracks to avoid wasting user/instance storage
|
|
"""
|
|
owned_libraries = upload.library.actor.libraries.all()
|
|
return (
|
|
models.Upload.objects.filter(
|
|
track__isnull=False, library__in=owned_libraries, track=track
|
|
)
|
|
.exclude(pk=upload.pk)
|
|
.values_list("uuid", flat=True)
|
|
)
|
|
|
|
|
|
def get_best_candidate_or_create(model, query, defaults, sort_fields):
|
|
"""
|
|
Like queryset.get_or_create() but does not crash if multiple objects
|
|
are returned on the get() call
|
|
"""
|
|
candidates = model.objects.filter(query)
|
|
if candidates:
|
|
|
|
return sort_candidates(candidates, sort_fields)[0], False
|
|
|
|
return model.objects.create(**defaults), True
|
|
|
|
|
|
def sort_candidates(candidates, important_fields):
|
|
"""
|
|
Given a list of objects and a list of fields,
|
|
will return a sorted list of those objects by score.
|
|
|
|
Score is higher for objects that have a non-empty attribute
|
|
that is also present in important fields::
|
|
|
|
artist1 = Artist(mbid=None, fid=None)
|
|
artist2 = Artist(mbid="something", fid=None)
|
|
|
|
# artist2 has a mbid, so is sorted first
|
|
assert sort_candidates([artist1, artist2], ['mbid'])[0] == artist2
|
|
|
|
Only supports string fields.
|
|
"""
|
|
|
|
# map each fields to its score, giving a higher score to first fields
|
|
fields_scores = {f: i + 1 for i, f in enumerate(sorted(important_fields))}
|
|
candidates_with_scores = []
|
|
for candidate in candidates:
|
|
current_score = 0
|
|
for field, score in fields_scores.items():
|
|
v = getattr(candidate, field, "")
|
|
if v:
|
|
current_score += score
|
|
|
|
candidates_with_scores.append((candidate, current_score))
|
|
|
|
return [c for c, s in reversed(sorted(candidates_with_scores, key=lambda v: v[1]))]
|
|
|
|
|
|
@transaction.atomic
|
|
def get_track_from_import_metadata(data):
|
|
track_uuid = getter(data, "funkwhale", "track", "uuid")
|
|
|
|
if track_uuid:
|
|
# easy case, we have a reference to a uuid of a track that
|
|
# already exists in our database
|
|
try:
|
|
track = models.Track.objects.get(uuid=track_uuid)
|
|
except models.Track.DoesNotExist:
|
|
raise UploadImportError(code="track_uuid_not_found")
|
|
|
|
if not track.album.cover:
|
|
update_album_cover(
|
|
track.album,
|
|
source=data.get("upload_source"),
|
|
cover_data=data.get("cover_data"),
|
|
)
|
|
return track
|
|
|
|
from_activity_id = data.get("from_activity_id", None)
|
|
track_mbid = data.get("musicbrainz_recordingid", None)
|
|
album_mbid = data.get("musicbrainz_albumid", None)
|
|
track_fid = getter(data, "fid")
|
|
|
|
query = None
|
|
|
|
if album_mbid and track_mbid:
|
|
query = Q(mbid=track_mbid, album__mbid=album_mbid)
|
|
|
|
if track_fid:
|
|
query = query | Q(fid=track_fid) if query else Q(fid=track_fid)
|
|
|
|
if query:
|
|
# second easy case: we have a (track_mbid, album_mbid) pair or
|
|
# a federation uuid we can check on
|
|
try:
|
|
return sort_candidates(models.Track.objects.filter(query), ["mbid", "fid"])[
|
|
0
|
|
]
|
|
except IndexError:
|
|
pass
|
|
|
|
# get / create artist and album artist
|
|
artist_mbid = data.get("musicbrainz_artistid", None)
|
|
artist_fid = data.get("artist_fid", None)
|
|
artist_name = data["artist"]
|
|
query = Q(name__iexact=artist_name)
|
|
if artist_mbid:
|
|
query |= Q(mbid=artist_mbid)
|
|
if artist_fid:
|
|
query |= Q(fid=artist_fid)
|
|
defaults = {
|
|
"name": artist_name,
|
|
"mbid": artist_mbid,
|
|
"fid": artist_fid,
|
|
"from_activity_id": from_activity_id,
|
|
}
|
|
if data.get("artist_fdate"):
|
|
defaults["creation_date"] = data.get("artist_fdate")
|
|
|
|
artist = get_best_candidate_or_create(
|
|
models.Artist, query, defaults=defaults, sort_fields=["mbid", "fid"]
|
|
)[0]
|
|
|
|
album_artist_name = data.get("album_artist") or artist_name
|
|
if album_artist_name == artist_name:
|
|
album_artist = artist
|
|
else:
|
|
query = Q(name__iexact=album_artist_name)
|
|
album_artist_mbid = data.get("musicbrainz_albumartistid", None)
|
|
album_artist_fid = data.get("album_artist_fid", None)
|
|
if album_artist_mbid:
|
|
query |= Q(mbid=album_artist_mbid)
|
|
if album_artist_fid:
|
|
query |= Q(fid=album_artist_fid)
|
|
defaults = {
|
|
"name": album_artist_name,
|
|
"mbid": album_artist_mbid,
|
|
"fid": album_artist_fid,
|
|
"from_activity_id": from_activity_id,
|
|
}
|
|
if data.get("album_artist_fdate"):
|
|
defaults["creation_date"] = data.get("album_artist_fdate")
|
|
|
|
album_artist = get_best_candidate_or_create(
|
|
models.Artist, query, defaults=defaults, sort_fields=["mbid", "fid"]
|
|
)[0]
|
|
|
|
# get / create album
|
|
album_title = data["album"]
|
|
album_fid = data.get("album_fid", None)
|
|
query = Q(title__iexact=album_title, artist=album_artist)
|
|
if album_mbid:
|
|
query |= Q(mbid=album_mbid)
|
|
if album_fid:
|
|
query |= Q(fid=album_fid)
|
|
defaults = {
|
|
"title": album_title,
|
|
"artist": album_artist,
|
|
"mbid": album_mbid,
|
|
"release_date": data.get("date"),
|
|
"fid": album_fid,
|
|
"from_activity_id": from_activity_id,
|
|
}
|
|
if data.get("album_fdate"):
|
|
defaults["creation_date"] = data.get("album_fdate")
|
|
|
|
album = get_best_candidate_or_create(
|
|
models.Album, query, defaults=defaults, sort_fields=["mbid", "fid"]
|
|
)[0]
|
|
if not album.cover:
|
|
update_album_cover(
|
|
album, source=data.get("upload_source"), cover_data=data.get("cover_data")
|
|
)
|
|
|
|
# get / create track
|
|
track_title = data["title"]
|
|
track_number = data.get("track_number", 1)
|
|
query = Q(title__iexact=track_title, artist=artist, album=album)
|
|
if track_mbid:
|
|
query |= Q(mbid=track_mbid)
|
|
if track_fid:
|
|
query |= Q(fid=track_fid)
|
|
defaults = {
|
|
"title": track_title,
|
|
"album": album,
|
|
"mbid": track_mbid,
|
|
"artist": artist,
|
|
"position": track_number,
|
|
"disc_number": data.get("disc_number"),
|
|
"fid": track_fid,
|
|
"from_activity_id": from_activity_id,
|
|
"license": licenses.match(data.get("license"), data.get("copyright")),
|
|
"copyright": data.get("copyright"),
|
|
}
|
|
if data.get("fdate"):
|
|
defaults["creation_date"] = data.get("fdate")
|
|
|
|
track = get_best_candidate_or_create(
|
|
models.Track, query, defaults=defaults, sort_fields=["mbid", "fid"]
|
|
)[0]
|
|
|
|
return track
|
|
|
|
|
|
@receiver(signals.upload_import_status_updated)
|
|
def broadcast_import_status_update_to_owner(old_status, new_status, upload, **kwargs):
|
|
user = upload.library.actor.get_user()
|
|
if not user:
|
|
return
|
|
|
|
group = "user.{}.imports".format(user.pk)
|
|
channels.group_send(
|
|
group,
|
|
{
|
|
"type": "event.send",
|
|
"text": "",
|
|
"data": {
|
|
"type": "import.status_updated",
|
|
"upload": serializers.UploadForOwnerSerializer(upload).data,
|
|
"old_status": old_status,
|
|
"new_status": new_status,
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
@celery.app.task(name="music.clean_transcoding_cache")
|
|
def clean_transcoding_cache():
|
|
delay = preferences.get("music__transcoding_cache_duration")
|
|
if delay < 1:
|
|
return # cache clearing disabled
|
|
limit = timezone.now() - datetime.timedelta(minutes=delay)
|
|
candidates = (
|
|
models.UploadVersion.objects.filter(
|
|
(Q(accessed_date__lt=limit) | Q(accessed_date=None))
|
|
)
|
|
.only("audio_file", "id")
|
|
.order_by("id")
|
|
)
|
|
return candidates.delete()
|