442 lines
14 KiB
Python
442 lines
14 KiB
Python
import json
|
|
import pytest
|
|
|
|
from django.urls import reverse
|
|
|
|
from funkwhale_api.users import models
|
|
from funkwhale_api.users.oauth import serializers
|
|
|
|
|
|
def test_apps_post(api_client, db):
|
|
url = reverse("api:v1:oauth:apps-list")
|
|
data = {
|
|
"name": "Test app",
|
|
"redirect_uris": "http://test.app",
|
|
"scopes": "read write:profile",
|
|
}
|
|
response = api_client.post(url, data)
|
|
|
|
assert response.status_code == 201
|
|
|
|
app = models.Application.objects.get(name=data["name"])
|
|
|
|
assert app.client_type == models.Application.CLIENT_CONFIDENTIAL
|
|
assert app.authorization_grant_type == models.Application.GRANT_AUTHORIZATION_CODE
|
|
assert app.redirect_uris == data["redirect_uris"]
|
|
assert response.data == serializers.CreateApplicationSerializer(app).data
|
|
assert app.scope == "read write:profile"
|
|
assert app.user is None
|
|
|
|
|
|
def test_apps_post_logged_in_user(logged_in_api_client, db):
|
|
url = reverse("api:v1:oauth:apps-list")
|
|
data = {
|
|
"name": "Test app",
|
|
"redirect_uris": "http://test.app",
|
|
"scopes": "read write:profile",
|
|
}
|
|
response = logged_in_api_client.post(url, data)
|
|
|
|
assert response.status_code == 201
|
|
|
|
app = models.Application.objects.get(name=data["name"])
|
|
|
|
assert app.client_type == models.Application.CLIENT_CONFIDENTIAL
|
|
assert app.authorization_grant_type == models.Application.GRANT_AUTHORIZATION_CODE
|
|
assert app.redirect_uris == data["redirect_uris"]
|
|
assert response.data == serializers.CreateApplicationSerializer(app).data
|
|
assert app.scope == "read write:profile"
|
|
assert app.user == logged_in_api_client.user
|
|
assert app.token is not None
|
|
assert response.data["token"] == app.token
|
|
|
|
|
|
def test_apps_list_anonymous(api_client, db):
|
|
url = reverse("api:v1:oauth:apps-list")
|
|
response = api_client.get(url)
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_apps_list_logged_in(factories, logged_in_api_client, db):
|
|
app = factories["users.Application"](user=logged_in_api_client.user)
|
|
factories["users.Application"]()
|
|
url = reverse("api:v1:oauth:apps-list")
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data["results"] == [serializers.ApplicationSerializer(app).data]
|
|
|
|
|
|
def test_apps_delete_not_owner(factories, logged_in_api_client, db):
|
|
app = factories["users.Application"]()
|
|
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
|
|
response = logged_in_api_client.delete(url)
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_apps_delete_owner(factories, logged_in_api_client, db):
|
|
app = factories["users.Application"](user=logged_in_api_client.user)
|
|
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
|
|
response = logged_in_api_client.delete(url)
|
|
|
|
assert response.status_code == 204
|
|
|
|
with pytest.raises(app.DoesNotExist):
|
|
app.refresh_from_db()
|
|
|
|
|
|
def test_apps_update_not_owner(factories, logged_in_api_client, db):
|
|
app = factories["users.Application"]()
|
|
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
|
|
response = logged_in_api_client.patch(url, {"name": "Hello"})
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_apps_update_owner(factories, logged_in_api_client, db):
|
|
app = factories["users.Application"](user=logged_in_api_client.user)
|
|
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
|
|
response = logged_in_api_client.patch(url, {"name": "Hello"})
|
|
|
|
assert response.status_code == 200
|
|
app.refresh_from_db()
|
|
|
|
assert app.name == "Hello"
|
|
|
|
|
|
def test_apps_get(preferences, logged_in_api_client, factories):
|
|
app = factories["users.Application"]()
|
|
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data == serializers.ApplicationSerializer(app).data
|
|
|
|
|
|
def test_apps_get_owner(preferences, logged_in_api_client, factories):
|
|
app = factories["users.Application"](user=logged_in_api_client.user)
|
|
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data == serializers.CreateApplicationSerializer(app).data
|
|
assert response.data["token"] == app.token
|
|
|
|
|
|
def test_apps_refresh_token(preferences, logged_in_api_client, factories):
|
|
app = factories["users.Application"](user=logged_in_api_client.user)
|
|
old_token = app.token
|
|
url = reverse(
|
|
"api:v1:oauth:apps-refresh_token", kwargs={"client_id": app.client_id}
|
|
)
|
|
response = logged_in_api_client.post(url)
|
|
|
|
app.refresh_from_db()
|
|
assert response.status_code == 200
|
|
assert response.data == serializers.CreateApplicationSerializer(app).data
|
|
assert app.token != old_token
|
|
|
|
|
|
def test_apps_refresh_token_not_owner(preferences, logged_in_api_client, factories):
|
|
app = factories["users.Application"]()
|
|
url = reverse(
|
|
"api:v1:oauth:apps-refresh_token", kwargs={"client_id": app.client_id}
|
|
)
|
|
response = logged_in_api_client.post(url)
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_authorize_view_post(logged_in_client, factories):
|
|
app = factories["users.Application"]()
|
|
url = reverse("api:v1:oauth:authorize")
|
|
response = logged_in_client.post(
|
|
url,
|
|
{
|
|
"allow": True,
|
|
"redirect_uri": app.redirect_uris,
|
|
"client_id": app.client_id,
|
|
"state": "hello",
|
|
"response_type": "code",
|
|
"scope": "read",
|
|
},
|
|
)
|
|
grant = models.Grant.objects.get(application=app)
|
|
assert response.status_code == 302
|
|
assert response["Location"] == "{}?code={}&state={}".format(
|
|
app.redirect_uris, grant.code, "hello"
|
|
)
|
|
|
|
|
|
def test_authorize_view_post_ajax_no_redirect(logged_in_client, factories):
|
|
app = factories["users.Application"]()
|
|
url = reverse("api:v1:oauth:authorize")
|
|
response = logged_in_client.post(
|
|
url,
|
|
{
|
|
"allow": True,
|
|
"redirect_uri": app.redirect_uris,
|
|
"client_id": app.client_id,
|
|
"state": "hello",
|
|
"response_type": "code",
|
|
"scope": "read",
|
|
},
|
|
HTTP_X_REQUESTED_WITH="XMLHttpRequest",
|
|
)
|
|
assert response.status_code == 200
|
|
grant = models.Grant.objects.get(application=app)
|
|
assert json.loads(response.content.decode()) == {
|
|
"redirect_uri": "{}?code={}&state={}".format(
|
|
app.redirect_uris, grant.code, "hello"
|
|
),
|
|
"code": grant.code,
|
|
}
|
|
|
|
|
|
def test_authorize_view_post_ajax_oob(logged_in_client, factories):
|
|
app = factories["users.Application"](redirect_uris="urn:ietf:wg:oauth:2.0:oob")
|
|
url = reverse("api:v1:oauth:authorize")
|
|
response = logged_in_client.post(
|
|
url,
|
|
{
|
|
"allow": True,
|
|
"redirect_uri": app.redirect_uris,
|
|
"client_id": app.client_id,
|
|
"state": "hello",
|
|
"response_type": "code",
|
|
"scope": "read",
|
|
},
|
|
HTTP_X_REQUESTED_WITH="XMLHttpRequest",
|
|
)
|
|
assert response.status_code == 200
|
|
grant = models.Grant.objects.get(application=app)
|
|
assert json.loads(response.content.decode()) == {
|
|
"redirect_uri": "{}?code={}&state={}".format(
|
|
app.redirect_uris, grant.code, "hello"
|
|
),
|
|
"code": grant.code,
|
|
}
|
|
|
|
|
|
def test_authorize_view_invalid_form(logged_in_client, factories):
|
|
url = reverse("api:v1:oauth:authorize")
|
|
response = logged_in_client.post(
|
|
url,
|
|
{
|
|
"allow": True,
|
|
"redirect_uri": "",
|
|
"client_id": "Noop",
|
|
"state": "hello",
|
|
"response_type": "code",
|
|
"scope": "read",
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
assert json.loads(response.content.decode()) == {
|
|
"redirect_uri": ["This field is required."]
|
|
}
|
|
|
|
|
|
def test_authorize_view_invalid_redirect_url(logged_in_client, factories):
|
|
app = factories["users.Application"]()
|
|
url = reverse("api:v1:oauth:authorize")
|
|
response = logged_in_client.post(
|
|
url,
|
|
{
|
|
"allow": True,
|
|
"redirect_uri": "http://wrong.url",
|
|
"client_id": app.client_id,
|
|
"state": "hello",
|
|
"response_type": "code",
|
|
"scope": "read",
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
assert json.loads(response.content.decode()) == {
|
|
"detail": "Mismatching redirect URI."
|
|
}
|
|
|
|
|
|
def test_authorize_view_invalid_oauth(logged_in_client, factories):
|
|
app = factories["users.Application"]()
|
|
url = reverse("api:v1:oauth:authorize")
|
|
response = logged_in_client.post(
|
|
url,
|
|
{
|
|
"allow": True,
|
|
"redirect_uri": app.redirect_uris,
|
|
"client_id": "wrong_id",
|
|
"state": "hello",
|
|
"response_type": "code",
|
|
"scope": "read",
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
assert json.loads(response.content.decode()) == {
|
|
"non_field_errors": ["Invalid application"]
|
|
}
|
|
|
|
|
|
def test_authorize_view_anonymous(client, factories):
|
|
url = reverse("api:v1:oauth:authorize")
|
|
response = client.post(url, {})
|
|
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_token_view_post(api_client, factories):
|
|
grant = factories["users.Grant"]()
|
|
app = grant.application
|
|
url = reverse("api:v1:oauth:token")
|
|
|
|
# The Client Secret is hashed on save, so we need to set it manually to something
|
|
_client_secret = "random_langer_code_bla_bla"
|
|
app.client_secret = _client_secret
|
|
app.save()
|
|
|
|
response = api_client.post(
|
|
url,
|
|
{
|
|
"redirect_uri": app.redirect_uris,
|
|
"client_id": app.client_id,
|
|
"client_secret": _client_secret,
|
|
"grant_type": "authorization_code",
|
|
"code": grant.code,
|
|
},
|
|
)
|
|
payload = json.loads(response.content.decode())
|
|
|
|
assert "access_token" in payload
|
|
assert "refresh_token" in payload
|
|
assert payload["expires_in"] == 36000
|
|
assert payload["scope"] == grant.scope
|
|
assert payload["token_type"] == "Bearer"
|
|
assert response.status_code == 200
|
|
|
|
with pytest.raises(grant.DoesNotExist):
|
|
grant.refresh_from_db()
|
|
|
|
token = payload["access_token"]
|
|
|
|
# Now check we can use the token for auth
|
|
response = api_client.get(
|
|
reverse("api:v1:users:users-me"), HTTP_AUTHORIZATION="Bearer {}".format(token)
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_revoke_view_post(logged_in_client, factories):
|
|
token = factories["users.AccessToken"]()
|
|
url = reverse("api:v1:oauth:revoke")
|
|
|
|
# The Client Secret is hashed on save, so we need to set it manually to something
|
|
_client_secret = "random_langer_code_bla_bla"
|
|
token.application.client_secret = _client_secret
|
|
token.application.save()
|
|
|
|
response = logged_in_client.post(
|
|
url,
|
|
{
|
|
"token": token.token,
|
|
"client_id": token.application.client_id,
|
|
"client_secret": _client_secret,
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
with pytest.raises(token.DoesNotExist):
|
|
token.refresh_from_db()
|
|
|
|
|
|
def test_grants_list(factories, logged_in_api_client):
|
|
token = factories["users.AccessToken"](user=logged_in_api_client.user)
|
|
refresh_token = factories["users.RefreshToken"](user=logged_in_api_client.user)
|
|
factories["users.AccessToken"]()
|
|
url = reverse("api:v1:oauth:grants-list")
|
|
expected = [
|
|
serializers.ApplicationSerializer(refresh_token.application).data,
|
|
serializers.ApplicationSerializer(token.application).data,
|
|
]
|
|
|
|
response = logged_in_api_client.get(url)
|
|
|
|
assert response.status_code == 200
|
|
assert response.data == expected
|
|
|
|
|
|
def test_grant_delete(factories, logged_in_api_client, mocker, now):
|
|
token = factories["users.AccessToken"](user=logged_in_api_client.user)
|
|
refresh_token = factories["users.RefreshToken"](
|
|
user=logged_in_api_client.user, application=token.application
|
|
)
|
|
grant = factories["users.Grant"](
|
|
user=logged_in_api_client.user, application=token.application
|
|
)
|
|
revoke_token = mocker.spy(token.__class__, "revoke")
|
|
revoke_refresh = mocker.spy(refresh_token.__class__, "revoke")
|
|
to_keep = [
|
|
factories["users.AccessToken"](application=token.application),
|
|
factories["users.RefreshToken"](application=token.application),
|
|
factories["users.Grant"](application=token.application),
|
|
]
|
|
url = reverse(
|
|
"api:v1:oauth:grants-detail", kwargs={"client_id": token.application.client_id}
|
|
)
|
|
|
|
response = logged_in_api_client.delete(url)
|
|
|
|
assert response.status_code == 204
|
|
|
|
revoke_token.assert_called_once()
|
|
revoke_refresh.assert_called_once()
|
|
|
|
with pytest.raises(token.DoesNotExist):
|
|
token.refresh_from_db()
|
|
|
|
with pytest.raises(grant.DoesNotExist):
|
|
grant.refresh_from_db()
|
|
|
|
refresh_token.refresh_from_db()
|
|
assert refresh_token.revoked == now
|
|
|
|
for t in to_keep:
|
|
t.refresh_from_db()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"setting_value, verified_email, expected_status_code",
|
|
[
|
|
("mandatory", False, 401),
|
|
("mandatory", True, 200),
|
|
("optional", True, 200),
|
|
("optional", False, 200),
|
|
],
|
|
)
|
|
def test_token_auth(
|
|
setting_value,
|
|
verified_email,
|
|
expected_status_code,
|
|
api_client,
|
|
factories,
|
|
settings,
|
|
mailoutbox,
|
|
):
|
|
sent_emails = len(mailoutbox)
|
|
user = factories["users.User"](verified_email=verified_email)
|
|
token = factories["users.AccessToken"](user=user)
|
|
settings.ACCOUNT_EMAIL_VERIFICATION = setting_value
|
|
response = api_client.get(
|
|
reverse("api:v1:users:users-me"),
|
|
HTTP_AUTHORIZATION="Bearer {}".format(token.token),
|
|
)
|
|
assert response.status_code == expected_status_code
|
|
|
|
if expected_status_code != 200:
|
|
# confirmation e-mail should have been sent again
|
|
assert len(mailoutbox) == sent_emails + 1
|