Add timers to `processChunk` and `deleteRecentlyDeletedUuids`
This commit is contained in:
parent
b1274125c9
commit
18a6df34bd
|
@ -23,18 +23,20 @@ import org.whispersystems.textsecuregcm.util.Util;
|
|||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
public class AccountDatabaseCrawler implements Managed, Runnable {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(AccountDatabaseCrawler.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(AccountDatabaseCrawler.class);
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private static final Timer readChunkTimer = metricRegistry.timer(name(AccountDatabaseCrawler.class, "readChunk"));
|
||||
private static final Timer readChunkTimer = metricRegistry.timer(name(AccountDatabaseCrawler.class, "readChunk"));
|
||||
private static final Timer processChunkTimer = metricRegistry.timer(
|
||||
name(AccountDatabaseCrawler.class, "processChunk"));
|
||||
|
||||
private static final long WORKER_TTL_MS = 120_000L;
|
||||
private static final long ACCELERATED_CHUNK_INTERVAL = 10L;
|
||||
private static final long WORKER_TTL_MS = 120_000L;
|
||||
private static final long ACCELERATED_CHUNK_INTERVAL = 10L;
|
||||
|
||||
private final AccountsManager accounts;
|
||||
private final int chunkSize;
|
||||
private final long chunkIntervalMs;
|
||||
private final String workerId;
|
||||
private final AccountDatabaseCrawlerCache cache;
|
||||
private final AccountsManager accounts;
|
||||
private final int chunkSize;
|
||||
private final long chunkIntervalMs;
|
||||
private final String workerId;
|
||||
private final AccountDatabaseCrawlerCache cache;
|
||||
private final List<AccountDatabaseCrawlerListener> listeners;
|
||||
|
||||
private final DynamicConfigurationManager dynamicConfigurationManager;
|
||||
|
@ -118,34 +120,38 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
|||
}
|
||||
|
||||
private void processChunk() {
|
||||
final boolean useDynamo = dynamicConfigurationManager.getConfiguration()
|
||||
.getAccountsDynamoDbMigrationConfiguration()
|
||||
.isDynamoCrawlerEnabled();
|
||||
|
||||
final Optional<UUID> fromUuid = getLastUuid(useDynamo);
|
||||
try (Timer.Context timer = processChunkTimer.time()) {
|
||||
|
||||
if (fromUuid.isEmpty()) {
|
||||
logger.info("Started crawl");
|
||||
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
|
||||
}
|
||||
final boolean useDynamo = dynamicConfigurationManager.getConfiguration()
|
||||
.getAccountsDynamoDbMigrationConfiguration()
|
||||
.isDynamoCrawlerEnabled();
|
||||
|
||||
final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize, useDynamo);
|
||||
final Optional<UUID> fromUuid = getLastUuid(useDynamo);
|
||||
|
||||
if (chunkAccounts.getAccounts().isEmpty()) {
|
||||
logger.info("Finished crawl");
|
||||
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
|
||||
cacheLastUuid(Optional.empty(), useDynamo);
|
||||
cache.setAccelerated(false);
|
||||
} else {
|
||||
logger.info("Processing chunk");
|
||||
try {
|
||||
for (AccountDatabaseCrawlerListener listener : listeners) {
|
||||
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts());
|
||||
}
|
||||
cacheLastUuid(chunkAccounts.getLastUuid(), useDynamo);
|
||||
} catch (AccountDatabaseCrawlerRestartException e) {
|
||||
if (fromUuid.isEmpty()) {
|
||||
logger.info("Started crawl");
|
||||
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
|
||||
}
|
||||
|
||||
final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize, useDynamo);
|
||||
|
||||
if (chunkAccounts.getAccounts().isEmpty()) {
|
||||
logger.info("Finished crawl");
|
||||
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
|
||||
cacheLastUuid(Optional.empty(), useDynamo);
|
||||
cache.setAccelerated(false);
|
||||
} else {
|
||||
logger.info("Processing chunk");
|
||||
try {
|
||||
for (AccountDatabaseCrawlerListener listener : listeners) {
|
||||
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts());
|
||||
}
|
||||
cacheLastUuid(chunkAccounts.getLastUuid(), useDynamo);
|
||||
} catch (AccountDatabaseCrawlerRestartException e) {
|
||||
cacheLastUuid(Optional.empty(), useDynamo);
|
||||
cache.setAccelerated(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,6 +74,8 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
|
|||
private static final Timer GET_ALL_FROM_START_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getAllFrom"));
|
||||
private static final Timer GET_ALL_FROM_OFFSET_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getAllFromOffset"));
|
||||
private static final Timer DELETE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "delete"));
|
||||
private static final Timer DELETE_RECENTLY_DELETED_UUIDS_TIMER = Metrics.timer(
|
||||
name(AccountsDynamoDb.class, "deleteRecentlyDeletedUuids"));
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(AccountsDynamoDb.class);
|
||||
|
||||
|
@ -361,13 +363,16 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
|
|||
|
||||
public void deleteRecentlyDeletedUuids() {
|
||||
|
||||
final List<UUID> recentlyDeletedUuids = migrationDeletedAccounts.getRecentlyDeletedUuids();
|
||||
DELETE_RECENTLY_DELETED_UUIDS_TIMER.record(() -> {
|
||||
|
||||
for (UUID recentlyDeletedUuid : recentlyDeletedUuids) {
|
||||
delete(recentlyDeletedUuid, false);
|
||||
}
|
||||
final List<UUID> recentlyDeletedUuids = migrationDeletedAccounts.getRecentlyDeletedUuids();
|
||||
|
||||
migrationDeletedAccounts.delete(recentlyDeletedUuids);
|
||||
for (UUID recentlyDeletedUuid : recentlyDeletedUuids) {
|
||||
delete(recentlyDeletedUuid, false);
|
||||
}
|
||||
|
||||
migrationDeletedAccounts.delete(recentlyDeletedUuids);
|
||||
});
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> migrate(Account account) {
|
||||
|
|
Loading…
Reference in New Issue