Merge branch 'federation-signing-requests' into 'develop'

Federation signing requests

See merge request funkwhale/funkwhale!106
This commit is contained in:
Eliot Berriot 2018-03-26 20:14:28 +00:00
commit 85ce220f29
7 changed files with 217 additions and 2 deletions

View File

@ -89,6 +89,7 @@ LOCAL_APPS = (
'funkwhale_api.music',
'funkwhale_api.requests',
'funkwhale_api.favorites',
'funkwhale_api.federation',
'funkwhale_api.radios',
'funkwhale_api.history',
'funkwhale_api.playlists',

View File

View File

@ -0,0 +1,30 @@
import factory
import requests
import requests_http_signature
from funkwhale_api.factories import registry
from . import signing
registry.register(signing.get_key_pair, name='federation.KeyPair')
@registry.register(name='federation.SignatureAuth')
class SignatureAuthFactory(factory.Factory):
algorithm = 'rsa-sha256'
key = factory.LazyFunction(lambda: signing.get_key_pair()[0])
key_id = factory.Faker('url')
class Meta:
model = requests_http_signature.HTTPSignatureAuth
@registry.register(name='federation.SignedRequest')
class SignedRequestFactory(factory.Factory):
url = factory.Faker('url')
method = 'get'
auth = factory.SubFactory(SignatureAuthFactory)
class Meta:
model = requests.Request

View File

@ -0,0 +1,56 @@
import requests
import requests_http_signature
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend as crypto_default_backend
def get_key_pair(size=2048):
key = rsa.generate_private_key(
backend=crypto_default_backend(),
public_exponent=65537,
key_size=size
)
private_key = key.private_bytes(
crypto_serialization.Encoding.PEM,
crypto_serialization.PrivateFormat.PKCS8,
crypto_serialization.NoEncryption())
public_key = key.public_key().public_bytes(
crypto_serialization.Encoding.PEM,
crypto_serialization.PublicFormat.PKCS1
)
return private_key, public_key
def verify(request, public_key):
return requests_http_signature.HTTPSignatureAuth.verify(
request,
key_resolver=lambda **kwargs: public_key
)
def verify_django(django_request, public_key):
"""
Given a django WSGI request, create an underlying requests.PreparedRequest
instance we can verify
"""
headers = django_request.META.get('headers', {}).copy()
for h, v in list(headers.items()):
# we include lower-cased version of the headers for compatibility
# with requests_http_signature
headers[h.lower()] = v
try:
signature = headers['authorization']
except KeyError:
raise exceptions.MissingSignature
request = requests.Request(
method=django_request.method,
url='http://noop',
data=django_request.body,
headers=headers)
prepared_request = request.prepare()
return verify(request, public_key)

View File

@ -60,3 +60,5 @@ channels_redis>=2.1,<2.2
django-cacheops>=4,<4.1
daphne==2.0.4
cryptography>=2,<3
requests-http-signature==0.0.3

View File

@ -32,7 +32,11 @@ def cache():
def factories(db):
from funkwhale_api import factories
for v in factories.registry.values():
v._meta.strategy = factory.CREATE_STRATEGY
try:
v._meta.strategy = factory.CREATE_STRATEGY
except AttributeError:
# probably not a class based factory
pass
yield factories.registry
@ -40,7 +44,11 @@ def factories(db):
def nodb_factories():
from funkwhale_api import factories
for v in factories.registry.values():
v._meta.strategy = factory.BUILD_STRATEGY
try:
v._meta.strategy = factory.BUILD_STRATEGY
except AttributeError:
# probably not a class based factory
pass
yield factories.registry

View File

@ -0,0 +1,118 @@
import cryptography.exceptions
import io
import pytest
import requests_http_signature
from funkwhale_api.federation import signing
def test_can_sign_and_verify_request(factories):
private, public = factories['federation.KeyPair']()
auth = factories['federation.SignatureAuth'](key=private)
request = factories['federation.SignedRequest'](
auth=auth
)
prepared_request = request.prepare()
assert 'date' in prepared_request.headers
assert 'authorization' in prepared_request.headers
assert prepared_request.headers['authorization'].startswith('Signature')
assert signing.verify(prepared_request, public) is None
def test_can_sign_and_verify_request_digest(factories):
private, public = factories['federation.KeyPair']()
auth = factories['federation.SignatureAuth'](key=private)
request = factories['federation.SignedRequest'](
auth=auth,
method='post',
data=b'hello=world'
)
prepared_request = request.prepare()
assert 'date' in prepared_request.headers
assert 'digest' in prepared_request.headers
assert 'authorization' in prepared_request.headers
assert prepared_request.headers['authorization'].startswith('Signature')
assert signing.verify(prepared_request, public) is None
def test_verify_fails_with_wrong_key(factories):
wrong_private, wrong_public = factories['federation.KeyPair']()
request = factories['federation.SignedRequest']()
prepared_request = request.prepare()
with pytest.raises(cryptography.exceptions.InvalidSignature):
signing.verify(prepared_request, wrong_public)
def test_can_verify_django_request(factories, api_request):
private_key, public_key = signing.get_key_pair()
signed_request = factories['federation.SignedRequest'](
auth__key=private_key
)
prepared = signed_request.prepare()
django_request = api_request.get(
'/',
headers={
'Date': prepared.headers['date'],
'Authorization': prepared.headers['authorization'],
}
)
assert signing.verify_django(django_request, public_key) is None
def test_can_verify_django_request_digest(factories, api_request):
private_key, public_key = signing.get_key_pair()
signed_request = factories['federation.SignedRequest'](
auth__key=private_key,
method='post',
data=b'hello=world'
)
prepared = signed_request.prepare()
django_request = api_request.post(
'/',
headers={
'Date': prepared.headers['date'],
'Digest': prepared.headers['digest'],
'Authorization': prepared.headers['authorization'],
}
)
assert signing.verify_django(django_request, public_key) is None
def test_can_verify_django_request_digest_failure(factories, api_request):
private_key, public_key = signing.get_key_pair()
signed_request = factories['federation.SignedRequest'](
auth__key=private_key,
method='post',
data=b'hello=world'
)
prepared = signed_request.prepare()
django_request = api_request.post(
'/',
headers={
'Date': prepared.headers['date'],
'Digest': prepared.headers['digest'] + 'noop',
'Authorization': prepared.headers['authorization'],
}
)
with pytest.raises(cryptography.exceptions.InvalidSignature):
signing.verify_django(django_request, public_key)
def test_can_verify_django_request_failure(factories, api_request):
private_key, public_key = signing.get_key_pair()
signed_request = factories['federation.SignedRequest'](
auth__key=private_key
)
prepared = signed_request.prepare()
django_request = api_request.get(
'/',
headers={
'Date': 'Wrong',
'Authorization': prepared.headers['authorization'],
}
)
with pytest.raises(cryptography.exceptions.InvalidSignature):
signing.verify_django(django_request, public_key)