272 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			272 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
| import os
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from funkwhale_api.common import utils as common_utils
 | |
| from funkwhale_api.music.management.commands import (
 | |
|     check_inplace_files,
 | |
|     create_playlist_from_folder_structure,
 | |
|     fix_uploads,
 | |
|     prune_library,
 | |
|     prune_non_mbid_content,
 | |
| )
 | |
| from funkwhale_api.playlists import models as playlist_models
 | |
| 
 | |
| DATA_DIR = os.path.dirname(os.path.abspath(__file__))
 | |
| 
 | |
| 
 | |
| def test_fix_uploads_bitrate_length(factories, mocker):
 | |
|     upload1 = factories["music.Upload"](bitrate=1, duration=2)
 | |
|     upload2 = factories["music.Upload"](bitrate=None, duration=None)
 | |
|     c = fix_uploads.Command()
 | |
| 
 | |
|     mocker.patch(
 | |
|         "funkwhale_api.music.utils.get_audio_file_data",
 | |
|         return_value={"bitrate": 42, "length": 43},
 | |
|     )
 | |
| 
 | |
|     c.fix_file_data(dry_run=False, batch_size=100)
 | |
| 
 | |
|     upload1.refresh_from_db()
 | |
|     upload2.refresh_from_db()
 | |
| 
 | |
|     # not updated
 | |
|     assert upload1.bitrate == 1
 | |
|     assert upload1.duration == 2
 | |
| 
 | |
|     # updated
 | |
|     assert upload2.bitrate == 42
 | |
|     assert upload2.duration == 43
 | |
| 
 | |
| 
 | |
| def test_fix_uploads_size(factories, mocker):
 | |
|     upload1 = factories["music.Upload"]()
 | |
|     upload2 = factories["music.Upload"]()
 | |
|     upload1.__class__.objects.filter(pk=upload1.pk).update(size=1)
 | |
|     upload2.__class__.objects.filter(pk=upload2.pk).update(size=None)
 | |
|     c = fix_uploads.Command()
 | |
| 
 | |
|     mocker.patch("funkwhale_api.music.models.Upload.get_file_size", return_value=2)
 | |
| 
 | |
|     c.fix_file_size(dry_run=False, batch_size=100)
 | |
| 
 | |
|     upload1.refresh_from_db()
 | |
|     upload2.refresh_from_db()
 | |
| 
 | |
|     # not updated
 | |
|     assert upload1.size == 1
 | |
| 
 | |
|     # updated
 | |
|     assert upload2.size == 2
 | |
| 
 | |
| 
 | |
| def test_fix_uploads_mimetype(factories, mocker):
 | |
|     mp3_path = os.path.join(DATA_DIR, "test.mp3")
 | |
|     ogg_path = os.path.join(DATA_DIR, "test.ogg")
 | |
|     upload1 = factories["music.Upload"](
 | |
|         audio_file__from_path=mp3_path,
 | |
|         source=f"file://{mp3_path}",
 | |
|         mimetype="application/x-empty",
 | |
|     )
 | |
| 
 | |
|     # this one already has a mimetype set, to it should not be updated
 | |
|     upload2 = factories["music.Upload"](
 | |
|         audio_file__from_path=ogg_path,
 | |
|         source=f"file://{ogg_path}",
 | |
|         mimetype="audio/something",
 | |
|     )
 | |
|     c = fix_uploads.Command()
 | |
|     c.fix_mimetypes(dry_run=False, batch_size=100)
 | |
| 
 | |
|     upload1.refresh_from_db()
 | |
|     upload2.refresh_from_db()
 | |
| 
 | |
|     assert upload1.mimetype == "audio/mpeg"
 | |
|     assert upload2.mimetype == "audio/something"
 | |
| 
 | |
| 
 | |
| def test_fix_uploads_checksum(factories, mocker):
 | |
|     upload1 = factories["music.Upload"]()
 | |
|     upload2 = factories["music.Upload"]()
 | |
|     upload1.__class__.objects.filter(pk=upload1.pk).update(checksum="test")
 | |
