From ad32555cc95fabe85d1d2fcb979e858c5d8813a3 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Tue, 19 Jan 2021 17:55:21 -0500 Subject: [PATCH] Drop the old push scheduler Redis singleton. --- .../WhisperServerConfiguration.java | 9 - .../textsecuregcm/WhisperServerService.java | 7 +- .../push/ApnFallbackManager.java | 173 ++++-------------- .../push/ApnFallbackManagerTest.java | 63 +------ 4 files changed, 50 insertions(+), 202 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 5594822d9..1d294a2a6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -104,11 +104,6 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private AccountDatabaseCrawlerConfiguration accountDatabaseCrawler; - @NotNull - @Valid - @JsonProperty - private RedisConfiguration pushScheduler; - @NotNull @Valid @JsonProperty @@ -295,10 +290,6 @@ public class WhisperServerConfiguration extends Configuration { return clientPresenceCluster; } - public RedisConfiguration getPushScheduler() { - return pushScheduler; - } - public RedisClusterConfiguration getPushSchedulerCluster() { return pushSchedulerCluster; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index eb345ad13..f1bceb8b1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -286,11 +286,8 @@ public class WhisperServerService extends Application> separated = getSeparated(numberAndDevice); - - if (!separated.isPresent()) { - removeFromSingleton(numberAndDevice); - continue; - } - - Optional account = accountsManager.get(separated.get().first()); - - if (!account.isPresent()) { - removeFromSingleton(numberAndDevice); - continue; - } - - Optional device = account.get().getDevice(separated.get().second()); - - if (!device.isPresent()) { - removeFromSingleton(numberAndDevice); - continue; - } - - sendNotification(account.get(), device.get()); - } - - } catch (Exception e) { - logger.warn("Exception while operating", e); - } - - Util.sleep(1000); - } - } - } - - class ClusterCacheWorker implements Runnable { + class NotificationWorker implements Runnable { @Override public void run() { @@ -145,12 +91,10 @@ public class ApnFallbackManager implements Managed { long entriesProcessed = 0; do { - pendingDestinations = getPendingDestinationsFromClusterCache(slot, 100); + pendingDestinations = getPendingDestinations(slot, 100); entriesProcessed += pendingDestinations.size(); for (final String uuidAndDevice : pendingDestinations) { - clusterDestinations.mark(); - final Optional> separated = getSeparated(uuidAndDevice); final Optional maybeAccount = separated.map(Pair::first) @@ -163,7 +107,7 @@ public class ApnFallbackManager implements Managed { if (maybeAccount.isPresent() && maybeDevice.isPresent()) { sendNotification(maybeAccount.get(), maybeDevice.get()); } else { - removeFromCluster(uuidAndDevice); + remove(uuidAndDevice); } } } while (!pendingDestinations.isEmpty()); @@ -172,8 +116,7 @@ public class ApnFallbackManager implements Managed { } } - public ApnFallbackManager(ReplicatedJedisPool jedisPool, - FaultTolerantRedisCluster cluster, + public ApnFallbackManager(FaultTolerantRedisCluster cluster, APNSender apnSender, AccountsManager accountsManager) throws IOException @@ -182,17 +125,12 @@ public class ApnFallbackManager implements Managed { this.accountsManager = accountsManager; this.cluster = cluster; - this.getSingletonScript = LuaScript.fromResource(jedisPool, "lua/apn/get.lua"); - this.removeSingletonScript = LuaScript.fromResource(jedisPool, "lua/apn/remove.lua"); + this.getScript = ClusterLuaScript.fromResource(cluster, "lua/apn/get.lua", ScriptOutputType.MULTI); + this.insertScript = ClusterLuaScript.fromResource(cluster, "lua/apn/insert.lua", ScriptOutputType.VALUE); + this.removeScript = ClusterLuaScript.fromResource(cluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER); - this.getClusterScript = ClusterLuaScript.fromResource(cluster, "lua/apn/get.lua", ScriptOutputType.MULTI); - this.insertClusterScript = ClusterLuaScript.fromResource(cluster, "lua/apn/insert.lua", ScriptOutputType.VALUE); - this.removeClusterScript = ClusterLuaScript.fromResource(cluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER); - - this.singletonWorkerThread = new Thread(new SingletonCacheWorker(), "ApnFallbackManagerSingletonWorker"); - - for (int i = 0; i < this.clusterWorkerThreads.length; i++) { - this.clusterWorkerThreads[i] = new Thread(new ClusterCacheWorker(), "ApnFallbackManagerClusterWorker-" + i); + for (int i = 0; i < this.workerThreads.length; i++) { + this.workerThreads[i] = new Thread(new NotificationWorker(), "ApnFallbackManagerWorker-" + i); } } @@ -215,7 +153,7 @@ public class ApnFallbackManager implements Managed { if (remove(account, device)) { delivered.mark(); } - } catch (JedisException | io.lettuce.core.RedisException e) { + } catch (io.lettuce.core.RedisException e) { throw new RedisException(e); } } @@ -223,20 +161,18 @@ public class ApnFallbackManager implements Managed { @Override public synchronized void start() { running.set(true); - singletonWorkerThread.start(); - for (final Thread clusterWorkerThread : clusterWorkerThreads) { - clusterWorkerThread.start(); + for (final Thread workerThread : workerThreads) { + workerThread.start(); } } @Override public synchronized void stop() throws InterruptedException { running.set(false); - singletonWorkerThread.join(); - for (final Thread clusterWorkerThread : clusterWorkerThreads) { - clusterWorkerThread.join(); + for (final Thread workerThread : workerThreads) { + workerThread.join(); } } @@ -280,73 +216,42 @@ public class ApnFallbackManager implements Managed { } private boolean remove(Account account, Device device) { - final boolean removedFromSingleton = removeFromSingleton(getSingletonEndpointKey(account, device)); - final boolean removedFromCluster = removeFromCluster(getClusterEndpointKey(account, device)); - - return removedFromSingleton || removedFromCluster; + return remove(getEndpointKey(account, device)); } - private boolean removeFromSingleton(String endpoint) { - if (!SINGLETON_PENDING_NOTIFICATIONS_KEY.equals(endpoint)) { - List keys = Arrays.asList(SINGLETON_PENDING_NOTIFICATIONS_KEY.getBytes(), endpoint.getBytes()); - List args = Collections.emptyList(); - - return ((long)removeSingletonScript.execute(keys, args)) > 0; - } - - return false; - } - - private boolean removeFromCluster(final String endpoint) { - final long removed = (long)removeClusterScript.execute(List.of(getClusterPendingNotificationQueueKey(endpoint), endpoint), - Collections.emptyList()); - - return removed > 0; - } - - @SuppressWarnings("unchecked") - private List getPendingDestinationsFromSingletonCache(final int limit) { - List keys = List.of(SINGLETON_PENDING_NOTIFICATIONS_KEY.getBytes()); - List args = List.of(String.valueOf(System.currentTimeMillis()).getBytes(), String.valueOf(limit).getBytes()); - - return ((List) getSingletonScript.execute(keys, args)) - .stream() - .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) - .collect(Collectors.toList()); + private boolean remove(final String endpoint) { + return (long)removeScript.execute(List.of(getPendingNotificationQueueKey(endpoint), endpoint), + Collections.emptyList()) > 0; } @SuppressWarnings("unchecked") @VisibleForTesting - List getPendingDestinationsFromClusterCache(final int slot, final int limit) { - return (List)getClusterScript.execute(List.of(getClusterPendingNotificationQueueKey(slot)), - List.of(String.valueOf(System.currentTimeMillis()), String.valueOf(limit))); + List getPendingDestinations(final int slot, final int limit) { + return (List)getScript.execute(List.of(getPendingNotificationQueueKey(slot)), + List.of(String.valueOf(System.currentTimeMillis()), String.valueOf(limit))); } private void insert(final Account account, final Device device, final long timestamp, final long interval) { - final String endpoint = getClusterEndpointKey(account, device); + final String endpoint = getEndpointKey(account, device); - insertClusterScript.execute(List.of(getClusterPendingNotificationQueueKey(endpoint), endpoint), - List.of(String.valueOf(timestamp), - String.valueOf(interval), - account.getUuid().toString(), - String.valueOf(device.getId()))); - } - - private String getSingletonEndpointKey(final Account account, final Device device) { - return "apn_device::" + account.getNumber() + "::" + device.getId(); + insertScript.execute(List.of(getPendingNotificationQueueKey(endpoint), endpoint), + List.of(String.valueOf(timestamp), + String.valueOf(interval), + account.getUuid().toString(), + String.valueOf(device.getId()))); } @VisibleForTesting - String getClusterEndpointKey(final Account account, final Device device) { + String getEndpointKey(final Account account, final Device device) { return "apn_device::{" + account.getUuid() + "::" + device.getId() + "}"; } - private String getClusterPendingNotificationQueueKey(final String endpoint) { - return getClusterPendingNotificationQueueKey(SlotHash.getSlot(endpoint)); + private String getPendingNotificationQueueKey(final String endpoint) { + return getPendingNotificationQueueKey(SlotHash.getSlot(endpoint)); } - private String getClusterPendingNotificationQueueKey(final int slot) { - return SINGLETON_PENDING_NOTIFICATIONS_KEY + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}"; + private String getPendingNotificationQueueKey(final int slot) { + return PENDING_NOTIFICATIONS_KEY + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}"; } private int getNextSlot() { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java index ee61598a9..21bc6278e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java @@ -7,23 +7,15 @@ package org.whispersystems.textsecuregcm.push; import io.lettuce.core.cluster.SlotHash; import org.apache.commons.lang3.RandomStringUtils; -import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.providers.RedisClientFactory; import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; import org.whispersystems.textsecuregcm.redis.RedisException; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.util.Pair; -import redis.clients.jedis.Jedis; -import redis.embedded.RedisServer; import java.util.List; import java.util.Optional; @@ -31,7 +23,6 @@ import java.util.UUID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -39,47 +30,21 @@ import static org.mockito.Mockito.when; public class ApnFallbackManagerTest extends AbstractRedisClusterTest { private Account account; - private Device device; + private Device device; private APNSender apnSender; private ApnFallbackManager apnFallbackManager; - private static RedisServer redisServer; - private static final UUID ACCOUNT_UUID = UUID.randomUUID(); private static final String ACCOUNT_NUMBER = "+18005551234"; private static final long DEVICE_ID = 1L; - private static final String VOIP_APN_ID = RandomStringUtils.randomAlphanumeric(32); - - @BeforeClass - public static void setUpRedisSingleton() throws Exception { - assumeFalse(System.getProperty("os.name").equalsIgnoreCase("windows")); - - redisServer = RedisServer.builder() - .setting("appendonly no") - .setting("dir " + System.getProperty("java.io.tmpdir")) - .port(AbstractRedisClusterTest.getNextRedisClusterPort()) - .build(); - - redisServer.start(); - } + private static final String VOIP_APN_ID = RandomStringUtils.randomAlphanumeric(32); @Before public void setUp() throws Exception { super.setUp(); - final String redisUrl = String.format("redis://127.0.0.1:%d", redisServer.ports().get(0)); - - final ReplicatedJedisPool replicatedJedisPool = new RedisClientFactory("test-pool", - redisUrl, - List.of(redisUrl), - new CircuitBreakerConfiguration()).getRedisClientPool(); - - try (final Jedis jedis = replicatedJedisPool.getWriteResource()) { - jedis.flushAll(); - } - device = mock(Device.class); when(device.getId()).thenReturn(DEVICE_ID); when(device.getVoipApnId()).thenReturn(VOIP_APN_ID); @@ -96,28 +61,18 @@ public class ApnFallbackManagerTest extends AbstractRedisClusterTest { apnSender = mock(APNSender.class); - apnFallbackManager = new ApnFallbackManager(replicatedJedisPool, getRedisCluster(), apnSender, accountsManager); - } - - @After - public void tearDown() throws Exception { - super.tearDown(); - } - - @AfterClass - public static void tearDownRedisSingleton() { - redisServer.stop(); + apnFallbackManager = new ApnFallbackManager(getRedisCluster(), apnSender, accountsManager); } @Test public void testClusterInsert() throws RedisException { - final String endpoint = apnFallbackManager.getClusterEndpointKey(account, device); + final String endpoint = apnFallbackManager.getEndpointKey(account, device); - assertTrue(apnFallbackManager.getPendingDestinationsFromClusterCache(SlotHash.getSlot(endpoint), 1).isEmpty()); + assertTrue(apnFallbackManager.getPendingDestinations(SlotHash.getSlot(endpoint), 1).isEmpty()); apnFallbackManager.schedule(account, device, System.currentTimeMillis() - 30_000); - final List pendingDestinations = apnFallbackManager.getPendingDestinationsFromClusterCache(SlotHash.getSlot(endpoint), 2); + final List pendingDestinations = apnFallbackManager.getPendingDestinations(SlotHash.getSlot(endpoint), 2); assertEquals(1, pendingDestinations.size()); final Optional> maybeUuidAndDeviceId = ApnFallbackManager.getSeparated(pendingDestinations.get(0)); @@ -126,16 +81,16 @@ public class ApnFallbackManagerTest extends AbstractRedisClusterTest { assertEquals(ACCOUNT_UUID.toString(), maybeUuidAndDeviceId.get().first()); assertEquals(DEVICE_ID, (long)maybeUuidAndDeviceId.get().second()); - assertTrue(apnFallbackManager.getPendingDestinationsFromClusterCache(SlotHash.getSlot(endpoint), 1).isEmpty()); + assertTrue(apnFallbackManager.getPendingDestinations(SlotHash.getSlot(endpoint), 1).isEmpty()); } @Test public void testProcessNextSlot() throws RedisException { - final ApnFallbackManager.ClusterCacheWorker worker = apnFallbackManager.new ClusterCacheWorker(); + final ApnFallbackManager.NotificationWorker worker = apnFallbackManager.new NotificationWorker(); apnFallbackManager.schedule(account, device, System.currentTimeMillis() - 30_000); - final int slot = SlotHash.getSlot(apnFallbackManager.getClusterEndpointKey(account, device)); + final int slot = SlotHash.getSlot(apnFallbackManager.getEndpointKey(account, device)); final int previousSlot = (slot + SlotHash.SLOT_COUNT - 1) % SlotHash.SLOT_COUNT; getRedisCluster().withCluster(connection -> connection.sync().set(ApnFallbackManager.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(previousSlot)));