do not loose track that stayed in the cache
This commit is contained in:
parent
b95ef93d18
commit
d52b74a55e
|
@ -124,9 +124,13 @@ class SessionRadio(SimpleRadio):
|
||||||
self.session.add(picked_choices)
|
self.session.add(picked_choices)
|
||||||
return picked_choices
|
return picked_choices
|
||||||
|
|
||||||
def cache_batch_radio_track(self, quantity, **kwargs):
|
def cache_batch_radio_track(self, **kwargs):
|
||||||
BATCH_SIZE = 100
|
BATCH_SIZE = 100
|
||||||
|
# get cached RadioTracks if any
|
||||||
|
old_evaluated_radio_tracks = cache.get(f"radiosessiontracks{self.session.id}")
|
||||||
|
|
||||||
# get the queryset and apply filters
|
# get the queryset and apply filters
|
||||||
|
kwargs.update(self.get_queryset_kwargs())
|
||||||
queryset = self.get_queryset(**kwargs)
|
queryset = self.get_queryset(**kwargs)
|
||||||
queryset = self.filter_already_played_from_session(queryset)
|
queryset = self.filter_already_played_from_session(queryset)
|
||||||
if kwargs["filter_playable"] is True:
|
if kwargs["filter_playable"] is True:
|
||||||
|
@ -137,18 +141,24 @@ class SessionRadio(SimpleRadio):
|
||||||
|
|
||||||
# select a random batch of the qs
|
# select a random batch of the qs
|
||||||
sliced_queryset = queryset.order_by("?")[:BATCH_SIZE]
|
sliced_queryset = queryset.order_by("?")[:BATCH_SIZE]
|
||||||
if len(sliced_queryset) == 0:
|
if len(sliced_queryset) == 0 and not old_evaluated_radio_tracks:
|
||||||
raise ValueError("No more radio candidates")
|
raise ValueError("No more radio candidates")
|
||||||
# create the radio session tracks into db in bulk
|
|
||||||
radio_tracks = self.session.add(sliced_queryset)
|
|
||||||
|
|
||||||
# evaluate the queryset to save it in cache
|
if len(sliced_queryset) > 0:
|
||||||
evaluated_radio_tracks = [t for t in radio_tracks]
|
# create the radio session tracks into db in bulk
|
||||||
logger.debug(
|
radio_tracks = self.session.add(sliced_queryset)
|
||||||
f"Setting redis cache for radio generation with radio id {self.session.id}"
|
|
||||||
)
|
# evaluate the queryset to save it in cache
|
||||||
cache.set(f"radiosessiontracks{self.session.id}", evaluated_radio_tracks, 3600)
|
evaluated_radio_tracks = [t for t in radio_tracks]
|
||||||
cache.set(f"radioqueryset{self.session.id}", sliced_queryset, 3600)
|
if old_evaluated_radio_tracks is not None:
|
||||||
|
evaluated_radio_tracks.append(old_evaluated_radio_tracks)
|
||||||
|
logger.info(
|
||||||
|
f"Setting redis cache for radio generation with radio id {self.session.id}"
|
||||||
|
)
|
||||||
|
cache.set(
|
||||||
|
f"radiosessiontracks{self.session.id}", evaluated_radio_tracks, 3600
|
||||||
|
)
|
||||||
|
cache.set(f"radioqueryset{self.session.id}", sliced_queryset, 3600)
|
||||||
|
|
||||||
return sliced_queryset
|
return sliced_queryset
|
||||||
|
|
||||||
|
@ -163,38 +173,31 @@ class SessionRadio(SimpleRadio):
|
||||||
return queryset
|
return queryset
|
||||||
|
|
||||||
def get_choices_v2(self, quantity, **kwargs):
|
def get_choices_v2(self, quantity, **kwargs):
|
||||||
kwargs.update(self.get_queryset_kwargs())
|
|
||||||
if cached_radio_tracks := cache.get(f"radiosessiontracks{self.session.id}"):
|
if cached_radio_tracks := cache.get(f"radiosessiontracks{self.session.id}"):
|
||||||
logger.debug("Using redis cache for radio generation")
|
logger.info("Using redis cache for radio generation")
|
||||||
radio_tracks = cached_radio_tracks
|
radio_tracks = cached_radio_tracks
|
||||||
if len(radio_tracks) < quantity:
|
if len(radio_tracks) < quantity:
|
||||||
logger.debug(
|
logger.info(
|
||||||
"Not enough radio tracks in cache. Trying to generate new cache"
|
"Not enough radio tracks in cache. Trying to generate new cache"
|
||||||
)
|
)
|
||||||
sliced_queryset = self.cache_batch_radio_track(quantity, **kwargs)
|
sliced_queryset = self.cache_batch_radio_track(**kwargs)
|
||||||
sliced_queryset = cache.get(f"radioqueryset{self.session.id}")
|
sliced_queryset = cache.get(f"radioqueryset{self.session.id}")
|
||||||
else:
|
else:
|
||||||
sliced_queryset = self.cache_batch_radio_track(quantity, **kwargs)
|
sliced_queryset = self.cache_batch_radio_track(**kwargs)
|
||||||
|
|
||||||
return sliced_queryset
|
return sliced_queryset[:quantity]
|
||||||
|
|
||||||
def pick_v2(self, **kwargs):
|
def pick_v2(self, **kwargs):
|
||||||
return self.pick_many_v2(quantity=1, **kwargs)[0]
|
return self.pick_many_v2(quantity=1, **kwargs)[0]
|
||||||
|
|
||||||
def pick_many_v2(self, quantity, **kwargs):
|
def pick_many_v2(self, quantity, **kwargs):
|
||||||
if self.session:
|
if self.session:
|
||||||
sliced_queryset = self.get_choices_v2(quantity, **kwargs)
|
sliced_queryset = self.get_choices_v2(quantity=quantity, **kwargs)
|
||||||
evaluated_radio_tracks = cache.get(f"radiosessiontracks{self.session.id}")
|
|
||||||
batch = evaluated_radio_tracks[0:quantity]
|
|
||||||
for radiotrack in batch:
|
|
||||||
radiotrack.played = True
|
|
||||||
RadioSessionTrack.objects.bulk_update(batch, ["played"])
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.debug(
|
logger.info(
|
||||||
"No radio session. Can't track user playback. Won't cache queryset results"
|
"No radio session. Can't track user playback. Won't cache queryset results"
|
||||||
)
|
)
|
||||||
sliced_queryset = self.get_choices_v2(quantity, **kwargs)
|
sliced_queryset = self.get_choices_v2(quantity=quantity, **kwargs)
|
||||||
|
|
||||||
return sliced_queryset
|
return sliced_queryset
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ from rest_framework.decorators import action
|
||||||
from rest_framework.response import Response
|
from rest_framework.response import Response
|
||||||
|
|
||||||
from funkwhale_api.common import permissions as common_permissions
|
from funkwhale_api.common import permissions as common_permissions
|
||||||
|
from funkwhale_api.radios.models import RadioSessionTrack
|
||||||
from funkwhale_api.music import utils as music_utils
|
from funkwhale_api.music import utils as music_utils
|
||||||
from funkwhale_api.music.serializers import TrackSerializer
|
from funkwhale_api.music.serializers import TrackSerializer
|
||||||
from funkwhale_api.users.oauth import permissions as oauth_permissions
|
from funkwhale_api.users.oauth import permissions as oauth_permissions
|
||||||
|
@ -181,6 +182,7 @@ class RadioSessionTracksViewSet(mixins.CreateModelMixin, viewsets.GenericViewSet
|
||||||
if "count" in serializer.validated_data.keys()
|
if "count" in serializer.validated_data.keys()
|
||||||
else 1
|
else 1
|
||||||
)
|
)
|
||||||
|
# this is used for test purpose.
|
||||||
filter_playable = (
|
filter_playable = (
|
||||||
request.data["filter_playable"]
|
request.data["filter_playable"]
|
||||||
if "filter_playable" in request.data.keys()
|
if "filter_playable" in request.data.keys()
|
||||||
|
@ -205,14 +207,21 @@ class RadioSessionTracksViewSet(mixins.CreateModelMixin, viewsets.GenericViewSet
|
||||||
# self.perform_create(serializer)
|
# self.perform_create(serializer)
|
||||||
# dirty override here, since we use a different serializer for creation and detail
|
# dirty override here, since we use a different serializer for creation and detail
|
||||||
evaluated_radio_tracks = cache.get(f"radiosessiontracks{session.id}")
|
evaluated_radio_tracks = cache.get(f"radiosessiontracks{session.id}")
|
||||||
|
batch = evaluated_radio_tracks[:count]
|
||||||
|
|
||||||
serializer = self.serializer_class(
|
serializer = self.serializer_class(
|
||||||
data=evaluated_radio_tracks[:count],
|
data=batch,
|
||||||
context=self.get_serializer_context(),
|
context=self.get_serializer_context(),
|
||||||
many="true",
|
many="true",
|
||||||
)
|
)
|
||||||
serializer.is_valid()
|
serializer.is_valid()
|
||||||
headers = self.get_success_headers(serializer.data)
|
headers = self.get_success_headers(serializer.data)
|
||||||
|
|
||||||
|
# mark the RadioTracks has played
|
||||||
|
for radiotrack in batch:
|
||||||
|
radiotrack.played = True
|
||||||
|
RadioSessionTrack.objects.bulk_update(batch, ["played"])
|
||||||
|
|
||||||
# delete the tracks we send from the cache
|
# delete the tracks we send from the cache
|
||||||
new_cached_radiotracks = evaluated_radio_tracks[count:]
|
new_cached_radiotracks = evaluated_radio_tracks[count:]
|
||||||
cache.set(f"radiosessiontracks{session.id}", new_cached_radiotracks)
|
cache.set(f"radiosessiontracks{session.id}", new_cached_radiotracks)
|
||||||
|
|
|
@ -1,4 +1,6 @@
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
import logging
|
||||||
import random
|
import random
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -523,14 +525,35 @@ def test_can_get_choices_for_custom_radio_v2(factories):
|
||||||
def test_can_cache_radio_track(factories):
|
def test_can_cache_radio_track(factories):
|
||||||
uploads = factories["music.Track"].create_batch(10)
|
uploads = factories["music.Track"].create_batch(10)
|
||||||
user = factories["users.User"]()
|
user = factories["users.User"]()
|
||||||
for t in Track.objects.all().playable_by(user.actor):
|
|
||||||
assert t in uploads
|
|
||||||
|
|
||||||
radio = radios.RandomRadio()
|
radio = radios.RandomRadio()
|
||||||
session = radio.start_session(user)
|
session = radio.start_session(user)
|
||||||
picked = session.radio.pick_many_v2(quantity=1, filter_playable=False)
|
picked = session.radio.pick_many_v2(quantity=1, filter_playable=False)
|
||||||
assert len(picked) == 10
|
assert len(picked) == 1
|
||||||
for t in cache.get(f"radioqueryset{session.id}"):
|
|
||||||
assert t in picked
|
|
||||||
for t in cache.get(f"radiosessiontracks{session.id}"):
|
for t in cache.get(f"radiosessiontracks{session.id}"):
|
||||||
assert t.track in uploads
|
assert t.track in uploads
|
||||||
|
|
||||||
|
|
||||||
|
def test_regenerate_cache_if_not_enought_tracks_in_it(
|
||||||
|
factories, caplog, logged_in_api_client
|
||||||
|
):
|
||||||
|
logger = logging.getLogger("funkwhale_api.radios.radios")
|
||||||
|
caplog.set_level(logging.INFO)
|
||||||
|
logger.addHandler(caplog.handler)
|
||||||
|
|
||||||
|
factories["music.Track"].create_batch(10)
|
||||||
|
user = factories["users.User"]()
|
||||||
|
url = reverse("api:v1:radios:sessions-list")
|
||||||
|
response = logged_in_api_client.post(url, {"radio_type": "random"})
|
||||||
|
session = models.RadioSession.objects.latest("id")
|
||||||
|
url = reverse("api:v2:radios:tracks-list")
|
||||||
|
logged_in_api_client.post(
|
||||||
|
url, {"session": session.pk, "count": 9, "filter_playable": False}
|
||||||
|
)
|
||||||
|
response = logged_in_api_client.post(
|
||||||
|
url, {"session": session.pk, "count": 10, "filter_playable": False}
|
||||||
|
)
|
||||||
|
pick = json.loads(response.content.decode("utf-8"))
|
||||||
|
assert (
|
||||||
|
"Not enough radio tracks in cache. Trying to generate new cache" in caplog.text
|
||||||
|
)
|
||||||
|
assert len(pick) == 1
|
||||||
|
|
Loading…
Reference in New Issue