add timers to the account crawler listeners

This commit is contained in:
Brian Acton 2019-10-25 21:30:48 -07:00
parent cba3c20d5c
commit eddfacd0f4
7 changed files with 89 additions and 61 deletions

View File

@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
public class AccountCleaner implements AccountDatabaseCrawlerListener {
public class AccountCleaner extends AccountDatabaseCrawlerListener {
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Meter expiredAccountsMeter = metricRegistry.meter(name(AccountCleaner.class, "expiredAccounts"));
@ -52,7 +52,11 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener {
}
@Override
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
public void onCrawlEnd(Optional<UUID> fromUuid) {
}
@Override
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
int accountUpdateCount = 0;
for (Account account : chunkAccounts) {
if (needsExplicitRemoval(account)) {
@ -74,10 +78,6 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener {
}
}
@Override
public void onCrawlEnd(Optional<UUID> fromUuid) {
}
private boolean needsExplicitRemoval(Account account) {
return account.getMasterDevice().isPresent() &&
hasPushToken(account.getMasterDevice().get()) &&

View File

@ -137,7 +137,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
} else {
try {
for (AccountDatabaseCrawlerListener listener : listeners) {
listener.onCrawlChunk(fromUuid, chunkAccounts);
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts);
}
cache.setLastUuid(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getUuid()));
} catch (AccountDatabaseCrawlerRestartException e) {

View File

@ -16,13 +16,37 @@
*/
package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import org.whispersystems.textsecuregcm.util.Constants;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public interface AccountDatabaseCrawlerListener {
void onCrawlStart();
void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;
void onCrawlEnd(Optional<UUID> fromUuid);
public abstract class AccountDatabaseCrawlerListener {
private Timer processChunkTimer;
abstract public void onCrawlStart();
abstract public void onCrawlEnd(Optional<UUID> fromUuid);
abstract protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;
public AccountDatabaseCrawlerListener() {
processChunkTimer = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME).timer(name(AccountDatabaseCrawlerListener.class, "processChunk", getClass().getSimpleName()));
}
public void timeAndProcessCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
try (Timer.Context timer = processChunkTimer.time()) {
onCrawlChunk(fromUuid, chunkAccounts);
}
}
}

View File

@ -37,7 +37,7 @@ import io.dropwizard.metrics.MetricsFactory;
import io.dropwizard.metrics.ReporterFactory;
import redis.clients.jedis.Jedis;
public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
private static final String TALLY_KEY = "active_user_tally";
@ -56,6 +56,7 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
this.mapper = SystemMapper.getMapper();
}
@Override
public void onCrawlStart() {
try (Jedis jedis = jedisPool.getWriteResource()) {
jedis.del(TALLY_KEY);
@ -63,7 +64,43 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
}
@Override
public void onCrawlChunk(Optional<UUID> fromNumber, List<Account> chunkAccounts) {
public void onCrawlEnd(Optional<UUID> fromNumber) {
MetricRegistry metrics = new MetricRegistry();
long intervalTallies[] = new long[INTERVALS.length];
ActiveUserTally activeUserTally = getFinalTallies();
Map<String, long[]> platforms = activeUserTally.getPlatforms();
platforms.forEach((platform, platformTallies) -> {
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = platformTallies[i];
metrics.register(metricKey(platform, INTERVALS[i]),
(Gauge<Long>) () -> tally);
intervalTallies[i] += tally;
}
});
Map<String, long[]> countries = activeUserTally.getCountries();
countries.forEach((country, countryTallies) -> {
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = countryTallies[i];
metrics.register(metricKey(country, INTERVALS[i]),
(Gauge<Long>) () -> tally);
}
});
for (int i = 0; i < INTERVALS.length; i++) {
final long intervalTotal = intervalTallies[i];
metrics.register(metricKey(INTERVALS[i]),
(Gauge<Long>) () -> intervalTotal);
}
for (ReporterFactory reporterFactory : metricsFactory.getReporters()) {
reporterFactory.build(metrics).report();
}
}
@Override
protected void onCrawlChunk(Optional<UUID> fromNumber, List<Account> chunkAccounts) {
long nowDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
long agoMs[] = {TimeUnit.DAYS.toMillis(nowDays - 1),
TimeUnit.DAYS.toMillis(nowDays - 7),
@ -109,42 +146,6 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
incrementTallies(fromNumber.orElse(UUID.randomUUID()), platformIncrements, countryIncrements);
}
@Override
public void onCrawlEnd(Optional<UUID> fromNumber) {
MetricRegistry metrics = new MetricRegistry();
long intervalTallies[] = new long[INTERVALS.length];
ActiveUserTally activeUserTally = getFinalTallies();
Map<String, long[]> platforms = activeUserTally.getPlatforms();
platforms.forEach((platform, platformTallies) -> {
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = platformTallies[i];
metrics.register(metricKey(platform, INTERVALS[i]),
(Gauge<Long>) () -> tally);
intervalTallies[i] += tally;
}
});
Map<String, long[]> countries = activeUserTally.getCountries();
countries.forEach((country, countryTallies) -> {
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = countryTallies[i];
metrics.register(metricKey(country, INTERVALS[i]),
(Gauge<Long>) () -> tally);
}
});
for (int i = 0; i < INTERVALS.length; i++) {
final long intervalTotal = intervalTallies[i];
metrics.register(metricKey(INTERVALS[i]),
(Gauge<Long>) () -> intervalTotal);
}
for (ReporterFactory reporterFactory : metricsFactory.getReporters()) {
reporterFactory.build(metrics).report();
}
}
private long[] getTallyFromMap(Map<String, long[]> map, String key) {
long[] tally = map.get(key);
if (tally == null) {

View File

@ -40,7 +40,7 @@ import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name;
public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class);
@ -56,14 +56,17 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
this.reconciliationClient = reconciliationClient;
}
@Override
public void onCrawlStart() { }
@Override
public void onCrawlEnd(Optional<UUID> fromUuid) {
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList());
DirectoryReconciliationResponse response = sendChunk(request);
}
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
@Override
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
updateDirectoryCache(chunkAccounts);

