CLI for importing files with user libraries

This commit is contained in:
Eliot Berriot 2018-09-22 15:47:17 +00:00
parent 616f459eb7
commit 3e49b2057a
9 changed files with 187 additions and 139 deletions

View File

@ -341,7 +341,7 @@ class UploadActionSerializer(common_serializers.ActionSerializer):
pks = list(qs.values_list("id", flat=True)) pks = list(qs.values_list("id", flat=True))
qs.update(import_status="pending") qs.update(import_status="pending")
for pk in pks: for pk in pks:
common_utils.on_commit(tasks.import_upload.delay, upload_id=pk) common_utils.on_commit(tasks.process_upload.delay, upload_id=pk)
class TagSerializer(serializers.ModelSerializer): class TagSerializer(serializers.ModelSerializer):

View File

@ -225,7 +225,7 @@ def scan_library_page(library_scan, page_url):
if upload.import_status == "pending" and not upload.track: if upload.import_status == "pending" and not upload.track:
# this track is not matched to any musicbrainz or other musical # this track is not matched to any musicbrainz or other musical
# metadata # metadata
import_upload.delay(upload_id=upload.pk) process_upload.delay(upload_id=upload.pk)
uploads.append(upload) uploads.append(upload)
library_scan.processed_files = F("processed_files") + len(uploads) library_scan.processed_files = F("processed_files") + len(uploads)
@ -249,7 +249,10 @@ def getter(data, *keys):
return return
v = data v = data
for k in keys: for k in keys:
v = v.get(k) try:
v = v[k]
except KeyError:
return
return v return v
@ -274,14 +277,14 @@ def fail_import(upload, error_code):
) )
@celery.app.task(name="music.import_upload") @celery.app.task(name="music.process_upload")
@celery.require_instance( @celery.require_instance(
models.Upload.objects.filter(import_status="pending").select_related( models.Upload.objects.filter(import_status="pending").select_related(
"library__actor__user" "library__actor__user"
), ),
"upload", "upload",
) )
def import_upload(upload): def process_upload(upload):
data = upload.import_metadata or {} data = upload.import_metadata or {}
old_status = upload.import_status old_status = upload.import_status
try: try:

View File

