From fc7291c3e82c6fa572ee20756571be4723897909 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Fri, 4 Jun 2021 14:00:35 -0500 Subject: [PATCH] Migrate DeletedAccountsTableCrawler to ManagedPeriodicWork --- .../textsecuregcm/WhisperServerService.java | 6 +- .../storage/DeletedAccountsTableCrawler.java | 126 ++++-------------- .../DeletedAccountsTableCrawlerCache.java | 34 ----- .../storage/ManagedPeriodicWork.java | 18 +-- .../resources/lua/table_crawler/unlock.lua | 8 -- 5 files changed, 37 insertions(+), 155 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawlerCache.java delete mode 100644 service/src/main/resources/lua/table_crawler/unlock.lua diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index ef4dc0b2c..402e38df0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -156,9 +156,8 @@ import org.whispersystems.textsecuregcm.storage.AccountsDynamoDbMigrator; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.ActiveUserCounter; import org.whispersystems.textsecuregcm.storage.DeletedAccounts; -import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawler; -import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawlerCache; import org.whispersystems.textsecuregcm.storage.DeletedAccountsDirectoryReconciler; +import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawler; import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; @@ -495,8 +494,7 @@ public class WhisperServerService extends Application reconcilers; - private final String workerId; - - private final AtomicBoolean running = new AtomicBoolean(false); - private boolean finished; public DeletedAccountsTableCrawler( final DeletedAccounts deletedAccounts, - final DeletedAccountsTableCrawlerCache cache, - final List reconcilers) { + final List reconcilers, + final FaultTolerantRedisCluster cluster) throws IOException { + + super(new ManagedPeriodicWorkCache(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL); this.deletedAccounts = deletedAccounts; - this.cache = cache; this.reconcilers = reconcilers; - this.workerId = UUID.randomUUID().toString(); } @Override - public void start() throws Exception { - running.set(true); - new Thread(this).start(); - } + public void doPeriodicWork() throws Exception { - @Override - public void stop() throws Exception { + final List> deletedAccounts = this.deletedAccounts.list(MAX_BATCH_SIZE); - running.set(false); - notifyAll(); + final List deletedUsers = deletedAccounts.stream() + .map(pair -> new User(pair.first(), pair.second())) + .collect(Collectors.toList()); - while (!finished) { - Util.wait(this); - } - } - - @Override - public void run() { - - while(running.get()) { - try { - doPeriodicWork(); - sleepWhileRunning(RUN_INTERVAL); - } catch (final Exception e) { - logger.warn("Error in crawl crawl", e); - - // wait a bit, in case the error is caused by external instability - Util.sleep(10_000); - } + for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) { + reconciler.onCrawlChunk(deletedUsers); } - synchronized (this) { - finished = true; - notifyAll(); - } + final List deletedUuids = deletedAccounts.stream() + .map(Pair::first) + .collect(Collectors.toList()); + + this.deletedAccounts.delete(deletedUuids); + + DistributionSummary.builder(BATCH_SIZE_DISTRIBUTION_NAME) + .publishPercentileHistogram() + .register(Metrics.globalRegistry) + .record(deletedUuids.size()); } - private void doPeriodicWork() { - - // generic - if (cache.claimActiveWork(workerId, WORKER_TTL)) { - - try { - final long startTimeMs = System.currentTimeMillis(); - - // specific - final List> deletedAccounts = this.deletedAccounts.list(MAX_BATCH_SIZE); - - final List deletedUsers = deletedAccounts.stream() - .map(pair -> new User(pair.first(), pair.second())) - .collect(Collectors.toList()); - - for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) { - reconciler.onCrawlChunk(deletedUsers); - } - - final List deletedUuids = deletedAccounts.stream() - .map(Pair::first) - .collect(Collectors.toList()); - - this.deletedAccounts.delete(deletedUuids); - - DistributionSummary.builder(BATCH_SIZE_DISTRIBUTION_NAME) - .publishPercentileHistogram() - .register(Metrics.globalRegistry) - .record(deletedUuids.size()); - - // generic - final long endTimeMs = System.currentTimeMillis(); - final Duration sleepInterval = RUN_INTERVAL.minusMillis(endTimeMs - startTimeMs); - if (sleepInterval.getSeconds() > 0) { - sleepWhileRunning(sleepInterval); - } - - } catch (final Exception e) { - logger.warn("Failed to process chunk", e); - - // wait a full interval for recovery - sleepWhileRunning(RUN_INTERVAL); - - } finally { - cache.releaseActiveWork(workerId); - } - } - } - - private synchronized void sleepWhileRunning(Duration delay) { - if (running.get()) Util.wait(this, delay.toMillis()); - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawlerCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawlerCache.java deleted file mode 100644 index 0afef3448..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DeletedAccountsTableCrawlerCache.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2013-2021 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.textsecuregcm.storage; - -import io.lettuce.core.ScriptOutputType; -import io.lettuce.core.SetArgs; -import java.io.IOException; -import java.time.Duration; -import java.util.List; -import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; - -public class DeletedAccountsTableCrawlerCache { - - private static final String ACTIVE_WORKER_KEY = "deleted_accounts_crawler_cache_active_worker"; - - private final FaultTolerantRedisCluster cacheCluster; - private final ClusterLuaScript unlockClusterScript; - - public DeletedAccountsTableCrawlerCache(final FaultTolerantRedisCluster cacheCluster) throws IOException { - this.cacheCluster = cacheCluster; - this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/table_crawler/unlock.lua", ScriptOutputType.INTEGER); - } - - public boolean claimActiveWork(String workerId, Duration ttl) { - return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttl.toMillis())))); - } - - public void releaseActiveWork(String workerId) { - unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId)); - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java index af74e8ef2..1ab878882 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java @@ -17,20 +17,20 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable { private static final Logger logger = LoggerFactory.getLogger(ManagedPeriodicWork.class); private final ManagedPeriodicWorkCache cache; + private final Duration workerTtl; + private final Duration runInterval; private final String workerId; private final AtomicBoolean running = new AtomicBoolean(false); private boolean finished; - public ManagedPeriodicWork(final ManagedPeriodicWorkCache cache) { + public ManagedPeriodicWork(final ManagedPeriodicWorkCache cache, final Duration workerTtl, final Duration runInterval) { this.cache = cache; + this.workerTtl = workerTtl; + this.runInterval = runInterval; this.workerId = UUID.randomUUID().toString(); } - abstract protected Duration getWorkerTtl(); - - abstract protected Duration getRunInterval(); - abstract protected void doPeriodicWork() throws Exception; @Override @@ -56,7 +56,7 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable { while(running.get()) { try { execute(); - sleepWhileRunning(getRunInterval()); + sleepWhileRunning(runInterval); } catch (final Exception e) { logger.warn("Error in crawl crawl", e); @@ -73,7 +73,7 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable { private void execute() { - if (cache.claimActiveWork(workerId, getWorkerTtl())) { + if (cache.claimActiveWork(workerId, workerTtl)) { try { final long startTimeMs = System.currentTimeMillis(); @@ -81,7 +81,7 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable { doPeriodicWork(); final long endTimeMs = System.currentTimeMillis(); - final Duration sleepInterval = getRunInterval().minusMillis(endTimeMs - startTimeMs); + final Duration sleepInterval = runInterval.minusMillis(endTimeMs - startTimeMs); if (sleepInterval.getSeconds() > 0) { sleepWhileRunning(sleepInterval); } @@ -90,7 +90,7 @@ public abstract class ManagedPeriodicWork implements Managed, Runnable { logger.warn("Failed to process chunk", e); // wait a full interval for recovery - sleepWhileRunning(getRunInterval()); + sleepWhileRunning(runInterval); } finally { cache.releaseActiveWork(workerId); diff --git a/service/src/main/resources/lua/table_crawler/unlock.lua b/service/src/main/resources/lua/table_crawler/unlock.lua deleted file mode 100644 index b95d15d66..000000000 --- a/service/src/main/resources/lua/table_crawler/unlock.lua +++ /dev/null @@ -1,8 +0,0 @@ --- keys: lock_key --- argv: lock_value - -if redis.call("GET", KEYS[1]) == ARGV[1] then - return redis.call("DEL", KEYS[1]) -else - return 0 -end