Drop the active user counter.

This commit is contained in:
Jon Chambers 2021-07-29 11:35:45 -04:00 committed by Jon Chambers
parent 51b7a8d868
commit 13a07dc6cd
3 changed files with 0 additions and 423 deletions

View File

@ -154,7 +154,6 @@ import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb;
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDbMigrator;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.ActiveUserCounter;
import org.whispersystems.textsecuregcm.storage.DeletedAccounts;
import org.whispersystems.textsecuregcm.storage.DeletedAccountsDirectoryReconciler;
import org.whispersystems.textsecuregcm.storage.DeletedAccountsManager;
@ -476,7 +475,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final List<DeletedAccountsDirectoryReconciler> deletedAccountsDirectoryReconcilers = new ArrayList<>();
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = new ArrayList<>();
accountDatabaseCrawlerListeners.add(new PushFeedbackProcessor(accountsManager));
accountDatabaseCrawlerListeners.add(new ActiveUserCounter(config.getMetricsFactory(), cacheCluster));
for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration().getDirectoryServerConfiguration()) {
final DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(directoryServerConfiguration);
final DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryServerConfiguration.getReplicationName(), directoryReconciliationClient);

View File

@ -1,204 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.metrics.MetricsFactory;
import io.dropwizard.metrics.ReporterFactory;
import org.whispersystems.textsecuregcm.entities.ActiveUserTally;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
private static final String TALLY_KEY = "active_user_tally";
private static final String PLATFORM_IOS = "ios";
private static final String PLATFORM_ANDROID = "android";
private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"};
private final MetricsFactory metricsFactory;
private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper;
public ActiveUserCounter(MetricsFactory metricsFactory, FaultTolerantRedisCluster cacheCluster) {
this.metricsFactory = metricsFactory;
this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.getMapper();
}
@Override
public void onCrawlStart() {
cacheCluster.useCluster(connection -> connection.sync().del(TALLY_KEY));
}
@Override
public void onCrawlEnd(Optional<UUID> fromNumber) {
MetricRegistry metrics = new MetricRegistry();
long intervalTallies[] = new long[INTERVALS.length];
ActiveUserTally activeUserTally = getFinalTallies();
Map<String, long[]> platforms = activeUserTally.getPlatforms();
platforms.forEach((platform, platformTallies) -> {
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = platformTallies[i];
metrics.register(metricKey(platform, INTERVALS[i]),
(Gauge<Long>) () -> tally);
intervalTallies[i] += tally;
}
});
Map<String, long[]> countries = activeUserTally.getCountries();
countries.forEach((country, countryTallies) -> {
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = countryTallies[i];
metrics.register(metricKey(country, INTERVALS[i]),
(Gauge<Long>) () -> tally);
}
});
for (int i = 0; i < INTERVALS.length; i++) {
final long intervalTotal = intervalTallies[i];
metrics.register(metricKey(INTERVALS[i]),
(Gauge<Long>) () -> intervalTotal);
}
for (ReporterFactory reporterFactory : metricsFactory.getReporters()) {
try (final ScheduledReporter reporter = reporterFactory.build(metrics)) {
reporter.report();
}
}
}
@Override
protected void onCrawlChunk(Optional<UUID> fromNumber, List<Account> chunkAccounts) {
long nowHours = TimeUnit.MILLISECONDS.toHours(System.currentTimeMillis());
long agoMs[] = {TimeUnit.HOURS.toMillis(nowHours - 1 * 24 - 8),
TimeUnit.HOURS.toMillis(nowHours - 7 * 24),
TimeUnit.HOURS.toMillis(nowHours - 30 * 24),
TimeUnit.HOURS.toMillis(nowHours - 90 * 24),
TimeUnit.HOURS.toMillis(nowHours - 365 * 24)};
Map<String, long[]> platformIncrements = new HashMap<>();
Map<String, long[]> countryIncrements = new HashMap<>();
for (Account account : chunkAccounts) {
Optional<Device> device = account.getMasterDevice();
if (device.isPresent()) {
long lastActiveMs = device.get().getLastSeen();
String platform = null;
if (device.get().getApnId() != null) {
platform = PLATFORM_IOS;
} else if (device.get().getGcmId() != null) {
platform = PLATFORM_ANDROID;
}
if (platform != null) {
String country = Util.getCountryCode(account.getNumber());
long[] platformIncrement = getTallyFromMap(platformIncrements, platform);
long[] countryIncrement = getTallyFromMap(countryIncrements, country);
for (int i = 0; i < agoMs.length; i++) {
if (lastActiveMs > agoMs[i]) {
platformIncrement[i]++;
countryIncrement[i]++;
}
}
}
}
}
incrementTallies(fromNumber.orElse(UUID.randomUUID()), platformIncrements, countryIncrements);
}
private long[] getTallyFromMap(Map<String, long[]> map, String key) {
long[] tally = map.get(key);
if (tally == null) {
tally = new long[INTERVALS.length];
map.put(key, tally);
}
return tally;
}
private void incrementTallies(UUID fromUuid, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
try {
final String tallyValue = cacheCluster.withCluster(connection -> connection.sync().get(TALLY_KEY));
ActiveUserTally activeUserTally;
if (tallyValue == null) {
activeUserTally = new ActiveUserTally(fromUuid, platformIncrements, countryIncrements);
} else {
activeUserTally = mapper.readValue(tallyValue, ActiveUserTally.class);
if (!fromUuid.equals(activeUserTally.getFromUuid())) {
activeUserTally.setFromUuid(fromUuid);
Map<String, long[]> platformTallies = activeUserTally.getPlatforms();
addTallyMaps(platformTallies, platformIncrements);
Map<String, long[]> countryTallies = activeUserTally.getCountries();
addTallyMaps(countryTallies, countryIncrements);
}
}
final String tallyJson = mapper.writeValueAsString(activeUserTally);
cacheCluster.useCluster(connection -> connection.sync().set(TALLY_KEY, tallyJson));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}
private void addTallyMaps(Map<String, long[]> tallyMap, Map<String, long[]> incrementMap) {
incrementMap.forEach((key, increments) -> {
long[] tallies = tallyMap.get(key);
if (tallies == null) {
tallyMap.put(key, increments);
} else {
for (int i = 0; i < INTERVALS.length; i++) {
tallies[i] += increments[i];
}
}
});
}
private ActiveUserTally getFinalTallies() {
try {
final String tallyJson = cacheCluster.withCluster(connection -> connection.sync().get(TALLY_KEY));
return mapper.readValue(tallyJson, ActiveUserTally.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private String metricKey(String platform, String intervalName) {
return MetricRegistry.name(ActiveUserCounter.class, intervalName + "_active_" + platform);
}
private String metricKey(String intervalName) {
return MetricRegistry.name(ActiveUserCounter.class, intervalName + "_active");
}
}

View File

@ -1,217 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.tests.storage;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.ActiveUserCounter;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
import com.google.common.collect.ImmutableList;
import io.dropwizard.metrics.MetricsFactory;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.Optional;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class ActiveUserCounterTest {
private final UUID UUID_IOS = UUID.randomUUID();
private final UUID UUID_ANDROID = UUID.randomUUID();
private final UUID UUID_NODEVICE = UUID.randomUUID();
private final String ACCOUNT_NUMBER_IOS = "+15551234567";
private final String ACCOUNT_NUMBER_ANDROID = "+5511987654321";
private final String ACCOUNT_NUMBER_NODEVICE = "+5215551234567";
private final String TALLY_KEY = "active_user_tally";
private final Device iosDevice = mock(Device.class);
private final Device androidDevice = mock(Device.class);
private final Account androidAccount = mock(Account.class);
private final Account iosAccount = mock(Account.class);
private final Account noDeviceAccount = mock(Account.class);
private final RedisAdvancedClusterCommands<String, String> commands = mock(RedisAdvancedClusterCommands.class);
private final FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
private final MetricsFactory metricsFactory = mock(MetricsFactory.class);
private final ActiveUserCounter activeUserCounter = new ActiveUserCounter(metricsFactory, cacheCluster);
@Before
public void setup() {
long halfDayAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(12);
long fortyFiveDayAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(45);
when(androidDevice.getApnId()).thenReturn(null);
when(androidDevice.getGcmId()).thenReturn("mock-gcm-id");
when(androidDevice.getLastSeen()).thenReturn(fortyFiveDayAgo);
when(iosDevice.getApnId()).thenReturn("mock-apn-id");
when(iosDevice.getGcmId()).thenReturn(null);
when(iosDevice.getLastSeen()).thenReturn(halfDayAgo);
when(iosAccount.getUuid()).thenReturn(UUID_IOS);
when(iosAccount.getMasterDevice()).thenReturn(Optional.of(iosDevice));
when(iosAccount.getNumber()).thenReturn(ACCOUNT_NUMBER_IOS);
when(androidAccount.getUuid()).thenReturn(UUID_ANDROID);
when(androidAccount.getMasterDevice()).thenReturn(Optional.of(androidDevice));
when(androidAccount.getNumber()).thenReturn(ACCOUNT_NUMBER_ANDROID);
when(noDeviceAccount.getUuid()).thenReturn(UUID_NODEVICE);
when(noDeviceAccount.getMasterDevice()).thenReturn(Optional.ofNullable(null));
when(noDeviceAccount.getNumber()).thenReturn(ACCOUNT_NUMBER_NODEVICE);
when(commands.get(any(String.class))).thenReturn("{\"fromNumber\":\"+\",\"platforms\":{},\"countries\":{}}");
when(metricsFactory.getReporters()).thenReturn(ImmutableList.of());
}
@Test
public void testCrawlStart() {
activeUserCounter.onCrawlStart();
verify(cacheCluster, times(1)).useCluster(any());
verify(commands, times(1)).del(any(String.class));
verifyZeroInteractions(iosDevice);
verifyZeroInteractions(iosAccount);
verifyZeroInteractions(androidDevice);
verifyZeroInteractions(androidAccount);
verifyZeroInteractions(noDeviceAccount);
verifyZeroInteractions(metricsFactory);
verifyNoMoreInteractions(commands);
verifyNoMoreInteractions(cacheCluster);
}
@Test
public void testCrawlEnd() {
activeUserCounter.onCrawlEnd(Optional.empty());
verify(cacheCluster, times(1)).withCluster(any());
verify(commands, times(1)).get(any(String.class));
verify(metricsFactory, times(1)).getReporters();
verifyZeroInteractions(iosDevice);
verifyZeroInteractions(iosAccount);
verifyZeroInteractions(androidDevice);
verifyZeroInteractions(androidAccount);
verifyZeroInteractions(noDeviceAccount);
verifyNoMoreInteractions(metricsFactory);
verifyNoMoreInteractions(commands);
verifyNoMoreInteractions(cacheCluster);
}
@Test
public void testCrawlChunkValidAccount() throws AccountDatabaseCrawlerRestartException {
activeUserCounter.timeAndProcessCrawlChunk(Optional.of(UUID_IOS), Arrays.asList(iosAccount));
verify(iosAccount, times(1)).getMasterDevice();
verify(iosAccount, times(1)).getNumber();
verify(iosDevice, times(1)).getLastSeen();
verify(iosDevice, times(1)).getApnId();
verify(iosDevice, times(0)).getGcmId();
verify(cacheCluster, times(1)).withCluster(any());
verify(cacheCluster, times(1)).useCluster(any());
verify(commands, times(1)).get(any(String.class));
verify(commands, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_IOS.toString()+"\",\"platforms\":{\"ios\":[1,1,1,1,1]},\"countries\":{\"1\":[1,1,1,1,1]}}"));
verify(metricsFactory, times(0)).getReporters();
verifyZeroInteractions(androidDevice);
verifyZeroInteractions(androidAccount);
verifyZeroInteractions(noDeviceAccount);
verifyZeroInteractions(metricsFactory);
verifyNoMoreInteractions(iosDevice);
verifyNoMoreInteractions(iosAccount);
verifyNoMoreInteractions(commands);
verifyNoMoreInteractions(cacheCluster);
}
@Test
public void testCrawlChunkNoDeviceAccount() throws AccountDatabaseCrawlerRestartException {
activeUserCounter.timeAndProcessCrawlChunk(Optional.of(UUID_NODEVICE), Arrays.asList(noDeviceAccount));
verify(noDeviceAccount, times(1)).getMasterDevice();
verify(cacheCluster, times(1)).withCluster(any());
verify(cacheCluster, times(1)).useCluster(any());
verify(commands, times(1)).get(eq(TALLY_KEY));
verify(commands, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_NODEVICE+"\",\"platforms\":{},\"countries\":{}}"));
verify(metricsFactory, times(0)).getReporters();
verifyZeroInteractions(iosDevice);
verifyZeroInteractions(iosAccount);
verifyZeroInteractions(androidDevice);
verifyZeroInteractions(androidAccount);
verifyZeroInteractions(noDeviceAccount);
verifyZeroInteractions(metricsFactory);
verifyNoMoreInteractions(commands);
verifyNoMoreInteractions(cacheCluster);
}
@Test
public void testCrawlChunkMixedAccount() throws AccountDatabaseCrawlerRestartException {
activeUserCounter.timeAndProcessCrawlChunk(Optional.of(UUID_IOS), Arrays.asList(iosAccount, androidAccount, noDeviceAccount));
verify(iosAccount, times(1)).getMasterDevice();
verify(iosAccount, times(1)).getNumber();
verify(androidAccount, times(1)).getMasterDevice();
verify(androidAccount, times(1)).getNumber();
verify(noDeviceAccount, times(1)).getMasterDevice();
verify(iosDevice, times(1)).getLastSeen();
verify(iosDevice, times(1)).getApnId();
verify(iosDevice, times(0)).getGcmId();
verify(androidDevice, times(1)).getLastSeen();
verify(androidDevice, times(1)).getApnId();
verify(androidDevice, times(1)).getGcmId();
verify(cacheCluster, times(1)).withCluster(any());
verify(cacheCluster, times(1)).useCluster(any());
verify(commands, times(1)).get(eq(TALLY_KEY));
verify(commands, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_IOS+"\",\"platforms\":{\"android\":[0,0,0,1,1],\"ios\":[1,1,1,1,1]},\"countries\":{\"55\":[0,0,0,1,1],\"1\":[1,1,1,1,1]}}"));
verify(metricsFactory, times(0)).getReporters();
verifyZeroInteractions(metricsFactory);
verifyNoMoreInteractions(iosDevice);
verifyNoMoreInteractions(iosAccount);
verifyNoMoreInteractions(androidDevice);
verifyNoMoreInteractions(androidAccount);
verifyNoMoreInteractions(noDeviceAccount);
verifyNoMoreInteractions(commands);
verifyNoMoreInteractions(cacheCluster);
}
}