272 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			272 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
import os
 | 
						|
 | 
						|
import pytest
 | 
						|
 | 
						|
from funkwhale_api.common import utils as common_utils
 | 
						|
from funkwhale_api.music.management.commands import (
 | 
						|
    check_inplace_files,
 | 
						|
    create_playlist_from_folder_structure,
 | 
						|
    fix_uploads,
 | 
						|
    prune_library,
 | 
						|
    prune_non_mbid_content,
 | 
						|
)
 | 
						|
from funkwhale_api.playlists import models as playlist_models
 | 
						|
 | 
						|
DATA_DIR = os.path.dirname(os.path.abspath(__file__))
 | 
						|
 | 
						|
 | 
						|
def test_fix_uploads_bitrate_length(factories, mocker):
 | 
						|
    upload1 = factories["music.Upload"](bitrate=1, duration=2)
 | 
						|
    upload2 = factories["music.Upload"](bitrate=None, duration=None)
 | 
						|
    c = fix_uploads.Command()
 | 
						|
 | 
						|
    mocker.patch(
 | 
						|
        "funkwhale_api.music.utils.get_audio_file_data",
 | 
						|
        return_value={"bitrate": 42, "length": 43},
 | 
						|
    )
 | 
						|
 | 
						|
    c.fix_file_data(dry_run=False, batch_size=100)
 | 
						|
 | 
						|
    upload1.refresh_from_db()
 | 
						|
    upload2.refresh_from_db()
 | 
						|
 | 
						|
    # not updated
 | 
						|
    assert upload1.bitrate == 1
 | 
						|
    assert upload1.duration == 2
 | 
						|
 | 
						|
    # updated
 | 
						|
    assert upload2.bitrate == 42
 | 
						|
    assert upload2.duration == 43
 | 
						|
 | 
						|
 | 
						|
def test_fix_uploads_size(factories, mocker):
 | 
						|
    upload1 = factories["music.Upload"]()
 | 
						|
    upload2 = factories["music.Upload"]()
 | 
						|
    upload1.__class__.objects.filter(pk=upload1.pk).update(size=1)
 | 
						|
    upload2.__class__.objects.filter(pk=upload2.pk).update(size=None)
 | 
						|
    c = fix_uploads.Command()
 | 
						|
 | 
						|
    mocker.patch("funkwhale_api.music.models.Upload.get_file_size", return_value=2)
 | 
						|
 | 
						|
    c.fix_file_size(dry_run=False, batch_size=100)
 | 
						|
 | 
						|
    upload1.refresh_from_db()
 | 
						|
    upload2.refresh_from_db()
 | 
						|
 | 
						|
    # not updated
 | 
						|
    assert upload1.size == 1
 | 
						|
 | 
						|
    # updated
 | 
						|
    assert upload2.size == 2
 | 
						|
 | 
						|
 | 
						|
def test_fix_uploads_mimetype(factories, mocker):
 | 
						|
    mp3_path = os.path.join(DATA_DIR, "test.mp3")
 | 
						|
    ogg_path = os.path.join(DATA_DIR, "test.ogg")
 | 
						|
    upload1 = factories["music.Upload"](
 | 
						|
        audio_file__from_path=mp3_path,
 | 
						|
        source=f"file://{mp3_path}",
 | 
						|
        mimetype="application/x-empty",
 | 
						|
    )
 | 
						|
 | 
						|
    # this one already has a mimetype set, to it should not be updated
 | 
						|
    upload2 = factories["music.Upload"](
 | 
						|
        audio_file__from_path=ogg_path,
 | 
						|
        source=f"file://{ogg_path}",
 | 
						|
        mimetype="audio/something",
 | 
						|
    )
 | 
						|
    c = fix_uploads.Command()
 | 
						|
    c.fix_mimetypes(dry_run=False, batch_size=100)
 | 
						|
 | 
						|
    upload1.refresh_from_db()
 | 
						|
    upload2.refresh_from_db()
 | 
						|
 | 
						|
    assert upload1.mimetype == "audio/mpeg"
 | 
						|
    assert upload2.mimetype == "audio/something"
 | 
						|
 | 
						|
 | 
						|
