Move additional handling of `MessagesManager#delete` to executor

This commit is contained in:
Chris Eager 2022-11-02 10:51:44 -05:00 committed by Chris Eager
parent c6a79ca176
commit e0178fa0ea
10 changed files with 30 additions and 17 deletions

View File

@ -469,9 +469,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, Clock.systemUTC(),
keyspaceNotificationDispatchExecutor, messageDeletionAsyncExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, config.getReportMessageConfiguration().getCounterTtl());
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager);
UsernameGenerator usernameGenerator = new UsernameGenerator(config.getUsername());
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
config.getReportMessageConfiguration().getCounterTtl());
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager,
messageDeletionAsyncExecutor);
UsernameGenerator usernameGenerator = new UsernameGenerator(config.getUsername());
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
deletedAccountsLockDynamoDbClient, config.getDynamoDbTables().getDeletedAccountsLock().getTableName());
AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster,

View File

@ -30,6 +30,8 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@ -60,6 +62,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private final String tableName;
private final Duration timeToLive;
private final ExecutorService messageDeletionExecutor;
private final Scheduler messageDeletionScheduler;
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
@ -72,6 +75,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
this.timeToLive = timeToLive;
this.messageDeletionExecutor = messageDeletionExecutor;
this.messageDeletionScheduler = Schedulers.fromExecutor(messageDeletionExecutor);
}
public void store(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid, final long destinationDeviceId) {
@ -176,6 +180,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
return null;
})
.map(Optional::ofNullable)
.subscribeOn(messageDeletionScheduler)
.last(Optional.empty()) // if the flux is empty, last() will throw without a default
.toFuture();
}

View File

@ -15,6 +15,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@ -44,14 +45,17 @@ public class MessagesManager {
private final MessagesDynamoDb messagesDynamoDb;
private final MessagesCache messagesCache;
private final ReportMessageManager reportMessageManager;
private final ExecutorService messageDeletionExecutor;
public MessagesManager(
final MessagesDynamoDb messagesDynamoDb,
final MessagesCache messagesCache,
final ReportMessageManager reportMessageManager) {
final ReportMessageManager reportMessageManager,
final ExecutorService messageDeletionExecutor) {
this.messagesDynamoDb = messagesDynamoDb;
this.messagesCache = messagesCache;
this.reportMessageManager = reportMessageManager;
this.messageDeletionExecutor = messageDeletionExecutor;
}
public void insert(UUID destinationUuid, long destinationDevice, Envelope message) {
@ -111,7 +115,7 @@ public class MessagesManager {
public CompletableFuture<Optional<Envelope>> delete(UUID destinationUuid, long destinationDeviceId, UUID guid,
@Nullable Long serverTimestamp) {
return messagesCache.remove(destinationUuid, destinationDeviceId, guid)
.thenCompose(removed -> {
.thenComposeAsync(removed -> {
if (removed.isPresent()) {
cacheHitByGuidMeter.mark();
@ -126,7 +130,7 @@ public class MessagesManager {
return messagesDynamoDb.deleteMessage(destinationUuid, destinationDeviceId, guid, serverTimestamp);
}
});
}, messageDeletionExecutor);
}
/**

View File

@ -127,7 +127,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private final Random random = new Random();
private final boolean useReactive;
private Scheduler reactiveScheduler;
private final Scheduler reactiveScheduler;
private enum StoredMessageState {
EMPTY,

View File

@ -186,7 +186,7 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
configuration.getReportMessageConfiguration().getCounterTtl());
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache,
reportMessageManager);
reportMessageManager, messageDeletionExecutor);
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
deletedAccountsLockDynamoDbClient,
configuration.getDynamoDbTables().getDeletedAccountsLock().getTableName());

View File

@ -186,9 +186,9 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
configuration.getDynamoDbTables().getReportMessage().getTableName(),
configuration.getReportMessageConfiguration().getReportTtl());
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
configuration.getReportMessageConfiguration().getCounterTtl());
configuration.getReportMessageConfiguration().getCounterTtl());
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache,
reportMessageManager);
reportMessageManager, messageDeletionExecutor);
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
deletedAccountsLockDynamoDbClient,
configuration.getDynamoDbTables().getDeletedAccountsLock().getTableName());

View File

@ -187,9 +187,9 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
configuration.getDynamoDbTables().getReportMessage().getTableName(),
configuration.getReportMessageConfiguration().getReportTtl());
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
configuration.getReportMessageConfiguration().getCounterTtl());
configuration.getReportMessageConfiguration().getCounterTtl());
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache,
reportMessageManager);
reportMessageManager, messageDeletionExecutor);
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
deletedAccountsLockDynamoDbClient,
configuration.getDynamoDbTables().getDeletedAccountsLock().getTableName());

View File

@ -77,7 +77,8 @@ class MessagePersisterIntegrationTest {
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
REDIS_CLUSTER_EXTENSION.getRedisCluster(), Clock.systemUTC(), notificationExecutorService,
messageDeletionExecutorService);
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class));
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class),
messageDeletionExecutorService);
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY);

View File

@ -12,6 +12,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.util.UUID;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
@ -22,7 +23,7 @@ class MessagesManagerTest {
private final ReportMessageManager reportMessageManager = mock(ReportMessageManager.class);
private final MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache,
reportMessageManager);
reportMessageManager, Executors.newSingleThreadExecutor());
@Test
void insert() {

View File

@ -122,7 +122,7 @@ class WebSocketConnectionIntegrationTest {
final boolean useReactive) {
final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new AuthenticatedAccount(() -> new Pair<>(account, device)),
device,
webSocketClient,
@ -207,7 +207,7 @@ class WebSocketConnectionIntegrationTest {
void testProcessStoredMessagesClientClosed(final boolean useReactive) {
final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new AuthenticatedAccount(() -> new Pair<>(account, device)),
device,
webSocketClient,
@ -273,7 +273,7 @@ class WebSocketConnectionIntegrationTest {
void testProcessStoredMessagesSendFutureTimeout(final boolean useReactive) {
final WebSocketConnection webSocketConnection = new WebSocketConnection(
mock(ReceiptSender.class),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager),
new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService),
new AuthenticatedAccount(() -> new Pair<>(account, device)),
device,
webSocketClient,