Add DeletedAccountsTableCrawler

This commit is contained in:
Chris Eager 2021-05-28 18:28:59 -05:00 committed by Chris Eager
parent a315c9be92
commit 5193abdab3
7 changed files with 299 additions and 10 deletions

View File

@ -156,6 +156,9 @@ import org.whispersystems.textsecuregcm.storage.AccountsDynamoDbMigrator;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.ActiveUserCounter;
import org.whispersystems.textsecuregcm.storage.DeletedAccounts;
import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawler;
import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawlerCache;
import org.whispersystems.textsecuregcm.storage.DeletedAccountsDirectoryReconciler;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
@ -468,6 +471,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
final List<DeletedAccountsDirectoryReconciler> deletedAccountsDirectoryReconcilers = new ArrayList<>();
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = new ArrayList<>();
accountDatabaseCrawlerListeners.add(new PushFeedbackProcessor(accountsManager, directoryQueue));
accountDatabaseCrawlerListeners.add(new ActiveUserCounter(config.getMetricsFactory(), cacheCluster));
@ -475,6 +479,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(directoryServerConfiguration);
final DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryServerConfiguration.getReplicationName(), directoryReconciliationClient);
accountDatabaseCrawlerListeners.add(directoryReconciler);
final DeletedAccountsDirectoryReconciler deletedAccountsDirectoryReconciler = new DeletedAccountsDirectoryReconciler(directoryServerConfiguration.getReplicationName(), directoryReconciliationClient);
deletedAccountsDirectoryReconcilers.add(deletedAccountsDirectoryReconciler);
}
accountDatabaseCrawlerListeners.add(new AccountCleaner(accountsManager));
accountDatabaseCrawlerListeners.add(new RegistrationLockVersionCounter(metricsCluster, config.getMetricsFactory()));
@ -488,11 +495,15 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs());
DeletedAccountsTableCrawlerCache deletedAccountsTableCrawlerCache = new DeletedAccountsTableCrawlerCache(cacheCluster);
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccounts, deletedAccountsTableCrawlerCache, deletedAccountsDirectoryReconcilers);
apnSender.setApnFallbackManager(apnFallbackManager);
environment.lifecycle().manage(apnFallbackManager);
environment.lifecycle().manage(pubSubManager);
environment.lifecycle().manage(messageSender);
environment.lifecycle().manage(accountDatabaseCrawler);
environment.lifecycle().manage(deletedAccountsTableCrawler);
environment.lifecycle().manage(remoteConfigsManager);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(messagePersister);

View File

