Drop the old push scheduler Redis singleton.

This commit is contained in:
Jon Chambers 2021-01-19 17:55:21 -05:00 committed by Jon Chambers
parent be8a1acca9
commit ad32555cc9
4 changed files with 50 additions and 202 deletions

View File

@ -104,11 +104,6 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private AccountDatabaseCrawlerConfiguration accountDatabaseCrawler;
@NotNull
@Valid
@JsonProperty
private RedisConfiguration pushScheduler;
@NotNull
@Valid
@JsonProperty
@ -295,10 +290,6 @@ public class WhisperServerConfiguration extends Configuration {
return clientPresenceCluster;
}
public RedisConfiguration getPushScheduler() {
return pushScheduler;
}
public RedisClusterConfiguration getPushSchedulerCluster() {
return pushSchedulerCluster;
}

View File

@ -286,11 +286,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
FeatureFlags featureFlags = new FeatureFlags(accountDatabase);
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory("push_scheduler_cache", config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls(), config.getPushScheduler().getCircuitBreakerConfiguration());
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
ReplicatedJedisPool pushSchedulerClient = pushSchedulerClientFactory.getRedisClientPool();
ClientResources generalCacheClientResources = ClientResources.builder().build();
ClientResources messageCacheClientResources = ClientResources.builder().build();
@ -353,7 +350,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ExternalServiceCredentialGenerator backupCredentialsGenerator = new ExternalServiceCredentialGenerator(config.getSecureBackupServiceConfiguration().getUserAuthenticationTokenSharedSecret(), new byte[0], false);
ExternalServiceCredentialGenerator paymentsCredentialsGenerator = new ExternalServiceCredentialGenerator(config.getPaymentsServiceConfiguration().getUserAuthenticationTokenSharedSecret(), new byte[0], false);
ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushSchedulerClient, pushSchedulerCluster, apnSender, accountsManager);
ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushSchedulerCluster, apnSender, accountsManager);
TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration());
SmsSender smsSender = new SmsSender(twilioSmsSender);
MessageSender messageSender = new MessageSender(apnFallbackManager, clientPresenceManager, messagesManager, gcmSender, apnSender, pushLatencyManager);

View File

