Add a task to enable/disable accelerated crawling.

This commit is contained in:
Jon Chambers 2021-01-11 18:51:20 -05:00 committed by Jon Chambers
parent 8fb37a0024
commit 2e01da5ec1
5 changed files with 48 additions and 6 deletions

View File

@ -150,6 +150,7 @@ import org.whispersystems.textsecuregcm.workers.DisableRequestLoggingTask;
import org.whispersystems.textsecuregcm.workers.EnableRequestLoggingTask; import org.whispersystems.textsecuregcm.workers.EnableRequestLoggingTask;
import org.whispersystems.textsecuregcm.workers.GetRedisCommandStatsCommand; import org.whispersystems.textsecuregcm.workers.GetRedisCommandStatsCommand;
import org.whispersystems.textsecuregcm.workers.GetRedisSlowlogCommand; import org.whispersystems.textsecuregcm.workers.GetRedisSlowlogCommand;
import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask;
import org.whispersystems.textsecuregcm.workers.VacuumCommand; import org.whispersystems.textsecuregcm.workers.VacuumCommand;
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand; import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
import org.whispersystems.websocket.WebSocketResourceProviderFactory; import org.whispersystems.websocket.WebSocketResourceProviderFactory;
@ -448,6 +449,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.admin().addTask(new EnableRequestLoggingTask()); environment.admin().addTask(new EnableRequestLoggingTask());
environment.admin().addTask(new DisableRequestLoggingTask()); environment.admin().addTask(new DisableRequestLoggingTask());
environment.admin().addTask(new SetCrawlerAccelerationTask(accountDatabaseCrawlerCache));
/// ///

View File

@ -122,7 +122,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
logger.info("Finished crawl"); logger.info("Finished crawl");
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid)); listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
cache.setLastUuid(Optional.empty()); cache.setLastUuid(Optional.empty());
cache.clearAccelerate(); cache.setAccelerated(false);
} else { } else {
try { try {
for (AccountDatabaseCrawlerListener listener : listeners) { for (AccountDatabaseCrawlerListener listener : listeners) {
@ -131,7 +131,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
cache.setLastUuid(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getUuid())); cache.setLastUuid(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getUuid()));
} catch (AccountDatabaseCrawlerRestartException e) { } catch (AccountDatabaseCrawlerRestartException e) {
cache.setLastUuid(Optional.empty()); cache.setLastUuid(Optional.empty());
cache.clearAccelerate(); cache.setAccelerated(false);
} }
} }

View File

@ -31,9 +31,13 @@ public class AccountDatabaseCrawlerCache {
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua", ScriptOutputType.INTEGER); this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua", ScriptOutputType.INTEGER);
} }
public void clearAccelerate() { public void setAccelerated(final boolean accelerated) {
if (accelerated) {
cacheCluster.useCluster(connection -> connection.sync().set(ACCELERATE_KEY, "1"));
} else {
cacheCluster.useCluster(connection -> connection.sync().del(ACCELERATE_KEY)); cacheCluster.useCluster(connection -> connection.sync().del(ACCELERATE_KEY));
} }
}
public boolean isAccelerated() { public boolean isAccelerated() {
return "1".equals(cacheCluster.withCluster(connection -> connection.sync().get(ACCELERATE_KEY))); return "1".equals(cacheCluster.withCluster(connection -> connection.sync().get(ACCELERATE_KEY)));

View File

@ -0,0 +1,36 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.servlets.tasks.Task;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import java.io.PrintWriter;
import java.util.List;
import java.util.Map;
public class SetCrawlerAccelerationTask extends Task {
private final AccountDatabaseCrawlerCache crawlerCache;
public SetCrawlerAccelerationTask(final AccountDatabaseCrawlerCache crawlerCache) {
super("set-crawler-accelerated");
this.crawlerCache = crawlerCache;
}
@Override
public void execute(final Map<String, List<String>> parameters, final PrintWriter out) {
if (parameters.containsKey("accelerated") && parameters.get("accelerated").size() == 1) {
final boolean accelerated = "true".equalsIgnoreCase(parameters.get("accelerated").get(0));
crawlerCache.setAccelerated(accelerated);
out.println("Set accelerated: " + accelerated);
} else {
out.println("Usage: set-crawler-accelerated?accelerated=[true|false]");
}
}
}

View File

@ -148,7 +148,7 @@ public class AccountDatabaseCrawlerTest {
verify(account2, times(0)).getNumber(); verify(account2, times(0)).getNumber();
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.empty())); verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(1)).clearAccelerate(); verify(cache, times(1)).setAccelerated(false);
verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class)); verify(cache, times(1)).releaseActiveWork(any(String.class));
@ -175,7 +175,7 @@ public class AccountDatabaseCrawlerTest {
verify(account2, times(0)).getNumber(); verify(account2, times(0)).getNumber();
verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2))); verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).setLastUuid(eq(Optional.empty())); verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(1)).clearAccelerate(); verify(cache, times(1)).setAccelerated(false);
verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class)); verify(cache, times(1)).releaseActiveWork(any(String.class));