diff --git a/service/config/sample.yml b/service/config/sample.yml index c7a0c5555..6789e1618 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -139,6 +139,10 @@ accountDatabaseCrawler: chunkSize: # accounts per run chunkIntervalMs: # time per run +dynamoDbMigrationCrawler: + chunkSize: # accounts per run + chunkIntervalMs: # time per run + apn: # Apple Push Notifications configuration sandbox: true bundleId: diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 196b2a249..0b0d56791 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -109,6 +109,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private AccountDatabaseCrawlerConfiguration accountDatabaseCrawler; + @NotNull + @Valid + @JsonProperty + private AccountDatabaseCrawlerConfiguration dynamoDbMigrationCrawler; + @NotNull @Valid @JsonProperty @@ -366,6 +371,10 @@ public class WhisperServerConfiguration extends Configuration { return accountDatabaseCrawler; } + public AccountDatabaseCrawlerConfiguration getDynamoDbMigrationCrawlerConfiguration() { + return dynamoDbMigrationCrawler; + } + public MessageCacheConfiguration getMessageCacheConfiguration() { return messageCache; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index c11cc6db7..8cf949d69 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -474,9 +474,6 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = new ArrayList<>(); - // the migrator is read-only - accountDatabaseCrawlerListeners.add(new AccountsDynamoDbMigrator(accountsDynamoDb, dynamicConfigurationManager)); - final List deletedAccountsDirectoryReconcilers = new ArrayList<>(); for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration() .getDirectoryServerConfiguration()) { @@ -503,7 +500,20 @@ public class WhisperServerService extends Application connection.sync().set(ACCELERATE_KEY, "1")); + cacheCluster.useCluster(connection -> connection.sync().set(getPrefixedKey(ACCELERATE_KEY), "1")); } else { - cacheCluster.useCluster(connection -> connection.sync().del(ACCELERATE_KEY)); + cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(ACCELERATE_KEY))); } } @@ -46,41 +48,64 @@ public class AccountDatabaseCrawlerCache { } public boolean claimActiveWork(String workerId, long ttlMs) { - return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttlMs)))); + return "OK".equals(cacheCluster.withCluster(connection -> connection.sync() + .set(getPrefixedKey(ACTIVE_WORKER_KEY), workerId, SetArgs.Builder.nx().px(ttlMs)))); } public void releaseActiveWork(String workerId) { - unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId)); + unlockClusterScript.execute(List.of(getPrefixedKey(ACTIVE_WORKER_KEY)), List.of(workerId)); } public Optional getLastUuid() { - final String lastUuidString = cacheCluster.withCluster(connection -> connection.sync().get(LAST_UUID_KEY)); + final String lastUuidString = cacheCluster.withCluster( + connection -> connection.sync().get(getPrefixedKey(LAST_UUID_KEY))); - if (lastUuidString == null) return Optional.empty(); - else return Optional.of(UUID.fromString(lastUuidString)); + if (lastUuidString == null) { + return Optional.empty(); + } else { + return Optional.of(UUID.fromString(lastUuidString)); + } } public void setLastUuid(Optional lastUuid) { if (lastUuid.isPresent()) { - cacheCluster.useCluster(connection -> connection.sync().psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); + cacheCluster.useCluster(connection -> connection.sync() + .psetex(getPrefixedKey(LAST_UUID_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString())); } else { - cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_KEY)); + cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_KEY))); } } public Optional getLastUuidDynamo() { - final String lastUuidString = cacheCluster.withCluster(connection -> connection.sync().get(LAST_UUID_DYNAMO_KEY)); + final String lastUuidString = cacheCluster.withCluster( + connection -> connection.sync().get(getPrefixedKey(LAST_UUID_DYNAMO_KEY))); - if (lastUuidString == null) return Optional.empty(); - else return Optional.of(UUID.fromString(lastUuidString)); + if (lastUuidString == null) { + return Optional.empty(); + } else { + return Optional.of(UUID.fromString(lastUuidString)); + } } public void setLastUuidDynamo(Optional lastUuid) { if (lastUuid.isPresent()) { - cacheCluster.useCluster(connection -> connection.sync().psetex(LAST_UUID_DYNAMO_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString())); + cacheCluster.useCluster( + connection -> connection.sync() + .psetex(getPrefixedKey(LAST_UUID_DYNAMO_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString())); } else { - cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_DYNAMO_KEY)); + cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_DYNAMO_KEY))); } } + private String getPrefixedKey(final String key) { + return prefix + key; + } + + /** + * Set a cache key prefix, allowing for uses beyond the canonical crawler + */ + public void setPrefix(final String prefix) { + this.prefix = prefix + "::"; + } + }