Make `MessagesCache#clear` methods asynchronous

This commit is contained in:
Jon Chambers 2023-08-08 10:34:58 -04:00 committed by Jon Chambers
parent 4d8c4d6693
commit 5caa951c61
3 changed files with 17 additions and 20 deletions

View File

@ -329,21 +329,24 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
});
}
public void clear(final UUID destinationUuid) {
// TODO Remove null check in a fully UUID-based world
if (destinationUuid != null) {
for (int i = 1; i < Device.MAXIMUM_DEVICE_ID; i++) {
clear(destinationUuid, i);
}
public CompletableFuture<Void> clear(final UUID destinationUuid) {
final CompletableFuture<?>[] clearFutures = new CompletableFuture[Device.MAXIMUM_DEVICE_ID];
for (int deviceId = 0; deviceId < Device.MAXIMUM_DEVICE_ID; deviceId++) {
clearFutures[deviceId] = clear(destinationUuid, deviceId);
}
return CompletableFuture.allOf(clearFutures);
}
public void clear(final UUID destinationUuid, final long deviceId) {
clearQueueTimer.record(() ->
removeQueueScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, deviceId),
public CompletableFuture<Void> clear(final UUID destinationUuid, final long deviceId) {
final Timer.Sample sample = Timer.start();
return removeQueueScript.executeBinaryAsync(List.of(getMessageQueueKey(destinationUuid, deviceId),
getMessageQueueMetadataKey(destinationUuid, deviceId),
getQueueIndexKey(destinationUuid, deviceId)),
Collections.emptyList()));
Collections.emptyList())
.thenRun(() -> sample.stop(clearQueueTimer));
}
int getNextSlotToPersist() {

View File

@ -103,12 +103,12 @@ public class MessagesManager {
}
public void clear(UUID destinationUuid) {
messagesCache.clear(destinationUuid);
messagesCache.clear(destinationUuid).join();
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid);
}
public void clear(UUID destinationUuid, long deviceId) {
messagesCache.clear(destinationUuid, deviceId);
messagesCache.clear(destinationUuid, deviceId).join();
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId);
}

View File

@ -320,7 +320,7 @@ class MessagesCacheTest {
}
}
messagesCache.clear(DESTINATION_UUID, DESTINATION_DEVICE_ID);
messagesCache.clear(DESTINATION_UUID, DESTINATION_DEVICE_ID).join();
assertEquals(Collections.emptyList(), get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
assertEquals(messageCount, get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount).size());
@ -340,18 +340,12 @@ class MessagesCacheTest {
}
}
messagesCache.clear(DESTINATION_UUID);
messagesCache.clear(DESTINATION_UUID).join();
assertEquals(Collections.emptyList(), get(DESTINATION_UUID, DESTINATION_DEVICE_ID, messageCount));
assertEquals(Collections.emptyList(), get(DESTINATION_UUID, DESTINATION_DEVICE_ID + 1, messageCount));
}
@Test
void testClearNullUuid() {
// We're happy as long as this doesn't throw an exception
messagesCache.clear(null);
}
@Test
void testGetAccountFromQueueName() {
assertEquals(DESTINATION_UUID,