uuid-based account crawler
This commit is contained in:
parent
bb52049bf4
commit
69742839c0
|
@ -19,10 +19,11 @@ package org.whispersystems.textsecuregcm.entities;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.UUID;
|
||||
|
||||
public class ActiveUserTally {
|
||||
@JsonProperty
|
||||
private String fromNumber;
|
||||
private UUID fromUuid;
|
||||
|
||||
@JsonProperty
|
||||
private Map<String, long[]> platforms;
|
||||
|
@ -32,14 +33,14 @@ public class ActiveUserTally {
|
|||
|
||||
public ActiveUserTally() {}
|
||||
|
||||
public ActiveUserTally(String fromNumber, Map<String, long[]> platforms, Map<String, long[]> countries) {
|
||||
this.fromNumber = fromNumber;
|
||||
public ActiveUserTally(UUID fromUuid, Map<String, long[]> platforms, Map<String, long[]> countries) {
|
||||
this.fromUuid = fromUuid;
|
||||
this.platforms = platforms;
|
||||
this.countries = countries;
|
||||
}
|
||||
|
||||
public String getFromNumber() {
|
||||
return this.fromNumber;
|
||||
public UUID getFromUuid() {
|
||||
return this.fromUuid;
|
||||
}
|
||||
|
||||
public Map<String, long[]> getPlatforms() {
|
||||
|
@ -50,8 +51,8 @@ public class ActiveUserTally {
|
|||
return this.countries;
|
||||
}
|
||||
|
||||
public void setFromNumber(String fromNumber) {
|
||||
this.fromNumber = fromNumber;
|
||||
public void setFromUuid(UUID fromUuid) {
|
||||
this.fromUuid = fromUuid;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,38 +18,84 @@ package org.whispersystems.textsecuregcm.entities;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public class DirectoryReconciliationRequest {
|
||||
|
||||
@JsonProperty
|
||||
private String fromNumber;
|
||||
private UUID fromUuid;
|
||||
|
||||
@JsonProperty
|
||||
private String toNumber;
|
||||
private UUID toUuid;
|
||||
|
||||
@JsonProperty
|
||||
private List<String> numbers;
|
||||
private List<User> users;
|
||||
|
||||
public DirectoryReconciliationRequest() {
|
||||
}
|
||||
|
||||
public DirectoryReconciliationRequest(String fromNumber, String toNumber, List<String> numbers) {
|
||||
this.fromNumber = fromNumber;
|
||||
this.toNumber = toNumber;
|
||||
this.numbers = numbers;
|
||||
public DirectoryReconciliationRequest(UUID fromUuid, UUID toUuid, List<User> users) {
|
||||
this.fromUuid = fromUuid;
|
||||
this.toUuid = toUuid;
|
||||
this.users = users;
|
||||
}
|
||||
|
||||
public String getFromNumber() {
|
||||
return fromNumber;
|
||||
public UUID getFromUuid() {
|
||||
return fromUuid;
|
||||
}
|
||||
|
||||
public String getToNumber() {
|
||||
return toNumber;
|
||||
public UUID getToUuid() {
|
||||
return toUuid;
|
||||
}
|
||||
|
||||
public List<String> getNumbers() {
|
||||
return numbers;
|
||||
public List<User> getUsers() {
|
||||
return users;
|
||||
}
|
||||
|
||||
public static class User {
|
||||
|
||||
@JsonProperty
|
||||
private UUID uuid;
|
||||
|
||||
@JsonProperty
|
||||
private String number;
|
||||
|
||||
public User() {
|
||||
}
|
||||
|
||||
public User(UUID uuid, String number) {
|
||||
this.uuid = uuid;
|
||||
this.number = number;
|
||||
}
|
||||
|
||||
public UUID getUuid() {
|
||||
return uuid;
|
||||
}
|
||||
|
||||
public String getNumber() {
|
||||
return number;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
User user = (User) o;
|
||||
|
||||
if (uuid != null ? !uuid.equals(user.uuid) : user.uuid != null) return false;
|
||||
if (number != null ? !number.equals(user.number) : user.number != null) return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = uuid != null ? uuid.hashCode() : 0;
|
||||
result = 31 * result + (number != null ? number.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.whispersystems.textsecuregcm.util.Util;
|
|||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
@ -51,7 +52,7 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
|
||||
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
|
||||
int accountUpdateCount = 0;
|
||||
for (Account account : chunkAccounts) {
|
||||
if (needsExplicitRemoval(account)) {
|
||||
|
@ -74,7 +75,7 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onCrawlEnd(Optional<String> fromNumber) {
|
||||
public void onCrawlEnd(Optional<UUID> fromUuid) {
|
||||
}
|
||||
|
||||
private boolean needsExplicitRemoval(Account account) {
|
||||
|
|
|
@ -122,26 +122,26 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
|||
}
|
||||
|
||||
private void processChunk() {
|
||||
Optional<String> fromNumber = cache.getLastNumber();
|
||||
Optional<UUID> fromUuid = cache.getLastUuid();
|
||||
|
||||
if (!fromNumber.isPresent()) {
|
||||
listeners.forEach(listener -> { listener.onCrawlStart(); });
|
||||
if (!fromUuid.isPresent()) {
|
||||
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
|
||||
}
|
||||
|
||||
List<Account> chunkAccounts = readChunk(fromNumber, chunkSize);
|
||||
List<Account> chunkAccounts = readChunk(fromUuid, chunkSize);
|
||||
|
||||
if (chunkAccounts.isEmpty()) {
|
||||
listeners.forEach(listener -> { listener.onCrawlEnd(fromNumber); });
|
||||
cache.setLastNumber(Optional.empty());
|
||||
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
|
||||
cache.setLastUuid(Optional.empty());
|
||||
cache.clearAccelerate();
|
||||
} else {
|
||||
try {
|
||||
for (AccountDatabaseCrawlerListener listener : listeners) {
|
||||
listener.onCrawlChunk(fromNumber, chunkAccounts);
|
||||
listener.onCrawlChunk(fromUuid, chunkAccounts);
|
||||
}
|
||||
cache.setLastNumber(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getNumber()));
|
||||
cache.setLastUuid(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getUuid()));
|
||||
} catch (AccountDatabaseCrawlerRestartException e) {
|
||||
cache.setLastNumber(Optional.empty());
|
||||
cache.setLastUuid(Optional.empty());
|
||||
cache.clearAccelerate();
|
||||
}
|
||||
|
||||
|
@ -149,12 +149,12 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
|||
|
||||
}
|
||||
|
||||
private List<Account> readChunk(Optional<String> fromNumber, int chunkSize) {
|
||||
private List<Account> readChunk(Optional<UUID> fromUuid, int chunkSize) {
|
||||
try (Timer.Context timer = readChunkTimer.time()) {
|
||||
List<Account> chunkAccounts;
|
||||
|
||||
if (fromNumber.isPresent()) {
|
||||
chunkAccounts = accounts.getAllFrom(fromNumber.get(), chunkSize);
|
||||
if (fromUuid.isPresent()) {
|
||||
chunkAccounts = accounts.getAllFrom(fromUuid.get(), chunkSize);
|
||||
} else {
|
||||
chunkAccounts = accounts.getAllFrom(chunkSize);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import redis.clients.jedis.Jedis;
|
||||
|
||||
|
@ -30,7 +31,7 @@ import redis.clients.jedis.Jedis;
|
|||
public class AccountDatabaseCrawlerCache {
|
||||
|
||||
private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker";
|
||||
private static final String LAST_NUMBER_KEY = "account_database_crawler_cache_last_number";
|
||||
private static final String LAST_UUID_KEY = "account_database_crawler_cache_last_uuid";
|
||||
private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate";
|
||||
|
||||
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
|
||||
|
@ -67,18 +68,21 @@ public class AccountDatabaseCrawlerCache {
|
|||
luaScript.execute(keys, args);
|
||||
}
|
||||
|
||||
public Optional<String> getLastNumber() {
|
||||
public Optional<UUID> getLastUuid() {
|
||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||
return Optional.ofNullable(jedis.get(LAST_NUMBER_KEY));
|
||||
String lastUuidString = jedis.get(LAST_UUID_KEY);
|
||||
|
||||
if (lastUuidString == null) return Optional.empty();
|
||||
else return Optional.of(UUID.fromString(lastUuidString));
|
||||
}
|
||||
}
|
||||
|
||||
public void setLastNumber(Optional<String> lastNumber) {
|
||||
public void setLastUuid(Optional<UUID> lastUuid) {
|
||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||
if (lastNumber.isPresent()) {
|
||||
jedis.psetex(LAST_NUMBER_KEY, LAST_NUMBER_TTL_MS, lastNumber.get());
|
||||
if (lastUuid.isPresent()) {
|
||||
jedis.psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString());
|
||||
} else {
|
||||
jedis.del(LAST_NUMBER_KEY);
|
||||
jedis.del(LAST_UUID_KEY);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,10 +18,11 @@ package org.whispersystems.textsecuregcm.storage;
|
|||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
public interface AccountDatabaseCrawlerListener {
|
||||
void onCrawlStart();
|
||||
void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;
|
||||
void onCrawlEnd(Optional<String> fromNumber);
|
||||
void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;
|
||||
void onCrawlEnd(Optional<UUID> fromUuid);
|
||||
}
|
||||
|
|
|
@ -111,10 +111,10 @@ public class Accounts {
|
|||
}));
|
||||
}
|
||||
|
||||
public List<Account> getAllFrom(String from, int length) {
|
||||
public List<Account> getAllFrom(UUID from, int length) {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = getAllFromOffsetTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " > :from ORDER BY " + NUMBER + " LIMIT :limit")
|
||||
return handle.createQuery("SELECT * FROM accounts WHERE " + UID + " > :from ORDER BY " + UID + " LIMIT :limit")
|
||||
.bind("from", from)
|
||||
.bind("limit", length)
|
||||
.mapTo(Account.class)
|
||||
|
@ -126,7 +126,7 @@ public class Accounts {
|
|||
public List<Account> getAllFrom(int length) {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = getAllFromTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM accounts ORDER BY " + NUMBER + " LIMIT :limit")
|
||||
return handle.createQuery("SELECT * FROM accounts ORDER BY " + UID + " LIMIT :limit")
|
||||
.bind("limit", length)
|
||||
.mapTo(Account.class)
|
||||
.list();
|
||||
|
|
|
@ -121,8 +121,8 @@ public class AccountsManager {
|
|||
return accounts.getAllFrom(length);
|
||||
}
|
||||
|
||||
public List<Account> getAllFrom(String number, int length) {
|
||||
return accounts.getAllFrom(number, length);
|
||||
public List<Account> getAllFrom(UUID uuid, int length) {
|
||||
return accounts.getAllFrom(uuid, length);
|
||||
}
|
||||
|
||||
private void updateDirectory(Account account) {
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.dropwizard.metrics.MetricsFactory;
|
||||
|
@ -43,8 +44,6 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
|
|||
private static final String PLATFORM_IOS = "ios";
|
||||
private static final String PLATFORM_ANDROID = "android";
|
||||
|
||||
private static final String FIRST_FROM_NUMBER = "+";
|
||||
|
||||
private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"};
|
||||
|
||||
private final MetricsFactory metricsFactory;
|
||||
|
@ -64,7 +63,7 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
|
||||
public void onCrawlChunk(Optional<UUID> fromNumber, List<Account> chunkAccounts) {
|
||||
long nowDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
|
||||
long agoMs[] = {TimeUnit.DAYS.toMillis(nowDays - 1),
|
||||
TimeUnit.DAYS.toMillis(nowDays - 7),
|
||||
|
@ -107,12 +106,11 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
|
|||
}
|
||||
}
|
||||
|
||||
incrementTallies(fromNumber.orElse(FIRST_FROM_NUMBER), platformIncrements, countryIncrements);
|
||||
|
||||
incrementTallies(fromNumber.orElse(UUID.randomUUID()), platformIncrements, countryIncrements);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCrawlEnd(Optional<String> fromNumber) {
|
||||
public void onCrawlEnd(Optional<UUID> fromNumber) {
|
||||
MetricRegistry metrics = new MetricRegistry();
|
||||
long intervalTallies[] = new long[INTERVALS.length];
|
||||
ActiveUserTally activeUserTally = getFinalTallies();
|
||||
|
@ -156,17 +154,18 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
|
|||
return tally;
|
||||
}
|
||||
|
||||
private void incrementTallies(String fromNumber, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
|
||||
private void incrementTallies(UUID fromUuid, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
|
||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||
String tallyValue = jedis.get(TALLY_KEY);
|
||||
ActiveUserTally activeUserTally;
|
||||
|
||||
if (tallyValue == null) {
|
||||
activeUserTally = new ActiveUserTally(fromNumber, platformIncrements, countryIncrements);
|
||||
activeUserTally = new ActiveUserTally(fromUuid, platformIncrements, countryIncrements);
|
||||
} else {
|
||||
activeUserTally = mapper.readValue(tallyValue, ActiveUserTally.class);
|
||||
if (activeUserTally.getFromNumber() != fromNumber) {
|
||||
activeUserTally.setFromNumber(fromNumber);
|
||||
|
||||
if (!fromUuid.equals(activeUserTally.getFromUuid())) {
|
||||
activeUserTally.setFromUuid(fromUuid);
|
||||
Map<String, long[]> platformTallies = activeUserTally.getPlatforms();
|
||||
addTallyMaps(platformTallies, platformIncrements);
|
||||
Map<String, long[]> countryTallies = activeUserTally.getCountries();
|
||||
|
|
|
@ -30,9 +30,12 @@ import org.whispersystems.textsecuregcm.util.Constants;
|
|||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
import javax.ws.rs.ProcessingException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
@ -55,16 +58,16 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
|
|||
|
||||
public void onCrawlStart() { }
|
||||
|
||||
public void onCrawlEnd(Optional<String> fromNumber) {
|
||||
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromNumber.orElse(null), null, Collections.emptyList());
|
||||
public void onCrawlEnd(Optional<UUID> fromUuid) {
|
||||
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList());
|
||||
DirectoryReconciliationResponse response = sendChunk(request);
|
||||
}
|
||||
|
||||
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
|
||||
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
|
||||
|
||||
updateDirectoryCache(chunkAccounts);
|
||||
|
||||
DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts);
|
||||
DirectoryReconciliationRequest request = createChunkRequest(fromUuid, chunkAccounts);
|
||||
DirectoryReconciliationResponse response = sendChunk(request);
|
||||
if (response.getStatus() == DirectoryReconciliationResponse.Status.MISSING) {
|
||||
throw new AccountDatabaseCrawlerRestartException("directory reconciler missing");
|
||||
|
@ -91,19 +94,21 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
|
|||
}
|
||||
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
private DirectoryReconciliationRequest createChunkRequest(Optional<String> fromNumber, List<Account> accounts) {
|
||||
List<String> numbers = accounts.stream()
|
||||
.filter(Account::isEnabled)
|
||||
.map(Account::getNumber)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Optional<String> toNumber = Optional.empty();
|
||||
|
||||
if (!accounts.isEmpty()) {
|
||||
toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber());
|
||||
private DirectoryReconciliationRequest createChunkRequest(Optional<UUID> fromUuid, List<Account> accounts) {
|
||||
List<DirectoryReconciliationRequest.User> users = new ArrayList<>(accounts.size());
|
||||
for (Account account : accounts) {
|
||||
if (account.isEnabled()) {
|
||||
users.add(new DirectoryReconciliationRequest.User(account.getUuid(), account.getNumber()));
|
||||
}
|
||||
}
|
||||
|
||||
return new DirectoryReconciliationRequest(fromNumber.orElse(null), toNumber.orElse(null), numbers);
|
||||
Optional<UUID> toUuid = Optional.empty();
|
||||
|
||||
if (!accounts.isEmpty()) {
|
||||
toUuid = Optional.of(accounts.get(accounts.size() - 1).getUuid());
|
||||
}
|
||||
|
||||
return new DirectoryReconciliationRequest(fromUuid.orElse(null), toUuid.orElse(null), users);
|
||||
}
|
||||
|
||||
private DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) {
|
||||
|
|
|
@ -51,7 +51,7 @@ public class DirectoryReconciliationClient {
|
|||
|
||||
public DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) {
|
||||
return client.target(replicationUrl)
|
||||
.path("/v1/directory/reconcile")
|
||||
.path("/v2/directory/reconcile")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(Entity.json(request), DirectoryReconciliationResponse.class);
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.whispersystems.textsecuregcm.util.Util;
|
|||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
@ -31,7 +32,7 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener {
|
|||
public void onCrawlStart() {}
|
||||
|
||||
@Override
|
||||
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
|
||||
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
|
||||
for (Account account : chunkAccounts) {
|
||||
boolean update = false;
|
||||
|
||||
|
@ -65,5 +66,5 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onCrawlEnd(Optional<String> fromNumber) {}
|
||||
public void onCrawlEnd(Optional<UUID> toUuid) {}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
|
@ -39,8 +40,8 @@ import static org.mockito.Mockito.*;
|
|||
|
||||
public class AccountDatabaseCrawlerTest {
|
||||
|
||||
private static final String ACCOUNT1 = "+1";
|
||||
private static final String ACCOUNT2 = "+2";
|
||||
private static final UUID ACCOUNT1 = UUID.randomUUID();
|
||||
private static final UUID ACCOUNT2 = UUID.randomUUID();
|
||||
|
||||
private static final int CHUNK_SIZE = 1000;
|
||||
private static final long CHUNK_INTERVAL_MS = 30_000L;
|
||||
|
@ -56,8 +57,8 @@ public class AccountDatabaseCrawlerTest {
|
|||
|
||||
@Before
|
||||
public void setup() {
|
||||
when(account1.getNumber()).thenReturn(ACCOUNT1);
|
||||
when(account2.getNumber()).thenReturn(ACCOUNT2);
|
||||
when(account1.getUuid()).thenReturn(ACCOUNT1);
|
||||
when(account2.getUuid()).thenReturn(ACCOUNT2);
|
||||
|
||||
when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account1, account2));
|
||||
when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(Arrays.asList(account2));
|
||||
|
@ -69,20 +70,20 @@ public class AccountDatabaseCrawlerTest {
|
|||
|
||||
@Test
|
||||
public void testCrawlStart() throws AccountDatabaseCrawlerRestartException {
|
||||
when(cache.getLastNumber()).thenReturn(Optional.empty());
|
||||
when(cache.getLastUuid()).thenReturn(Optional.empty());
|
||||
|
||||
boolean accelerated = crawler.doPeriodicWork();
|
||||
assertThat(accelerated).isFalse();
|
||||
|
||||
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
|
||||
verify(cache, times(1)).getLastNumber();
|
||||
verify(cache, times(1)).getLastUuid();
|
||||
verify(listener, times(1)).onCrawlStart();
|
||||
verify(accounts, times(1)).getAllFrom(eq(CHUNK_SIZE));
|
||||
verify(accounts, times(0)).getAllFrom(any(String.class), eq(CHUNK_SIZE));
|
||||
verify(account1, times(0)).getNumber();
|
||||
verify(account2, times(1)).getNumber();
|
||||
verify(accounts, times(0)).getAllFrom(any(UUID.class), eq(CHUNK_SIZE));
|
||||
verify(account1, times(0)).getUuid();
|
||||
verify(account2, times(1)).getUuid();
|
||||
verify(listener, times(1)).onCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2)));
|
||||
verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2)));
|
||||
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
|
||||
verify(cache, times(1)).isAccelerated();
|
||||
verify(cache, times(1)).releaseActiveWork(any(String.class));
|
||||
|
||||
|
@ -95,18 +96,18 @@ public class AccountDatabaseCrawlerTest {
|
|||
|
||||
@Test
|
||||
public void testCrawlChunk() throws AccountDatabaseCrawlerRestartException {
|
||||
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1));
|
||||
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
|
||||
|
||||
boolean accelerated = crawler.doPeriodicWork();
|
||||
assertThat(accelerated).isFalse();
|
||||
|
||||
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
|
||||
verify(cache, times(1)).getLastNumber();
|
||||
verify(cache, times(1)).getLastUuid();
|
||||
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
|
||||
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
|
||||
verify(account2, times(1)).getNumber();
|
||||
verify(account2, times(1)).getUuid();
|
||||
verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
|
||||
verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2)));
|
||||
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
|
||||
verify(cache, times(1)).isAccelerated();
|
||||
verify(cache, times(1)).releaseActiveWork(any(String.class));
|
||||
|
||||
|
@ -121,18 +122,18 @@ public class AccountDatabaseCrawlerTest {
|
|||
@Test
|
||||
public void testCrawlChunkAccelerated() throws AccountDatabaseCrawlerRestartException {
|
||||
when(cache.isAccelerated()).thenReturn(true);
|
||||
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1));
|
||||
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
|
||||
|
||||
boolean accelerated = crawler.doPeriodicWork();
|
||||
assertThat(accelerated).isTrue();
|
||||
|
||||
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
|
||||
verify(cache, times(1)).getLastNumber();
|
||||
verify(cache, times(1)).getLastUuid();
|
||||
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
|
||||
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
|
||||
verify(account2, times(1)).getNumber();
|
||||
verify(account2, times(1)).getUuid();
|
||||
verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
|
||||
verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2)));
|
||||
verify(cache, times(1)).setLastUuid(eq(Optional.of(ACCOUNT2)));
|
||||
verify(cache, times(1)).isAccelerated();
|
||||
verify(cache, times(1)).releaseActiveWork(any(String.class));
|
||||
|
||||
|
@ -146,19 +147,19 @@ public class AccountDatabaseCrawlerTest {
|
|||
|
||||
@Test
|
||||
public void testCrawlChunkRestart() throws AccountDatabaseCrawlerRestartException {
|
||||
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1));
|
||||
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
|
||||
doThrow(AccountDatabaseCrawlerRestartException.class).when(listener).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
|
||||
|
||||
boolean accelerated = crawler.doPeriodicWork();
|
||||
assertThat(accelerated).isFalse();
|
||||
|
||||
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
|
||||
verify(cache, times(1)).getLastNumber();
|
||||
verify(cache, times(1)).getLastUuid();
|
||||
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
|
||||
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
|
||||
verify(account2, times(0)).getNumber();
|
||||
verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
|
||||
verify(cache, times(1)).setLastNumber(eq(Optional.empty()));
|
||||
verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
|
||||
verify(cache, times(1)).clearAccelerate();
|
||||
verify(cache, times(1)).isAccelerated();
|
||||
verify(cache, times(1)).releaseActiveWork(any(String.class));
|
||||
|
@ -173,19 +174,19 @@ public class AccountDatabaseCrawlerTest {
|
|||
|
||||
@Test
|
||||
public void testCrawlEnd() {
|
||||
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT2));
|
||||
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT2));
|
||||
|
||||
boolean accelerated = crawler.doPeriodicWork();
|
||||
assertThat(accelerated).isFalse();
|
||||
|
||||
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
|
||||
verify(cache, times(1)).getLastNumber();
|
||||
verify(cache, times(1)).getLastUuid();
|
||||
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
|
||||
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT2), eq(CHUNK_SIZE));
|
||||
verify(account1, times(0)).getNumber();
|
||||
verify(account2, times(0)).getNumber();
|
||||
verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2)));
|
||||
verify(cache, times(1)).setLastNumber(eq(Optional.empty()));
|
||||
verify(cache, times(1)).setLastUuid(eq(Optional.empty()));
|
||||
verify(cache, times(1)).clearAccelerate();
|
||||
verify(cache, times(1)).isAccelerated();
|
||||
verify(cache, times(1)).releaseActiveWork(any(String.class));
|
||||
|
|
|
@ -173,6 +173,8 @@ public class AccountsTest {
|
|||
accounts.create(account);
|
||||
}
|
||||
|
||||
users.sort((account, t1) -> UUIDComparator.staticCompare(account.getUuid(), t1.getUuid()));
|
||||
|
||||
List<Account> retrieved = accounts.getAllFrom(10);
|
||||
assertThat(retrieved.size()).isEqualTo(10);
|
||||
|
||||
|
@ -181,7 +183,7 @@ public class AccountsTest {
|
|||
}
|
||||
|
||||
for (int j=0;j<9;j++) {
|
||||
retrieved = accounts.getAllFrom(retrieved.get(9).getNumber(), 10);
|
||||
retrieved = accounts.getAllFrom(retrieved.get(9).getUuid(), 10);
|
||||
assertThat(retrieved.size()).isEqualTo(10);
|
||||
|
||||
for (int i=0;i<retrieved.size();i++) {
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.junit.Test;
|
|||
import redis.clients.jedis.Jedis;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.Optional;
|
||||
|
||||
|
@ -46,9 +47,13 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
public class ActiveUserCounterTest {
|
||||
|
||||
private final String NUMBER_IOS = "+15551234567";
|
||||
private final String NUMBER_ANDROID = "+5511987654321";
|
||||
private final String NUMBER_NODEVICE = "+5215551234567";
|
||||
private final UUID UUID_IOS = UUID.randomUUID();
|
||||
private final UUID UUID_ANDROID = UUID.randomUUID();
|
||||
private final UUID UUID_NODEVICE = UUID.randomUUID();
|
||||
|
||||
private final String ACCOUNT_NUMBER_IOS = "+15551234567";
|
||||
private final String ACCOUNT_NUMBER_ANDROID = "+5511987654321";
|
||||
private final String ACCOUNT_NUMBER_NODEVICE = "+5215551234567";
|
||||
|
||||
private final String TALLY_KEY = "active_user_tally";
|
||||
|
||||
|
@ -79,14 +84,17 @@ public class ActiveUserCounterTest {
|
|||
when(iosDevice.getGcmId()).thenReturn(null);
|
||||
when(iosDevice.getLastSeen()).thenReturn(halfDayAgo);
|
||||
|
||||
when(iosAccount.getNumber()).thenReturn(NUMBER_IOS);
|
||||
when(iosAccount.getUuid()).thenReturn(UUID_IOS);
|
||||
when(iosAccount.getMasterDevice()).thenReturn(Optional.of(iosDevice));
|
||||
when(iosAccount.getNumber()).thenReturn(ACCOUNT_NUMBER_IOS);
|
||||
|
||||
when(androidAccount.getNumber()).thenReturn(NUMBER_ANDROID);
|
||||
when(androidAccount.getUuid()).thenReturn(UUID_ANDROID);
|
||||
when(androidAccount.getMasterDevice()).thenReturn(Optional.of(androidDevice));
|
||||
when(androidAccount.getNumber()).thenReturn(ACCOUNT_NUMBER_ANDROID);
|
||||
|
||||
when(noDeviceAccount.getNumber()).thenReturn(NUMBER_NODEVICE);
|
||||
when(noDeviceAccount.getUuid()).thenReturn(UUID_NODEVICE);
|
||||
when(noDeviceAccount.getMasterDevice()).thenReturn(Optional.ofNullable(null));
|
||||
when(noDeviceAccount.getNumber()).thenReturn(ACCOUNT_NUMBER_NODEVICE);
|
||||
|
||||
when(jedis.get(any(String.class))).thenReturn("{\"fromNumber\":\"+\",\"platforms\":{},\"countries\":{}}");
|
||||
when(jedisPool.getWriteResource()).thenReturn(jedis);
|
||||
|
@ -137,7 +145,7 @@ public class ActiveUserCounterTest {
|
|||
|
||||
@Test
|
||||
public void testCrawlChunkValidAccount() throws AccountDatabaseCrawlerRestartException {
|
||||
activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount));
|
||||
activeUserCounter.onCrawlChunk(Optional.of(UUID_IOS), Arrays.asList(iosAccount));
|
||||
|
||||
verify(iosAccount, times(1)).getMasterDevice();
|
||||
verify(iosAccount, times(1)).getNumber();
|
||||
|
@ -148,7 +156,7 @@ public class ActiveUserCounterTest {
|
|||
|
||||
verify(jedisPool, times(1)).getWriteResource();
|
||||
verify(jedis, times(1)).get(any(String.class));
|
||||
verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_IOS+"\",\"platforms\":{\"ios\":[1,1,1,1,1]},\"countries\":{\"1\":[1,1,1,1,1]}}"));
|
||||
verify(jedis, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_IOS.toString()+"\",\"platforms\":{\"ios\":[1,1,1,1,1]},\"countries\":{\"1\":[1,1,1,1,1]}}"));
|
||||
verify(jedis, times(1)).close();
|
||||
|
||||
verify(metricsFactory, times(0)).getReporters();
|
||||
|
@ -166,13 +174,13 @@ public class ActiveUserCounterTest {
|
|||
|
||||
@Test
|
||||
public void testCrawlChunkNoDeviceAccount() throws AccountDatabaseCrawlerRestartException {
|
||||
activeUserCounter.onCrawlChunk(Optional.of(NUMBER_NODEVICE), Arrays.asList(noDeviceAccount));
|
||||
activeUserCounter.onCrawlChunk(Optional.of(UUID_NODEVICE), Arrays.asList(noDeviceAccount));
|
||||
|
||||
verify(noDeviceAccount, times(1)).getMasterDevice();
|
||||
|
||||
verify(jedisPool, times(1)).getWriteResource();
|
||||
verify(jedis, times(1)).get(eq(TALLY_KEY));
|
||||
verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_NODEVICE+"\",\"platforms\":{},\"countries\":{}}"));
|
||||
verify(jedis, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_NODEVICE+"\",\"platforms\":{},\"countries\":{}}"));
|
||||
verify(jedis, times(1)).close();
|
||||
|
||||
verify(metricsFactory, times(0)).getReporters();
|
||||
|
@ -190,7 +198,7 @@ public class ActiveUserCounterTest {
|
|||
|
||||
@Test
|
||||
public void testCrawlChunkMixedAccount() throws AccountDatabaseCrawlerRestartException {
|
||||
activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount, androidAccount, noDeviceAccount));
|
||||
activeUserCounter.onCrawlChunk(Optional.of(UUID_IOS), Arrays.asList(iosAccount, androidAccount, noDeviceAccount));
|
||||
|
||||
verify(iosAccount, times(1)).getMasterDevice();
|
||||
verify(iosAccount, times(1)).getNumber();
|
||||
|
@ -208,7 +216,7 @@ public class ActiveUserCounterTest {
|
|||
|
||||
verify(jedisPool, times(1)).getWriteResource();
|
||||
verify(jedis, times(1)).get(eq(TALLY_KEY));
|
||||
verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_IOS+"\",\"platforms\":{\"android\":[0,0,0,1,1],\"ios\":[1,1,1,1,1]},\"countries\":{\"55\":[0,0,0,1,1],\"1\":[1,1,1,1,1]}}"));
|
||||
verify(jedis, times(1)).set(any(String.class), eq("{\"fromUuid\":\""+UUID_IOS+"\",\"platforms\":{\"android\":[0,0,0,1,1],\"ios\":[1,1,1,1,1]},\"countries\":{\"55\":[0,0,0,1,1],\"1\":[1,1,1,1,1]}}"));
|
||||
verify(jedis, times(1)).close();
|
||||
|
||||
verify(metricsFactory, times(0)).getReporters();
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.whispersystems.textsecuregcm.util.Util;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
|
@ -40,8 +41,10 @@ import static org.mockito.ArgumentMatchers.eq;
|
|||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class DirectoryReconcilerTest {
|
||||
private static final String VALID_NUMBER = "valid";
|
||||
private static final String INACTIVE_NUMBER = "inactive";
|
||||
private static final UUID VALID_UUID = UUID.randomUUID();
|
||||
private static final String VALID_NUMBERRR = "+14152222222";
|
||||
private static final UUID INACTIVE_UUID = UUID.randomUUID();
|
||||
private static final String INACTIVE_NUMBERRR = "+14151111111";
|
||||
|
||||
private final Account activeAccount = mock(Account.class);
|
||||
private final Account inactiveAccount = mock(Account.class);
|
||||
|
@ -55,9 +58,11 @@ public class DirectoryReconcilerTest {
|
|||
|
||||
@Before
|
||||
public void setup() {
|
||||
when(activeAccount.getNumber()).thenReturn(VALID_NUMBER);
|
||||
when(activeAccount.getUuid()).thenReturn(VALID_UUID);
|
||||
when(activeAccount.isEnabled()).thenReturn(true);
|
||||
when(inactiveAccount.getNumber()).thenReturn(INACTIVE_NUMBER);
|
||||
when(activeAccount.getNumber()).thenReturn(VALID_NUMBERRR);
|
||||
when(inactiveAccount.getUuid()).thenReturn(INACTIVE_UUID);
|
||||
when(inactiveAccount.getNumber()).thenReturn(INACTIVE_NUMBERRR);
|
||||
when(inactiveAccount.isEnabled()).thenReturn(false);
|
||||
when(directoryManager.startBatchOperation()).thenReturn(batchOperationHandle);
|
||||
}
|
||||
|
@ -65,27 +70,29 @@ public class DirectoryReconcilerTest {
|
|||
@Test
|
||||
public void testCrawlChunkValid() throws AccountDatabaseCrawlerRestartException {
|
||||
when(reconciliationClient.sendChunk(any())).thenReturn(successResponse);
|
||||
directoryReconciler.onCrawlChunk(Optional.of(VALID_NUMBER), Arrays.asList(activeAccount, inactiveAccount));
|
||||
directoryReconciler.onCrawlChunk(Optional.of(VALID_UUID), Arrays.asList(activeAccount, inactiveAccount));
|
||||
|
||||
verify(activeAccount, atLeastOnce()).getUuid();
|
||||
verify(activeAccount, atLeastOnce()).getNumber();
|
||||
verify(activeAccount, atLeastOnce()).isEnabled();
|
||||
verify(inactiveAccount, atLeastOnce()).getUuid();
|
||||
verify(inactiveAccount, atLeastOnce()).getNumber();
|
||||
verify(inactiveAccount, atLeastOnce()).isEnabled();
|
||||
|
||||
ArgumentCaptor<DirectoryReconciliationRequest> request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class);
|
||||
verify(reconciliationClient, times(1)).sendChunk(request.capture());
|
||||
|
||||
assertThat(request.getValue().getFromNumber()).isEqualTo(VALID_NUMBER);
|
||||
assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER);
|
||||
assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER));
|
||||
assertThat(request.getValue().getFromUuid()).isEqualTo(VALID_UUID);
|
||||
assertThat(request.getValue().getToUuid()).isEqualTo(INACTIVE_UUID);
|
||||
assertThat(request.getValue().getUsers()).isEqualTo(Arrays.asList(new DirectoryReconciliationRequest.User(VALID_UUID, VALID_NUMBERRR)));
|
||||
|
||||
ArgumentCaptor<ClientContact> addedContact = ArgumentCaptor.forClass(ClientContact.class);
|
||||
verify(directoryManager, times(1)).startBatchOperation();
|
||||
verify(directoryManager, times(1)).add(eq(batchOperationHandle), addedContact.capture());
|
||||
verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER));
|
||||
verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBERRR));
|
||||
verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle));
|
||||
|
||||
assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER));
|
||||
assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBERRR));
|
||||
|
||||
verifyNoMoreInteractions(activeAccount);
|
||||
verifyNoMoreInteractions(inactiveAccount);
|
||||
|
|
|
@ -13,6 +13,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
@ -62,7 +63,7 @@ public class PushFeedbackProcessorTest {
|
|||
@Test
|
||||
public void testEmpty() {
|
||||
PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager, directoryQueue);
|
||||
processor.onCrawlChunk(Optional.of("+14152222222"), Collections.emptyList());
|
||||
processor.onCrawlChunk(Optional.of(UUID.randomUUID()), Collections.emptyList());
|
||||
|
||||
verifyZeroInteractions(accountsManager);
|
||||
verifyZeroInteractions(directoryQueue);
|
||||
|
@ -71,7 +72,7 @@ public class PushFeedbackProcessorTest {
|
|||
@Test
|
||||
public void testUpdate() {
|
||||
PushFeedbackProcessor processor = new PushFeedbackProcessor(accountsManager, directoryQueue);
|
||||
processor.onCrawlChunk(Optional.of("+14153333333"), List.of(uninstalledAccount, mixedAccount, stillActiveAccount, freshAccount, cleanAccount));
|
||||
processor.onCrawlChunk(Optional.of(UUID.randomUUID()), List.of(uninstalledAccount, mixedAccount, stillActiveAccount, freshAccount, cleanAccount));
|
||||
|
||||
verify(uninstalledDevice).setApnId(isNull());
|
||||
verify(uninstalledDevice).setGcmId(isNull());
|
||||
|
|
Loading…
Reference in New Issue