160 lines
5.2 KiB
Python
160 lines
5.2 KiB
Python
import datetime
|
|
import logging
|
|
import os
|
|
|
|
from django.conf import settings
|
|
from django.db.models import Q, F
|
|
from django.utils import timezone
|
|
from dynamic_preferences.registries import global_preferences_registry
|
|
from requests.exceptions import RequestException
|
|
|
|
from funkwhale_api.common import session
|
|
from funkwhale_api.music import models as music_models
|
|
from funkwhale_api.taskapp import celery
|
|
|
|
from . import models, signing
|
|
from . import routes
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery.app.task(name="federation.clean_music_cache")
|
|
def clean_music_cache():
|
|
preferences = global_preferences_registry.manager()
|
|
delay = preferences["federation__music_cache_duration"]
|
|
if delay < 1:
|
|
return # cache clearing disabled
|
|
limit = timezone.now() - datetime.timedelta(minutes=delay)
|
|
|
|
candidates = (
|
|
music_models.TrackFile.objects.filter(
|
|
Q(audio_file__isnull=False)
|
|
& (Q(accessed_date__lt=limit) | Q(accessed_date=None))
|
|
)
|
|
.local(False)
|
|
.exclude(audio_file="")
|
|
.only("audio_file", "id")
|
|
.order_by("id")
|
|
)
|
|
for tf in candidates:
|
|
tf.audio_file.delete()
|
|
|
|
# we also delete orphaned files, if any
|
|
storage = models.LibraryTrack._meta.get_field("audio_file").storage
|
|
files = get_files(storage, "federation_cache/tracks")
|
|
existing = music_models.TrackFile.objects.filter(audio_file__in=files)
|
|
missing = set(files) - set(existing.values_list("audio_file", flat=True))
|
|
for m in missing:
|
|
storage.delete(m)
|
|
|
|
|
|
def get_files(storage, *parts):
|
|
"""
|
|
This is a recursive function that return all files available
|
|
in a given directory using django's storage.
|
|
"""
|
|
if not parts:
|
|
raise ValueError("Missing path")
|
|
|
|
dirs, files = storage.listdir(os.path.join(*parts))
|
|
for dir in dirs:
|
|
files += get_files(storage, *(list(parts) + [dir]))
|
|
return [os.path.join(parts[-1], path) for path in files]
|
|
|
|
|
|
@celery.app.task(name="federation.dispatch_inbox")
|
|
@celery.require_instance(models.Activity.objects.select_related(), "activity")
|
|
def dispatch_inbox(activity):
|
|
"""
|
|
Given an activity instance, triggers our internal delivery logic (follow
|
|
creation, etc.)
|
|
"""
|
|
|
|
try:
|
|
routes.inbox.dispatch(
|
|
activity.payload,
|
|
context={
|
|
"actor": activity.actor,
|
|
"inbox_items": list(activity.inbox_items.local().select_related()),
|
|
},
|
|
)
|
|
except Exception:
|
|
activity.inbox_items.local().update(
|
|
delivery_attempts=F("delivery_attempts") + 1,
|
|
last_delivery_date=timezone.now(),
|
|
)
|
|
raise
|
|
else:
|
|
activity.inbox_items.local().update(
|
|
delivery_attempts=F("delivery_attempts") + 1,
|
|
last_delivery_date=timezone.now(),
|
|
is_delivered=True,
|
|
)
|
|
|
|
|
|
@celery.app.task(name="federation.dispatch_outbox")
|
|
@celery.require_instance(models.Activity.objects.select_related(), "activity")
|
|
def dispatch_outbox(activity):
|
|
"""
|
|
Deliver a local activity to its recipients
|
|
"""
|
|
inbox_items = activity.inbox_items.all().select_related("actor")
|
|
local_recipients_items = [ii for ii in inbox_items if ii.actor.is_local]
|
|
if local_recipients_items:
|
|
dispatch_inbox.delay(activity_id=activity.pk)
|
|
remote_recipients_items = [ii for ii in inbox_items if not ii.actor.is_local]
|
|
|
|
shared_inbox_urls = {
|
|
ii.actor.shared_inbox_url
|
|
for ii in remote_recipients_items
|
|
if ii.actor.shared_inbox_url
|
|
}
|
|
inbox_urls = {
|
|
ii.actor.inbox_url
|
|
for ii in remote_recipients_items
|
|
if not ii.actor.shared_inbox_url
|
|
}
|
|
for url in shared_inbox_urls:
|
|
deliver_to_remote_inbox.delay(activity_id=activity.pk, shared_inbox_url=url)
|
|
|
|
for url in inbox_urls:
|
|
deliver_to_remote_inbox.delay(activity_id=activity.pk, inbox_url=url)
|
|
|
|
|
|
@celery.app.task(
|
|
name="federation.deliver_to_remote_inbox",
|
|
autoretry_for=[RequestException],
|
|
retry_backoff=30,
|
|
max_retries=5,
|
|
)
|
|
@celery.require_instance(models.Activity.objects.select_related(), "activity")
|
|
def deliver_to_remote_inbox(activity, inbox_url=None, shared_inbox_url=None):
|
|
url = inbox_url or shared_inbox_url
|
|
actor = activity.actor
|
|
inbox_items = activity.inbox_items.filter(is_delivered=False)
|
|
if inbox_url:
|
|
inbox_items = inbox_items.filter(actor__inbox_url=inbox_url)
|
|
else:
|
|
inbox_items = inbox_items.filter(actor__shared_inbox_url=shared_inbox_url)
|
|
logger.info("Preparing activity delivery to %s", url)
|
|
auth = signing.get_auth(actor.private_key, actor.private_key_id)
|
|
try:
|
|
response = session.get_session().post(
|
|
auth=auth,
|
|
json=activity.payload,
|
|
url=url,
|
|
timeout=5,
|
|
verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
|
|
headers={"Content-Type": "application/activity+json"},
|
|
)
|
|
logger.debug("Remote answered with %s", response.status_code)
|
|
response.raise_for_status()
|
|
except Exception:
|
|
inbox_items.update(
|
|
last_delivery_date=timezone.now(),
|
|
delivery_attempts=F("delivery_attempts") + 1,
|
|
)
|
|
raise
|
|
else:
|
|
inbox_items.update(last_delivery_date=timezone.now(), is_delivered=True)
|