|     upload2.__class__.objects.filter(pk=upload2.pk).update(checksum=None)
 | |
|     c = fix_uploads.Command()
 | |
| 
 | |
|     c.fix_file_checksum(dry_run=False, batch_size=100)
 | |
| 
 | |
|     upload1.refresh_from_db()
 | |
|     upload2.refresh_from_db()
 | |
| 
 | |
|     # not updated
 | |
|     assert upload1.checksum == "test"
 | |
| 
 | |
|     # updated
 | |
|     assert upload2.checksum == common_utils.get_file_hash(upload2.audio_file)
 | |
| 
 | |
| 
 | |
| def test_prune_library_dry_run(factories):
 | |
|     prunable = factories["music.Track"]()
 | |
|     not_prunable = factories["music.Track"]()
 | |
|     c = prune_library.Command()
 | |
|     options = {
 | |
|         "prune_artists": True,
 | |
|         "prune_albums": True,
 | |
|         "prune_tracks": True,
 | |
|         "exclude_favorites": False,
 | |
|         "exclude_listenings": False,
 | |
|         "exclude_playlists": False,
 | |
|         "dry_run": True,
 | |
|     }
 | |
|     c.handle(**options)
 | |
| 
 | |
|     for t in [prunable, not_prunable]:
 | |
|         # nothing pruned, because dry run
 | |
|         t.refresh_from_db()
 | |
| 
 | |
| 
 | |
| def test_prune_library(factories, mocker):
 | |
|     prunable_track = factories["music.Track"]()
 | |
|     not_prunable_track = factories["music.Track"]()
 | |
|     prunable_tracks = prunable_track.__class__.objects.filter(pk=prunable_track.pk)
 | |
|     get_prunable_tracks = mocker.patch(
 | |
|         "funkwhale_api.music.tasks.get_prunable_tracks", return_value=prunable_tracks
 | |
|     )
 | |
| 
 | |
|     prunable_album = factories["music.Album"]()
 | |
|     not_prunable_album = factories["music.Album"]()
 | |
|     prunable_albums = prunable_album.__class__.objects.filter(pk=prunable_album.pk)
 | |
|     get_prunable_albums = mocker.patch(
 | |
|         "funkwhale_api.music.tasks.get_prunable_albums", return_value=prunable_albums
 | |
|     )
 | |
| 
 | |
|     prunable_artist = factories["music.Artist"]()
 | |
|     not_prunable_artist = factories["music.Artist"]()
 | |
|     prunable_artists = prunable_artist.__class__.objects.filter(pk=prunable_artist.pk)
 | |
|     get_prunable_artists = mocker.patch(
 | |
|         "funkwhale_api.music.tasks.get_prunable_artists", return_value=prunable_artists
 | |
|     )
 | |
| 
 | |
|     c = prune_library.Command()
 | |
|     options = {
 | |
|         "exclude_favorites": mocker.Mock(),
 | |
|         "exclude_listenings": mocker.Mock(),
 | |
|         "exclude_playlists": mocker.Mock(),
 | |
|         "prune_artists": True,
 | |
|         "prune_albums": True,
 | |
|         "prune_tracks": True,
 | |
|         "dry_run": False,
 | |
|     }
 | |
|     c.handle(**options)
 | |
| 
 | |
|     get_prunable_tracks.assert_called_once_with(
 | |
|         exclude_favorites=options["exclude_favorites"],
 | |
|         exclude_listenings=options["exclude_listenings"],
 | |
|         exclude_playlists=options["exclude_playlists"],
 | |
|     )
 | |
|     get_prunable_albums.assert_called_once()
 | |
|     get_prunable_artists.assert_called_once()
 | |
| 
 | |
|     with pytest.raises(prunable_track.DoesNotExist):
 | |
|         prunable_track.refresh_from_db()
 | |
| 
 | |
|     with pytest.raises(prunable_album.DoesNotExist):
 | |
|         prunable_album.refresh_from_db()
 | |
| 
 | |
|     with pytest.raises(prunable_artist.DoesNotExist):
 | |
|         prunable_artist.refresh_from_db()
 | |
| 
 | |
