Add separate `AccountsDatabaseCrawler` for DynamoDB migration

This commit is contained in:
Chris Eager 2021-09-08 10:28:50 -07:00 committed by Chris Eager
parent 23a076a204
commit 624e40e3b7
4 changed files with 77 additions and 28 deletions

View File

@ -139,6 +139,10 @@ accountDatabaseCrawler:
chunkSize: # accounts per run
chunkIntervalMs: # time per run
dynamoDbMigrationCrawler:
chunkSize: # accounts per run
chunkIntervalMs: # time per run
apn: # Apple Push Notifications configuration
sandbox: true
bundleId:

View File

@ -109,6 +109,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private AccountDatabaseCrawlerConfiguration accountDatabaseCrawler;
@NotNull
@Valid
@JsonProperty
private AccountDatabaseCrawlerConfiguration dynamoDbMigrationCrawler;
@NotNull
@Valid
@JsonProperty
@ -366,6 +371,10 @@ public class WhisperServerConfiguration extends Configuration {
return accountDatabaseCrawler;
}
public AccountDatabaseCrawlerConfiguration getDynamoDbMigrationCrawlerConfiguration() {
return dynamoDbMigrationCrawler;
}
public MessageCacheConfiguration getMessageCacheConfiguration() {
return messageCache;
}

View File

@ -474,9 +474,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
// TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = new ArrayList<>();
// the migrator is read-only
accountDatabaseCrawlerListeners.add(new AccountsDynamoDbMigrator(accountsDynamoDb, dynamicConfigurationManager));
final List<DeletedAccountsDirectoryReconciler> deletedAccountsDirectoryReconcilers = new ArrayList<>();
for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration()
.getDirectoryServerConfiguration()) {
@ -503,7 +500,20 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
CurrencyConversionManager currencyManager = new CurrencyConversionManager(fixerClient, ftxClient, config.getPaymentsServiceConfiguration().getPaymentCurrencies());
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs(), dynamicConfigurationManager);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager,
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs(),
dynamicConfigurationManager);
AccountDatabaseCrawlerCache dynamoDbMigrationCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
dynamoDbMigrationCrawlerCache.setPrefix("DynamoMigration");
AccountDatabaseCrawler accountDynamoDbMigrationCrawler = new AccountDatabaseCrawler(accountsManager,
dynamoDbMigrationCrawlerCache,
List.of(new AccountsDynamoDbMigrator(accountsDynamoDb, dynamicConfigurationManager)),
config.getDynamoDbMigrationCrawlerConfiguration().getChunkSize(),
config.getDynamoDbMigrationCrawlerConfiguration().getChunkIntervalMs(),
dynamicConfigurationManager);
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
MigrationRetryAccountsTableCrawler migrationRetryAccountsTableCrawler = new MigrationRetryAccountsTableCrawler(migrationRetryAccounts, accountsManager, accountsDynamoDb, cacheCluster, recurringJobExecutor);
@ -514,6 +524,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(pubSubManager);
environment.lifecycle().manage(messageSender);
environment.lifecycle().manage(accountDatabaseCrawler);
environment.lifecycle().manage(accountDynamoDbMigrationCrawler);
environment.lifecycle().manage(deletedAccountsTableCrawler);
environment.lifecycle().manage(migrationRetryAccountsTableCrawler);
environment.lifecycle().manage(remoteConfigsManager);

View File

@ -6,38 +6,40 @@ package org.whispersystems.textsecuregcm.storage;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class AccountDatabaseCrawlerCache {
private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker";
private static final String LAST_UUID_KEY = "account_database_crawler_cache_last_uuid";
private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate";
private static final String LAST_UUID_KEY = "account_database_crawler_cache_last_uuid";
private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate";
private static final String LAST_UUID_DYNAMO_KEY = "account_database_crawler_cache_last_uuid_dynamo";
private static final String LAST_UUID_DYNAMO_KEY = "account_database_crawler_cache_last_uuid_dynamo";
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
private final FaultTolerantRedisCluster cacheCluster;
private final ClusterLuaScript unlockClusterScript;
private final ClusterLuaScript unlockClusterScript;
private String prefix = "";
public AccountDatabaseCrawlerCache(FaultTolerantRedisCluster cacheCluster) throws IOException {
this.cacheCluster = cacheCluster;
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua", ScriptOutputType.INTEGER);
this.cacheCluster = cacheCluster;
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua",
ScriptOutputType.INTEGER);
}
public void setAccelerated(final boolean accelerated) {
if (accelerated) {
cacheCluster.useCluster(connection -> connection.sync().set(ACCELERATE_KEY, "1"));
cacheCluster.useCluster(connection -> connection.sync().set(getPrefixedKey(ACCELERATE_KEY), "1"));
} else {
cacheCluster.useCluster(connection -> connection.sync().del(ACCELERATE_KEY));
cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(ACCELERATE_KEY)));
}
}
@ -46,41 +48,64 @@ public class AccountDatabaseCrawlerCache {
}
public boolean claimActiveWork(String workerId, long ttlMs) {
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttlMs))));
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync()
.set(getPrefixedKey(ACTIVE_WORKER_KEY), workerId, SetArgs.Builder.nx().px(ttlMs))));
}
public void releaseActiveWork(String workerId) {
unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId));
unlockClusterScript.execute(List.of(getPrefixedKey(ACTIVE_WORKER_KEY)), List.of(workerId));
}
public Optional<UUID> getLastUuid() {
final String lastUuidString = cacheCluster.withCluster(connection -> connection.sync().get(LAST_UUID_KEY));
final String lastUuidString = cacheCluster.withCluster(
connection -> connection.sync().get(getPrefixedKey(LAST_UUID_KEY)));
if (lastUuidString == null) return Optional.empty();
else return Optional.of(UUID.fromString(lastUuidString));
if (lastUuidString == null) {
return Optional.empty();
} else {
return Optional.of(UUID.fromString(lastUuidString));
}
}
public void setLastUuid(Optional<UUID> lastUuid) {
if (lastUuid.isPresent()) {
cacheCluster.useCluster(connection -> connection.sync().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
cacheCluster.useCluster(connection -> connection.sync()
.psetex(getPrefixedKey(LAST_UUID_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
} else {
cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_KEY));
cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_KEY)));
}
}
public Optional<UUID> getLastUuidDynamo() {
final String lastUuidString = cacheCluster.withCluster(connection -> connection.sync().get(LAST_UUID_DYNAMO_KEY));
final String lastUuidString = cacheCluster.withCluster(
connection -> connection.sync().get(getPrefixedKey(LAST_UUID_DYNAMO_KEY)));
if (lastUuidString == null) return Optional.empty();
else return Optional.of(UUID.fromString(lastUuidString));
if (lastUuidString == null) {
return Optional.empty();
} else {
return Optional.of(UUID.fromString(lastUuidString));
}
}
public void setLastUuidDynamo(Optional<UUID> lastUuid) {
if (lastUuid.isPresent()) {
cacheCluster.useCluster(connection -> connection.sync().psetex(LAST_UUID_DYNAMO_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
cacheCluster.useCluster(
connection -> connection.sync()
.psetex(getPrefixedKey(LAST_UUID_DYNAMO_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
} else {
cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_DYNAMO_KEY));
cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_DYNAMO_KEY)));
}
}
private String getPrefixedKey(final String key) {
return prefix + key;
}
/**
* Set a cache key prefix, allowing for uses beyond the canonical crawler
*/
public void setPrefix(final String prefix) {
this.prefix = prefix + "::";
}
}