@ -350,7 +350,7 @@ class UploadViewSet(
def perform_create(self, serializer): def perform_create(self, serializer):
upload = serializer.save() upload = serializer.save()
common_utils.on_commit(tasks.import_upload.delay, upload_id=upload.pk) common_utils.on_commit(tasks.process_upload.delay, upload_id=upload.pk)
@transaction.atomic @transaction.atomic
def perform_destroy(self, instance): def perform_destroy(self, instance):

View File

@ -1,18 +1,29 @@
import glob import glob
import os import os
import urllib.parse
from django.conf import settings from django.conf import settings
from django.core.files import File from django.core.files import File
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from funkwhale_api.music import models, tasks from funkwhale_api.music import models, tasks
from funkwhale_api.users.models import User
class Command(BaseCommand): class Command(BaseCommand):
help = "Import audio files mathinc given glob pattern" help = "Import audio files mathinc given glob pattern"
def add_arguments(self, parser): def add_arguments(self, parser):
parser.add_argument(
"library_id",
type=str,
help=(
"A local library identifier where the files should be imported. "
"You can use the full uuid such as e29c5be9-6da3-4d92-b40b-4970edd3ee4b "
"or only a small portion of it, starting from the beginning, such as "
"e29c5be9"
),
)
parser.add_argument("path", nargs="+", type=str) parser.add_argument("path", nargs="+", type=str)
parser.add_argument( parser.add_argument(
"--recursive", "--recursive",
@ -29,7 +40,7 @@ class Command(BaseCommand):
parser.add_argument( parser.add_argument(
"--async", "--async",
action="store_true", action="store_true",
dest="async", dest="async_",
default=False, default=False,
help="Will launch celery tasks for each file to import instead of doing it synchronously and block the CLI", help="Will launch celery tasks for each file to import instead of doing it synchronously and block the CLI",
) )
@ -66,6 +77,17 @@ class Command(BaseCommand):
"with their newest version." "with their newest version."
), ),
) )
parser.add_argument(
"--reference",
action="store",
dest="reference",
default=None,
help=(
"A custom reference for the import. Leave this empty to have a random "
"reference being generated for you."
),
)
parser.add_argument( parser.add_argument(
"--noinput", "--noinput",
"--no-input", "--no-input",
@ -77,14 +99,22 @@ class Command(BaseCommand):
def handle(self, *args, **options): def handle(self, *args, **options):
glob_kwargs = {} glob_kwargs = {}
matching = [] matching = []
try:
library = models.Library.objects.select_related("actor__user").get(
uuid__startswith=options["library_id"]
)
except models.Library.DoesNotExist:
raise CommandError("Invalid library id")
if not library.actor.is_local:
raise CommandError("Library {} is not a local library".format(library.uuid))
if options["recursive"]: if options["recursive"]:
glob_kwargs["recursive"] = True glob_kwargs["recursive"] = True
try: for import_path in options["path"]:
for import_path in options["path"]: matching += glob.glob(import_path, **glob_kwargs)
matching += glob.glob(import_path, **glob_kwargs) raw_matching = sorted(list(set(matching)))
raw_matching = sorted(list(set(matching)))
except TypeError:
raise Exception("You need Python 3.5 to use the --recursive flag")
matching = [] matching = []
for m in raw_matching: for m in raw_matching:
@ -128,28 +158,12 @@ class Command(BaseCommand):
if not matching: if not matching:
raise CommandError("No file matching pattern, aborting") raise CommandError("No file matching pattern, aborting")
user = None
if options["username"]:
try:
user = User.objects.get(username=options["username"])
except User.DoesNotExist:
raise CommandError("Invalid username")
else:
# we bind the import to the first registered superuser
try:
user = User.objects.filter(is_superuser=True).order_by("pk").first()
assert user is not None
except AssertionError:
raise CommandError(
"No superuser available, please provide a --username"
)
if options["replace"]: if options["replace"]:
filtered = {"initial": matching, "skipped": [], "new": matching} filtered = {"initial": matching, "skipped": [], "new": matching}
message = "- {} files to be replaced" message = "- {} files to be replaced"
import_paths = matching import_paths = matching
else: else:
filtered = self.filter_matching(matching) filtered = self.filter_matching(matching, library)
message = "- {} files already found in database" message = "- {} files already found in database"
import_paths = filtered["new"] import_paths = filtered["new"]
@ -179,10 +193,26 @@ class Command(BaseCommand):
) )
if input("".join(message)) != "yes": if input("".join(message)) != "yes":
raise CommandError("Import cancelled.") raise CommandError("Import cancelled.")
reference = options["reference"] or "cli-{}".format(timezone.now().isoformat())
batch, errors = self.do_import(import_paths, user=user, options=options) import_url = "{}://{}/content/libraries/{}/upload?{}"
import_url = import_url.format(
settings.FUNKWHALE_PROTOCOL,
settings.FUNKWHALE_HOSTNAME,
str(library.uuid),
urllib.parse.urlencode([("import", reference)]),
)
self.stdout.write(
"For details, please refer to import refrence '{}' or URL {}".format(
reference, import_url
)
)
errors = self.do_import(
import_paths, library=library, reference=reference, options=options
)
message = "Successfully imported {} tracks" message = "Successfully imported {} tracks"
if options["async"]: if options["async_"]:
message = "Successfully launched import for {} tracks" message = "Successfully launched import for {} tracks"
self.stdout.write(message.format(len(import_paths))) self.stdout.write(message.format(len(import_paths)))
@ -191,15 +221,18 @@ class Command(BaseCommand):
for path, error in errors: for path, error in errors:
self.stderr.write("- {}: {}".format(path, error)) self.stderr.write("- {}: {}".format(path, error))
self.stdout.write( self.stdout.write(
"For details, please refer to import batch #{}".format(batch.pk) "For details, please refer to import refrence '{}' or URL {}".format(
reference, import_url
)
) )
def filter_matching(self, matching): def filter_matching(self, matching, library):
sources = ["file://{}".format(p) for p in matching] sources = ["file://{}".format(p) for p in matching]
# we skip reimport for path that are already found # we skip reimport for path that are already found
# as a Upload.source # as a Upload.source
existing = models.Upload.objects.filter(source__in=sources) existing = library.uploads.filter(source__in=sources, import_status="finished")
existing = existing.values_list("source", flat=True) existing = existing.values_list("source", flat=True)
existing = set([p.replace("file://", "", 1) for p in existing]) existing = set([p.replace("file://", "", 1) for p in existing])
skipped = set(matching) & existing skipped = set(matching) & existing
@ -210,20 +243,25 @@ class Command(BaseCommand):
} }
return result return result
def do_import(self, paths, user, options): def do_import(self, paths, library, reference, options):
message = "{i}/{total} Importing {path}..." message = "{i}/{total} Importing {path}..."
if options["async"]: if options["async_"]:
message = "{i}/{total} Launching import for {path}..." message = "{i}/{total} Launching import for {path}..."
# we create an import batch binded to the user # we create an upload binded to the library
async_ = options["async"] async_ = options["async_"]
import_handler = tasks.import_job_run.delay if async_ else tasks.import_job_run
batch = user.imports.create(source="shell")
errors = [] errors = []
for i, path in list(enumerate(paths)): for i, path in list(enumerate(paths)):
try: try:
self.stdout.write(message.format(path=path, i=i + 1, total=len(paths))) self.stdout.write(message.format(path=path, i=i + 1, total=len(paths)))
self.import_file(path, batch, import_handler, options) self.create_upload(
path,
reference,
library,
async_,
options["replace"],
options["in_place"],
)
except Exception as e: except Exception as e:
if options["exit_on_failure"]: if options["exit_on_failure"]:
raise raise
@ -232,16 +270,18 @@ class Command(BaseCommand):
) )
self.stderr.write(m) self.stderr.write(m)
errors.append((path, "{} {}".format(e.__class__.__name__, e))) errors.append((path, "{} {}".format(e.__class__.__name__, e)))
return batch, errors return errors
def import_file(self, path, batch, import_handler, options): def create_upload(self, path, reference, library, async_, replace, in_place):
job = batch.jobs.create( import_handler = tasks.process_upload.delay if async_ else tasks.process_upload
source="file://" + path, replace_if_duplicate=options["replace"] upload = models.Upload(library=library, import_reference=reference)
) upload.source = "file://" + path
if not options["in_place"]: upload.import_metadata = {"replace": replace}
if not in_place:
name = os.path.basename(path) name = os.path.basename(path)
with open(path, "rb") as f: with open(path, "rb") as f:
job.audio_file.save(name, File(f)) upload.audio_file.save(name, File(f), save=False)
job.save() upload.save()
import_handler(import_job_id=job.pk, use_acoustid=False)
import_handler(upload_id=upload.pk)