@ -17,9 +17,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.LuaScript;
import org.whispersystems.textsecuregcm.redis.RedisException;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@ -27,18 +25,14 @@ import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import org.whispersystems.textsecuregcm.util.Util;
import redis.clients.jedis.exceptions.JedisException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name;
@ -46,16 +40,14 @@ public class ApnFallbackManager implements Managed {
private static final Logger logger = LoggerFactory.getLogger(ApnFallbackManager.class);
private static final String SINGLETON_PENDING_NOTIFICATIONS_KEY = "PENDING_APN";
static final String NEXT_SLOT_TO_PERSIST_KEY = "pending_notification_next_slot";
private static final String PENDING_NOTIFICATIONS_KEY = "PENDING_APN";
static final String NEXT_SLOT_TO_PERSIST_KEY = "pending_notification_next_slot";
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Meter delivered = metricRegistry.meter(name(ApnFallbackManager.class, "voip_delivered"));
private static final Meter sent = metricRegistry.meter(name(ApnFallbackManager.class, "voip_sent" ));
private static final Meter retry = metricRegistry.meter(name(ApnFallbackManager.class, "voip_retry"));
private static final Meter evicted = metricRegistry.meter(name(ApnFallbackManager.class, "voip_evicted"));
private static final Meter singletonDestinations = metricRegistry.meter(name(ApnFallbackManager.class, "singleton_destinations"));
private static final Meter clusterDestinations = metricRegistry.meter(name(ApnFallbackManager.class, "cluster_destinations"));
static {
metricRegistry.register(name(ApnFallbackManager.class, "voip_ratio"), new VoipRatioGauge(delivered, sent));
@ -65,63 +57,17 @@ public class ApnFallbackManager implements Managed {
private final AccountsManager accountsManager;
private final FaultTolerantRedisCluster cluster;
private final LuaScript getSingletonScript;
private final LuaScript removeSingletonScript;
private final ClusterLuaScript getScript;
private final ClusterLuaScript insertScript;
private final ClusterLuaScript removeScript;
private final ClusterLuaScript getClusterScript;
private final ClusterLuaScript insertClusterScript;
private final ClusterLuaScript removeClusterScript;
private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT];
private final Thread singletonWorkerThread;
private final Thread[] clusterWorkerThreads = new Thread[CLUSTER_WORKER_THREAD_COUNT];
private static final int CLUSTER_WORKER_THREAD_COUNT = 4;
private static final int WORKER_THREAD_COUNT = 4;
private final AtomicBoolean running = new AtomicBoolean(false);
private class SingletonCacheWorker implements Runnable {
@Override
public void run() {
while (running.get()) {
try {
for (final String numberAndDevice : getPendingDestinationsFromSingletonCache(100)) {
singletonDestinations.mark();
Optional<Pair<String, Long>> separated = getSeparated(numberAndDevice);
if (!separated.isPresent()) {
removeFromSingleton(numberAndDevice);
continue;
}
Optional<Account> account = accountsManager.get(separated.get().first());
if (!account.isPresent()) {
removeFromSingleton(numberAndDevice);
continue;
}
Optional<Device> device = account.get().getDevice(separated.get().second());
if (!device.isPresent()) {
removeFromSingleton(numberAndDevice);
continue;
}
sendNotification(account.get(), device.get());
}
} catch (Exception e) {
logger.warn("Exception while operating", e);
}
Util.sleep(1000);
}
}
}
class ClusterCacheWorker implements Runnable {
class NotificationWorker implements Runnable {
@Override
public void run() {
@ -145,12 +91,10 @@ public class ApnFallbackManager implements Managed {
long entriesProcessed = 0;
do {
pendingDestinations = getPendingDestinationsFromClusterCache(slot, 100);
pendingDestinations = getPendingDestinations(slot, 100);
entriesProcessed += pendingDestinations.size();
for (final String uuidAndDevice : pendingDestinations) {
clusterDestinations.mark();
final Optional<Pair<String, Long>> separated = getSeparated(uuidAndDevice);
final Optional<Account> maybeAccount = separated.map(Pair::first)
@ -163,7 +107,7 @@ public class ApnFallbackManager implements Managed {
if (maybeAccount.isPresent() && maybeDevice.isPresent()) {
sendNotification(maybeAccount.get(), maybeDevice.get());
} else {
removeFromCluster(uuidAndDevice);
remove(uuidAndDevice);
}
}
} while (!pendingDestinations.isEmpty());
@ -172,8 +116,7 @@ public class ApnFallbackManager implements Managed {
}
}
public ApnFallbackManager(ReplicatedJedisPool jedisPool,
FaultTolerantRedisCluster cluster,
public ApnFallbackManager(FaultTolerantRedisCluster cluster,
APNSender apnSender,
AccountsManager accountsManager)
throws IOException
@ -182,17 +125,12 @@ public class ApnFallbackManager implements Managed {
this.accountsManager = accountsManager;
this.cluster = cluster;
this.getSingletonScript = LuaScript.fromResource(jedisPool, "lua/apn/get.lua");
this.removeSingletonScript = LuaScript.fromResource(jedisPool, "lua/apn/remove.lua");
this.getScript = ClusterLuaScript.fromResource(cluster, "lua/apn/get.lua", ScriptOutputType.MULTI);
this.insertScript = ClusterLuaScript.fromResource(cluster, "lua/apn/insert.lua", ScriptOutputType.VALUE);
this.removeScript = ClusterLuaScript.fromResource(cluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER);
this.getClusterScript = ClusterLuaScript.fromResource(cluster, "lua/apn/get.lua", ScriptOutputType.MULTI);
this.insertClusterScript = ClusterLuaScript.fromResource(cluster, "lua/apn/insert.lua", ScriptOutputType.VALUE);
this.removeClusterScript = ClusterLuaScript.fromResource(cluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER);
this.singletonWorkerThread = new Thread(new SingletonCacheWorker(), "ApnFallbackManagerSingletonWorker");
for (int i = 0; i < this.clusterWorkerThreads.length; i++) {
this.clusterWorkerThreads[i] = new Thread(new ClusterCacheWorker(), "ApnFallbackManagerClusterWorker-" + i);
for (int i = 0; i < this.workerThreads.length; i++) {
this.workerThreads[i] = new Thread(new NotificationWorker(), "ApnFallbackManagerWorker-" + i);
}
}
@ -215,7 +153,7 @@ public class ApnFallbackManager implements Managed {
if (remove(account, device)) {
delivered.mark();
}
} catch (JedisException | io.lettuce.core.RedisException e) {
} catch (io.lettuce.core.RedisException e) {
throw new RedisException(e);
}
}
@ -223,20 +161,18 @@ public class ApnFallbackManager implements Managed {
@Override
public synchronized void start() {
running.set(true);
singletonWorkerThread.start();
for (final Thread clusterWorkerThread : clusterWorkerThreads) {
clusterWorkerThread.start();
for (final Thread workerThread : workerThreads) {
workerThread.start();
}
}
@Override
public synchronized void stop() throws InterruptedException {
running.set(false);
singletonWorkerThread.join();
for (final Thread clusterWorkerThread : clusterWorkerThreads) {
clusterWorkerThread.join();
for (final Thread workerThread : workerThreads) {
workerThread.join();
}
}
@ -280,73 +216,42 @@ public class ApnFallbackManager implements Managed {
}
private boolean remove(Account account, Device device) {
final boolean removedFromSingleton = removeFromSingleton(getSingletonEndpointKey(account, device));
final boolean removedFromCluster = removeFromCluster(getClusterEndpointKey(account, device));
return removedFromSingleton || removedFromCluster;
return remove(getEndpointKey(account, device));
}
private boolean removeFromSingleton(String endpoint) {
if (!SINGLETON_PENDING_NOTIFICATIONS_KEY.equals(endpoint)) {
List<byte[]> keys = Arrays.asList(SINGLETON_PENDING_NOTIFICATIONS_KEY.getBytes(), endpoint.getBytes());
List<byte[]> args = Collections.emptyList();
return ((long)removeSingletonScript.execute(keys, args)) > 0;
}
return false;
}
private boolean removeFromCluster(final String endpoint) {
final long removed = (long)removeClusterScript.execute(List.of(getClusterPendingNotificationQueueKey(endpoint), endpoint),
Collections.emptyList());
return removed > 0;
}
@SuppressWarnings("unchecked")
private List<String> getPendingDestinationsFromSingletonCache(final int limit) {
List<byte[]> keys = List.of(SINGLETON_PENDING_NOTIFICATIONS_KEY.getBytes());
List<byte[]> args = List.of(String.valueOf(System.currentTimeMillis()).getBytes(), String.valueOf(limit).getBytes());
return ((List<byte[]>) getSingletonScript.execute(keys, args))
.stream()
.map(bytes -> new String(bytes, StandardCharsets.UTF_8))
.collect(Collectors.toList());
private boolean remove(final String endpoint) {
return (long)removeScript.execute(List.of(getPendingNotificationQueueKey(endpoint), endpoint),
Collections.emptyList()) > 0;
}
@SuppressWarnings("unchecked")
@VisibleForTesting
List<String> getPendingDestinationsFromClusterCache(final int slot, final int limit) {
return (List<String>)getClusterScript.execute(List.of(getClusterPendingNotificationQueueKey(slot)),
List.of(String.valueOf(System.currentTimeMillis()), String.valueOf(limit)));
List<String> getPendingDestinations(final int slot, final int limit) {
return (List<String>)getScript.execute(List.of(getPendingNotificationQueueKey(slot)),
List.of(String.valueOf(System.currentTimeMillis()), String.valueOf(limit)));
}
private void insert(final Account account, final Device device, final long timestamp, final long interval) {
final String endpoint = getClusterEndpointKey(account, device);
final String endpoint = getEndpointKey(account, device);
insertClusterScript.execute(List.of(getClusterPendingNotificationQueueKey(endpoint), endpoint),
List.of(String.valueOf(timestamp),
String.valueOf(interval),
account.getUuid().toString(),
String.valueOf(device.getId())));
}
private String getSingletonEndpointKey(final Account account, final Device device) {
return "apn_device::" + account.getNumber() + "::" + device.getId();
insertScript.execute(List.of(getPendingNotificationQueueKey(endpoint), endpoint),
List.of(String.valueOf(timestamp),
String.valueOf(interval),
account.getUuid().toString(),
String.valueOf(device.getId())));
}
@VisibleForTesting
String getClusterEndpointKey(final Account account, final Device device) {
String getEndpointKey(final Account account, final Device device) {
return "apn_device::{" + account.getUuid() + "::" + device.getId() + "}";
}
private String getClusterPendingNotificationQueueKey(final String endpoint) {
return getClusterPendingNotificationQueueKey(SlotHash.getSlot(endpoint));
private String getPendingNotificationQueueKey(final String endpoint) {
return getPendingNotificationQueueKey(SlotHash.getSlot(endpoint));
}
private String getClusterPendingNotificationQueueKey(final int slot) {
return SINGLETON_PENDING_NOTIFICATIONS_KEY + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}";
private String getPendingNotificationQueueKey(final int slot) {
return PENDING_NOTIFICATIONS_KEY + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}";
}
private int getNextSlot() {

View File

@ -7,23 +7,15 @@ package org.whispersystems.textsecuregcm.push;
import io.lettuce.core.cluster.SlotHash;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import org.whispersystems.textsecuregcm.redis.RedisException;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Pair;
import redis.clients.jedis.Jedis;
import redis.embedded.RedisServer;
import java.util.List;
import java.util.Optional;
@ -31,7 +23,6 @@ import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -39,47 +30,21 @@ import static org.mockito.Mockito.when;
public class ApnFallbackManagerTest extends AbstractRedisClusterTest {
private Account account;
private Device device;
private Device device;
private APNSender apnSender;
private ApnFallbackManager apnFallbackManager;
private static RedisServer redisServer;
private static final UUID ACCOUNT_UUID = UUID.randomUUID();
private static final String ACCOUNT_NUMBER = "+18005551234";
private static final long DEVICE_ID = 1L;
private static final String VOIP_APN_ID = RandomStringUtils.randomAlphanumeric(32);
@BeforeClass
public static void setUpRedisSingleton() throws Exception {
assumeFalse(System.getProperty("os.name").equalsIgnoreCase("windows"));
redisServer = RedisServer.builder()
.setting("appendonly no")
.setting("dir " + System.getProperty("java.io.tmpdir"))
.port(AbstractRedisClusterTest.getNextRedisClusterPort())
.build();
redisServer.start();
}
private static final String VOIP_APN_ID = RandomStringUtils.randomAlphanumeric(32);
@Before
public void setUp() throws Exception {
super.setUp();
final String redisUrl = String.format("redis://127.0.0.1:%d", redisServer.ports().get(0));
final ReplicatedJedisPool replicatedJedisPool = new RedisClientFactory("test-pool",
redisUrl,
List.of(redisUrl),
new CircuitBreakerConfiguration()).getRedisClientPool();
try (final Jedis jedis = replicatedJedisPool.getWriteResource()) {
jedis.flushAll();
}
device = mock(Device.class);
when(device.getId()).thenReturn(DEVICE_ID);
when(device.getVoipApnId()).thenReturn(VOIP_APN_ID);
@ -96,28 +61,18 @@ public class ApnFallbackManagerTest extends AbstractRedisClusterTest {
apnSender = mock(APNSender.class);
apnFallbackManager = new ApnFallbackManager(replicatedJedisPool, getRedisCluster(), apnSender, accountsManager);
}
@After
public void tearDown() throws Exception {
super.tearDown();
}
@AfterClass
public static void tearDownRedisSingleton() {
redisServer.stop();
apnFallbackManager = new ApnFallbackManager(getRedisCluster(), apnSender, accountsManager);
}
@Test
public void testClusterInsert() throws RedisException {
final String endpoint = apnFallbackManager.getClusterEndpointKey(account, device);
final String endpoint = apnFallbackManager.getEndpointKey(account, device);
assertTrue(apnFallbackManager.getPendingDestinationsFromClusterCache(SlotHash.getSlot(endpoint), 1).isEmpty());
assertTrue(apnFallbackManager.getPendingDestinations(SlotHash.getSlot(endpoint), 1).isEmpty());
apnFallbackManager.schedule(account, device, System.currentTimeMillis() - 30_000);
final List<String> pendingDestinations = apnFallbackManager.getPendingDestinationsFromClusterCache(SlotHash.getSlot(endpoint), 2);
final List<String> pendingDestinations = apnFallbackManager.getPendingDestinations(SlotHash.getSlot(endpoint), 2);
assertEquals(1, pendingDestinations.size());
final Optional<Pair<String, Long>> maybeUuidAndDeviceId = ApnFallbackManager.getSeparated(pendingDestinations.get(0));
@ -126,16 +81,16 @@ public class ApnFallbackManagerTest extends AbstractRedisClusterTest {
assertEquals(ACCOUNT_UUID.toString(), maybeUuidAndDeviceId.get().first());
assertEquals(DEVICE_ID, (long)maybeUuidAndDeviceId.get().second());
assertTrue(apnFallbackManager.getPendingDestinationsFromClusterCache(SlotHash.getSlot(endpoint), 1).isEmpty());
assertTrue(apnFallbackManager.getPendingDestinations(SlotHash.getSlot(endpoint), 1).isEmpty());
}
@Test
public void testProcessNextSlot() throws RedisException {
final ApnFallbackManager.ClusterCacheWorker worker = apnFallbackManager.new ClusterCacheWorker();
final ApnFallbackManager.NotificationWorker worker = apnFallbackManager.new NotificationWorker();
apnFallbackManager.schedule(account, device, System.currentTimeMillis() - 30_000);
final int slot = SlotHash.getSlot(apnFallbackManager.getClusterEndpointKey(account, device));
final int slot = SlotHash.getSlot(apnFallbackManager.getEndpointKey(account, device));
final int previousSlot = (slot + SlotHash.SLOT_COUNT - 1) % SlotHash.SLOT_COUNT;
getRedisCluster().withCluster(connection -> connection.sync().set(ApnFallbackManager.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(previousSlot)));