@ -0,0 +1,16 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
public class ChunkProcessingFailedException extends Exception {
public ChunkProcessingFailedException(String message) {
super(message);
}
public ChunkProcessingFailedException(Exception cause) {
super(cause);
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest.User;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse;
public class DeletedAccountsDirectoryReconciler {
private final Logger logger = LoggerFactory.getLogger(DeletedAccountsDirectoryReconciler.class);
private final DirectoryReconciliationClient directoryReconciliationClient;
private final Timer deleteTimer;
private final Counter errorCounter;
public DeletedAccountsDirectoryReconciler(
final String replicationName,
final DirectoryReconciliationClient directoryReconciliationClient) {
this.directoryReconciliationClient = directoryReconciliationClient;
deleteTimer = Timer.builder(name(DeletedAccountsDirectoryReconciler.class, "delete"))
.tag("replicationName", replicationName)
.register(Metrics.globalRegistry);
errorCounter = Counter.builder(name(DeletedAccountsDirectoryReconciler.class, "error"))
.tag("replicationName", replicationName)
.register(Metrics.globalRegistry);
}
public void onCrawlChunk(final List<User> deletedUsers) throws ChunkProcessingFailedException {
try {
deleteTimer.recordCallable(() -> {
try {
final DirectoryReconciliationResponse response = directoryReconciliationClient.delete(new DirectoryReconciliationRequest(null, null, deletedUsers));
if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) {
errorCounter.increment();
throw new ChunkProcessingFailedException("Response status: " + response.getStatus());
}
} catch (final Exception e) {
errorCounter.increment();
throw new ChunkProcessingFailedException(e);
}
return null;
});
} catch (final ChunkProcessingFailedException e) {
throw e;
} catch (final Exception e) {
logger.warn("Unexpected exception", e);
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,142 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest.User;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.Util;
public class DeletedAccountsTableCrawler implements Managed, Runnable {
private static final Logger logger = LoggerFactory.getLogger(DeletedAccountsTableCrawler.class);
private static final Duration WORKER_TTL = Duration.ofMinutes(2);
private static final Duration RUN_INTERVAL = Duration.ofMinutes(15);
private static final int MAX_BATCH_SIZE = 5_000;
private static final String BATCH_SIZE_DISTRIBUTION_NAME = name(DeletedAccountsTableCrawler.class, "batchSize");
private final DeletedAccounts deletedAccounts;
private final DeletedAccountsTableCrawlerCache cache;
private final List<DeletedAccountsDirectoryReconciler> reconcilers;
private final String workerId;
private final AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
public DeletedAccountsTableCrawler(
final DeletedAccounts deletedAccounts,
final DeletedAccountsTableCrawlerCache cache,
final List<DeletedAccountsDirectoryReconciler> reconcilers) {
this.deletedAccounts = deletedAccounts;
this.cache = cache;
this.reconcilers = reconcilers;
this.workerId = UUID.randomUUID().toString();
}
@Override
public void start() throws Exception {
running.set(true);
new Thread(this).start();
}
@Override
public void stop() throws Exception {
running.set(false);
notifyAll();
while (!finished) {
Util.wait(this);
}
}
@Override
public void run() {
while(running.get()) {
try {
doPeriodicWork();
sleepWhileRunning(RUN_INTERVAL);
} catch (final Exception e) {
logger.warn("Error in crawl crawl", e);
// wait a bit, in case the error is caused by external instability
Util.sleep(10_000);
}
}
synchronized (this) {
finished = true;
notifyAll();
}
}
private void doPeriodicWork() {
// generic
if (cache.claimActiveWork(workerId, WORKER_TTL)) {
try {
final long startTimeMs = System.currentTimeMillis();
// specific
final List<Pair<UUID, String>> deletedAccounts = this.deletedAccounts.list(MAX_BATCH_SIZE);
final List<User> deletedUsers = deletedAccounts.stream()
.map(pair -> new User(pair.first(), pair.second()))
.collect(Collectors.toList());
for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) {
reconciler.onCrawlChunk(deletedUsers);
}
final List<UUID> deletedUuids = deletedAccounts.stream()
.map(Pair::first)
.collect(Collectors.toList());
this.deletedAccounts.delete(deletedUuids);
DistributionSummary.builder(BATCH_SIZE_DISTRIBUTION_NAME)
.publishPercentileHistogram()
.register(Metrics.globalRegistry)
.record(deletedUuids.size());
// generic
final long endTimeMs = System.currentTimeMillis();
final Duration sleepInterval = RUN_INTERVAL.minusMillis(endTimeMs - startTimeMs);
if (sleepInterval.getSeconds() > 0) {
sleepWhileRunning(sleepInterval);
}
} catch (final Exception e) {
logger.warn("Failed to process chunk", e);
// wait a full interval for recovery
sleepWhileRunning(RUN_INTERVAL);
} finally {
cache.releaseActiveWork(workerId);
}
}
}
private synchronized void sleepWhileRunning(Duration delay) {
if (running.get()) Util.wait(this, delay.toMillis());
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
public class DeletedAccountsTableCrawlerCache {
private static final String ACTIVE_WORKER_KEY = "deleted_accounts_crawler_cache_active_worker";
private final FaultTolerantRedisCluster cacheCluster;
private final ClusterLuaScript unlockClusterScript;
public DeletedAccountsTableCrawlerCache(final FaultTolerantRedisCluster cacheCluster) throws IOException {
this.cacheCluster = cacheCluster;
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/table_crawler/unlock.lua", ScriptOutputType.INTEGER);
}
public boolean claimActiveWork(String workerId, Duration ttl) {
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttl.toMillis()))));
}
public void releaseActiveWork(String workerId) {
unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId));
}
}

View File

@ -4,7 +4,16 @@
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.SharedMetricRegistries;
import java.security.KeyStore;
import java.security.cert.CertificateException;
import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import org.glassfish.jersey.SslConfigurator;
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration;
@ -14,16 +23,6 @@ import org.whispersystems.textsecuregcm.util.CertificateExpirationGauge;
import org.whispersystems.textsecuregcm.util.CertificateUtil;
import org.whispersystems.textsecuregcm.util.Constants;
import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import java.security.KeyStore;
import java.security.cert.CertificateException;
import static com.codahale.metrics.MetricRegistry.name;
public class DirectoryReconciliationClient {
private final String replicationUrl;
@ -47,6 +46,13 @@ public class DirectoryReconciliationClient {
.put(Entity.json(request), DirectoryReconciliationResponse.class);
}
public DirectoryReconciliationResponse delete(DirectoryReconciliationRequest request) {
return client.target(replicationUrl)
.path("/v3/directory/deletes")
.request(MediaType.APPLICATION_JSON_TYPE)
.put(Entity.json(request), DirectoryReconciliationResponse.class);
}
private static Client initializeClient(DirectoryServerConfiguration directoryServerConfiguration)
throws CertificateException
{

View File

@ -0,0 +1,8 @@
-- keys: lock_key
-- argv: lock_value
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end