View File

@ -255,7 +255,7 @@ def test_manage_upload_action_relaunch_import(factories, mocker):
for obj in to_relaunch: for obj in to_relaunch:
obj.refresh_from_db() obj.refresh_from_db()
assert obj.import_status == "pending" assert obj.import_status == "pending"
m.assert_any_call(tasks.import_upload.delay, upload_id=obj.pk) m.assert_any_call(tasks.process_upload.delay, upload_id=obj.pk)
finished.refresh_from_db() finished.refresh_from_db()
assert finished.import_status == "finished" assert finished.import_status == "finished"

View File

@ -97,7 +97,7 @@ def test_upload_import_mbid(now, factories, temp_signal, mocker):
) )
with temp_signal(signals.upload_import_status_updated) as handler: with temp_signal(signals.upload_import_status_updated) as handler:
tasks.import_upload(upload_id=upload.pk) tasks.process_upload(upload_id=upload.pk)
upload.refresh_from_db() upload.refresh_from_db()
@ -126,7 +126,7 @@ def test_upload_import_get_audio_data(factories, mocker):
track=None, import_metadata={"track": {"mbid": track.mbid}} track=None, import_metadata={"track": {"mbid": track.mbid}}
) )
tasks.import_upload(upload_id=upload.pk) tasks.process_upload(upload_id=upload.pk)
upload.refresh_from_db() upload.refresh_from_db()
assert upload.size == 23 assert upload.size == 23
@ -150,7 +150,7 @@ def test_upload_import_skip_existing_track_in_own_library(factories, temp_signal
import_metadata={"track": {"mbid": track.mbid}}, import_metadata={"track": {"mbid": track.mbid}},
) )
with temp_signal(signals.upload_import_status_updated) as handler: with temp_signal(signals.upload_import_status_updated) as handler:
tasks.import_upload(upload_id=duplicate.pk) tasks.process_upload(upload_id=duplicate.pk)
duplicate.refresh_from_db() duplicate.refresh_from_db()
@ -175,7 +175,7 @@ def test_upload_import_track_uuid(now, factories):
track=None, import_metadata={"track": {"uuid": track.uuid}} track=None, import_metadata={"track": {"uuid": track.uuid}}
) )
tasks.import_upload(upload_id=upload.pk) tasks.process_upload(upload_id=upload.pk)
upload.refresh_from_db() upload.refresh_from_db()
@ -189,7 +189,7 @@ def test_upload_import_error(factories, now, temp_signal):
import_metadata={"track": {"uuid": uuid.uuid4()}} import_metadata={"track": {"uuid": uuid.uuid4()}}
) )
with temp_signal(signals.upload_import_status_updated) as handler: with temp_signal(signals.upload_import_status_updated) as handler:
tasks.import_upload(upload_id=upload.pk) tasks.process_upload(upload_id=upload.pk)
upload.refresh_from_db() upload.refresh_from_db()
assert upload.import_status == "errored" assert upload.import_status == "errored"
@ -211,7 +211,7 @@ def test_upload_import_updates_cover_if_no_cover(factories, mocker, now):
upload = factories["music.Upload"]( upload = factories["music.Upload"](
track=None, import_metadata={"track": {"uuid": track.uuid}} track=None, import_metadata={"track": {"uuid": track.uuid}}
) )
tasks.import_upload(upload_id=upload.pk) tasks.process_upload(upload_id=upload.pk)
mocked_update.assert_called_once_with(album, upload) mocked_update.assert_called_once_with(album, upload)

