110 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			110 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
| import unicodedata
 | |
| import re
 | |
| from django.conf import settings
 | |
| 
 | |
| from funkwhale_api.common import session
 | |
| from funkwhale_api.moderation import models as moderation_models
 | |
| 
 | |
| from . import exceptions
 | |
| from . import signing
 | |
| 
 | |
| 
 | |
| def full_url(path):
 | |
|     """
 | |
|     Given a relative path, return a full url usable for federation purpose
 | |
|     """
 | |
|     if path.startswith("http://") or path.startswith("https://"):
 | |
|         return path
 | |
|     root = settings.FUNKWHALE_URL
 | |
|     if path.startswith("/") and root.endswith("/"):
 | |
|         return root + path[1:]
 | |
|     elif not path.startswith("/") and not root.endswith("/"):
 | |
|         return root + "/" + path
 | |
|     else:
 | |
|         return root + path
 | |
| 
 | |
| 
 | |
| def clean_wsgi_headers(raw_headers):
 | |
|     """
 | |
|     Convert WSGI headers from CONTENT_TYPE to Content-Type notation
 | |
|     """
 | |
|     cleaned = {}
 | |
|     non_prefixed = ["content_type", "content_length"]
 | |
|     for raw_header, value in raw_headers.items():
 | |
|         h = raw_header.lower()
 | |
|         if not h.startswith("http_") and h not in non_prefixed:
 | |
|             continue
 | |
| 
 | |
|         words = h.replace("http_", "", 1).split("_")
 | |
|         cleaned_header = "-".join([w.capitalize() for w in words])
 | |
|         cleaned[cleaned_header] = value
 | |
| 
 | |
|     return cleaned
 | |
| 
 | |
| 
 | |
| def slugify_username(username):
 | |
|     """
 | |
|     Given a username such as "hello M. world", returns a username
 | |
|     suitable for federation purpose (hello_M_world).
 | |
| 
 | |
|     Preserves the original case.
 | |
| 
 | |
|     Code is borrowed from django's slugify function.
 | |
|     """
 | |
| 
 | |
|     value = str(username)
 | |
|     value = (
 | |
|         unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii")
 | |
|     )
 | |
|     value = re.sub(r"[^\w\s-]", "", value).strip()
 | |
|     return re.sub(r"[-\s]+", "_", value)
 | |
| 
 | |
| 
 | |
| def retrieve_ap_object(
 | |
|     fid, actor=None, serializer_class=None, queryset=None, apply_instance_policies=True
 | |
| ):
 | |
|     from . import activity
 | |
| 
 | |
|     policies = moderation_models.InstancePolicy.objects.active().filter(block_all=True)
 | |
|     if apply_instance_policies and policies.matching_url(fid):
 | |
|         raise exceptions.BlockedActorOrDomain()
 | |
|     if queryset:
 | |
|         try:
 | |
|             # queryset can also be a Model class
 | |
|             existing = queryset.filter(fid=fid).first()
 | |
|         except AttributeError:
 | |
|             existing = queryset.objects.filter(fid=fid).first()
 | |
|         if existing:
 | |
|             return existing
 | |
| 
 | |
|     auth = (
 | |
|         None if not actor else signing.get_auth(actor.private_key, actor.private_key_id)
 | |
|     )
 | |
|     response = session.get_session().get(
 | |
|         fid,
 | |
|         auth=auth,
 | |
|         timeout=5,
 | |
|         verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
 | |
|         headers={
 | |
|             "Accept": "application/activity+json",
 | |
|             "Content-Type": "application/activity+json",
 | |
|         },
 | |
|     )
 | |
|     response.raise_for_status()
 | |
|     data = response.json()
 | |
| 
 | |
|     # we match against moderation policies here again, because the FID of the returned
 | |
|     # object may not be the same as the URL used to access it
 | |
|     try:
 | |
|         id = data["id"]
 | |
|     except KeyError:
 | |
|         pass
 | |
|     else:
 | |
|         if apply_instance_policies and activity.should_reject(id=id, payload=data):
 | |
|             raise exceptions.BlockedActorOrDomain()
 | |
|     if not serializer_class:
 | |
|         return data
 | |
|     serializer = serializer_class(data=data)
 | |
|     serializer.is_valid(raise_exception=True)
 | |
|     return serializer.save()
 |