diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index a1524c7e5..ef4dc0b2c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -156,6 +156,9 @@ import org.whispersystems.textsecuregcm.storage.AccountsDynamoDbMigrator; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.ActiveUserCounter; import org.whispersystems.textsecuregcm.storage.DeletedAccounts; +import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawler; +import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawlerCache; +import org.whispersystems.textsecuregcm.storage.DeletedAccountsDirectoryReconciler; import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; @@ -468,6 +471,7 @@ public class WhisperServerService extends Application deletedAccountsDirectoryReconcilers = new ArrayList<>(); final List accountDatabaseCrawlerListeners = new ArrayList<>(); accountDatabaseCrawlerListeners.add(new PushFeedbackProcessor(accountsManager, directoryQueue)); accountDatabaseCrawlerListeners.add(new ActiveUserCounter(config.getMetricsFactory(), cacheCluster)); @@ -475,6 +479,9 @@ public class WhisperServerService extends Application deletedUsers) throws ChunkProcessingFailedException { + + try { + deleteTimer.recordCallable(() -> { + try { + final DirectoryReconciliationResponse response = directoryReconciliationClient.delete(new DirectoryReconciliationRequest(null, null, deletedUsers)); + + if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) { + errorCounter.increment(); + + throw new ChunkProcessingFailedException("Response status: " + response.getStatus()); + } + } catch (final Exception e) { + + errorCounter.increment(); + + throw new ChunkProcessingFailedException(e); + } + + return null; + }); + } catch (final ChunkProcessingFailedException e) { + throw e; + } catch (final Exception e) { + logger.warn("Unexpected exception", e); + throw new RuntimeException(e); + } + } + +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawler.java new file mode 100644 index 000000000..4c1043d3d --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawler.java @@ -0,0 +1,142 @@ +/* + * Copyright 2013-2020 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.storage; + +import static com.codahale.metrics.MetricRegistry.name; + +import io.dropwizard.lifecycle.Managed; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Metrics; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest.User; +import org.whispersystems.textsecuregcm.util.Pair; +import org.whispersystems.textsecuregcm.util.Util; + +public class DeletedAccountsTableCrawler implements Managed, Runnable { + + private static final Logger logger = LoggerFactory.getLogger(DeletedAccountsTableCrawler.class); + + private static final Duration WORKER_TTL = Duration.ofMinutes(2); + private static final Duration RUN_INTERVAL = Duration.ofMinutes(15); + private static final int MAX_BATCH_SIZE = 5_000; + + private static final String BATCH_SIZE_DISTRIBUTION_NAME = name(DeletedAccountsTableCrawler.class, "batchSize"); + + private final DeletedAccounts deletedAccounts; + private final DeletedAccountsTableCrawlerCache cache; + private final List reconcilers; + private final String workerId; + + private final AtomicBoolean running = new AtomicBoolean(false); + private boolean finished; + + public DeletedAccountsTableCrawler( + final DeletedAccounts deletedAccounts, + final DeletedAccountsTableCrawlerCache cache, + final List reconcilers) { + + this.deletedAccounts = deletedAccounts; + this.cache = cache; + this.reconcilers = reconcilers; + this.workerId = UUID.randomUUID().toString(); + } + + @Override + public void start() throws Exception { + running.set(true); + new Thread(this).start(); + } + + @Override + public void stop() throws Exception { + + running.set(false); + notifyAll(); + + while (!finished) { + Util.wait(this); + } + } + + @Override + public void run() { + + while(running.get()) { + try { + doPeriodicWork(); + sleepWhileRunning(RUN_INTERVAL); + } catch (final Exception e) { + logger.warn("Error in crawl crawl", e); + + // wait a bit, in case the error is caused by external instability + Util.sleep(10_000); + } + } + + synchronized (this) { + finished = true; + notifyAll(); + } + } + + private void doPeriodicWork() { + + // generic + if (cache.claimActiveWork(workerId, WORKER_TTL)) { + + try { + final long startTimeMs = System.currentTimeMillis(); + + // specific + final List> deletedAccounts = this.deletedAccounts.list(MAX_BATCH_SIZE); + + final List deletedUsers = deletedAccounts.stream() + .map(pair -> new User(pair.first(), pair.second())) + .collect(Collectors.toList()); + + for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) { + reconciler.onCrawlChunk(deletedUsers); + } + + final List deletedUuids = deletedAccounts.stream() + .map(Pair::first) + .collect(Collectors.toList()); + + this.deletedAccounts.delete(deletedUuids); + + DistributionSummary.builder(BATCH_SIZE_DISTRIBUTION_NAME) + .publishPercentileHistogram() + .register(Metrics.globalRegistry) + .record(deletedUuids.size()); + + // generic + final long endTimeMs = System.currentTimeMillis(); + final Duration sleepInterval = RUN_INTERVAL.minusMillis(endTimeMs - startTimeMs); + if (sleepInterval.getSeconds() > 0) { + sleepWhileRunning(sleepInterval); + } + + } catch (final Exception e) { + logger.warn("Failed to process chunk", e); + + // wait a full interval for recovery + sleepWhileRunning(RUN_INTERVAL); + + } finally { + cache.releaseActiveWork(workerId); + } + } + } + + private synchronized void sleepWhileRunning(Duration delay) { + if (running.get()) Util.wait(this, delay.toMillis()); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawlerCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawlerCache.java new file mode 100644 index 000000000..0afef3448 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawlerCache.java @@ -0,0 +1,34 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.storage; + +import io.lettuce.core.ScriptOutputType; +import io.lettuce.core.SetArgs; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; + +public class DeletedAccountsTableCrawlerCache { + + private static final String ACTIVE_WORKER_KEY = "deleted_accounts_crawler_cache_active_worker"; + + private final FaultTolerantRedisCluster cacheCluster; + private final ClusterLuaScript unlockClusterScript; + + public DeletedAccountsTableCrawlerCache(final FaultTolerantRedisCluster cacheCluster) throws IOException { + this.cacheCluster = cacheCluster; + this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/table_crawler/unlock.lua", ScriptOutputType.INTEGER); + } + + public boolean claimActiveWork(String workerId, Duration ttl) { + return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttl.toMillis())))); + } + + public void releaseActiveWork(String workerId) { + unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId)); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java index 37d4484a1..35bad4921 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java @@ -4,7 +4,16 @@ */ package org.whispersystems.textsecuregcm.storage; +import static com.codahale.metrics.MetricRegistry.name; + import com.codahale.metrics.SharedMetricRegistries; +import java.security.KeyStore; +import java.security.cert.CertificateException; +import javax.net.ssl.SSLContext; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; import org.glassfish.jersey.SslConfigurator; import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration; @@ -14,16 +23,6 @@ import org.whispersystems.textsecuregcm.util.CertificateExpirationGauge; import org.whispersystems.textsecuregcm.util.CertificateUtil; import org.whispersystems.textsecuregcm.util.Constants; -import javax.net.ssl.SSLContext; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Entity; -import javax.ws.rs.core.MediaType; -import java.security.KeyStore; -import java.security.cert.CertificateException; - -import static com.codahale.metrics.MetricRegistry.name; - public class DirectoryReconciliationClient { private final String replicationUrl; @@ -47,6 +46,13 @@ public class DirectoryReconciliationClient { .put(Entity.json(request), DirectoryReconciliationResponse.class); } + public DirectoryReconciliationResponse delete(DirectoryReconciliationRequest request) { + return client.target(replicationUrl) + .path("/v3/directory/deletes") + .request(MediaType.APPLICATION_JSON_TYPE) + .put(Entity.json(request), DirectoryReconciliationResponse.class); + } + private static Client initializeClient(DirectoryServerConfiguration directoryServerConfiguration) throws CertificateException { diff --git a/service/src/main/resources/lua/table_crawler/unlock.lua b/service/src/main/resources/lua/table_crawler/unlock.lua new file mode 100644 index 000000000..b95d15d66 --- /dev/null +++ b/service/src/main/resources/lua/table_crawler/unlock.lua @@ -0,0 +1,8 @@ +-- keys: lock_key +-- argv: lock_value + +if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) +else + return 0 +end