624 lines
20 KiB
Python
624 lines
20 KiB
Python
import datetime
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import uuid
|
|
|
|
import arrow
|
|
import markdown
|
|
from django.conf import settings
|
|
from django.core.files import File
|
|
from django.core.files.base import ContentFile
|
|
from django.db import models
|
|
from django.db.models.signals import post_save
|
|
from django.dispatch import receiver
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from taggit.managers import TaggableManager
|
|
from versatileimagefield.fields import VersatileImageField
|
|
|
|
from funkwhale_api import downloader, musicbrainz
|
|
from funkwhale_api.federation import utils as federation_utils
|
|
|
|
from . import importers, metadata, utils
|
|
|
|
|
|
class APIModelMixin(models.Model):
|
|
mbid = models.UUIDField(unique=True, db_index=True, null=True, blank=True)
|
|
uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4)
|
|
api_includes = []
|
|
creation_date = models.DateTimeField(default=timezone.now)
|
|
import_hooks = []
|
|
|
|
class Meta:
|
|
abstract = True
|
|
ordering = ["-creation_date"]
|
|
|
|
@classmethod
|
|
def get_or_create_from_api(cls, mbid):
|
|
try:
|
|
return cls.objects.get(mbid=mbid), False
|
|
except cls.DoesNotExist:
|
|
return cls.create_from_api(id=mbid), True
|
|
|
|
def get_api_data(self):
|
|
return self.__class__.api.get(id=self.mbid, includes=self.api_includes)[
|
|
self.musicbrainz_model
|
|
]
|
|
|
|
@classmethod
|
|
def create_from_api(cls, **kwargs):
|
|
if kwargs.get("id"):
|
|
raw_data = cls.api.get(id=kwargs["id"], includes=cls.api_includes)[
|
|
cls.musicbrainz_model
|
|
]
|
|
else:
|
|
raw_data = cls.api.search(**kwargs)[
|
|
"{0}-list".format(cls.musicbrainz_model)
|
|
][0]
|
|
cleaned_data = cls.clean_musicbrainz_data(raw_data)
|
|
return importers.load(cls, cleaned_data, raw_data, cls.import_hooks)
|
|
|
|
@classmethod
|
|
def clean_musicbrainz_data(cls, data):
|
|
cleaned_data = {}
|
|
mapping = importers.Mapping(cls.musicbrainz_mapping)
|
|
for key, value in data.items():
|
|
try:
|
|
cleaned_key, cleaned_value = mapping.from_musicbrainz(key, value)
|
|
cleaned_data[cleaned_key] = cleaned_value
|
|
except KeyError:
|
|
pass
|
|
return cleaned_data
|
|
|
|
@property
|
|
def musicbrainz_url(self):
|
|
if self.mbid:
|
|
return "https://musicbrainz.org/{}/{}".format(
|
|
self.musicbrainz_model, self.mbid
|
|
)
|
|
|
|
|
|
class ArtistQuerySet(models.QuerySet):
|
|
def with_albums_count(self):
|
|
return self.annotate(_albums_count=models.Count("albums"))
|
|
|
|
def with_albums(self):
|
|
return self.prefetch_related(
|
|
models.Prefetch("albums", queryset=Album.objects.with_tracks_count())
|
|
)
|
|
|
|
|
|
class Artist(APIModelMixin):
|
|
name = models.CharField(max_length=255)
|
|
|
|
musicbrainz_model = "artist"
|
|
musicbrainz_mapping = {
|
|
"mbid": {"musicbrainz_field_name": "id"},
|
|
"name": {"musicbrainz_field_name": "name"},
|
|
}
|
|
api = musicbrainz.api.artists
|
|
objects = ArtistQuerySet.as_manager()
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
@property
|
|
def tags(self):
|
|
t = []
|
|
for album in self.albums.all():
|
|
for tag in album.tags:
|
|
t.append(tag)
|
|
return set(t)
|
|
|
|
@classmethod
|
|
def get_or_create_from_name(cls, name, **kwargs):
|
|
kwargs.update({"name": name})
|
|
return cls.objects.get_or_create(name__iexact=name, defaults=kwargs)
|
|
|
|
|
|
def import_artist(v):
|
|
a = Artist.get_or_create_from_api(mbid=v[0]["artist"]["id"])[0]
|
|
return a
|
|
|
|
|
|
def parse_date(v):
|
|
if len(v) == 4:
|
|
return datetime.date(int(v), 1, 1)
|
|
d = arrow.get(v).date()
|
|
return d
|
|
|
|
|
|
def import_tracks(instance, cleaned_data, raw_data):
|
|
for track_data in raw_data["medium-list"][0]["track-list"]:
|
|
track_cleaned_data = Track.clean_musicbrainz_data(track_data["recording"])
|
|
track_cleaned_data["album"] = instance
|
|
track_cleaned_data["position"] = int(track_data["position"])
|
|
importers.load(Track, track_cleaned_data, track_data, Track.import_hooks)
|
|
|
|
|
|
class AlbumQuerySet(models.QuerySet):
|
|
def with_tracks_count(self):
|
|
return self.annotate(_tracks_count=models.Count("tracks"))
|
|
|
|
|
|
class Album(APIModelMixin):
|
|
title = models.CharField(max_length=255)
|
|
artist = models.ForeignKey(Artist, related_name="albums", on_delete=models.CASCADE)
|
|
release_date = models.DateField(null=True)
|
|
release_group_id = models.UUIDField(null=True, blank=True)
|
|
cover = VersatileImageField(
|
|
upload_to="albums/covers/%Y/%m/%d", null=True, blank=True
|
|
)
|
|
TYPE_CHOICES = (("album", "Album"),)
|
|
type = models.CharField(choices=TYPE_CHOICES, max_length=30, default="album")
|
|
|
|
api_includes = ["artist-credits", "recordings", "media", "release-groups"]
|
|
api = musicbrainz.api.releases
|
|
musicbrainz_model = "release"
|
|
musicbrainz_mapping = {
|
|
"mbid": {"musicbrainz_field_name": "id"},
|
|
"position": {
|
|
"musicbrainz_field_name": "release-list",
|
|
"converter": lambda v: int(v[0]["medium-list"][0]["position"]),
|
|
},
|
|
"release_group_id": {
|
|
"musicbrainz_field_name": "release-group",
|
|
"converter": lambda v: v["id"],
|
|
},
|
|
"title": {"musicbrainz_field_name": "title"},
|
|
"release_date": {"musicbrainz_field_name": "date", "converter": parse_date},
|
|
"type": {"musicbrainz_field_name": "type", "converter": lambda v: v.lower()},
|
|
"artist": {
|
|
"musicbrainz_field_name": "artist-credit",
|
|
"converter": import_artist,
|
|
},
|
|
}
|
|
objects = AlbumQuerySet.as_manager()
|
|
|
|
def get_image(self, data=None):
|
|
if data:
|
|
f = ContentFile(data["content"])
|
|
extensions = {"image/jpeg": "jpg", "image/png": "png", "image/gif": "gif"}
|
|
extension = extensions.get(data["mimetype"], "jpg")
|
|
self.cover.save("{}.{}".format(self.uuid, extension), f)
|
|
else:
|
|
image_data = musicbrainz.api.images.get_front(str(self.mbid))
|
|
f = ContentFile(image_data)
|
|
self.cover.save("{0}.jpg".format(self.mbid), f)
|
|
return self.cover.file
|
|
|
|
def __str__(self):
|
|
return self.title
|
|
|
|
@property
|
|
def tags(self):
|
|
t = []
|
|
for track in self.tracks.all():
|
|
for tag in track.tags.all():
|
|
t.append(tag)
|
|
return set(t)
|
|
|
|
@classmethod
|
|
def get_or_create_from_title(cls, title, **kwargs):
|
|
kwargs.update({"title": title})
|
|
return cls.objects.get_or_create(title__iexact=title, defaults=kwargs)
|
|
|
|
|
|
def import_tags(instance, cleaned_data, raw_data):
|
|
MINIMUM_COUNT = 2
|
|
tags_to_add = []
|
|
for tag_data in raw_data.get("tag-list", []):
|
|
try:
|
|
if int(tag_data["count"]) < MINIMUM_COUNT:
|
|
continue
|
|
except ValueError:
|
|
continue
|
|
tags_to_add.append(tag_data["name"])
|
|
instance.tags.add(*tags_to_add)
|
|
|
|
|
|
def import_album(v):
|
|
a = Album.get_or_create_from_api(mbid=v[0]["id"])[0]
|
|
return a
|
|
|
|
|
|
def link_recordings(instance, cleaned_data, raw_data):
|
|
tracks = [r["target"] for r in raw_data["recording-relation-list"]]
|
|
Track.objects.filter(mbid__in=tracks).update(work=instance)
|
|
|
|
|
|
def import_lyrics(instance, cleaned_data, raw_data):
|
|
try:
|
|
url = [
|
|
url_data
|
|
for url_data in raw_data["url-relation-list"]
|
|
if url_data["type"] == "lyrics"
|
|
][0]["target"]
|
|
except (IndexError, KeyError):
|
|
return
|
|
l, _ = Lyrics.objects.get_or_create(work=instance, url=url)
|
|
|
|
return l
|
|
|
|
|
|
class Work(APIModelMixin):
|
|
language = models.CharField(max_length=20)
|
|
nature = models.CharField(max_length=50)
|
|
title = models.CharField(max_length=255)
|
|
|
|
api = musicbrainz.api.works
|
|
api_includes = ["url-rels", "recording-rels"]
|
|
musicbrainz_model = "work"
|
|
musicbrainz_mapping = {
|
|
"mbid": {"musicbrainz_field_name": "id"},
|
|
"title": {"musicbrainz_field_name": "title"},
|
|
"language": {"musicbrainz_field_name": "language"},
|
|
"nature": {"musicbrainz_field_name": "type", "converter": lambda v: v.lower()},
|
|
}
|
|
import_hooks = [import_lyrics, link_recordings]
|
|
|
|
def fetch_lyrics(self):
|
|
lyric = self.lyrics.first()
|
|
if lyric:
|
|
return lyric
|
|
data = self.api.get(self.mbid, includes=["url-rels"])["work"]
|
|
lyric = import_lyrics(self, {}, data)
|
|
|
|
return lyric
|
|
|
|
|
|
class Lyrics(models.Model):
|
|
uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4)
|
|
work = models.ForeignKey(
|
|
Work, related_name="lyrics", null=True, blank=True, on_delete=models.CASCADE
|
|
)
|
|
url = models.URLField(unique=True)
|
|
content = models.TextField(null=True, blank=True)
|
|
|
|
@property
|
|
def content_rendered(self):
|
|
return markdown.markdown(
|
|
self.content,
|
|
safe_mode=True,
|
|
enable_attributes=False,
|
|
extensions=["markdown.extensions.nl2br"],
|
|
)
|
|
|
|
|
|
class TrackQuerySet(models.QuerySet):
|
|
def for_nested_serialization(self):
|
|
return (
|
|
self.select_related()
|
|
.select_related("album__artist", "artist")
|
|
.prefetch_related("files")
|
|
)
|
|
|
|
|
|
def get_artist(release_list):
|
|
return Artist.get_or_create_from_api(
|
|
mbid=release_list[0]["artist-credits"][0]["artists"]["id"]
|
|
)[0]
|
|
|
|
|
|
class Track(APIModelMixin):
|
|
title = models.CharField(max_length=255)
|
|
artist = models.ForeignKey(Artist, related_name="tracks", on_delete=models.CASCADE)
|
|
position = models.PositiveIntegerField(null=True, blank=True)
|
|
album = models.ForeignKey(
|
|
Album, related_name="tracks", null=True, blank=True, on_delete=models.CASCADE
|
|
)
|
|
work = models.ForeignKey(
|
|
Work, related_name="tracks", null=True, blank=True, on_delete=models.CASCADE
|
|
)
|
|
|
|
musicbrainz_model = "recording"
|
|
api = musicbrainz.api.recordings
|
|
api_includes = ["artist-credits", "releases", "media", "tags", "work-rels"]
|
|
musicbrainz_mapping = {
|
|
"mbid": {"musicbrainz_field_name": "id"},
|
|
"title": {"musicbrainz_field_name": "title"},
|
|
"artist": {
|
|
# we use the artist from the release to avoid #237
|
|
"musicbrainz_field_name": "release-list",
|
|
"converter": get_artist,
|
|
},
|
|
"album": {"musicbrainz_field_name": "release-list", "converter": import_album},
|
|
}
|
|
import_hooks = [import_tags]
|
|
objects = TrackQuerySet.as_manager()
|
|
tags = TaggableManager(blank=True)
|
|
|
|
class Meta:
|
|
ordering = ["album", "position"]
|
|
|
|
def __str__(self):
|
|
return self.title
|
|
|
|
def save(self, **kwargs):
|
|
try:
|
|
self.artist
|
|
except Artist.DoesNotExist:
|
|
self.artist = self.album.artist
|
|
super().save(**kwargs)
|
|
|
|
def get_work(self):
|
|
if self.work:
|
|
return self.work
|
|
data = self.api.get(self.mbid, includes=["work-rels"])
|
|
try:
|
|
work_data = data["recording"]["work-relation-list"][0]["work"]
|
|
except (IndexError, KeyError):
|
|
return
|
|
work, _ = Work.get_or_create_from_api(mbid=work_data["id"])
|
|
return work
|
|
|
|
def get_lyrics_url(self):
|
|
return reverse("api:v1:tracks-lyrics", kwargs={"pk": self.pk})
|
|
|
|
@property
|
|
def full_name(self):
|
|
try:
|
|
return "{} - {} - {}".format(self.artist.name, self.album.title, self.title)
|
|
except AttributeError:
|
|
return "{} - {}".format(self.artist.name, self.title)
|
|
|
|
def get_activity_url(self):
|
|
if self.mbid:
|
|
return "https://musicbrainz.org/recording/{}".format(self.mbid)
|
|
return settings.FUNKWHALE_URL + "/tracks/{}".format(self.pk)
|
|
|
|
@classmethod
|
|
def get_or_create_from_title(cls, title, **kwargs):
|
|
kwargs.update({"title": title})
|
|
return cls.objects.get_or_create(title__iexact=title, defaults=kwargs)
|
|
|
|
@classmethod
|
|
def get_or_create_from_release(cls, release_mbid, mbid):
|
|
release_mbid = str(release_mbid)
|
|
mbid = str(mbid)
|
|
try:
|
|
return cls.objects.get(mbid=mbid), False
|
|
except cls.DoesNotExist:
|
|
pass
|
|
|
|
album = Album.get_or_create_from_api(release_mbid)[0]
|
|
data = musicbrainz.client.api.releases.get(
|
|
str(album.mbid), includes=Album.api_includes
|
|
)
|
|
tracks = [t for m in data["release"]["medium-list"] for t in m["track-list"]]
|
|
track_data = None
|
|
for track in tracks:
|
|
if track["recording"]["id"] == mbid:
|
|
track_data = track
|
|
break
|
|
if not track_data:
|
|
raise ValueError("No track found matching this ID")
|
|
|
|
return cls.objects.update_or_create(
|
|
mbid=mbid,
|
|
defaults={
|
|
"position": int(track["position"]),
|
|
"title": track["recording"]["title"],
|
|
"album": album,
|
|
"artist": album.artist,
|
|
},
|
|
)
|
|
|
|
|
|
class TrackFile(models.Model):
|
|
uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4)
|
|
track = models.ForeignKey(Track, related_name="files", on_delete=models.CASCADE)
|
|
audio_file = models.FileField(upload_to="tracks/%Y/%m/%d", max_length=255)
|
|
source = models.URLField(null=True, blank=True, max_length=500)
|
|
creation_date = models.DateTimeField(default=timezone.now)
|
|
modification_date = models.DateTimeField(auto_now=True)
|
|
accessed_date = models.DateTimeField(null=True, blank=True)
|
|
duration = models.IntegerField(null=True, blank=True)
|
|
size = models.IntegerField(null=True, blank=True)
|
|
bitrate = models.IntegerField(null=True, blank=True)
|
|
acoustid_track_id = models.UUIDField(null=True, blank=True)
|
|
mimetype = models.CharField(null=True, blank=True, max_length=200)
|
|
|
|
library_track = models.OneToOneField(
|
|
"federation.LibraryTrack",
|
|
related_name="local_track_file",
|
|
on_delete=models.CASCADE,
|
|
null=True,
|
|
blank=True,
|
|
)
|
|
|
|
def download_file(self):
|
|
# import the track file, since there is not any
|
|
# we create a tmp dir for the download
|
|
tmp_dir = tempfile.mkdtemp()
|
|
data = downloader.download(self.source, target_directory=tmp_dir)
|
|
self.duration = data.get("duration", None)
|
|
self.audio_file.save(
|
|
os.path.basename(data["audio_file_path"]),
|
|
File(open(data["audio_file_path"], "rb")),
|
|
)
|
|
shutil.rmtree(tmp_dir)
|
|
return self.audio_file
|
|
|
|
def get_federation_url(self):
|
|
return federation_utils.full_url("/federation/music/file/{}".format(self.uuid))
|
|
|
|
@property
|
|
def path(self):
|
|
return reverse("api:v1:trackfiles-serve", kwargs={"pk": self.pk})
|
|
|
|
@property
|
|
def filename(self):
|
|
return "{}.{}".format(self.track.full_name, self.extension)
|
|
|
|
@property
|
|
def extension(self):
|
|
if not self.audio_file:
|
|
return
|
|
return os.path.splitext(self.audio_file.name)[-1].replace(".", "", 1)
|
|
|
|
def get_file_size(self):
|
|
if self.audio_file:
|
|
return self.audio_file.size
|
|
|
|
if self.source.startswith("file://"):
|
|
return os.path.getsize(self.source.replace("file://", "", 1))
|
|
|
|
if self.library_track and self.library_track.audio_file:
|
|
return self.library_track.audio_file.size
|
|
|
|
def get_audio_file(self):
|
|
if self.audio_file:
|
|
return self.audio_file.open()
|
|
if self.source.startswith("file://"):
|
|
return open(self.source.replace("file://", "", 1), "rb")
|
|
if self.library_track and self.library_track.audio_file:
|
|
return self.library_track.audio_file.open()
|
|
|
|
def set_audio_data(self):
|
|
audio_file = self.get_audio_file()
|
|
if audio_file:
|
|
with audio_file as f:
|
|
audio_data = utils.get_audio_file_data(f)
|
|
if not audio_data:
|
|
return
|
|
self.duration = int(audio_data["length"])
|
|
self.bitrate = audio_data["bitrate"]
|
|
self.size = self.get_file_size()
|
|
else:
|
|
lt = self.library_track
|
|
if lt:
|
|
self.duration = lt.get_metadata("length")
|
|
self.size = lt.get_metadata("size")
|
|
self.bitrate = lt.get_metadata("bitrate")
|
|
|
|
def save(self, **kwargs):
|
|
if not self.mimetype and self.audio_file:
|
|
self.mimetype = utils.guess_mimetype(self.audio_file)
|
|
return super().save(**kwargs)
|
|
|
|
def get_metadata(self):
|
|
audio_file = self.get_audio_file()
|
|
if not audio_file:
|
|
return
|
|
return metadata.Metadata(audio_file)
|
|
|
|
|
|
IMPORT_STATUS_CHOICES = (
|
|
("pending", "Pending"),
|
|
("finished", "Finished"),
|
|
("errored", "Errored"),
|
|
("skipped", "Skipped"),
|
|
)
|
|
|
|
|
|
class ImportBatch(models.Model):
|
|
uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4)
|
|
IMPORT_BATCH_SOURCES = [
|
|
("api", "api"),
|
|
("shell", "shell"),
|
|
("federation", "federation"),
|
|
]
|
|
source = models.CharField(
|
|
max_length=30, default="api", choices=IMPORT_BATCH_SOURCES
|
|
)
|
|
creation_date = models.DateTimeField(default=timezone.now)
|
|
submitted_by = models.ForeignKey(
|
|
"users.User",
|
|
related_name="imports",
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
status = models.CharField(
|
|
choices=IMPORT_STATUS_CHOICES, default="pending", max_length=30
|
|
)
|
|
import_request = models.ForeignKey(
|
|
"requests.ImportRequest",
|
|
related_name="import_batches",
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.CASCADE,
|
|
)
|
|
|
|
class Meta:
|
|
ordering = ["-creation_date"]
|
|
|
|
def __str__(self):
|
|
return str(self.pk)
|
|
|
|
def update_status(self):
|
|
old_status = self.status
|
|
self.status = utils.compute_status(self.jobs.all())
|
|
if self.status == old_status:
|
|
return
|
|
self.save(update_fields=["status"])
|
|
if self.status != old_status and self.status == "finished":
|
|
from . import tasks
|
|
|
|
tasks.import_batch_notify_followers.delay(import_batch_id=self.pk)
|
|
|
|
def get_federation_url(self):
|
|
return federation_utils.full_url(
|
|
"/federation/music/import/batch/{}".format(self.uuid)
|
|
)
|
|
|
|
|
|
class ImportJob(models.Model):
|
|
uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4)
|
|
batch = models.ForeignKey(
|
|
ImportBatch, related_name="jobs", on_delete=models.CASCADE
|
|
)
|
|
track_file = models.ForeignKey(
|
|
TrackFile, related_name="jobs", null=True, blank=True, on_delete=models.CASCADE
|
|
)
|
|
source = models.CharField(max_length=500)
|
|
mbid = models.UUIDField(editable=False, null=True, blank=True)
|
|
|
|
status = models.CharField(
|
|
choices=IMPORT_STATUS_CHOICES, default="pending", max_length=30
|
|
)
|
|
audio_file = models.FileField(
|
|
upload_to="imports/%Y/%m/%d", max_length=255, null=True, blank=True
|
|
)
|
|
|
|
library_track = models.ForeignKey(
|
|
"federation.LibraryTrack",
|
|
related_name="import_jobs",
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True,
|
|
)
|
|
|
|
class Meta:
|
|
ordering = ("id",)
|
|
|
|
|
|
@receiver(post_save, sender=ImportJob)
|
|
def update_batch_status(sender, instance, **kwargs):
|
|
instance.batch.update_status()
|
|
|
|
|
|
@receiver(post_save, sender=ImportBatch)
|
|
def update_request_status(sender, instance, created, **kwargs):
|
|
update_fields = kwargs.get("update_fields", []) or []
|
|
if not instance.import_request:
|
|
return
|
|
|
|
if not created and "status" not in update_fields:
|
|
return
|
|
|
|
r_status = instance.import_request.status
|
|
status = instance.status
|
|
|
|
if status == "pending" and r_status == "pending":
|
|
# let's mark the request as accepted since we started an import
|
|
instance.import_request.status = "accepted"
|
|
return instance.import_request.save(update_fields=["status"])
|
|
|
|
if status == "finished" and r_status == "accepted":
|
|
# let's mark the request as imported since the import is over
|
|
instance.import_request.status = "imported"
|
|
return instance.import_request.save(update_fields=["status"])
|