257 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			257 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
| import pytest
 | |
| 
 | |
| from funkwhale_api.common import scripts
 | |
| from funkwhale_api.common.management.commands import script
 | |
| from funkwhale_api.federation import models as federation_models
 | |
| from funkwhale_api.music import models as music_models
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def command():
 | |
|     return script.Command()
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "script_name",
 | |
|     [
 | |
|         "django_permissions_to_user_permissions",
 | |
|         "test",
 | |
|         "delete_pre_017_federated_uploads",
 | |
|     ],
 | |
| )
 | |
| def test_script_command_list(command, script_name, mocker):
 | |
|     mocked = mocker.patch(f"funkwhale_api.common.scripts.{script_name}.main")
 | |
| 
 | |
|     command.handle(script_name=script_name, interactive=False)
 | |
| 
 | |
|     mocked.assert_called_once_with(command, script_name=script_name, interactive=False)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "open_api,expected_visibility", [(True, "everyone"), (False, "instance")]
 | |
| )
 | |
| def test_migrate_to_user_libraries_create_libraries(
 | |
|     factories, open_api, expected_visibility, stdout
 | |
| ):
 | |
|     user1 = factories["users.User"](with_actor=True)
 | |
|     user2 = factories["users.User"](with_actor=True)
 | |
| 
 | |
|     result = scripts.migrate_to_user_libraries.create_libraries(open_api, stdout)
 | |
| 
 | |
|     user1_library = user1.actor.libraries.get(
 | |
|         name="default", privacy_level=expected_visibility
 | |
|     )
 | |
|     user2_library = user2.actor.libraries.get(
 | |
|         name="default", privacy_level=expected_visibility
 | |
|     )
 | |
| 
 | |
|     assert result == {user1.pk: user1_library.pk, user2.pk: user2_library.pk}
 | |
| 
 | |
| 
 | |
| def test_migrate_to_user_libraries_update_uploads(factories, stdout):
 | |
|     user1 = factories["users.User"](with_actor=True)
 | |
|     user2 = factories["users.User"](with_actor=True)
 | |
| 
 | |
|     library1 = factories["music.Library"](actor=user1.actor)
 | |
|     library2 = factories["music.Library"](actor=user2.actor)
 | |
| 
 | |
|     upload1 = factories["music.Upload"]()
 | |
|     upload2 = factories["music.Upload"]()
 | |
| 
 | |
|     # we delete libraries
 | |
|     upload1.library = None
 | |
|     upload2.library = None
 | |
|     upload1.save()
 | |
|     upload2.save()
 | |
| 
 | |
|     factories["music.ImportJob"](batch__submitted_by=user1, upload=upload1)
 | |
|     factories["music.ImportJob"](batch__submitted_by=user2, upload=upload2)
 | |
| 
 | |
|     libraries_by_user = {user1.pk: library1.pk, user2.pk: library2.pk}
 | |
| 
 | |
|     scripts.migrate_to_user_libraries.update_uploads(libraries_by_user, stdout)
 | |
| 
 | |
|     upload1.refresh_from_db()
 | |
|     upload2.refresh_from_db()
 | |
| 
 | |
|     assert upload1.library == library1
 | |
|     assert upload1.import_status == "finished"
 | |
|     assert upload2.library == library2
 | |
|     assert upload2.import_status == "finished"
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "open_api,expected_visibility", [(True, "everyone"), (False, "instance")]
 | |
| )
 | |
| def test_migrate_to_user_libraries_without_jobs(
 | |
|     factories, open_api, expected_visibility, stdout
 | |
| ):
 | |
|     superuser = factories["users.User"](is_superuser=True, with_actor=True)
 | |
|     upload1 = factories["music.Upload"]()
 | |
|     upload2 = factories["music.Upload"]()
 | |
|     upload3 = factories["music.Upload"](audio_file=None)
 | |
| 
 | |
|     # we delete libraries
 | |
|     upload1.library = None
 | |