def test_fix_uploads_checksum(factories, mocker):
 | 
						|
    upload1 = factories["music.Upload"]()
 | 
						|
    upload2 = factories["music.Upload"]()
 | 
						|
    upload1.__class__.objects.filter(pk=upload1.pk).update(checksum="test")
 | 
						|
    upload2.__class__.objects.filter(pk=upload2.pk).update(checksum=None)
 | 
						|
    c = fix_uploads.Command()
 | 
						|
 | 
						|
    c.fix_file_checksum(dry_run=False, batch_size=100)
 | 
						|
 | 
						|
    upload1.refresh_from_db()
 | 
						|
    upload2.refresh_from_db()
 | 
						|
 | 
						|
    # not updated
 | 
						|
    assert upload1.checksum == "test"
 | 
						|
 | 
						|
    # updated
 | 
						|
    assert upload2.checksum == common_utils.get_file_hash(upload2.audio_file)
 | 
						|
 | 
						|
 | 
						|
def test_prune_library_dry_run(factories):
 | 
						|
    prunable = factories["music.Track"]()
 | 
						|
    not_prunable = factories["music.Track"]()
 | 
						|
    c = prune_library.Command()
 | 
						|
    options = {
 | 
						|
        "prune_artists": True,
 | 
						|
        "prune_albums": True,
 | 
						|
        "prune_tracks": True,
 | 
						|
        "exclude_favorites": False,
 | 
						|
        "exclude_listenings": False,
 | 
						|
        "exclude_playlists": False,
 | 
						|
        "dry_run": True,
 | 
						|
    }
 | 
						|
    c.handle(**options)
 | 
						|
 | 
						|
    for t in [prunable, not_prunable]:
 | 
						|
        # nothing pruned, because dry run
 | 
						|
        t.refresh_from_db()
 | 
						|
 | 
						|
 | 
						|
def test_prune_library(factories, mocker):
 | 
						|
    prunable_track = factories["music.Track"]()
 | 
						|
    not_prunable_track = factories["music.Track"]()
 | 
						|
    prunable_tracks = prunable_track.__class__.objects.filter(pk=prunable_track.pk)
 | 
						|
    get_prunable_tracks = mocker.patch(
 | 
						|
        "funkwhale_api.music.tasks.get_prunable_tracks", return_value=prunable_tracks
 | 
						|
    )
 | 
						|
 | 
						|
    prunable_album = factories["music.Album"]()
 | 
						|
    not_prunable_album = factories["music.Album"]()
 | 
						|
    prunable_albums = prunable_album.__class__.objects.filter(pk=prunable_album.pk)
 | 
						|
    get_prunable_albums = mocker.patch(
 | 
						|
        "funkwhale_api.music.tasks.get_prunable_albums", return_value=prunable_albums
 | 
						|
    )
 | 
						|
 | 
						|
    prunable_artist = factories["music.Artist"]()
 | 
						|
    not_prunable_artist = factories["music.Artist"]()
 | 
						|
    prunable_artists = prunable_artist.__class__.objects.filter(pk=prunable_artist.pk)
 | 
						|
    get_prunable_artists = mocker.patch(
 | 
						|
        "funkwhale_api.music.tasks.get_prunable_artists", return_value=prunable_artists
 | 
						|
    )
 | 
						|
 | 
						|
    c = prune_library.Command()
 | 
						|
    options = {
 | 
						|
        "exclude_favorites": mocker.Mock(),
 | 
						|
        "exclude_listenings": mocker.Mock(),
 | 
						|
        "exclude_playlists": mocker.Mock(),
 | 
						|
        "prune_artists": True,
 | 
						|
        "prune_albums": True,
 | 
						|
        "prune_tracks": True,
 | 
						|
        "dry_run": False,
 | 
						|
    }
 | 
						|
    c.handle(**options)
 | 
						|
 | 
						|
    get_prunable_tracks.assert_called_once_with(
 | 
						|
        exclude_favorites=options["exclude_favorites"],
 | 
						|
        exclude_listenings=options["exclude_listenings"],
 | 
						|
        exclude_playlists=options["exclude_playlists"],
 | 
						|
    )
 | 
						|
    get_prunable_albums.assert_called_once()
 | 
						|
    get_prunable_artists.assert_called_once()
 | 
						|
 | 
						|
    with pytest.raises(prunable_track.DoesNotExist):
 | 
						|
        prunable_track.refresh_from_db()
 | 
						|
 | 
						|
    with pytest.raises(prunable_album.DoesNotExist):
 | 
						|
        prunable_album.refresh_from_db()
 | 
						|
 | 
						|
    with pytest.raises(prunable_artist.DoesNotExist):
 | 
						|
        prunable_artist.refresh_from_db()
 | 
						|
 | 
						|
    for o in [not_prunable_track, not_prunable_album, not_prunable_artist]:
 | 
						|
        o.refresh_from_db()
 | 
						|
 | 
						|
 | 
						|
