Migrate DeletedAccountsTableCrawler to ManagedPeriodicWork

This commit is contained in:
Chris Eager 2021-06-04 14:00:35 -05:00 committed by Chris Eager
parent 88db808298
commit fc7291c3e8
5 changed files with 37 additions and 155 deletions

View File

@ -156,9 +156,8 @@ import org.whispersystems.textsecuregcm.storage.AccountsDynamoDbMigrator;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.ActiveUserCounter;
import org.whispersystems.textsecuregcm.storage.DeletedAccounts;
import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawler;
import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawlerCache;
import org.whispersystems.textsecuregcm.storage.DeletedAccountsDirectoryReconciler;
import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawler;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
@ -495,8 +494,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs());
DeletedAccountsTableCrawlerCache deletedAccountsTableCrawlerCache = new DeletedAccountsTableCrawlerCache(cacheCluster);
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccounts, deletedAccountsTableCrawlerCache, deletedAccountsDirectoryReconcilers);
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccounts, deletedAccountsDirectoryReconcilers, cacheCluster);
apnSender.setApnFallbackManager(apnFallbackManager);
environment.lifecycle().manage(apnFallbackManager);

View File

@ -6,137 +6,63 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest.User;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.Util;
public class DeletedAccountsTableCrawler implements Managed, Runnable {
private static final Logger logger = LoggerFactory.getLogger(DeletedAccountsTableCrawler.class);
public class DeletedAccountsTableCrawler extends ManagedPeriodicWork {
private static final Duration WORKER_TTL = Duration.ofMinutes(2);
private static final Duration RUN_INTERVAL = Duration.ofMinutes(15);
private static final int MAX_BATCH_SIZE = 5_000;
private static final String ACTIVE_WORKER_KEY = "deleted_accounts_crawler_cache_active_worker";
private static final int MAX_BATCH_SIZE = 5_000;
private static final String BATCH_SIZE_DISTRIBUTION_NAME = name(DeletedAccountsTableCrawler.class, "batchSize");
private final DeletedAccounts deletedAccounts;
private final DeletedAccountsTableCrawlerCache cache;
private final List<DeletedAccountsDirectoryReconciler> reconcilers;
private final String workerId;
private final AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
public DeletedAccountsTableCrawler(
final DeletedAccounts deletedAccounts,
final DeletedAccountsTableCrawlerCache cache,
final List<DeletedAccountsDirectoryReconciler> reconcilers) {
final List<DeletedAccountsDirectoryReconciler> reconcilers,
final FaultTolerantRedisCluster cluster) throws IOException {
super(new ManagedPeriodicWorkCache(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL);
this.deletedAccounts = deletedAccounts;
this.cache = cache;
this.reconcilers = reconcilers;
this.workerId = UUID.randomUUID().toString();
}
@Override
public void start() throws Exception {
running.set(true);
new Thread(this).start();
}
public void doPeriodicWork() throws Exception {
@Override
public void stop() throws Exception {
final List<Pair<UUID, String>> deletedAccounts = this.deletedAccounts.list(MAX_BATCH_SIZE);
running.set(false);
notifyAll();
final List<User> deletedUsers = deletedAccounts.stream()
.map(pair -> new User(pair.first(), pair.second()))
.collect(Collectors.toList());
while (!finished) {
Util.wait(this);
}
}
@Override
public void run() {
while(running.get()) {
try {
doPeriodicWork();
sleepWhileRunning(RUN_INTERVAL);
} catch (final Exception e) {
logger.warn("Error in crawl crawl", e);
// wait a bit, in case the error is caused by external instability
Util.sleep(10_000);
}
for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) {
reconciler.onCrawlChunk(deletedUsers);
}
synchronized (this) {
finished = true;
notifyAll();
}
final List<UUID> deletedUuids = deletedAccounts.stream()
.map(Pair::first)
.collect(Collectors.toList());
this.deletedAccounts.delete(deletedUuids);
DistributionSummary.builder(BATCH_SIZE_DISTRIBUTION_NAME)
.publishPercentileHistogram()
.register(Metrics.globalRegistry)
.record(deletedUuids.size());
}
private void doPeriodicWork() {
// generic
if (cache.claimActiveWork(workerId, WORKER_TTL)) {
try {
final long startTimeMs = System.currentTimeMillis();
// specific
final List<Pair<UUID, String>> deletedAccounts = this.deletedAccounts.list(MAX_BATCH_SIZE);
final List<User> deletedUsers = deletedAccounts.stream()
.map(pair -> new User(pair.first(), pair.second()))
.collect(Collectors.toList());
for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) {
reconciler.onCrawlChunk(deletedUsers);
}
final List<UUID> deletedUuids = deletedAccounts.stream()
.map(Pair::first)
.collect(Collectors.toList());
this.deletedAccounts.delete(deletedUuids);
DistributionSummary.builder(BATCH_SIZE_DISTRIBUTION_NAME)
.publishPercentileHistogram()
.register(Metrics.globalRegistry)
.record(deletedUuids.size());
// generic
final long endTimeMs = System.currentTimeMillis();
final Duration sleepInterval = RUN_INTERVAL.minusMillis(endTimeMs - startTimeMs);
if (sleepInterval.getSeconds() > 0) {
sleepWhileRunning(sleepInterval);
}
} catch (final Exception e) {
logger.warn("Failed to process chunk", e);
// wait a full interval for recovery
sleepWhileRunning(RUN_INTERVAL);
} finally {
cache.releaseActiveWork(workerId);
}
}
}
private synchronized void sleepWhileRunning(Duration delay) {
if (running.get()) Util.wait(this, delay.toMillis());
}
}

View File

@ -1,34 +0,0 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
public class DeletedAccountsTableCrawlerCache {
private static final String ACTIVE_WORKER_KEY = "deleted_accounts_crawler_cache_active_worker";
private final FaultTolerantRedisCluster cacheCluster;
private final ClusterLuaScript unlockClusterScript;
public DeletedAccountsTableCrawlerCache(final FaultTolerantRedisCluster cacheCluster) throws IOException {
this.cacheCluster = cacheCluster;
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/table_crawler/unlock.lua", ScriptOutputType.INTEGER);
}
public boolean claimActiveWork(String workerId, Duration ttl) {
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttl.toMillis()))));
}
public void releaseActiveWork(String workerId) {
unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId));
}
}

