From 3608c5bfb0962f4ccff4a3c040575fa995b66ee1 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Mon, 26 Jul 2021 12:56:33 -0400 Subject: [PATCH] Wait for outstanding requests to be resolved before shutting down the directory queue. --- .../textsecuregcm/WhisperServerService.java | 1 + .../textsecuregcm/sqs/DirectoryQueue.java | 32 ++++++++++++++++- .../textsecuregcm/sqs/DirectoryQueueTest.java | 35 +++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index f24454af8..4c2759eb3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -515,6 +515,7 @@ public class WhisperServerService extends Application queueUrls; private final SqsAsyncClient sqs; + private final Set outstandingRequests = Collections.newSetFromMap(new IdentityHashMap<>()); + private enum UpdateAction { ADD("add"), DELETE("delete"); @@ -73,6 +79,21 @@ public class DirectoryQueue { this.sqs = sqs; } + @Override + public void start() throws Exception { + } + + @Override + public void stop() throws Exception { + synchronized (outstandingRequests) { + while (!outstandingRequests.isEmpty()) { + outstandingRequests.wait(); + } + } + + sqs.close(); + } + public boolean isDiscoverable(final Account account) { return account.isEnabled() && account.isDiscoverableByPhoneNumber(); } @@ -101,6 +122,10 @@ public class DirectoryQueue { )) .build(); + synchronized (outstandingRequests) { + outstandingRequests.add(request); + } + sqs.sendMessage(request).whenComplete((response, cause) -> { try { if (cause instanceof SdkServiceException) { @@ -113,6 +138,11 @@ public class DirectoryQueue { logger.warn("sqs unexpected error", cause); } } finally { + synchronized (outstandingRequests) { + outstandingRequests.remove(request); + outstandingRequests.notifyAll(); + } + timerContext.close(); } }); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java index 8a371e7c0..2405fe69e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java @@ -6,6 +6,8 @@ package org.whispersystems.textsecuregcm.sqs; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -15,6 +17,8 @@ import static org.mockito.Mockito.when; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -89,4 +93,35 @@ public class DirectoryQueueTest { sendMessageRequest.messageAttributes().get("action")); } } + + @Test + void testStop() { + final CompletableFuture sendMessageFuture = new CompletableFuture<>(); + when(sqsAsyncClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageFuture); + + final DirectoryQueue directoryQueue = new DirectoryQueue(List.of("sqs://test"), sqsAsyncClient); + + final Account account = mock(Account.class); + when(account.getNumber()).thenReturn("+18005556543"); + when(account.getUuid()).thenReturn(UUID.randomUUID()); + when(account.isEnabled()).thenReturn(true); + when(account.isDiscoverableByPhoneNumber()).thenReturn(true); + + directoryQueue.refreshAccount(account); + + final CompletableFuture stopFuture = CompletableFuture.supplyAsync(() -> { + try { + directoryQueue.stop(); + return true; + } catch (final Exception e) { + return false; + } + }); + + assertThrows(TimeoutException.class, () -> stopFuture.get(1, TimeUnit.SECONDS), + "Directory queue should not finish shutting down until all outstanding requests are resolved"); + + sendMessageFuture.complete(SendMessageResponse.builder().build()); + assertTrue(stopFuture.join()); + } }