from __future__ import absolute_import import functools import os from celery import Celery from django.apps import AppConfig from django.conf import settings if not settings.configured: # set the default Django settings module for the 'celery' program. os.environ.setdefault( "DJANGO_SETTINGS_MODULE", "config.settings.local" ) # pragma: no cover app = Celery("funkwhale_api") class CeleryConfig(AppConfig): name = "funkwhale_api.taskapp" verbose_name = "Celery Config" def ready(self): # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, force=True) def require_instance(model_or_qs, parameter_name, id_kwarg_name=None): def decorator(function): @functools.wraps(function) def inner(*args, **kwargs): kw = id_kwarg_name or "_".join([parameter_name, "id"]) pk = kwargs.pop(kw) try: instance = model_or_qs.get(pk=pk) except AttributeError: instance = model_or_qs.objects.get(pk=pk) kwargs[parameter_name] = instance return function(*args, **kwargs) return inner return decorator