View File

@ -17,20 +17,20 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable {
private static final Logger logger = LoggerFactory.getLogger(ManagedPeriodicWork.class);
private final ManagedPeriodicWorkCache cache;
private final Duration workerTtl;
private final Duration runInterval;
private final String workerId;
private final AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
public ManagedPeriodicWork(final ManagedPeriodicWorkCache cache) {
public ManagedPeriodicWork(final ManagedPeriodicWorkCache cache, final Duration workerTtl, final Duration runInterval) {
this.cache = cache;
this.workerTtl = workerTtl;
this.runInterval = runInterval;
this.workerId = UUID.randomUUID().toString();
}
abstract protected Duration getWorkerTtl();
abstract protected Duration getRunInterval();
abstract protected void doPeriodicWork() throws Exception;
@Override
@ -56,7 +56,7 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable {
while(running.get()) {
try {
execute();
sleepWhileRunning(getRunInterval());
sleepWhileRunning(runInterval);
} catch (final Exception e) {
logger.warn("Error in crawl crawl", e);
@ -73,7 +73,7 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable {
private void execute() {
if (cache.claimActiveWork(workerId, getWorkerTtl())) {
if (cache.claimActiveWork(workerId, workerTtl)) {
try {
final long startTimeMs = System.currentTimeMillis();
@ -81,7 +81,7 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable {
doPeriodicWork();
final long endTimeMs = System.currentTimeMillis();
final Duration sleepInterval = getRunInterval().minusMillis(endTimeMs - startTimeMs);
final Duration sleepInterval = runInterval.minusMillis(endTimeMs - startTimeMs);
if (sleepInterval.getSeconds() > 0) {
sleepWhileRunning(sleepInterval);
}
@ -90,7 +90,7 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable {
logger.warn("Failed to process chunk", e);
// wait a full interval for recovery
sleepWhileRunning(getRunInterval());
sleepWhileRunning(runInterval);
} finally {
cache.releaseActiveWork(workerId);

View File

@ -1,8 +0,0 @@
-- keys: lock_key
-- argv: lock_value
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end