View File

@ -407,7 +407,7 @@ def test_user_can_create_upload(logged_in_api_client, factories, mocker, audio_f
assert upload.source == "upload://test" assert upload.source == "upload://test"
assert upload.import_reference == "test" assert upload.import_reference == "test"
assert upload.track is None assert upload.track is None
m.assert_called_once_with(tasks.import_upload.delay, upload_id=upload.pk) m.assert_called_once_with(tasks.process_upload.delay, upload_id=upload.pk)
def test_user_can_list_own_library_follows(factories, logged_in_api_client): def test_user_can_list_own_library_follows(factories, logged_in_api_client):

View File

@ -4,121 +4,124 @@ import pytest
from django.core.management import call_command from django.core.management import call_command
from django.core.management.base import CommandError from django.core.management.base import CommandError
from funkwhale_api.music.models import ImportJob
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "uploads") DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files")
@pytest.mark.skip("XXX : wip") def test_management_command_requires_a_valid_library_id(factories):
def test_management_command_requires_a_valid_username(factories, mocker):
path = os.path.join(DATA_DIR, "dummy_file.ogg") path = os.path.join(DATA_DIR, "dummy_file.ogg")
factories["users.User"](username="me")
mocker.patch( with pytest.raises(CommandError) as e:
"funkwhale_api.providers.audiofile.management.commands.import_files.Command.do_import", # noqa call_command("import_files", "wrong_id", path, interactive=False)
return_value=(mocker.MagicMock(), []), assert "Invalid library id" in str(e)
)
with pytest.raises(CommandError):
call_command("import_files", path, username="not_me", interactive=False)
call_command("import_files", path, username="me", interactive=False)
def test_in_place_import_only_from_music_dir(factories, settings): def test_in_place_import_only_from_music_dir(factories, settings):
factories["users.User"](username="me") library = factories["music.Library"](actor__local=True)
settings.MUSIC_DIRECTORY_PATH = "/nope" settings.MUSIC_DIRECTORY_PATH = "/nope"
path = os.path.join(DATA_DIR, "dummy_file.ogg") path = os.path.join(DATA_DIR, "dummy_file.ogg")
with pytest.raises(CommandError): with pytest.raises(CommandError) as e:
call_command( call_command(
"import_files", path, in_place=True, username="me", interactive=False "import_files", str(library.uuid), path, in_place=True, interactive=False
) )
assert "Importing in-place only works if importing" in str(e)
@pytest.mark.skip("XXX : wip")
def test_import_with_multiple_argument(factories, mocker): def test_import_with_multiple_argument(factories, mocker):
factories["users.User"](username="me") library = factories["music.Library"](actor__local=True)
path1 = os.path.join(DATA_DIR, "dummy_file.ogg") path1 = os.path.join(DATA_DIR, "dummy_file.ogg")
path2 = os.path.join(DATA_DIR, "utf8-éà◌.ogg") path2 = os.path.join(DATA_DIR, "utf8-éà◌.ogg")
mocked_filter = mocker.patch( mocked_filter = mocker.patch(
"funkwhale_api.providers.audiofile.management.commands.import_files.Command.filter_matching", "funkwhale_api.providers.audiofile.management.commands.import_files.Command.filter_matching",
return_value=({"new": [], "skipped": []}), return_value=({"new": [], "skipped": []}),
) )
call_command("import_files", path1, path2, username="me", interactive=False) call_command("import_files", str(library.uuid), path1, path2, interactive=False)
mocked_filter.assert_called_once_with([path1, path2]) mocked_filter.assert_called_once_with([path1, path2], library)
@pytest.mark.parametrize(
"path",
[os.path.join(DATA_DIR, "dummy_file.ogg"), os.path.join(DATA_DIR, "utf8-éà◌.ogg")],
)
def test_import_files_stores_proper_data(factories, mocker, now, path):
mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
library = factories["music.Library"](actor__local=True)
call_command(
"import_files", str(library.uuid), path, async_=False, interactive=False
)
upload = library.uploads.last()
assert upload.import_reference == "cli-{}".format(now.isoformat())
assert upload.import_status == "pending"
assert upload.source == "file://{}".format(path)
mocked_process.assert_called_once_with(upload_id=upload.pk)
@pytest.mark.skip("Refactoring in progress")
def test_import_with_replace_flag(factories, mocker): def test_import_with_replace_flag(factories, mocker):
factories["users.User"](username="me") library = factories["music.Library"](actor__local=True)
path = os.path.join(DATA_DIR, "dummy_file.ogg") path = os.path.join(DATA_DIR, "dummy_file.ogg")
mocked_job_run = mocker.patch("funkwhale_api.music.tasks.import_job_run") mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
call_command("import_files", path, username="me", replace=True, interactive=False) call_command(
created_job = ImportJob.objects.latest("id") "import_files", str(library.uuid), path, replace=True, interactive=False
)
upload = library.uploads.last()
assert created_job.replace_if_duplicate is True assert upload.import_metadata["replace"] is True
mocked_job_run.assert_called_once_with(
import_job_id=created_job.id, use_acoustid=False mocked_process.assert_called_once_with(upload_id=upload.pk)
def test_import_with_custom_reference(factories, mocker):
library = factories["music.Library"](actor__local=True)
path = os.path.join(DATA_DIR, "dummy_file.ogg")
mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
call_command(
"import_files",
str(library.uuid),
path,
reference="test",
replace=True,
interactive=False,
)
upload = library.uploads.last()
assert upload.import_reference == "test"
mocked_process.assert_called_once_with(upload_id=upload.pk)
def test_import_files_skip_if_path_already_imported(factories, mocker):
library = factories["music.Library"](actor__local=True)
path = os.path.join(DATA_DIR, "dummy_file.ogg")
# existing one with same source
factories["music.Upload"](
library=library, import_status="finished", source="file://{}".format(path)
) )
call_command(
@pytest.mark.skip("Refactoring in progress") "import_files", str(library.uuid), path, async=False, interactive=False
def test_import_files_creates_a_batch_and_job(factories, mocker): )
m = mocker.patch("funkwhale_api.music.tasks.import_job_run") assert library.uploads.count() == 1
user = factories["users.User"](username="me")
path = os.path.join(DATA_DIR, "dummy_file.ogg")
call_command("import_files", path, username="me", async=False, interactive=False)
batch = user.imports.latest("id")
assert batch.source == "shell"
assert batch.jobs.count() == 1
job = batch.jobs.first()
assert job.status == "pending"
with open(path, "rb") as f:
assert job.audio_file.read() == f.read()
assert job.source == "file://" + path
m.assert_called_once_with(import_job_id=job.pk, use_acoustid=False)
@pytest.mark.skip("XXX : wip")
def test_import_files_skip_if_path_already_imported(factories, mocker):
user = factories["users.User"](username="me")
path = os.path.join(DATA_DIR, "dummy_file.ogg")
factories["music.Upload"](source="file://{}".format(path))
call_command("import_files", path, username="me", async=False, interactive=False)
assert user.imports.count() == 0
@pytest.mark.skip("Refactoring in progress")
def test_import_files_works_with_utf8_file_name(factories, mocker):
m = mocker.patch("funkwhale_api.music.tasks.import_job_run")
user = factories["users.User"](username="me")
path = os.path.join(DATA_DIR, "utf8-éà◌.ogg")
call_command("import_files", path, username="me", async=False, interactive=False)
batch = user.imports.latest("id")
job = batch.jobs.first()
m.assert_called_once_with(import_job_id=job.pk, use_acoustid=False)
@pytest.mark.skip("Refactoring in progress")
def test_import_files_in_place(factories, mocker, settings): def test_import_files_in_place(factories, mocker, settings):
settings.MUSIC_DIRECTORY_PATH = DATA_DIR settings.MUSIC_DIRECTORY_PATH = DATA_DIR
m = mocker.patch("funkwhale_api.music.tasks.import_job_run") mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
user = factories["users.User"](username="me") library = factories["music.Library"](actor__local=True)
path = os.path.join(DATA_DIR, "utf8-éà◌.ogg") path = os.path.join(DATA_DIR, "utf8-éà◌.ogg")
call_command( call_command(
"import_files", "import_files",
str(library.uuid),
path, path,
username="me", async_=False,
async=False,
in_place=True, in_place=True,
interactive=False, interactive=False,
) )
batch = user.imports.latest("id") upload = library.uploads.last()
job = batch.jobs.first() assert bool(upload.audio_file) is False
assert bool(job.audio_file) is False mocked_process.assert_called_once_with(upload_id=upload.pk)
m.assert_called_once_with(import_job_id=job.pk, use_acoustid=False)
def test_storage_rename_utf_8_files(factories): def test_storage_rename_utf_8_files(factories):