|     for o in [not_prunable_track, not_prunable_album, not_prunable_artist]:
 | |
|         o.refresh_from_db()
 | |
| 
 | |
| 
 | |
| def test_check_inplace_files_dry_run(factories, tmpfile):
 | |
|     prunable = factories["music.Upload"](source="file:///notfound", audio_file=None)
 | |
|     not_prunable = factories["music.Upload"](
 | |
|         source=f"file://{tmpfile.name}", audio_file=None
 | |
|     )
 | |
|     c = check_inplace_files.Command()
 | |
|     c.handle(dry_run=True)
 | |
| 
 | |
|     for u in [prunable, not_prunable]:
 | |
|         # nothing pruned, because dry run
 | |
|         u.refresh_from_db()
 | |
| 
 | |
| 
 | |
| def test_check_inplace_files_no_dry_run(factories, tmpfile):
 | |
|     prunable = factories["music.Upload"](source="file:///notfound", audio_file=None)
 | |
|     not_prunable = [
 | |
|         factories["music.Upload"](source=f"file://{tmpfile.name}", audio_file=None),
 | |
|         factories["music.Upload"](source="upload://"),
 | |
|         factories["music.Upload"](source="https://"),
 | |
|     ]
 | |
|     c = check_inplace_files.Command()
 | |
|     c.handle(dry_run=False)
 | |
| 
 | |
|     with pytest.raises(prunable.DoesNotExist):
 | |
|         prunable.refresh_from_db()
 | |
| 
 | |
|     for u in not_prunable:
 | |
|         u.refresh_from_db()
 | |
| 
 | |
| 
 | |
| def test_prune_non_mbid_content(factories):
 | |
|     prunable = factories["music.Track"](mbid=None)
 | |
| 
 | |
|     track = factories["music.Track"](mbid=None)
 | |
|     factories["playlists.PlaylistTrack"](track=track)
 | |
|     not_prunable = [factories["music.Track"](), track]
 | |
|     c = prune_non_mbid_content.Command()
 | |
|     options = {
 | |
|         "include_playlist_content": False,
 | |
|         "include_listened_content": False,
 | |
|         "include_favorited_content": True,
 | |
|         "no_dry_run": True,
 | |
|     }
 | |
|     c.handle(**options)
 | |
| 
 | |
|     with pytest.raises(prunable.DoesNotExist):
 | |
|         prunable.refresh_from_db()
 | |
| 
 | |
|     for t in not_prunable:
 | |
|         t.refresh_from_db()
 | |
| 
 | |
|     track = factories["music.Track"](mbid=None)
 | |
|     factories["playlists.PlaylistTrack"](track=track)
 | |
|     prunable = [factories["music.Track"](mbid=None), track]
 | |
| 
 | |
|     not_prunable = [factories["music.Track"]()]
 | |
|     options = {
 | |
|         "include_playlist_content": True,
 | |
|         "include_listened_content": False,
 | |
|         "include_favorited_content": False,
 | |
|         "no_dry_run": True,
 | |
|     }
 | |
|     c.handle(**options)
 | |
| 
 | |
|     for t in prunable:
 | |
|         with pytest.raises(t.DoesNotExist):
 | |
|             t.refresh_from_db()
 | |
| 
 | |
|     for t in not_prunable:
 | |
|         t.refresh_from_db()
 | |
| 
 | |
| 
 | |
| def test_create_playlist_from_folder_structure(factories, tmp_path):
 | |
|     user = factories["users.User"](with_actor=True)
 | |
|     c = create_playlist_from_folder_structure.Command()
 | |
|     options = {
 | |
|         "dir_name": DATA_DIR,
 | |
|         "user_name": user.username,
 | |
|         "privacy_level": "me",
 | |
|         "yes": True,
 | |
|         "no_dry_run": True,
 | |
|         "only_mbid": False,
 | |
|     }
 | |
|     c.handle(**options)
 | |
| 
 | |
|     assert (
 | |
|         playlist_models.Playlist.objects.all()
 | |
|         .filter(name="test_directory_playlist")
 | |
|         .exists()
 | |
|     )
 |