From eddfacd0f498fc64457330033da8e7b302294836 Mon Sep 17 00:00:00 2001 From: Brian Acton Date: Fri, 25 Oct 2019 21:30:48 -0700 Subject: [PATCH] add timers to the account crawler listeners --- .../textsecuregcm/storage/AccountCleaner.java | 12 +-- .../storage/AccountDatabaseCrawler.java | 2 +- .../AccountDatabaseCrawlerListener.java | 32 +++++++- .../storage/ActiveUserCounter.java | 77 ++++++++++--------- .../storage/DirectoryReconciler.java | 7 +- .../storage/PushFeedbackProcessor.java | 10 +-- .../storage/AccountDatabaseCrawlerTest.java | 10 +-- 7 files changed, 89 insertions(+), 61 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java index cf68231aa..45ecd89c5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountCleaner.java @@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit; import static com.codahale.metrics.MetricRegistry.name; -public class AccountCleaner implements AccountDatabaseCrawlerListener { +public class AccountCleaner extends AccountDatabaseCrawlerListener { private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private static final Meter expiredAccountsMeter = metricRegistry.meter(name(AccountCleaner.class, "expiredAccounts")); @@ -52,7 +52,11 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener { } @Override - public void onCrawlChunk(Optional fromUuid, List chunkAccounts) { + public void onCrawlEnd(Optional fromUuid) { + } + + @Override + protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) { int accountUpdateCount = 0; for (Account account : chunkAccounts) { if (needsExplicitRemoval(account)) { @@ -74,10 +78,6 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener { } } - @Override - public void onCrawlEnd(Optional fromUuid) { - } - private boolean needsExplicitRemoval(Account account) { return account.getMasterDevice().isPresent() && hasPushToken(account.getMasterDevice().get()) && diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java index 93d08d679..429a0c6c0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java @@ -137,7 +137,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable { } else { try { for (AccountDatabaseCrawlerListener listener : listeners) { - listener.onCrawlChunk(fromUuid, chunkAccounts); + listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts); } cache.setLastUuid(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getUuid())); } catch (AccountDatabaseCrawlerRestartException e) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java index 590f61c38..71acb58ec 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java @@ -16,13 +16,37 @@ */ package org.whispersystems.textsecuregcm.storage; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import com.codahale.metrics.Timer; + +import org.whispersystems.textsecuregcm.util.Constants; + import java.util.List; import java.util.Optional; import java.util.UUID; +import static com.codahale.metrics.MetricRegistry.name; + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") -public interface AccountDatabaseCrawlerListener { - void onCrawlStart(); - void onCrawlChunk(Optional fromUuid, List chunkAccounts) throws AccountDatabaseCrawlerRestartException; - void onCrawlEnd(Optional fromUuid); +public abstract class AccountDatabaseCrawlerListener { + + private Timer processChunkTimer; + + abstract public void onCrawlStart(); + + abstract public void onCrawlEnd(Optional fromUuid); + + abstract protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) throws AccountDatabaseCrawlerRestartException; + + public AccountDatabaseCrawlerListener() { + processChunkTimer = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME).timer(name(AccountDatabaseCrawlerListener.class, "processChunk", getClass().getSimpleName())); + } + + public void timeAndProcessCrawlChunk(Optional fromUuid, List chunkAccounts) throws AccountDatabaseCrawlerRestartException { + try (Timer.Context timer = processChunkTimer.time()) { + onCrawlChunk(fromUuid, chunkAccounts); + } + } + } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java index 099cae5d5..1c23de692 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java @@ -37,7 +37,7 @@ import io.dropwizard.metrics.MetricsFactory; import io.dropwizard.metrics.ReporterFactory; import redis.clients.jedis.Jedis; -public class ActiveUserCounter implements AccountDatabaseCrawlerListener { +public class ActiveUserCounter extends AccountDatabaseCrawlerListener { private static final String TALLY_KEY = "active_user_tally"; @@ -56,6 +56,7 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener { this.mapper = SystemMapper.getMapper(); } + @Override public void onCrawlStart() { try (Jedis jedis = jedisPool.getWriteResource()) { jedis.del(TALLY_KEY); @@ -63,7 +64,43 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener { } @Override - public void onCrawlChunk(Optional fromNumber, List chunkAccounts) { + public void onCrawlEnd(Optional fromNumber) { + MetricRegistry metrics = new MetricRegistry(); + long intervalTallies[] = new long[INTERVALS.length]; + ActiveUserTally activeUserTally = getFinalTallies(); + Map platforms = activeUserTally.getPlatforms(); + + platforms.forEach((platform, platformTallies) -> { + for (int i = 0; i < INTERVALS.length; i++) { + final long tally = platformTallies[i]; + metrics.register(metricKey(platform, INTERVALS[i]), + (Gauge) () -> tally); + intervalTallies[i] += tally; + } + }); + + Map countries = activeUserTally.getCountries(); + countries.forEach((country, countryTallies) -> { + for (int i = 0; i < INTERVALS.length; i++) { + final long tally = countryTallies[i]; + metrics.register(metricKey(country, INTERVALS[i]), + (Gauge) () -> tally); + } + }); + + for (int i = 0; i < INTERVALS.length; i++) { + final long intervalTotal = intervalTallies[i]; + metrics.register(metricKey(INTERVALS[i]), + (Gauge) () -> intervalTotal); + } + + for (ReporterFactory reporterFactory : metricsFactory.getReporters()) { + reporterFactory.build(metrics).report(); + } + } + + @Override + protected void onCrawlChunk(Optional fromNumber, List chunkAccounts) { long nowDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()); long agoMs[] = {TimeUnit.DAYS.toMillis(nowDays - 1), TimeUnit.DAYS.toMillis(nowDays - 7), @@ -109,42 +146,6 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener { incrementTallies(fromNumber.orElse(UUID.randomUUID()), platformIncrements, countryIncrements); } - @Override - public void onCrawlEnd(Optional fromNumber) { - MetricRegistry metrics = new MetricRegistry(); - long intervalTallies[] = new long[INTERVALS.length]; - ActiveUserTally activeUserTally = getFinalTallies(); - Map platforms = activeUserTally.getPlatforms(); - - platforms.forEach((platform, platformTallies) -> { - for (int i = 0; i < INTERVALS.length; i++) { - final long tally = platformTallies[i]; - metrics.register(metricKey(platform, INTERVALS[i]), - (Gauge) () -> tally); - intervalTallies[i] += tally; - } - }); - - Map countries = activeUserTally.getCountries(); - countries.forEach((country, countryTallies) -> { - for (int i = 0; i < INTERVALS.length; i++) { - final long tally = countryTallies[i]; - metrics.register(metricKey(country, INTERVALS[i]), - (Gauge) () -> tally); - } - }); - - for (int i = 0; i < INTERVALS.length; i++) { - final long intervalTotal = intervalTallies[i]; - metrics.register(metricKey(INTERVALS[i]), - (Gauge) () -> intervalTotal); - } - - for (ReporterFactory reporterFactory : metricsFactory.getReporters()) { - reporterFactory.build(metrics).report(); - } - } - private long[] getTallyFromMap(Map map, String key) { long[] tally = map.get(key); if (tally == null) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java index e3e5aee3d..9037f74db 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java @@ -40,7 +40,7 @@ import java.util.stream.Collectors; import static com.codahale.metrics.MetricRegistry.name; -public class DirectoryReconciler implements AccountDatabaseCrawlerListener { +public class DirectoryReconciler extends AccountDatabaseCrawlerListener { private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class); @@ -56,14 +56,17 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener { this.reconciliationClient = reconciliationClient; } + @Override public void onCrawlStart() { } + @Override public void onCrawlEnd(Optional fromUuid) { DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList()); DirectoryReconciliationResponse response = sendChunk(request); } - public void onCrawlChunk(Optional fromUuid, List chunkAccounts) throws AccountDatabaseCrawlerRestartException { + @Override + protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) throws AccountDatabaseCrawlerRestartException { updateDirectoryCache(chunkAccounts); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java index fd3d6ceb8..cb4677ac5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PushFeedbackProcessor.java @@ -14,7 +14,7 @@ import java.util.concurrent.TimeUnit; import static com.codahale.metrics.MetricRegistry.name; -public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener { +public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener { private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final Meter expired = metricRegistry.meter(name(getClass(), "unregistered", "expired")); @@ -32,7 +32,10 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener { public void onCrawlStart() {} @Override - public void onCrawlChunk(Optional fromUuid, List chunkAccounts) { + public void onCrawlEnd(Optional toUuid) {} + + @Override + protected void onCrawlChunk(Optional fromUuid, List chunkAccounts) { for (Account account : chunkAccounts) { boolean update = false; @@ -64,7 +67,4 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener { } } } - - @Override - public void onCrawlEnd(Optional toUuid) {} } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java index 1ce7ef08a..0981932be 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java @@ -82,7 +82,7 @@ public class AccountDatabaseCrawlerTest { verify(accounts, times(0)).getAllFrom(any(UUID.class), eq(CHUNK_SIZE)); verify(account1, times(0)).getUuid(); verify(account2, times(1)).getUuid(); - verify(listener, times(1)).onCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2))); + verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2))); verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2))); verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); @@ -106,7 +106,7 @@ public class AccountDatabaseCrawlerTest { verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); verify(account2, times(1)).getUuid(); - verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2))); verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); @@ -132,7 +132,7 @@ public class AccountDatabaseCrawlerTest { verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); verify(account2, times(1)).getUuid(); - verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2))); verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).releaseActiveWork(any(String.class)); @@ -148,7 +148,7 @@ public class AccountDatabaseCrawlerTest { @Test public void testCrawlChunkRestart() throws AccountDatabaseCrawlerRestartException { when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1)); - doThrow(AccountDatabaseCrawlerRestartException.class).when(listener).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + doThrow(AccountDatabaseCrawlerRestartException.class).when(listener).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); boolean accelerated = crawler.doPeriodicWork(); assertThat(accelerated).isFalse(); @@ -158,7 +158,7 @@ public class AccountDatabaseCrawlerTest { verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); verify(account2, times(0)).getNumber(); - verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); verify(cache, times(1)).setLastUuid(eq(Optional.empty())); verify(cache, times(1)).clearAccelerate(); verify(cache, times(1)).isAccelerated();