def test_check_inplace_files_dry_run(factories, tmpfile):
 | 
						|
    prunable = factories["music.Upload"](source="file:///notfound", audio_file=None)
 | 
						|
    not_prunable = factories["music.Upload"](
 | 
						|
        source=f"file://{tmpfile.name}", audio_file=None
 | 
						|
    )
 | 
						|
    c = check_inplace_files.Command()
 | 
						|
    c.handle(dry_run=True)
 | 
						|
 | 
						|
    for u in [prunable, not_prunable]:
 | 
						|
        # nothing pruned, because dry run
 | 
						|
        u.refresh_from_db()
 | 
						|
 | 
						|
 | 
						|
def test_check_inplace_files_no_dry_run(factories, tmpfile):
 | 
						|
    prunable = factories["music.Upload"](source="file:///notfound", audio_file=None)
 | 
						|
    not_prunable = [
 | 
						|
        factories["music.Upload"](source=f"file://{tmpfile.name}", audio_file=None),
 | 
						|
        factories["music.Upload"](source="upload://"),
 | 
						|
        factories["music.Upload"](source="https://"),
 | 
						|
    ]
 | 
						|
    c = check_inplace_files.Command()
 | 
						|
    c.handle(dry_run=False)
 | 
						|
 | 
						|
    with pytest.raises(prunable.DoesNotExist):
 | 
						|
        prunable.refresh_from_db()
 | 
						|
 | 
						|
    for u in not_prunable:
 | 
						|
        u.refresh_from_db()
 | 
						|
 | 
						|
 | 
						|
def test_prune_non_mbid_content(factories):
 | 
						|
    prunable = factories["music.Track"](mbid=None)
 | 
						|
 | 
						|
    track = factories["music.Track"](mbid=None)
 | 
						|
    factories["playlists.PlaylistTrack"](track=track)
 | 
						|
    not_prunable = [factories["music.Track"](), track]
 | 
						|
    c = prune_non_mbid_content.Command()
 | 
						|
    options = {
 | 
						|
        "include_playlist_content": False,
 | 
						|
        "include_listened_content": False,
 | 
						|
        "include_favorited_content": True,
 | 
						|
        "no_dry_run": True,
 | 
						|
    }
 | 
						|
    c.handle(**options)
 | 
						|
 | 
						|
    with pytest.raises(prunable.DoesNotExist):
 | 
						|
        prunable.refresh_from_db()
 | 
						|
 | 
						|
    for t in not_prunable:
 | 
						|
        t.refresh_from_db()
 | 
						|
 | 
						|
    track = factories["music.Track"](mbid=None)
 | 
						|
    factories["playlists.PlaylistTrack"](track=track)
 | 
						|
    prunable = [factories["music.Track"](mbid=None), track]
 | 
						|
 | 
						|
    not_prunable = [factories["music.Track"]()]
 | 
						|
    options = {
 | 
						|
        "include_playlist_content": True,
 | 
						|
        "include_listened_content": False,
 | 
						|
        "include_favorited_content": False,
 | 
						|
        "no_dry_run": True,
 | 
						|
    }
 | 
						|
    c.handle(**options)
 | 
						|
 | 
						|
    for t in prunable:
 | 
						|
        with pytest.raises(t.DoesNotExist):
 | 
						|
            t.refresh_from_db()
 | 
						|
 | 
						|
    for t in not_prunable:
 | 
						|
        t.refresh_from_db()
 | 
						|
 | 
						|
 | 
						|
def test_create_playlist_from_folder_structure(factories, tmp_path):
 | 
						|
    user = factories["users.User"]()
 | 
						|
    c = create_playlist_from_folder_structure.Command()
 | 
						|
    options = {
 | 
						|
        "dir_name": DATA_DIR,
 | 
						|
        "user_name": user.username,
 | 
						|
        "privacy_level": "me",
 | 
						|
        "yes": True,
 | 
						|
        "no_dry_run": True,
 | 
						|
        "only_mbid": False,
 | 
						|
    }
 | 
						|
    c.handle(**options)
 | 
						|
 | 
						|
    assert (
 | 
						|
        playlist_models.Playlist.objects.all()
 | 
						|
        .filter(name="test_directory_playlist")
 | 
						|
        .exists()
 | 
						|
    )
 |