|     upload2.library = None
 | |
|     upload3.library = None
 | |
|     upload1.save()
 | |
|     upload2.save()
 | |
|     upload3.save()
 | |
| 
 | |
|     factories["music.ImportJob"](upload=upload2)
 | |
|     scripts.migrate_to_user_libraries.update_orphan_uploads(open_api, stdout)
 | |
| 
 | |
|     upload1.refresh_from_db()
 | |
|     upload2.refresh_from_db()
 | |
|     upload3.refresh_from_db()
 | |
| 
 | |
|     superuser_library = superuser.actor.libraries.get(
 | |
|         name="default", privacy_level=expected_visibility
 | |
|     )
 | |
|     assert upload1.library == superuser_library
 | |
|     assert upload1.import_status == "finished"
 | |
|     # left untouched because they don't match filters
 | |
|     assert upload2.library is None
 | |
|     assert upload3.library is None
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "model,args,path",
 | |
|     [
 | |
|         ("music.Upload", {"library__actor__local": True}, "/federation/music/uploads/"),
 | |
|         ("music.Artist", {}, "/federation/music/artists/"),
 | |
|         ("music.Album", {}, "/federation/music/albums/"),
 | |
|         ("music.Track", {}, "/federation/music/tracks/"),
 | |
|     ],
 | |
| )
 | |
| def test_migrate_to_user_libraries_generate_fids(
 | |
|     factories, args, model, path, settings, stdout
 | |
| ):
 | |
|     template = "{}{}{}"
 | |
| 
 | |
|     objects = factories[model].create_batch(5, fid=None, **args)
 | |
|     klass = factories[model]._meta.model
 | |
| 
 | |
|     # we leave a fid on the first one, and set the others to None
 | |
|     existing_fid = objects[0].fid
 | |
|     base_path = existing_fid.replace(str(objects[0].uuid), "")
 | |
|     klass.objects.filter(pk__in=[o.pk for o in objects[1:]]).update(fid=None)
 | |
| 
 | |
|     scripts.migrate_to_user_libraries.set_fid(klass.objects.all(), path, stdout)
 | |
| 
 | |
|     for i, o in enumerate(objects):
 | |
|         o.refresh_from_db()
 | |
|         if i == 0:
 | |
|             assert o.fid == existing_fid
 | |
|         else:
 | |
|             assert o.fid == template.format(settings.FUNKWHALE_URL, path, o.uuid)
 | |
|             # we also ensure the path we insert match the one that is generated
 | |
|             # by the app on objects creation, as a safe guard for typos
 | |
|             assert base_path == o.fid.replace(str(o.uuid), "")
 | |
| 
 | |
| 
 | |
| def test_migrate_to_user_libraries_update_actors_shared_inbox_url(factories, stdout):
 | |
|     local = factories["federation.Actor"](local=True, shared_inbox_url=None)
 | |
|     remote = factories["federation.Actor"](local=False, shared_inbox_url=None)
 | |
|     expected = federation_models.get_shared_inbox_url()
 | |
|     scripts.migrate_to_user_libraries.update_shared_inbox_url(stdout)
 | |
| 
 | |
|     local.refresh_from_db()
 | |
|     remote.refresh_from_db()
 | |
| 
 | |
|     assert local.shared_inbox_url == expected
 | |
|     assert remote.shared_inbox_url is None
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("part", ["following", "followers"])
 | |
| def test_migrate_to_user_libraries_generate_actor_urls(
 | |
|     factories, part, settings, stdout
 | |
| ):
 | |
|     field = f"{part}_url"
 | |
|     ok = factories["users.User"]().create_actor()
 | |
|     local = factories["federation.Actor"](local=True, **{field: None})
 | |
|     remote = factories["federation.Actor"](local=False, **{field: None})
 | |
| 
 | |
|     assert getattr(local, field) is None
 | |
|     expected = "{}/federation/actors/{}/{}".format(
 | |
|         settings.FUNKWHALE_URL, local.preferred_username, part
 | |
|     )
 | |
