From 13e346d4ebd88941eb97597997775584c4c8023f Mon Sep 17 00:00:00 2001 From: Chris Eager <79161849+eager-signal@users.noreply.github.com> Date: Thu, 2 Dec 2021 15:32:42 -0700 Subject: [PATCH] Distinguish local vs remote in `ClientPresenceManager#disconnectPresence` --- .../WebsocketRefreshRequestEventListener.java | 15 +- .../push/ClientPresenceManager.java | 40 +- .../storage/AccountsManager.java | 2 +- ...blementRefreshRequirementProviderTest.java | 16 +- .../push/ClientPresenceManagerTest.java | 472 ++++++++++-------- .../redis/RedisClusterExtension.java | 4 +- ...ntsManagerChangeNumberIntegrationTest.java | 2 +- .../controllers/DeviceControllerTest.java | 130 ++--- 8 files changed, 388 insertions(+), 293 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshRequestEventListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshRequestEventListener.java index e61708d9a..9fbb84fab 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshRequestEventListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/auth/WebsocketRefreshRequestEventListener.java @@ -5,10 +5,14 @@ package org.whispersystems.textsecuregcm.auth; -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicInteger; +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import javax.ws.rs.container.ResourceInfo; +import javax.ws.rs.core.Context; import org.glassfish.jersey.server.monitoring.RequestEvent; import org.glassfish.jersey.server.monitoring.RequestEvent.Type; import org.glassfish.jersey.server.monitoring.RequestEventListener; @@ -16,11 +20,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; -import javax.ws.rs.container.ResourceInfo; -import javax.ws.rs.core.Context; - -import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; - public class WebsocketRefreshRequestEventListener implements RequestEventListener { private final ClientPresenceManager clientPresenceManager; @@ -60,7 +59,7 @@ public class WebsocketRefreshRequestEventListener implements RequestEventListene .forEach(pair -> { try { displacedDevices.incrementAndGet(); - clientPresenceManager.displacePresence(pair.first(), pair.second()); + clientPresenceManager.disconnectPresence(pair.first(), pair.second()); } catch (final Exception e) { logger.error("Could not displace device presence", e); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index 5fdffb715..47359650d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -171,8 +171,16 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().del(presenceKey)); } private void displacePresence(final String presenceKey) { @@ -268,18 +276,22 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter { - try { - displacePresence(channel.substring("__keyspace@0__:".length())); - remoteDisplacementMeter.mark(); - } catch (final Exception e) { - log.warn("Error displacing presence", e); - } - }); + if (channel.startsWith("__keyspace@0__:presence::{")) { + if ("set".equals(message) || "del".equals(message)) { + // for "set", another process has overwritten this presence key, which means the client has connected to another host. + // for "del", another process has indicated the client should be disconnected + + // At this point, we're on a Lettuce IO thread and need to dispatch to a separate thread before making + // synchronous Lettuce calls to avoid deadlocking. + keyspaceNotificationExecutorService.execute(() -> { + try { + displacePresence(channel.substring("__keyspace@0__:".length())); + remoteDisplacementMeter.mark(); + } catch (final Exception e) { + log.warn("Error displacing presence", e); + } + }); + } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 4d0acac38..46d03e493 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -552,7 +552,7 @@ public class AccountsManager { RedisOperation.unchecked(() -> account.getDevices().forEach(device -> - clientPresenceManager.displacePresence(account.getUuid(), device.getId()))); + clientPresenceManager.disconnectPresence(account.getUuid(), device.getId()))); } private String getAccountMapKey(String key) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/auth/AuthEnablementRefreshRequirementProviderTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/auth/AuthEnablementRefreshRequirementProviderTest.java index 378989180..65efc2760 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/auth/AuthEnablementRefreshRequirementProviderTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/auth/AuthEnablementRefreshRequirementProviderTest.java @@ -190,12 +190,12 @@ class AuthEnablementRefreshRequirementProviderTest { assertAll( initialEnabled.keySet().stream() .map(deviceId -> () -> verify(clientPresenceManager, times(expectDisplacedPresence ? 1 : 0)) - .displacePresence(account.getUuid(), deviceId))); + .disconnectPresence(account.getUuid(), deviceId))); assertAll( finalEnabled.keySet().stream() .map(deviceId -> () -> verify(clientPresenceManager, times(expectDisplacedPresence ? 1 : 0)) - .displacePresence(account.getUuid(), deviceId))); + .disconnectPresence(account.getUuid(), deviceId))); } static Stream testDeviceEnabledChanged() { @@ -227,9 +227,9 @@ class AuthEnablementRefreshRequirementProviderTest { assertEquals(initialDeviceCount + addedDeviceNames.size(), account.getDevices().size()); - verify(clientPresenceManager).displacePresence(account.getUuid(), 1); - verify(clientPresenceManager).displacePresence(account.getUuid(), 2); - verify(clientPresenceManager).displacePresence(account.getUuid(), 3); + verify(clientPresenceManager).disconnectPresence(account.getUuid(), 1); + verify(clientPresenceManager).disconnectPresence(account.getUuid(), 2); + verify(clientPresenceManager).disconnectPresence(account.getUuid(), 3); } @ParameterizedTest @@ -260,7 +260,7 @@ class AuthEnablementRefreshRequirementProviderTest { assertEquals(200, response.getStatus()); initialDeviceIds.forEach(deviceId -> - verify(clientPresenceManager).displacePresence(account.getUuid(), deviceId)); + verify(clientPresenceManager).disconnectPresence(account.getUuid(), deviceId)); verifyNoMoreInteractions(clientPresenceManager); } @@ -285,8 +285,8 @@ class AuthEnablementRefreshRequirementProviderTest { assertTrue(account.getDevice(deletedDeviceId).isEmpty()); - initialDeviceIds.forEach(deviceId -> verify(clientPresenceManager).displacePresence(account.getUuid(), deviceId)); - verify(clientPresenceManager).displacePresence(account.getUuid(), deletedDeviceId); + initialDeviceIds.forEach(deviceId -> verify(clientPresenceManager).disconnectPresence(account.getUuid(), deviceId)); + verify(clientPresenceManager).disconnectPresence(account.getUuid(), deletedDeviceId); verifyNoMoreInteractions(clientPresenceManager); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java index c4d34c31b..5eb0b5dd6 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ClientPresenceManagerTest.java @@ -5,236 +5,314 @@ package org.whispersystems.textsecuregcm.push; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; +import java.time.Duration; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; +import java.util.function.Function; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; -public class ClientPresenceManagerTest extends AbstractRedisClusterTest { +class ClientPresenceManagerTest { - private ScheduledExecutorService presenceRenewalExecutorService; - private ClientPresenceManager clientPresenceManager; + @RegisterExtension + static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); - private static final DisplacedPresenceListener NO_OP = () -> {}; + private ScheduledExecutorService presenceRenewalExecutorService; + private ClientPresenceManager clientPresenceManager; - @Override - @Before - public void setUp() throws Exception { - super.setUp(); + private static final DisplacedPresenceListener NO_OP = () -> { + }; - getRedisCluster().useCluster(connection -> { - connection.sync().flushall(); - connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$z"); - }); + @BeforeEach + void setUp() throws Exception { - presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor(); - clientPresenceManager = new ClientPresenceManager(getRedisCluster(), presenceRenewalExecutorService, presenceRenewalExecutorService); + REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> { + connection.sync().flushall(); + connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz"); + }); + + presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor(); + clientPresenceManager = new ClientPresenceManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), + presenceRenewalExecutorService, + presenceRenewalExecutorService); + } + + @AfterEach + public void tearDown() throws Exception { + presenceRenewalExecutorService.shutdown(); + presenceRenewalExecutorService.awaitTermination(1, TimeUnit.MINUTES); + + clientPresenceManager.stop(); + } + + @Test + void testIsPresent() { + final UUID accountUuid = UUID.randomUUID(); + final long deviceId = 1; + + assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId)); + + clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); + assertTrue(clientPresenceManager.isPresent(accountUuid, deviceId)); + } + + @Test + void testIsLocallyPresent() { + final UUID accountUuid = UUID.randomUUID(); + final long deviceId = 1; + + assertFalse(clientPresenceManager.isLocallyPresent(accountUuid, deviceId)); + + clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); + REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> connection.sync().flushall()); + + assertTrue(clientPresenceManager.isLocallyPresent(accountUuid, deviceId)); + } + + @Test + void testLocalDisplacement() { + final UUID accountUuid = UUID.randomUUID(); + final long deviceId = 1; + + final AtomicInteger displacementCounter = new AtomicInteger(0); + final DisplacedPresenceListener displacementListener = displacementCounter::incrementAndGet; + + clientPresenceManager.setPresent(accountUuid, deviceId, displacementListener); + + assertEquals(0, displacementCounter.get()); + + clientPresenceManager.setPresent(accountUuid, deviceId, displacementListener); + + assertEquals(1, displacementCounter.get()); + } + + @Test + void testRemoteDisplacement() { + final UUID accountUuid = UUID.randomUUID(); + final long deviceId = 1; + + final CompletableFuture displaced = new CompletableFuture<>(); + + clientPresenceManager.start(); + + clientPresenceManager.setPresent(accountUuid, deviceId, () -> displaced.complete(null)); + + REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster( + connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), + UUID.randomUUID().toString())); + + assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join); + } + + @Test + void testRemoteDisplacementAfterTopologyChange() { + final UUID accountUuid = UUID.randomUUID(); + final long deviceId = 1; + + final CompletableFuture displaced = new CompletableFuture<>(); + + clientPresenceManager.start(); + + clientPresenceManager.setPresent(accountUuid, deviceId, () -> displaced.complete(null)); + + clientPresenceManager.getPubSubConnection() + .usePubSubConnection(connection -> connection.getResources().eventBus() + .publish(new ClusterTopologyChangedEvent(List.of(), List.of()))); + + REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster( + connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), + UUID.randomUUID().toString())); + + assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join); + } + + @Test + void testClearPresence() { + final UUID accountUuid = UUID.randomUUID(); + final long deviceId = 1; + + assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId)); + + clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); + assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId)); + + clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); + REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster( + connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), + UUID.randomUUID().toString())); + + assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId)); + } + + @Test + void testPruneMissingPeers() { + final String presentPeerId = UUID.randomUUID().toString(); + final String missingPeerId = UUID.randomUUID().toString(); + + REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> { + connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId); + connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId); + }); + + for (int i = 0; i < 10; i++) { + addClientPresence(presentPeerId); + addClientPresence(missingPeerId); } - @Override - @After - public void tearDown() throws Exception { - super.tearDown(); + clientPresenceManager.getPubSubConnection().usePubSubConnection( + connection -> connection.sync().upstream().commands() + .subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId))); + clientPresenceManager.pruneMissingPeers(); - presenceRenewalExecutorService.shutdown(); - presenceRenewalExecutorService.awaitTermination(1, TimeUnit.MINUTES); + assertEquals(1, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster( + connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId)))); + assertTrue(REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster( + (Function, Boolean>) connection -> connection.sync() + .sismember(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId))); + + assertEquals(0, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster( + connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(missingPeerId)))); + assertFalse(REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster( + (Function, Boolean>) connection -> connection.sync() + .sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId))); + } + + private void addClientPresence(final String managerId) { + final String clientPresenceKey = ClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7); + + REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> { + connection.sync().set(clientPresenceKey, managerId); + connection.sync().sadd(ClientPresenceManager.getConnectedClientSetKey(managerId), clientPresenceKey); + }); + } + + @Test + void testClearAllOnStop() { + final int localAccounts = 10; + final UUID[] localUuids = new UUID[localAccounts]; + final long[] localDeviceIds = new long[localAccounts]; + + for (int i = 0; i < localAccounts; i++) { + localUuids[i] = UUID.randomUUID(); + localDeviceIds[i] = i; + + clientPresenceManager.setPresent(localUuids[i], localDeviceIds[i], NO_OP); + } + + final UUID displacedAccountUuid = UUID.randomUUID(); + final long displacedAccountDeviceId = 7; + + clientPresenceManager.setPresent(displacedAccountUuid, displacedAccountDeviceId, NO_OP); + REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> connection.sync() + .set(ClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId), + UUID.randomUUID().toString())); + + clientPresenceManager.stop(); + + for (int i = 0; i < localAccounts; i++) { + localUuids[i] = UUID.randomUUID(); + localDeviceIds[i] = i; + + assertFalse(clientPresenceManager.isPresent(localUuids[i], localDeviceIds[i])); + } + + assertTrue(clientPresenceManager.isPresent(displacedAccountUuid, displacedAccountDeviceId)); + } + + @Nested + class MultiServerTest { + + private ClientPresenceManager server1; + private ClientPresenceManager server2; + + @BeforeEach + void setup() throws Exception { + + REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> { + connection.sync().flushall(); + connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz"); + }); + + final ScheduledExecutorService scheduledExecutorService1 = mock(ScheduledExecutorService.class); + final ExecutorService keyspaceNotificationExecutorService1 = Executors.newSingleThreadExecutor(); + server1 = new ClientPresenceManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), + scheduledExecutorService1, keyspaceNotificationExecutorService1); + + final ScheduledExecutorService scheduledExecutorService2 = mock(ScheduledExecutorService.class); + final ExecutorService keyspaceNotificationExecutorService2 = Executors.newSingleThreadExecutor(); + server2 = new ClientPresenceManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), + scheduledExecutorService2, keyspaceNotificationExecutorService2); + + server1.start(); + server2.start(); + } + + @AfterEach + void teardown() { + server2.stop(); + server1.stop(); } @Test - public void testIsPresent() { - final UUID accountUuid = UUID.randomUUID(); - final long deviceId = 1; + void testSetPresentRemotely() { + final UUID uuid1 = UUID.randomUUID(); + final long deviceId = 1L; - assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId)); + final CompletableFuture displaced = new CompletableFuture<>(); + final DisplacedPresenceListener listener1 = () -> displaced.complete(null); + server1.setPresent(uuid1, deviceId, listener1); - clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); - assertTrue(clientPresenceManager.isPresent(accountUuid, deviceId)); + server2.setPresent(uuid1, deviceId, () -> { + }); + + assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join); } @Test - public void testIsLocallyPresent() { - final UUID accountUuid = UUID.randomUUID(); - final long deviceId = 1; + void testDisconnectPresenceLocally() { + final UUID uuid1 = UUID.randomUUID(); + final long deviceId = 1L; - assertFalse(clientPresenceManager.isLocallyPresent(accountUuid, deviceId)); + final CompletableFuture displaced = new CompletableFuture<>(); + final DisplacedPresenceListener listener1 = () -> displaced.complete(null); + server1.setPresent(uuid1, deviceId, listener1); - clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); - getRedisCluster().useCluster(connection -> connection.sync().flushall()); + server1.disconnectPresence(uuid1, deviceId); - assertTrue(clientPresenceManager.isLocallyPresent(accountUuid, deviceId)); + assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join); } @Test - public void testLocalDisplacement() { - final UUID accountUuid = UUID.randomUUID(); - final long deviceId = 1; + void testDisconnectPresenceRemotely() { + final UUID uuid1 = UUID.randomUUID(); + final long deviceId = 1L; - final AtomicInteger displacementCounter = new AtomicInteger(0); - final DisplacedPresenceListener displacementListener = displacementCounter::incrementAndGet; + final CompletableFuture displaced = new CompletableFuture<>(); + final DisplacedPresenceListener listener1 = () -> displaced.complete(null); + server1.setPresent(uuid1, deviceId, listener1); - clientPresenceManager.setPresent(accountUuid, deviceId, displacementListener); + server2.disconnectPresence(uuid1, deviceId); - assertEquals(0, displacementCounter.get()); - - clientPresenceManager.setPresent(accountUuid, deviceId, displacementListener); - - assertEquals(1, displacementCounter.get()); - } - - @Test(timeout = 10_000) - public void testRemoteDisplacement() throws InterruptedException { - final UUID accountUuid = UUID.randomUUID(); - final long deviceId = 1; - - final AtomicBoolean displaced = new AtomicBoolean(false); - - clientPresenceManager.start(); - - try { - clientPresenceManager.setPresent(accountUuid, deviceId, () -> { - synchronized (displaced) { - displaced.set(true); - displaced.notifyAll(); - } - }); - - getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), - UUID.randomUUID().toString())); - - synchronized (displaced) { - while (!displaced.get()) { - displaced.wait(); - } - } - } finally { - clientPresenceManager.stop(); - } - } - - @Test(timeout = 10_000) - public void testRemoteDisplacementAfterTopologyChange() throws InterruptedException { - final UUID accountUuid = UUID.randomUUID(); - final long deviceId = 1; - - final AtomicBoolean displaced = new AtomicBoolean(false); - - clientPresenceManager.start(); - - try { - clientPresenceManager.setPresent(accountUuid, deviceId, () -> { - synchronized (displaced) { - displaced.set(true); - displaced.notifyAll(); - } - }); - - clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.getResources().eventBus().publish(new ClusterTopologyChangedEvent(List.of(), List.of()))); - - getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), - UUID.randomUUID().toString())); - - synchronized (displaced) { - while (!displaced.get()) { - displaced.wait(); - } - } - } finally { - clientPresenceManager.stop(); - } - } - - @Test - public void testClearPresence() { - final UUID accountUuid = UUID.randomUUID(); - final long deviceId = 1; - - assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId)); - - clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); - assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId)); - - clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP); - getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId), - UUID.randomUUID().toString())); - - assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId)); - } - - @Test - public void testPruneMissingPeers() { - final String presentPeerId = UUID.randomUUID().toString(); - final String missingPeerId = UUID.randomUUID().toString(); - - getRedisCluster().useCluster(connection -> { - connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId); - connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId); - }); - - for (int i = 0; i < 10; i++) { - addClientPresence(presentPeerId); - addClientPresence(missingPeerId); - } - - clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().upstream().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId))); - clientPresenceManager.pruneMissingPeers(); - - assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId)))); - assertTrue(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId))); - - assertEquals(0, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(missingPeerId)))); - assertFalse(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId))); - } - - private void addClientPresence(final String managerId) { - final String clientPresenceKey = ClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7); - - getRedisCluster().useCluster(connection -> { - connection.sync().set(clientPresenceKey, managerId); - connection.sync().sadd(ClientPresenceManager.getConnectedClientSetKey(managerId), clientPresenceKey); - }); - } - - @Test - public void testClearAllOnStop() { - final int localAccounts = 10; - final UUID[] localUuids = new UUID[localAccounts]; - final long[] localDeviceIds = new long[localAccounts]; - - for (int i = 0; i < localAccounts; i++) { - localUuids[i] = UUID.randomUUID(); - localDeviceIds[i] = i; - - clientPresenceManager.setPresent(localUuids[i], localDeviceIds[i], NO_OP); - } - - final UUID displacedAccountUuid = UUID.randomUUID(); - final long displacedAccountDeviceId = 7; - - clientPresenceManager.setPresent(displacedAccountUuid, displacedAccountDeviceId, NO_OP); - getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId), - UUID.randomUUID().toString())); - - clientPresenceManager.stop(); - - for (int i = 0; i < localAccounts; i++) { - localUuids[i] = UUID.randomUUID(); - localDeviceIds[i] = i; - - assertFalse(clientPresenceManager.isPresent(localUuids[i], localDeviceIds[i])); - } - - assertTrue(clientPresenceManager.isPresent(displacedAccountUuid, displacedAccountDeviceId)); + assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join); } + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java index bcf3e17dc..6cc961fc9 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java @@ -135,7 +135,7 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb try { final StatefulRedisConnection connection = meetClient.connect(); - final RedisCommands commands = connection.sync(); + final RedisCommands commands = connection.sync(); for (int i = 1; i < nodes.length; i++) { commands.clusterMeet("127.0.0.1", nodes[i].ports().get(0)); @@ -148,7 +148,7 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb for (int i = 0; i < nodes.length; i++) { final int startInclusive = i * slotsPerNode; - final int endExclusive = i == nodes.length - 1 ? SlotHash.SLOT_COUNT : (i + 1) * slotsPerNode; + final int endExclusive = i == nodes.length - 1 ? SlotHash.SLOT_COUNT : (i + 1) * slotsPerNode; final RedisClient assignSlotClient = RedisClient.create(RedisURI.create("127.0.0.1", nodes[i].ports().get(0))); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java index 6025f852d..8c6c04cb3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountsManagerChangeNumberIntegrationTest.java @@ -265,7 +265,7 @@ class AccountsManagerChangeNumberIntegrationTest { assertEquals(secondNumber, accountsManager.getByAccountIdentifier(originalUuid).map(Account::getNumber).orElseThrow()); - verify(clientPresenceManager).displacePresence(existingAccountUuid, Device.MASTER_ID); + verify(clientPresenceManager).disconnectPresence(existingAccountUuid, Device.MASTER_ID); assertEquals(Optional.of(existingAccountUuid), deletedAccounts.findUuid(originalNumber)); assertEquals(Optional.empty(), deletedAccounts.findUuid(secondNumber)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java index 484760381..84d0876ca 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java @@ -61,15 +61,16 @@ import org.whispersystems.textsecuregcm.util.VerificationCode; @ExtendWith(DropwizardExtensionsSupport.class) class DeviceControllerTest { + @Path("/v1/devices") static class DumbVerificationDeviceController extends DeviceController { + public DumbVerificationDeviceController(StoredVerificationCodeManager pendingDevices, - AccountsManager accounts, - MessagesManager messages, - Keys keys, - RateLimiters rateLimiters, - Map deviceConfiguration) - { + AccountsManager accounts, + MessagesManager messages, + Keys keys, + RateLimiters rateLimiters, + Map deviceConfiguration) { super(pendingDevices, accounts, messages, keys, rateLimiters, deviceConfiguration); } @@ -80,17 +81,17 @@ class DeviceControllerTest { } private static StoredVerificationCodeManager pendingDevicesManager = mock(StoredVerificationCodeManager.class); - private static AccountsManager accountsManager = mock(AccountsManager.class ); - private static MessagesManager messagesManager = mock(MessagesManager.class); - private static Keys keys = mock(Keys.class); - private static RateLimiters rateLimiters = mock(RateLimiters.class ); - private static RateLimiter rateLimiter = mock(RateLimiter.class ); - private static Account account = mock(Account.class ); - private static Account maxedAccount = mock(Account.class); - private static Device masterDevice = mock(Device.class); + private static AccountsManager accountsManager = mock(AccountsManager.class); + private static MessagesManager messagesManager = mock(MessagesManager.class); + private static Keys keys = mock(Keys.class); + private static RateLimiters rateLimiters = mock(RateLimiters.class); + private static RateLimiter rateLimiter = mock(RateLimiter.class); + private static Account account = mock(Account.class); + private static Account maxedAccount = mock(Account.class); + private static Device masterDevice = mock(Device.class); private static ClientPresenceManager clientPresenceManager = mock(ClientPresenceManager.class); - private static Map deviceConfiguration = new HashMap<>(); + private static Map deviceConfiguration = new HashMap<>(); private static final ResourceExtension resources = ResourceExtension.builder() .addProvider(AuthHelper.getAuthFilter()) @@ -162,27 +163,27 @@ class DeviceControllerTest { when(AuthHelper.VALID_ACCOUNT.getDevices()).thenReturn(Set.of(existingDevice)); VerificationCode deviceCode = resources.getJerseyTest() - .target("/v1/devices/provisioning/code") - .request() - .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD)) - .get(VerificationCode.class); + .target("/v1/devices/provisioning/code") + .request() + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD)) + .get(VerificationCode.class); assertThat(deviceCode).isEqualTo(new VerificationCode(5678901)); DeviceResponse response = resources.getJerseyTest() - .target("/v1/devices/5678901") - .request() - .header("Authorization", AuthHelper.getProvisioningAuthHeader(AuthHelper.VALID_NUMBER, "password1")) - .put(Entity.entity(new AccountAttributes(false, 1234, null, - null, true, null), - MediaType.APPLICATION_JSON_TYPE), - DeviceResponse.class); + .target("/v1/devices/5678901") + .request() + .header("Authorization", AuthHelper.getProvisioningAuthHeader(AuthHelper.VALID_NUMBER, "password1")) + .put(Entity.entity(new AccountAttributes(false, 1234, null, + null, true, null), + MediaType.APPLICATION_JSON_TYPE), + DeviceResponse.class); assertThat(response.getDeviceId()).isEqualTo(42L); verify(pendingDevicesManager).remove(AuthHelper.VALID_NUMBER); verify(messagesManager).clear(eq(AuthHelper.VALID_UUID), eq(42L)); - verify(clientPresenceManager).displacePresence(AuthHelper.VALID_UUID, Device.MASTER_ID); + verify(clientPresenceManager).disconnectPresence(AuthHelper.VALID_UUID, Device.MASTER_ID); } @Test @@ -201,30 +202,30 @@ class DeviceControllerTest { @Test void disabledDeviceRegisterTest() { Response response = resources.getJerseyTest() - .target("/v1/devices/provisioning/code") - .request() - .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.DISABLED_UUID, AuthHelper.DISABLED_PASSWORD)) - .get(); + .target("/v1/devices/provisioning/code") + .request() + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.DISABLED_UUID, AuthHelper.DISABLED_PASSWORD)) + .get(); - assertThat(response.getStatus()).isEqualTo(401); + assertThat(response.getStatus()).isEqualTo(401); } @Test void invalidDeviceRegisterTest() { VerificationCode deviceCode = resources.getJerseyTest() - .target("/v1/devices/provisioning/code") - .request() - .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD)) - .get(VerificationCode.class); + .target("/v1/devices/provisioning/code") + .request() + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD)) + .get(VerificationCode.class); assertThat(deviceCode).isEqualTo(new VerificationCode(5678901)); Response response = resources.getJerseyTest() - .target("/v1/devices/5678902") - .request() - .header("Authorization", AuthHelper.getProvisioningAuthHeader(AuthHelper.VALID_NUMBER, "password1")) - .put(Entity.entity(new AccountAttributes(false, 1234, null, null, true, null), - MediaType.APPLICATION_JSON_TYPE)); + .target("/v1/devices/5678902") + .request() + .header("Authorization", AuthHelper.getProvisioningAuthHeader(AuthHelper.VALID_NUMBER, "password1")) + .put(Entity.entity(new AccountAttributes(false, 1234, null, null, true, null), + MediaType.APPLICATION_JSON_TYPE)); assertThat(response.getStatus()).isEqualTo(403); @@ -234,11 +235,12 @@ class DeviceControllerTest { @Test void oldDeviceRegisterTest() { Response response = resources.getJerseyTest() - .target("/v1/devices/1112223") - .request() - .header("Authorization", AuthHelper.getProvisioningAuthHeader(AuthHelper.VALID_NUMBER_TWO, AuthHelper.VALID_PASSWORD_TWO)) - .put(Entity.entity(new AccountAttributes(false, 1234, null, null, true, null), - MediaType.APPLICATION_JSON_TYPE)); + .target("/v1/devices/1112223") + .request() + .header("Authorization", + AuthHelper.getProvisioningAuthHeader(AuthHelper.VALID_NUMBER_TWO, AuthHelper.VALID_PASSWORD_TWO)) + .put(Entity.entity(new AccountAttributes(false, 1234, null, null, true, null), + MediaType.APPLICATION_JSON_TYPE)); assertThat(response.getStatus()).isEqualTo(403); @@ -248,10 +250,10 @@ class DeviceControllerTest { @Test void maxDevicesTest() { Response response = resources.getJerseyTest() - .target("/v1/devices/provisioning/code") - .request() - .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID_TWO, AuthHelper.VALID_PASSWORD_TWO)) - .get(); + .target("/v1/devices/provisioning/code") + .request() + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID_TWO, AuthHelper.VALID_PASSWORD_TWO)) + .get(); assertEquals(411, response.getStatus()); verifyNoMoreInteractions(messagesManager); @@ -260,11 +262,13 @@ class DeviceControllerTest { @Test void longNameTest() { Response response = resources.getJerseyTest() - .target("/v1/devices/5678901") - .request() - .header("Authorization", AuthHelper.getProvisioningAuthHeader(AuthHelper.VALID_NUMBER, "password1")) - .put(Entity.entity(new AccountAttributes(false, 1234, "this is a really long name that is longer than 80 characters it's so long that it's even longer than 204 characters. that's a lot of characters. we're talking lots and lots and lots of characters. 12345678", null, true, null), - MediaType.APPLICATION_JSON_TYPE)); + .target("/v1/devices/5678901") + .request() + .header("Authorization", AuthHelper.getProvisioningAuthHeader(AuthHelper.VALID_NUMBER, "password1")) + .put(Entity.entity(new AccountAttributes(false, 1234, + "this is a really long name that is longer than 80 characters it's so long that it's even longer than 204 characters. that's a lot of characters. we're talking lots and lots and lots of characters. 12345678", + null, true, null), + MediaType.APPLICATION_JSON_TYPE)); assertEquals(response.getStatus(), 422); verifyNoMoreInteractions(messagesManager); @@ -272,15 +276,17 @@ class DeviceControllerTest { @ParameterizedTest @MethodSource - void deviceDowngradeCapabilitiesTest(final String userAgent, final boolean gv2, final boolean gv2_2, final boolean gv2_3, final int expectedStatus) { - DeviceCapabilities deviceCapabilities = new DeviceCapabilities(gv2, gv2_2, gv2_3, true, false, true, true, true, true); + void deviceDowngradeCapabilitiesTest(final String userAgent, final boolean gv2, final boolean gv2_2, + final boolean gv2_3, final int expectedStatus) { + DeviceCapabilities deviceCapabilities = new DeviceCapabilities(gv2, gv2_2, gv2_3, true, false, true, true, true, + true); AccountAttributes accountAttributes = new AccountAttributes(false, 1234, null, null, true, deviceCapabilities); Response response = resources.getJerseyTest() - .target("/v1/devices/5678901") - .request() - .header("Authorization", AuthHelper.getProvisioningAuthHeader(AuthHelper.VALID_NUMBER, "password1")) - .header("User-Agent", userAgent) - .put(Entity.entity(accountAttributes, MediaType.APPLICATION_JSON_TYPE)); + .target("/v1/devices/5678901") + .request() + .header("Authorization", AuthHelper.getProvisioningAuthHeader(AuthHelper.VALID_NUMBER, "password1")) + .header("User-Agent", userAgent) + .put(Entity.entity(accountAttributes, MediaType.APPLICATION_JSON_TYPE)); assertThat(response.getStatus()).isEqualTo(expectedStatus);