Merge branch '144-import-perf' into 'develop'
Resolve "Make the transaction logic less dumb in file importer" Closes #144 See merge request funkwhale/funkwhale!149
This commit is contained in:
commit
3d6c12437e
|
@ -1,5 +1,6 @@
|
|||
import mutagen
|
||||
from django import forms
|
||||
import arrow
|
||||
import mutagen
|
||||
|
||||
NODEFAULT = object()
|
||||
|
||||
|
@ -50,6 +51,13 @@ def convert_track_number(v):
|
|||
except (ValueError, AttributeError, IndexError):
|
||||
pass
|
||||
|
||||
|
||||
VALIDATION = {
|
||||
'musicbrainz_artistid': forms.UUIDField(),
|
||||
'musicbrainz_albumid': forms.UUIDField(),
|
||||
'musicbrainz_recordingid': forms.UUIDField(),
|
||||
}
|
||||
|
||||
CONF = {
|
||||
'OggVorbis': {
|
||||
'getter': lambda f, k: f[k][0],
|
||||
|
@ -146,4 +154,7 @@ class Metadata(object):
|
|||
converter = field_conf.get('to_application')
|
||||
if converter:
|
||||
v = converter(v)
|
||||
field = VALIDATION.get(key)
|
||||
if field:
|
||||
v = field.to_python(v)
|
||||
return v
|
||||
|
|
|
@ -507,6 +507,8 @@ class ImportBatch(models.Model):
|
|||
def update_status(self):
|
||||
old_status = self.status
|
||||
self.status = utils.compute_status(self.jobs.all())
|
||||
if self.status == old_status:
|
||||
return
|
||||
self.save(update_fields=['status'])
|
||||
if self.status != old_status and self.status == 'finished':
|
||||
from . import tasks
|
||||
|
|
|
@ -53,10 +53,11 @@ def guess_mimetype(f):
|
|||
|
||||
|
||||
def compute_status(jobs):
|
||||
errored = any([job.status == 'errored' for job in jobs])
|
||||
statuses = jobs.order_by().values_list('status', flat=True).distinct()
|
||||
errored = any([status == 'errored' for status in statuses])
|
||||
if errored:
|
||||
return 'errored'
|
||||
pending = any([job.status == 'pending' for job in jobs])
|
||||
pending = any([status == 'pending' for status in statuses])
|
||||
if pending:
|
||||
return 'pending'
|
||||
return 'finished'
|
||||
|
|
|
@ -3,9 +3,8 @@ import os
|
|||
|
||||
from django.core.files import File
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.db import transaction
|
||||
|
||||
from funkwhale_api.common import utils
|
||||
from funkwhale_api.music import models
|
||||
from funkwhale_api.music import tasks
|
||||
from funkwhale_api.users.models import User
|
||||
|
||||
|
@ -62,13 +61,10 @@ class Command(BaseCommand):
|
|||
if options['recursive']:
|
||||
glob_kwargs['recursive'] = True
|
||||
try:
|
||||
matching = glob.glob(options['path'], **glob_kwargs)
|
||||
matching = sorted(glob.glob(options['path'], **glob_kwargs))
|
||||
except TypeError:
|
||||
raise Exception('You need Python 3.5 to use the --recursive flag')
|
||||
|
||||
self.stdout.write('This will import {} files matching this pattern: {}'.format(
|
||||
len(matching), options['path']))
|
||||
|
||||
if not matching:
|
||||
raise CommandError('No file matching pattern, aborting')
|
||||
|
||||
|
@ -86,6 +82,20 @@ class Command(BaseCommand):
|
|||
except AssertionError:
|
||||
raise CommandError(
|
||||
'No superuser available, please provide a --username')
|
||||
|
||||
filtered = self.filter_matching(matching, options)
|
||||
self.stdout.write('Import summary:')
|
||||
self.stdout.write('- {} files found matching this pattern: {}'.format(
|
||||
len(matching), options['path']))
|
||||
self.stdout.write('- {} files already found in database'.format(
|
||||
len(filtered['skipped'])))
|
||||
self.stdout.write('- {} new files'.format(
|
||||
len(filtered['new'])))
|
||||
|
||||
if len(filtered['new']) == 0:
|
||||
self.stdout.write('Nothing new to import, exiting')
|
||||
return
|
||||
|
||||
if options['interactive']:
|
||||
message = (
|
||||
'Are you sure you want to do this?\n\n'
|
||||
|
@ -94,27 +104,52 @@ class Command(BaseCommand):
|
|||
if input(''.join(message)) != 'yes':
|
||||
raise CommandError("Import cancelled.")
|
||||
|
||||
batch = self.do_import(matching, user=user, options=options)
|
||||
batch, errors = self.do_import(
|
||||
filtered['new'], user=user, options=options)
|
||||
message = 'Successfully imported {} tracks'
|
||||
if options['async']:
|
||||
message = 'Successfully launched import for {} tracks'
|
||||
self.stdout.write(message.format(len(matching)))
|
||||
|
||||
self.stdout.write(message.format(len(filtered['new'])))
|
||||
if len(errors) > 0:
|
||||
self.stderr.write(
|
||||
'{} tracks could not be imported:'.format(len(errors)))
|
||||
|
||||
for path, error in errors:
|
||||
self.stderr.write('- {}: {}'.format(path, error))
|
||||
self.stdout.write(
|
||||
"For details, please refer to import batch #{}".format(batch.pk))
|
||||
|
||||
@transaction.atomic
|
||||
def do_import(self, matching, user, options):
|
||||
message = 'Importing {}...'
|
||||
def filter_matching(self, matching, options):
|
||||
sources = ['file://{}'.format(p) for p in matching]
|
||||
# we skip reimport for path that are already found
|
||||
# as a TrackFile.source
|
||||
existing = models.TrackFile.objects.filter(source__in=sources)
|
||||
existing = existing.values_list('source', flat=True)
|
||||
existing = set([p.replace('file://', '', 1) for p in existing])
|
||||
skipped = set(matching) & existing
|
||||
result = {
|
||||
'initial': matching,
|
||||
'skipped': list(sorted(skipped)),
|
||||
'new': list(sorted(set(matching) - skipped)),
|
||||
}
|
||||
return result
|
||||
|
||||
def do_import(self, paths, user, options):
|
||||
message = '{i}/{total} Importing {path}...'
|
||||
if options['async']:
|
||||
message = 'Launching import for {}...'
|
||||
message = '{i}/{total} Launching import for {path}...'
|
||||
|
||||
# we create an import batch binded to the user
|
||||
batch = user.imports.create(source='shell')
|
||||
async = options['async']
|
||||
import_handler = tasks.import_job_run.delay if async else tasks.import_job_run
|
||||
for path in matching:
|
||||
batch = user.imports.create(source='shell')
|
||||
total = len(paths)
|
||||
errors = []
|
||||
for i, path in list(enumerate(paths)):
|
||||
try:
|
||||
self.stdout.write(message.format(path))
|
||||
self.stdout.write(
|
||||
message.format(path=path, i=i+1, total=len(paths)))
|
||||
self.import_file(path, batch, import_handler, options)
|
||||
except Exception as e:
|
||||
if options['exit_on_failure']:
|
||||
|
@ -122,7 +157,8 @@ class Command(BaseCommand):
|
|||
m = 'Error while importing {}: {} {}'.format(
|
||||
path, e.__class__.__name__, e)
|
||||
self.stderr.write(m)
|
||||
return batch
|
||||
errors.append((path, '{} {}'.format(e.__class__.__name__, e)))
|
||||
return batch, errors
|
||||
|
||||
def import_file(self, path, batch, import_handler, options):
|
||||
job = batch.jobs.create(
|
||||
|
@ -133,7 +169,6 @@ class Command(BaseCommand):
|
|||
job.audio_file.save(name, File(f))
|
||||
|
||||
job.save()
|
||||
utils.on_commit(
|
||||
import_handler,
|
||||
import_handler(
|
||||
import_job_id=job.pk,
|
||||
use_acoustid=not options['no_acoustid'])
|
||||
|
|
|
@ -2,12 +2,14 @@ import acoustid
|
|||
import os
|
||||
import datetime
|
||||
from django.core.files import File
|
||||
from django.db import transaction
|
||||
|
||||
from funkwhale_api.taskapp import celery
|
||||
from funkwhale_api.providers.acoustid import get_acoustid_client
|
||||
from funkwhale_api.music import models, metadata
|
||||
|
||||
|
||||
@transaction.atomic
|
||||
def import_track_data_from_path(path):
|
||||
data = metadata.Metadata(path)
|
||||
artist = models.Artist.objects.get_or_create(
|
||||
|
@ -45,6 +47,7 @@ def import_track_data_from_path(path):
|
|||
def import_metadata_with_musicbrainz(path):
|
||||
pass
|
||||
|
||||
|
||||
@celery.app.task(name='audiofile.from_path')
|
||||
def from_path(path):
|
||||
acoustid_track_id = None
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import datetime
|
||||
import os
|
||||
import pytest
|
||||
import uuid
|
||||
|
||||
from funkwhale_api.music import metadata
|
||||
|
||||
|
@ -13,9 +14,9 @@ DATA_DIR = os.path.dirname(os.path.abspath(__file__))
|
|||
('album', 'Peer Gynt Suite no. 1, op. 46'),
|
||||
('date', datetime.date(2012, 8, 15)),
|
||||
('track_number', 1),
|
||||
('musicbrainz_albumid', 'a766da8b-8336-47aa-a3ee-371cc41ccc75'),
|
||||
('musicbrainz_recordingid', 'bd21ac48-46d8-4e78-925f-d9cc2a294656'),
|
||||
('musicbrainz_artistid', '013c8e5b-d72a-4cd3-8dee-6c64d6125823'),
|
||||
('musicbrainz_albumid', uuid.UUID('a766da8b-8336-47aa-a3ee-371cc41ccc75')),
|
||||
('musicbrainz_recordingid', uuid.UUID('bd21ac48-46d8-4e78-925f-d9cc2a294656')),
|
||||
('musicbrainz_artistid', uuid.UUID('013c8e5b-d72a-4cd3-8dee-6c64d6125823')),
|
||||
])
|
||||
def test_can_get_metadata_from_ogg_file(field, value):
|
||||
path = os.path.join(DATA_DIR, 'test.ogg')
|
||||
|
@ -30,9 +31,9 @@ def test_can_get_metadata_from_ogg_file(field, value):
|
|||
('album', 'You Can\'t Stop Da Funk'),
|
||||
('date', datetime.date(2006, 2, 7)),
|
||||
('track_number', 1),
|
||||
('musicbrainz_albumid', 'ce40cdb1-a562-4fd8-a269-9269f98d4124'),
|
||||
('musicbrainz_recordingid', 'f269d497-1cc0-4ae4-a0c4-157ec7d73fcb'),
|
||||
('musicbrainz_artistid', '9c6bddde-6228-4d9f-ad0d-03f6fcb19e13'),
|
||||
('musicbrainz_albumid', uuid.UUID('ce40cdb1-a562-4fd8-a269-9269f98d4124')),
|
||||
('musicbrainz_recordingid', uuid.UUID('f269d497-1cc0-4ae4-a0c4-157ec7d73fcb')),
|
||||
('musicbrainz_artistid', uuid.UUID('9c6bddde-6228-4d9f-ad0d-03f6fcb19e13')),
|
||||
])
|
||||
def test_can_get_metadata_from_id3_mp3_file(field, value):
|
||||
path = os.path.join(DATA_DIR, 'test.mp3')
|
||||
|
|
|
@ -2,6 +2,8 @@ import pytest
|
|||
import acoustid
|
||||
import datetime
|
||||
import os
|
||||
import uuid
|
||||
|
||||
from django.core.management import call_command
|
||||
from django.core.management.base import CommandError
|
||||
|
||||
|
@ -15,7 +17,8 @@ DATA_DIR = os.path.join(
|
|||
|
||||
|
||||
def test_can_create_track_from_file_metadata(db, mocker):
|
||||
mocker.patch('acoustid.match', side_effect=acoustid.WebServiceError('test'))
|
||||
mocker.patch(
|
||||
'acoustid.match', side_effect=acoustid.WebServiceError('test'))
|
||||
metadata = {
|
||||
'artist': ['Test artist'],
|
||||
'album': ['Test album'],
|
||||
|
@ -35,33 +38,35 @@ def test_can_create_track_from_file_metadata(db, mocker):
|
|||
os.path.join(DATA_DIR, 'dummy_file.ogg'))
|
||||
|
||||
assert track.title == metadata['title'][0]
|
||||
assert track.mbid == metadata['musicbrainz_trackid'][0]
|
||||
assert track.mbid == uuid.UUID(metadata['musicbrainz_trackid'][0])
|
||||
assert track.position == 4
|
||||
assert track.album.title == metadata['album'][0]
|
||||
assert track.album.mbid == metadata['musicbrainz_albumid'][0]
|
||||
assert track.album.mbid == uuid.UUID(metadata['musicbrainz_albumid'][0])
|
||||
assert track.album.release_date == datetime.date(2012, 8, 15)
|
||||
assert track.artist.name == metadata['artist'][0]
|
||||
assert track.artist.mbid == metadata['musicbrainz_artistid'][0]
|
||||
assert track.artist.mbid == uuid.UUID(metadata['musicbrainz_artistid'][0])
|
||||
|
||||
|
||||
def test_management_command_requires_a_valid_username(factories, mocker):
|
||||
path = os.path.join(DATA_DIR, 'dummy_file.ogg')
|
||||
user = factories['users.User'](username='me')
|
||||
mocker.patch('funkwhale_api.providers.audiofile.management.commands.import_files.Command.do_import') # NOQA
|
||||
mocker.patch(
|
||||
'funkwhale_api.providers.audiofile.management.commands.import_files.Command.do_import', # noqa
|
||||
return_value=(mocker.MagicMock(), []))
|
||||
with pytest.raises(CommandError):
|
||||
call_command('import_files', path, username='not_me', interactive=False)
|
||||
call_command('import_files', path, username='me', interactive=False)
|
||||
|
||||
|
||||
def test_import_files_creates_a_batch_and_job(factories, mocker):
|
||||
m = mocker.patch('funkwhale_api.common.utils.on_commit')
|
||||
m = mocker.patch('funkwhale_api.music.tasks.import_job_run')
|
||||
user = factories['users.User'](username='me')
|
||||
path = os.path.join(DATA_DIR, 'dummy_file.ogg')
|
||||
call_command(
|
||||
'import_files',
|
||||
path,
|
||||
username='me',
|
||||
async=True,
|
||||
async=False,
|
||||
interactive=False)
|
||||
|
||||
batch = user.imports.latest('id')
|
||||
|
@ -76,45 +81,58 @@ def test_import_files_creates_a_batch_and_job(factories, mocker):
|
|||
|
||||
assert job.source == 'file://' + path
|
||||
m.assert_called_once_with(
|
||||
music_tasks.import_job_run.delay,
|
||||
import_job_id=job.pk,
|
||||
use_acoustid=True)
|
||||
|
||||
|
||||
def test_import_files_skip_acoustid(factories, mocker):
|
||||
m = mocker.patch('funkwhale_api.common.utils.on_commit')
|
||||
m = mocker.patch('funkwhale_api.music.tasks.import_job_run')
|
||||
user = factories['users.User'](username='me')
|
||||
path = os.path.join(DATA_DIR, 'dummy_file.ogg')
|
||||
call_command(
|
||||
'import_files',
|
||||
path,
|
||||
username='me',
|
||||
async=True,
|
||||
async=False,
|
||||
no_acoustid=True,
|
||||
interactive=False)
|
||||
batch = user.imports.latest('id')
|
||||
job = batch.jobs.first()
|
||||
m.assert_called_once_with(
|
||||
music_tasks.import_job_run.delay,
|
||||
import_job_id=job.pk,
|
||||
use_acoustid=False)
|
||||
|
||||
|
||||
def test_import_files_skip_if_path_already_imported(factories, mocker):
|
||||
user = factories['users.User'](username='me')
|
||||
path = os.path.join(DATA_DIR, 'dummy_file.ogg')
|
||||
existing = factories['music.TrackFile'](
|
||||
source='file://{}'.format(path))
|
||||
|
||||
call_command(
|
||||
'import_files',
|
||||
path,
|
||||
username='me',
|
||||
async=False,
|
||||
no_acoustid=True,
|
||||
interactive=False)
|
||||
assert user.imports.count() == 0
|
||||
|
||||
|
||||
def test_import_files_works_with_utf8_file_name(factories, mocker):
|
||||
m = mocker.patch('funkwhale_api.common.utils.on_commit')
|
||||
m = mocker.patch('funkwhale_api.music.tasks.import_job_run')
|
||||
user = factories['users.User'](username='me')
|
||||
path = os.path.join(DATA_DIR, 'utf8-éà◌.ogg')
|
||||
call_command(
|
||||
'import_files',
|
||||
path,
|
||||
username='me',
|
||||
async=True,
|
||||
async=False,
|
||||
no_acoustid=True,
|
||||
interactive=False)
|
||||
batch = user.imports.latest('id')
|
||||
job = batch.jobs.first()
|
||||
m.assert_called_once_with(
|
||||
music_tasks.import_job_run.delay,
|
||||
import_job_id=job.pk,
|
||||
use_acoustid=False)
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Better file import performance and error handling (#144)
|
4
dev.yml
4
dev.yml
|
@ -65,7 +65,7 @@ services:
|
|||
- "CACHE_URL=redis://redis:6379/0"
|
||||
volumes:
|
||||
- ./api:/app
|
||||
- ./data/music:/music
|
||||
- "${MUSIC_DIRECTORY-./data/music}:/music"
|
||||
networks:
|
||||
- internal
|
||||
api:
|
||||
|
@ -78,7 +78,7 @@ services:
|
|||
command: python /app/manage.py runserver 0.0.0.0:12081
|
||||
volumes:
|
||||
- ./api:/app
|
||||
- ./data/music:/music
|
||||
- "${MUSIC_DIRECTORY-./data/music}:/music"
|
||||
environment:
|
||||
- "FUNKWHALE_HOSTNAME=${FUNKWHALE_HOSTNAME-localhost}"
|
||||
- "FUNKWHALE_HOSTNAME_SUFFIX=funkwhale.test"
|
||||
|
|
Loading…
Reference in New Issue