View File

@ -48,6 +48,8 @@ u.set_password("demo")
u.subsonic_api_token = "demo" u.subsonic_api_token = "demo"
u.save() u.save()
library = actor.libraries.create(name='Demo library', privacy_level='everyone')
from funkwhale_api.common import preferences from funkwhale_api.common import preferences
manager = preferences.global_preferences_registry.manager() manager = preferences.global_preferences_registry.manager()
@ -61,7 +63,7 @@ paths = [
"$music_path/**/*.flac", "$music_path/**/*.flac",
] ]
print(paths) print(paths)
call_command("import_files", *paths, username="demo", recursive=True, interactive=False) call_command("import_files", str(library.uuid), *paths, username="demo", recursive=True, interactive=False)
print('Creating some dummy data...') print('Creating some dummy data...')
@ -73,7 +75,7 @@ from funkwhale_api.favorites.factories import TrackFavorite as TrackFavoriteFact
from funkwhale_api.users.factories import UserFactory from funkwhale_api.users.factories import UserFactory
from funkwhale_api.playlists.factories import PlaylistFactory from funkwhale_api.playlists.factories import PlaylistFactory
users = UserFactory.create_batch(size=15, privacy_level="everyone") users = UserFactory.create_batch(size=15, privacy_level="everyone", with_actor=True)
available_tracks = list(Track.objects.all()) available_tracks = list(Track.objects.all())
available_albums = list(Album.objects.all()) available_albums = list(Album.objects.all())