View File

@ -14,7 +14,7 @@ import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener {
public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener {
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter expired = metricRegistry.meter(name(getClass(), "unregistered", "expired"));
@ -32,7 +32,10 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener {
public void onCrawlStart() {}
@Override
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
public void onCrawlEnd(Optional<UUID> toUuid) {}
@Override
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
for (Account account : chunkAccounts) {
boolean update = false;
@ -64,7 +67,4 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener {
}
}
}
@Override
public void onCrawlEnd(Optional<UUID> toUuid) {}
}

View File

@ -82,7 +82,7 @@ public class AccountDatabaseCrawlerTest {
verify(accounts, times(0)).getAllFrom(any(UUID.class), eq(CHUNK_SIZE));
verify(account1, times(0)).getUuid();
verify(account2, times(1)).getUuid();
verify(listener, times(1)).onCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2)));
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
@ -106,7 +106,7 @@ public class AccountDatabaseCrawlerTest {
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(account2, times(1)).getUuid();
verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
@ -132,7 +132,7 @@ public class AccountDatabaseCrawlerTest {
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(account2, times(1)).getUuid();
verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
@ -148,7 +148,7 @@ public class AccountDatabaseCrawlerTest {
@Test
public void testCrawlChunkRestart() throws AccountDatabaseCrawlerRestartException {
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
doThrow(AccountDatabaseCrawlerRestartException.class).when(listener).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
doThrow(AccountDatabaseCrawlerRestartException.class).when(listener).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
@ -158,7 +158,7 @@ public class AccountDatabaseCrawlerTest {
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(account2, times(0)).getNumber();
verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
verify(cache, times(1)).clearAccelerate();
verify(cache, times(1)).isAccelerated();