272 lines
8.1 KiB
Python
272 lines
8.1 KiB
Python
import os
|
|
|
|
import pytest
|
|
|
|
from funkwhale_api.common import utils as common_utils
|
|
from funkwhale_api.music.management.commands import (
|
|
check_inplace_files,
|
|
create_playlist_from_folder_structure,
|
|
fix_uploads,
|
|
prune_library,
|
|
prune_non_mbid_content,
|
|
)
|
|
from funkwhale_api.playlists import models as playlist_models
|
|
|
|
DATA_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
def test_fix_uploads_bitrate_length(factories, mocker):
|
|
upload1 = factories["music.Upload"](bitrate=1, duration=2)
|
|
upload2 = factories["music.Upload"](bitrate=None, duration=None)
|
|
c = fix_uploads.Command()
|
|
|
|
mocker.patch(
|
|
"funkwhale_api.music.utils.get_audio_file_data",
|
|
return_value={"bitrate": 42, "length": 43},
|
|
)
|
|
|
|
c.fix_file_data(dry_run=False, batch_size=100)
|
|
|
|
upload1.refresh_from_db()
|
|
upload2.refresh_from_db()
|
|
|
|
# not updated
|
|
assert upload1.bitrate == 1
|
|
assert upload1.duration == 2
|
|
|
|
# updated
|
|
assert upload2.bitrate == 42
|
|
assert upload2.duration == 43
|
|
|
|
|
|
def test_fix_uploads_size(factories, mocker):
|
|
upload1 = factories["music.Upload"]()
|
|
upload2 = factories["music.Upload"]()
|
|
upload1.__class__.objects.filter(pk=upload1.pk).update(size=1)
|
|
upload2.__class__.objects.filter(pk=upload2.pk).update(size=None)
|
|
c = fix_uploads.Command()
|
|
|
|
mocker.patch("funkwhale_api.music.models.Upload.get_file_size", return_value=2)
|
|
|
|
c.fix_file_size(dry_run=False, batch_size=100)
|
|
|
|
upload1.refresh_from_db()
|
|
upload2.refresh_from_db()
|
|
|
|
# not updated
|
|
assert upload1.size == 1
|
|
|
|
# updated
|
|
assert upload2.size == 2
|
|
|
|
|
|
def test_fix_uploads_mimetype(factories, mocker):
|
|
mp3_path = os.path.join(DATA_DIR, "test.mp3")
|
|
ogg_path = os.path.join(DATA_DIR, "test.ogg")
|
|
upload1 = factories["music.Upload"](
|
|
audio_file__from_path=mp3_path,
|
|
source=f"file://{mp3_path}",
|
|
mimetype="application/x-empty",
|
|
)
|
|
|
|
# this one already has a mimetype set, to it should not be updated
|
|
upload2 = factories["music.Upload"](
|
|
audio_file__from_path=ogg_path,
|
|
source=f"file://{ogg_path}",
|
|
mimetype="audio/something",
|
|
)
|
|
c = fix_uploads.Command()
|
|
c.fix_mimetypes(dry_run=False, batch_size=100)
|
|
|
|
upload1.refresh_from_db()
|
|
upload2.refresh_from_db()
|
|
|
|
assert upload1.mimetype == "audio/mpeg"
|
|
assert upload2.mimetype == "audio/something"
|
|
|
|
|
|
def test_fix_uploads_checksum(factories, mocker):
|
|
upload1 = factories["music.Upload"]()
|
|
upload2 = factories["music.Upload"]()
|
|
upload1.__class__.objects.filter(pk=upload1.pk).update(checksum="test")
|
|
upload2.__class__.objects.filter(pk=upload2.pk).update(checksum=None)
|
|
c = fix_uploads.Command()
|
|
|
|
c.fix_file_checksum(dry_run=False, batch_size=100)
|
|
|
|
upload1.refresh_from_db()
|
|
upload2.refresh_from_db()
|
|
|
|
# not updated
|
|
assert upload1.checksum == "test"
|
|
|
|
# updated
|
|
assert upload2.checksum == common_utils.get_file_hash(upload2.audio_file)
|
|
|
|
|
|
def test_prune_library_dry_run(factories):
|
|
prunable = factories["music.Track"]()
|
|
not_prunable = factories["music.Track"]()
|
|
c = prune_library.Command()
|
|
options = {
|
|
"prune_artists": True,
|
|
"prune_albums": True,
|
|
"prune_tracks": True,
|
|
"exclude_favorites": False,
|
|
"exclude_listenings": False,
|
|
"exclude_playlists": False,
|
|
"dry_run": True,
|
|
}
|
|
c.handle(**options)
|
|
|
|
for t in [prunable, not_prunable]:
|
|
# nothing pruned, because dry run
|
|
t.refresh_from_db()
|
|
|
|
|
|
def test_prune_library(factories, mocker):
|
|
prunable_track = factories["music.Track"]()
|
|
not_prunable_track = factories["music.Track"]()
|
|
prunable_tracks = prunable_track.__class__.objects.filter(pk=prunable_track.pk)
|
|
get_prunable_tracks = mocker.patch(
|
|
"funkwhale_api.music.tasks.get_prunable_tracks", return_value=prunable_tracks
|
|
)
|
|
|
|
prunable_album = factories["music.Album"]()
|
|
not_prunable_album = factories["music.Album"]()
|
|
prunable_albums = prunable_album.__class__.objects.filter(pk=prunable_album.pk)
|
|
get_prunable_albums = mocker.patch(
|
|
"funkwhale_api.music.tasks.get_prunable_albums", return_value=prunable_albums
|
|
)
|
|
|
|
prunable_artist = factories["music.Artist"]()
|
|
not_prunable_artist = factories["music.Artist"]()
|
|
prunable_artists = prunable_artist.__class__.objects.filter(pk=prunable_artist.pk)
|
|
get_prunable_artists = mocker.patch(
|
|
"funkwhale_api.music.tasks.get_prunable_artists", return_value=prunable_artists
|
|
)
|
|
|
|
c = prune_library.Command()
|
|
options = {
|
|
"exclude_favorites": mocker.Mock(),
|
|
"exclude_listenings": mocker.Mock(),
|
|
"exclude_playlists": mocker.Mock(),
|
|
"prune_artists": True,
|
|
"prune_albums": True,
|
|
"prune_tracks": True,
|
|
"dry_run": False,
|
|
}
|
|
c.handle(**options)
|
|
|
|
get_prunable_tracks.assert_called_once_with(
|
|
exclude_favorites=options["exclude_favorites"],
|
|
exclude_listenings=options["exclude_listenings"],
|
|
exclude_playlists=options["exclude_playlists"],
|
|
)
|
|
get_prunable_albums.assert_called_once()
|
|
get_prunable_artists.assert_called_once()
|
|
|
|
with pytest.raises(prunable_track.DoesNotExist):
|
|
prunable_track.refresh_from_db()
|
|
|
|
with pytest.raises(prunable_album.DoesNotExist):
|
|
prunable_album.refresh_from_db()
|
|
|
|
with pytest.raises(prunable_artist.DoesNotExist):
|
|
prunable_artist.refresh_from_db()
|
|
|
|
for o in [not_prunable_track, not_prunable_album, not_prunable_artist]:
|
|
o.refresh_from_db()
|
|
|
|
|
|
def test_check_inplace_files_dry_run(factories, tmpfile):
|
|
prunable = factories["music.Upload"](source="file:///notfound", audio_file=None)
|
|
not_prunable = factories["music.Upload"](
|
|
source=f"file://{tmpfile.name}", audio_file=None
|
|
)
|
|
c = check_inplace_files.Command()
|
|
c.handle(dry_run=True)
|
|
|
|
for u in [prunable, not_prunable]:
|
|
# nothing pruned, because dry run
|
|
u.refresh_from_db()
|
|
|
|
|
|
def test_check_inplace_files_no_dry_run(factories, tmpfile):
|
|
prunable = factories["music.Upload"](source="file:///notfound", audio_file=None)
|
|
not_prunable = [
|
|
factories["music.Upload"](source=f"file://{tmpfile.name}", audio_file=None),
|
|
factories["music.Upload"](source="upload://"),
|
|
factories["music.Upload"](source="https://"),
|
|
]
|
|
c = check_inplace_files.Command()
|
|
c.handle(dry_run=False)
|
|
|
|
with pytest.raises(prunable.DoesNotExist):
|
|
prunable.refresh_from_db()
|
|
|
|
for u in not_prunable:
|
|
u.refresh_from_db()
|
|
|
|
|
|
def test_prune_non_mbid_content(factories):
|
|
prunable = factories["music.Track"](mbid=None)
|
|
|
|
track = factories["music.Track"](mbid=None)
|
|
factories["playlists.PlaylistTrack"](track=track)
|
|
not_prunable = [factories["music.Track"](), track]
|
|
c = prune_non_mbid_content.Command()
|
|
options = {
|
|
"include_playlist_content": False,
|
|
"include_listened_content": False,
|
|
"include_favorited_content": True,
|
|
"no_dry_run": True,
|
|
}
|
|
c.handle(**options)
|
|
|
|
with pytest.raises(prunable.DoesNotExist):
|
|
prunable.refresh_from_db()
|
|
|
|
for t in not_prunable:
|
|
t.refresh_from_db()
|
|
|
|
track = factories["music.Track"](mbid=None)
|
|
factories["playlists.PlaylistTrack"](track=track)
|
|
prunable = [factories["music.Track"](mbid=None), track]
|
|
|
|
not_prunable = [factories["music.Track"]()]
|
|
options = {
|
|
"include_playlist_content": True,
|
|
"include_listened_content": False,
|
|
"include_favorited_content": False,
|
|
"no_dry_run": True,
|
|
}
|
|
c.handle(**options)
|
|
|
|
for t in prunable:
|
|
with pytest.raises(t.DoesNotExist):
|
|
t.refresh_from_db()
|
|
|
|
for t in not_prunable:
|
|
t.refresh_from_db()
|
|
|
|
|
|
def test_create_playlist_from_folder_structure(factories, tmp_path):
|
|
user = factories["users.User"](with_actor=True)
|
|
c = create_playlist_from_folder_structure.Command()
|
|
options = {
|
|
"dir_name": DATA_DIR,
|
|
"user_name": user.username,
|
|
"privacy_level": "me",
|
|
"yes": True,
|
|
"no_dry_run": True,
|
|
"only_mbid": False,
|
|
}
|
|
c.handle(**options)
|
|
|
|
assert (
|
|
playlist_models.Playlist.objects.all()
|
|
.filter(name="test_directory_playlist")
|
|
.exists()
|
|
)
|