|     ok_url = getattr(ok, field)
 | |
| 
 | |
|     scripts.migrate_to_user_libraries.generate_actor_urls(part, stdout)
 | |
| 
 | |
|     ok.refresh_from_db()
 | |
|     local.refresh_from_db()
 | |
|     remote.refresh_from_db()
 | |
| 
 | |
|     # unchanged
 | |
|     assert getattr(ok, field) == ok_url
 | |
|     assert getattr(remote, field) is None
 | |
| 
 | |
|     assert getattr(local, field) == expected
 | |
|     assert expected.replace(local.preferred_username, "") == ok_url.replace(
 | |
|         ok.preferred_username, ""
 | |
|     )
 | |
| 
 | |
| 
 | |
| def test_migrate_to_users_libraries_command(
 | |
|     preferences, mocker, db, command, queryset_equal_queries
 | |
| ):
 | |
|     preferences["common__api_authentication_required"] = False
 | |
|     open_api = not preferences["common__api_authentication_required"]
 | |
|     create_libraries = mocker.patch.object(
 | |
|         scripts.migrate_to_user_libraries,
 | |
|         "create_libraries",
 | |
|         return_value={"hello": "world"},
 | |
|     )
 | |
|     update_uploads = mocker.patch.object(
 | |
|         scripts.migrate_to_user_libraries, "update_uploads"
 | |
|     )
 | |
|     update_orphan_uploads = mocker.patch.object(
 | |
|         scripts.migrate_to_user_libraries, "update_orphan_uploads"
 | |
|     )
 | |
|     set_fid = mocker.patch.object(scripts.migrate_to_user_libraries, "set_fid")
 | |
|     update_shared_inbox_url = mocker.patch.object(
 | |
|         scripts.migrate_to_user_libraries, "update_shared_inbox_url"
 | |
|     )
 | |
|     generate_actor_urls = mocker.patch.object(
 | |
|         scripts.migrate_to_user_libraries, "generate_actor_urls"
 | |
|     )
 | |
| 
 | |
|     scripts.migrate_to_user_libraries.main(command)
 | |
| 
 | |
|     create_libraries.assert_called_once_with(open_api, command.stdout)
 | |
|     update_uploads.assert_called_once_with({"hello": "world"}, command.stdout)
 | |
|     update_orphan_uploads.assert_called_once_with(open_api, command.stdout)
 | |
|     set_fid_params = [
 | |
|         (
 | |
|             music_models.Upload.objects.exclude(library__actor__user=None),
 | |
|             "/federation/music/uploads/",
 | |
|         ),
 | |
|         (music_models.Artist.objects.all(), "/federation/music/artists/"),
 | |
|         (music_models.Album.objects.all(), "/federation/music/albums/"),
 | |
|         (music_models.Track.objects.all(), "/federation/music/tracks/"),
 | |
|     ]
 | |
|     for qs, path in set_fid_params:
 | |
|         set_fid.assert_any_call(qs, path, command.stdout)
 | |
|     update_shared_inbox_url.assert_called_once_with(command.stdout)
 | |
|     # generate_actor_urls(part, stdout):
 | |
| 
 | |
|     for part in ["followers", "following"]:
 | |
|         generate_actor_urls.assert_any_call(part, command.stdout)
 | |
| 
 | |
| 
 | |
| def test_delete_pre_017_federated_uploads(factories, command):
 | |
|     to_delete = factories["music.Upload"](
 | |
|         source="https://test.com/federation/music/file/1"
 | |
|     )
 | |
|     to_keep = factories["music.Upload"](source="https://hello.world")
 | |
| 
 | |
|     scripts.delete_pre_017_federated_uploads.main(command)
 | |
| 
 | |
|     to_keep.refresh_from_db()
 | |
| 
 | |
|     with pytest.raises(to_delete.__class__.DoesNotExist):
 | |
|         to_delete.refresh_from_db()
 |