tally active users by time interval by platform and by country

1) refactor Directory Reconciler and pull out AccountDatabaseCrawler class
2) implement ActiveUserCounter to tally daily, weekly, monthly, etc active use
3) rework and simplify the crawl and sleep logic
4) move chunk interval and chunk size configuration options out of directory section and into accountDatabaseCrawler section
This commit is contained in:
Brian Acton 2018-12-20 21:20:06 -08:00
parent dbfe4fd5ac
commit 4b8608906a
14 changed files with 984 additions and 327 deletions

View File

@ -63,6 +63,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private DirectoryConfiguration directory;
@NotNull
@Valid
@JsonProperty
private AccountDatabaseCrawlerConfiguration accountDatabaseCrawler;
@NotNull
@Valid
@JsonProperty
@ -176,6 +181,10 @@ public class WhisperServerConfiguration extends Configuration {
return directory;
}
public AccountDatabaseCrawlerConfiguration getAccountDatabaseCrawlerConfiguration() {
return accountDatabaseCrawler;
}
public MessageCacheConfiguration getMessageCacheConfiguration() {
return messageCache;
}

View File

@ -83,7 +83,9 @@ import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration;
import javax.servlet.ServletRegistration;
import java.security.Security;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import static com.codahale.metrics.MetricRegistry.name;
@ -195,11 +197,14 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DirectoryCredentialsGenerator directoryCredentialsGenerator = new DirectoryCredentialsGenerator(config.getDirectoryConfiguration().getDirectoryClientConfiguration().getUserAuthenticationTokenSharedSecret(),
config.getDirectoryConfiguration().getDirectoryClientConfiguration().getUserAuthenticationTokenUserIdSecret());
DirectoryReconciliationCache directoryReconciliationCache = new DirectoryReconciliationCache(cacheClient);
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryReconciliationClient, directoryReconciliationCache, directory, accounts,
config.getDirectoryConfiguration().getDirectoryServerConfiguration().getReconciliationChunkSize(),
config.getDirectoryConfiguration().getDirectoryServerConfiguration().getReconciliationChunkIntervalMs());
DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryReconciliationClient, directory);
ActiveUserCounter activeUserCounter = new ActiveUserCounter(config.getMetricsFactory(), cacheClient);
List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = Arrays.asList(activeUserCounter, directoryReconciler);
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheClient);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accounts, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs());
messagesCache.setPubSubManager(pubSubManager, pushSender);
@ -208,7 +213,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(pubSubManager);
environment.lifecycle().manage(pushSender);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(directoryReconciler);
environment.lifecycle().manage(accountDatabaseCrawler);
AttachmentController attachmentController = new AttachmentController(rateLimiters, urlSigner);
KeysController keysController = new KeysController(rateLimiters, keys, accountsManager, directoryQueue);

View File

@ -0,0 +1,36 @@
/**
* Copyright (C) 2018 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
public class AccountDatabaseCrawlerConfiguration {
@JsonProperty
private int chunkSize = 1000;
@JsonProperty
private long chunkIntervalMs = 8000L;
public int getChunkSize() {
return chunkSize;
}
public long getChunkIntervalMs() {
return chunkIntervalMs;
}
}

View File

@ -33,12 +33,6 @@ public class DirectoryServerConfiguration {
@JsonProperty
private String replicationCaCertificate;
@JsonProperty
private int reconciliationChunkSize = 1000;
@JsonProperty
private long reconciliationChunkIntervalMs = 8000L;
public String getReplicationUrl() {
return replicationUrl;
}
@ -51,11 +45,4 @@ public class DirectoryServerConfiguration {
return replicationCaCertificate;
}
public int getReconciliationChunkSize() {
return reconciliationChunkSize;
}
public long getReconciliationChunkIntervalMs() {
return reconciliationChunkIntervalMs;
}
}

View File

@ -0,0 +1,57 @@
/**
* Copyright (C) 2018 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import java.util.HashMap;
public class ActiveUserTally {
@JsonProperty
private String fromNumber;
@JsonProperty
private Map<String, long[]> platforms;
@JsonProperty
private Map<String, long[]> countries;
public ActiveUserTally() {}
public ActiveUserTally(String fromNumber, Map<String, long[]> platforms, Map<String, long[]> countries) {
this.fromNumber = fromNumber;
this.platforms = platforms;
this.countries = countries;
}
public String getFromNumber() {
return this.fromNumber;
}
public Map<String, long[]> getPlatforms() {
return this.platforms;
}
public Map<String, long[]> getCountries() {
return this.countries;
}
public void setFromNumber(String fromNumber) {
this.fromNumber = fromNumber;
}
}

View File

@ -0,0 +1,160 @@
/*
* Copyright (C) 2018 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
public class AccountDatabaseCrawler implements Managed, Runnable {
private static final Logger logger = LoggerFactory.getLogger(AccountDatabaseCrawler.class);
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Timer readChunkTimer = metricRegistry.timer(name(AccountDatabaseCrawler.class, "readChunk"));
private static final long WORKER_TTL_MS = 120_000L;
private static final long ACCELERATED_CHUNK_INTERVAL = 10L;
private final Accounts accounts;
private final int chunkSize;
private final long chunkIntervalMs;
private final String workerId;
private final AccountDatabaseCrawlerCache cache;
private final List<AccountDatabaseCrawlerListener> listeners;
private AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
public AccountDatabaseCrawler(Accounts accounts,
AccountDatabaseCrawlerCache cache,
List<AccountDatabaseCrawlerListener> listeners,
int chunkSize,
long chunkIntervalMs)
{
this.accounts = accounts;
this.chunkSize = chunkSize;
this.chunkIntervalMs = chunkIntervalMs;
this.workerId = UUID.randomUUID().toString();
this.cache = cache;
this.listeners = listeners;
}
@Override
public synchronized void start() {
running.set(true);
new Thread(this).start();
}
@Override
public synchronized void stop() {
running.set(false);
notifyAll();
while (!finished) {
Util.wait(this);
}
}
@Override
public void run() {
boolean accelerated = false;
while (running.get()) {
try {
accelerated = doPeriodicWork();
sleepWhileRunning(accelerated ? ACCELERATED_CHUNK_INTERVAL : chunkIntervalMs);
} catch (Throwable t) {
logger.warn("error in database crawl: ", t);
}
}
synchronized (this) {
finished = true;
notifyAll();
}
}
@VisibleForTesting
public boolean doPeriodicWork() {
if (cache.claimActiveWork(workerId, WORKER_TTL_MS)) {
try {
long startTimeMs = System.currentTimeMillis();
processChunk();
if (cache.isAccelerated()) {
return true;
}
long endTimeMs = System.currentTimeMillis();
long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs);
if (sleepIntervalMs > 0) sleepWhileRunning(sleepIntervalMs);
} finally {
cache.releaseActiveWork(workerId);
}
}
return false;
}
private void processChunk() {
Optional<String> fromNumber = cache.getLastNumber();
if (!fromNumber.isPresent()) {
listeners.forEach(listener -> { listener.onCrawlStart(); });
}
List<Account> chunkAccounts = readChunk(fromNumber, chunkSize);
if (chunkAccounts.isEmpty()) {
listeners.forEach(listener -> { listener.onCrawlEnd(fromNumber); });
cache.setLastNumber(Optional.empty());
cache.clearAccelerate();
} else {
listeners.forEach(listener -> { listener.onCrawlChunk(fromNumber, chunkAccounts); });
cache.setLastNumber(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getNumber()));
}
}
private List<Account> readChunk(Optional<String> fromNumber, int chunkSize) {
try (Timer.Context timer = readChunkTimer.time()) {
List<Account> chunkAccounts;
if (fromNumber.isPresent()) {
chunkAccounts = accounts.getAllFrom(fromNumber.get(), chunkSize);
} else {
chunkAccounts = accounts.getAllFrom(chunkSize);
}
return chunkAccounts;
}
}
private synchronized void sleepWhileRunning(long delayMs) {
if (running.get()) Util.wait(this, delayMs);
}
}

View File

@ -25,21 +25,20 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class DirectoryReconciliationCache {
public class AccountDatabaseCrawlerCache {
private static final String ACTIVE_WORKER_KEY = "directory_reconciliation_active_worker";
private static final String LAST_NUMBER_KEY = "directory_reconciliation_last_number";
private static final String ACCELERATE_KEY = "directory_reconciliation_accelerate";
private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker";
private static final String LAST_NUMBER_KEY = "account_database_crawler_cache_last_number";
private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate";
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
private final ReplicatedJedisPool jedisPool;
private final UnlockOperation unlockOperation;
private final LuaScript luaScript;
public DirectoryReconciliationCache(ReplicatedJedisPool jedisPool) throws IOException {
this.jedisPool = jedisPool;
this.unlockOperation = new UnlockOperation(jedisPool);
public AccountDatabaseCrawlerCache(ReplicatedJedisPool jedisPool) throws IOException {
this.jedisPool = jedisPool;
this.luaScript = LuaScript.fromResource(jedisPool, "lua/account_database_crawler/unlock.lua");
}
public void clearAccelerate() {
@ -55,12 +54,17 @@ public class DirectoryReconciliationCache {
}
public boolean claimActiveWork(String workerId, long ttlMs) {
unlockOperation.unlock(ACTIVE_WORKER_KEY, workerId);
try (Jedis jedis = jedisPool.getWriteResource()) {
return "OK".equals(jedis.set(ACTIVE_WORKER_KEY, workerId, "NX", "PX", ttlMs));
}
}
public void releaseActiveWork(String workerId) {
List<byte[]> keys = Arrays.asList(ACTIVE_WORKER_KEY.getBytes());
List<byte[]> args = Arrays.asList(workerId.getBytes());
luaScript.execute(keys, args);
}
public Optional<String> getLastNumber() {
try (Jedis jedis = jedisPool.getWriteResource()) {
return Optional.ofNullable(jedis.get(LAST_NUMBER_KEY));
@ -77,20 +81,4 @@ public class DirectoryReconciliationCache {
}
}
public static class UnlockOperation {
private final LuaScript luaScript;
UnlockOperation(ReplicatedJedisPool jedisPool) throws IOException {
this.luaScript = LuaScript.fromResource(jedisPool, "lua/unlock.lua");
}
public boolean unlock(String key, String value) {
List<byte[]> keys = Arrays.asList(key.getBytes());
List<byte[]> args = Arrays.asList(value.getBytes());
return ((long) luaScript.execute(keys, args)) > 0;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright (C) 2018 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.storage;
import java.util.List;
import java.util.Optional;
public interface AccountDatabaseCrawlerListener {
void onCrawlStart();
void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts);
void onCrawlEnd(Optional<String> fromNumber);
}

View File

@ -0,0 +1,218 @@
/**
* Copyright (C) 2018 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.metrics.MetricsFactory;
import io.dropwizard.metrics.ReporterFactory;
import org.whispersystems.textsecuregcm.entities.ActiveUserTally;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
private static final String TALLY_KEY = "active_user_tally";
private static final String PLATFORM_IOS = "ios";
private static final String PLATFORM_ANDROID = "android";
private static final String FIRST_FROM_NUMBER = "+";
private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"};
private final MetricsFactory metricsFactory;
private final ReplicatedJedisPool jedisPool;
private final ObjectMapper mapper;
public ActiveUserCounter(MetricsFactory metricsFactory, ReplicatedJedisPool jedisPool) {
this.metricsFactory = metricsFactory;
this.jedisPool = jedisPool;
this.mapper = SystemMapper.getMapper();
}
public void onCrawlStart() {
try (Jedis jedis = jedisPool.getWriteResource()) {
jedis.del(TALLY_KEY);
}
}
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
long nowDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
long agoMs[] = {TimeUnit.DAYS.toMillis(nowDays - 1),
TimeUnit.DAYS.toMillis(nowDays - 7),
TimeUnit.DAYS.toMillis(nowDays - 30),
TimeUnit.DAYS.toMillis(nowDays - 90),
TimeUnit.DAYS.toMillis(nowDays - 365)};
Map<String, long[]> platformIncrements = new HashMap<>();
Map<String, long[]> countryIncrements = new HashMap<>();
for (Account account : chunkAccounts) {
Optional<Device> device = account.getMasterDevice();
if (device.isPresent()) {
long lastActiveMs = device.get().getLastSeen();
String platform = null;
if (device.get().getApnId() != null) {
platform = PLATFORM_IOS;
} else if (device.get().getGcmId() != null) {
platform = PLATFORM_ANDROID;
}
if (platform != null) {
String country = Util.getCountryCode(account.getNumber());
long[] platformIncrement = getTallyFromMap(platformIncrements, platform);
long[] countryIncrement = getTallyFromMap(countryIncrements, country);
for (int i = 0; i < agoMs.length; i++) {
if (lastActiveMs > agoMs[i]) {
platformIncrement[i]++;
countryIncrement[i]++;
}
}
}
}
}
incrementTallies(fromNumber.orElse(FIRST_FROM_NUMBER), platformIncrements, countryIncrements);
}
public void onCrawlEnd(Optional<String> fromNumber) {
MetricRegistry metrics = new MetricRegistry();
long intervalTallies[] = new long[INTERVALS.length];
ActiveUserTally activeUserTally = getFinalTallies();
Map<String, long[]> platforms = activeUserTally.getPlatforms();
platforms.forEach((platform, platformTallies) -> {
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = platformTallies[i];
metrics.register(metricKey(platform, INTERVALS[i]),
new Gauge<Long>() {
@Override
public Long getValue() { return tally; }
});
intervalTallies[i] += tally;
}
});
Map<String, long[]> countries = activeUserTally.getCountries();
countries.forEach((country, countryTallies) -> {
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = countryTallies[i];
metrics.register(metricKey(country, INTERVALS[i]),
new Gauge<Long>() {
@Override
public Long getValue() { return tally; }
});
}
});
for (int i = 0; i < INTERVALS.length; i++) {
final long intervalTotal = intervalTallies[i];
metrics.register(metricKey(INTERVALS[i]),
new Gauge<Long>() {
@Override
public Long getValue() { return intervalTotal; }
});
}
for (ReporterFactory reporterFactory : metricsFactory.getReporters()) {
reporterFactory.build(metrics).report();
}
}
private long[] getTallyFromMap(Map<String, long[]> map, String key) {
long[] tally = map.get(key);
if (tally == null) {
tally = new long[INTERVALS.length];
map.put(key, tally);
}
return tally;
}
private void incrementTallies(String fromNumber, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
try (Jedis jedis = jedisPool.getWriteResource()) {
String tallyValue = jedis.get(TALLY_KEY);
ActiveUserTally activeUserTally;
if (tallyValue == null) {
activeUserTally = new ActiveUserTally(fromNumber, platformIncrements, countryIncrements);
} else {
activeUserTally = mapper.readValue(tallyValue, ActiveUserTally.class);
if (activeUserTally.getFromNumber() != fromNumber) {
activeUserTally.setFromNumber(fromNumber);
Map<String, long[]> platformTallies = activeUserTally.getPlatforms();
addTallyMaps(platformTallies, platformIncrements);
Map<String, long[]> countryTallies = activeUserTally.getCountries();
addTallyMaps(countryTallies, countryIncrements);
}
}
jedis.set(TALLY_KEY, mapper.writeValueAsString(activeUserTally));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void addTallyMaps(Map<String, long[]> tallyMap, Map<String, long[]> incrementMap) {
incrementMap.forEach((key, increments) -> {
long[] tallies = tallyMap.get(key);
if (tallies == null) {
tallyMap.put(key, increments);
} else {
for (int i = 0; i < INTERVALS.length; i++) {
tallies[i] += increments[i];
}
}
});
}
private ActiveUserTally getFinalTallies() {
try (Jedis jedis = jedisPool.getReadResource()) {
return mapper.readValue(jedis.get(TALLY_KEY), ActiveUserTally.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private String metricKey(String platform, String intervalName) {
return MetricRegistry.name(ActiveUserCounter.class, intervalName + "_active_" + platform);
}
private String metricKey(String intervalName) {
return MetricRegistry.name(ActiveUserCounter.class, intervalName + "_active");
}
}

View File

@ -28,153 +28,51 @@ import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse;
import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Hex;
import org.whispersystems.textsecuregcm.util.Util;
import javax.ws.rs.ProcessingException;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class DirectoryReconciler implements Managed, Runnable {
public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class);
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Timer readChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, "readChunk"));
private static final Timer sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, "sendChunk"));
private static final Meter sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, "sendChunkError"));
private static final long WORKER_TTL_MS = 120_000L;
private static final long MINIMUM_CHUNK_INTERVAL = 500L;
private static final long ACCELERATED_CHUNK_INTERVAL = 10L;
private static final double JITTER_MAX = 0.20;
private final Accounts accounts;
private final DirectoryManager directoryManager;
private final DirectoryReconciliationClient reconciliationClient;
private final DirectoryReconciliationCache reconciliationCache;
private final int chunkSize;
private final long chunkIntervalMs;
private final String workerId;
private boolean running;
private boolean finished;
public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient,
DirectoryReconciliationCache reconciliationCache,
DirectoryManager directoryManager,
Accounts accounts,
int chunkSize,
long chunkIntervalMs)
{
this.accounts = accounts;
public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient, DirectoryManager directoryManager) {
this.directoryManager = directoryManager;
this.reconciliationClient = reconciliationClient;
this.reconciliationCache = reconciliationCache;
this.chunkSize = chunkSize;
this.chunkIntervalMs = chunkIntervalMs;
this.workerId = Hex.toString(Util.generateSecretBytes(16));
}
@Override
public synchronized void start() {
running = true;
new Thread(this).start();
public void onCrawlStart() { }
public void onCrawlEnd(Optional<String> fromNumber) {
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromNumber.orElse(null), null, Collections.emptyList());
DirectoryReconciliationResponse response = sendChunk(request);
}
@Override
public synchronized void stop() {
running = false;
notifyAll();
while (!finished) {
Util.wait(this);
}
}
@Override
public void run() {
long delayMs = chunkIntervalMs;
while (sleepWhileRunning(getDelayWithJitter(delayMs))) {
try {
delayMs = getBoundedChunkInterval(chunkIntervalMs);
delayMs = doPeriodicWork(delayMs);
} catch (Throwable t) {
logger.warn("error in directory reconciliation: ", t);
}
}
synchronized (this) {
finished = true;
notifyAll();
}
}
@VisibleForTesting
public long doPeriodicWork(long intervalMs) {
long nextIntervalTimeMs = System.currentTimeMillis() + intervalMs;
if (reconciliationCache.claimActiveWork(workerId, WORKER_TTL_MS)) {
if (processChunk()) {
if (!reconciliationCache.isAccelerated()) {
long timeUntilNextIntervalMs = getTimeUntilNextInterval(nextIntervalTimeMs);
reconciliationCache.claimActiveWork(workerId, timeUntilNextIntervalMs);
return timeUntilNextIntervalMs;
} else {
return ACCELERATED_CHUNK_INTERVAL;
}
}
}
return intervalMs;
}
private boolean processChunk() {
Optional<String> fromNumber = reconciliationCache.getLastNumber();
List<Account> chunkAccounts = readChunk(fromNumber, chunkSize);
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
updateDirectoryCache(chunkAccounts);
DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts);
DirectoryReconciliationResponse sendChunkResponse = sendChunk(request);
DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts);
DirectoryReconciliationResponse response = sendChunk(request);
if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.MISSING || request.getToNumber() == null) {
reconciliationCache.clearAccelerate();
}
if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.OK) {
reconciliationCache.setLastNumber(Optional.ofNullable(request.getToNumber()));
} else if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.MISSING) {
reconciliationCache.setLastNumber(Optional.empty());
}
return sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.OK;
}
private List<Account> readChunk(Optional<String> fromNumber, int chunkSize) {
try (Timer.Context timer = readChunkTimer.time()) {
Optional<List<Account>> chunkAccounts;
if (fromNumber.isPresent()) {
chunkAccounts = Optional.ofNullable(accounts.getAllFrom(fromNumber.get(), chunkSize));
} else {
chunkAccounts = Optional.ofNullable(accounts.getAllFrom(chunkSize));
}
return chunkAccounts.orElse(Collections.emptyList());
}
}
private void updateDirectoryCache(List<Account> accounts) {
if (accounts.isEmpty()) {
return;
}
BatchOperationHandle batchOperation = directoryManager.startBatchOperation();
@ -183,7 +81,6 @@ public class DirectoryReconciler implements Managed, Runnable {
if (account.isActive()) {
byte[] token = Util.getContactToken(account.getNumber());
ClientContact clientContact = new ClientContact(token, null, true, true);
directoryManager.add(batchOperation, clientContact);
} else {
directoryManager.remove(batchOperation, account.getNumber());
@ -200,9 +97,10 @@ public class DirectoryReconciler implements Managed, Runnable {
.map(Account::getNumber)
.collect(Collectors.toList());
Optional<String> toNumber = Optional.empty();
Optional<String> toNumber = Optional.empty();
if (!accounts.isEmpty()) {
toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber());
toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber());
}
return new DirectoryReconciliationRequest(fromNumber.orElse(null), toNumber.orElse(null), numbers);
@ -223,29 +121,4 @@ public class DirectoryReconciler implements Managed, Runnable {
}
}
private synchronized boolean sleepWhileRunning(long delayMs) {
long startTimeMs = System.currentTimeMillis();
while (running && delayMs > 0) {
Util.wait(this, delayMs);
long nowMs = System.currentTimeMillis();
delayMs -= Math.abs(nowMs - startTimeMs);
}
return running;
}
private long getTimeUntilNextInterval(long nextIntervalTimeMs) {
long nextIntervalMs = nextIntervalTimeMs - System.currentTimeMillis();
return getBoundedChunkInterval(nextIntervalMs);
}
private long getBoundedChunkInterval(long intervalMs) {
return Math.max(Math.min(intervalMs, chunkIntervalMs), MINIMUM_CHUNK_INTERVAL);
}
private long getDelayWithJitter(long delayMs) {
long randomJitterMs = (long) (new SecureRandom().nextDouble() * JITTER_MAX * delayMs);
return delayMs + randomJitterMs;
}
}

View File

@ -0,0 +1,174 @@
/**
* Copyright (C) 2018 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.tests.storage;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
public class AccountDatabaseCrawlerTest {
private static final String ACCOUNT1 = "+1";
private static final String ACCOUNT2 = "+2";
private static final int CHUNK_SIZE = 1000;
private static final long CHUNK_INTERVAL_MS = 30_000L;
private final Account account1 = mock(Account.class);
private final Account account2 = mock(Account.class);
private final Accounts accounts = mock(Accounts.class);
private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class);
private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class);
private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS);
@Before
public void setup() {
when(account1.getNumber()).thenReturn(ACCOUNT1);
when(account2.getNumber()).thenReturn(ACCOUNT2);
when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account1, account2));
when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(Arrays.asList(account2));
when(accounts.getAllFrom(eq(ACCOUNT2), anyInt())).thenReturn(Collections.emptyList());
when(cache.claimActiveWork(any(), anyLong())).thenReturn(true);
when(cache.isAccelerated()).thenReturn(false);
}
@Test
public void testCrawlStart() {
when(cache.getLastNumber()).thenReturn(Optional.empty());
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastNumber();
verify(listener, times(1)).onCrawlStart();
verify(accounts, times(1)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(0)).getAllFrom(any(String.class), eq(CHUNK_SIZE));
verify(account1, times(0)).getNumber();
verify(account2, times(1)).getNumber();
verify(listener, times(1)).onCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2)));
verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyNoMoreInteractions(account1);
verifyNoMoreInteractions(account2);
verifyNoMoreInteractions(accounts);
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(cache);
}
@Test
public void testCrawlChunk() {
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastNumber();
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(account2, times(1)).getNumber();
verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyZeroInteractions(account1);
verifyNoMoreInteractions(account2);
verifyNoMoreInteractions(accounts);
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(cache);
}
@Test
public void testCrawlChunkAccelerated() {
when(cache.isAccelerated()).thenReturn(true);
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isTrue();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastNumber();
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(account2, times(1)).getNumber();
verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2)));
verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyZeroInteractions(account1);
verifyNoMoreInteractions(account2);
verifyNoMoreInteractions(accounts);
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(cache);
}
@Test
public void testCrawlEnd() {
when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT2));
boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse();
verify(cache, times(1)).claimActiveWork(any(String.class), anyLong());
verify(cache, times(1)).getLastNumber();
verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFrom(eq(ACCOUNT2), eq(CHUNK_SIZE));
verify(account1, times(0)).getNumber();
verify(account2, times(0)).getNumber();
verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).setLastNumber(eq(Optional.empty()));
verify(cache, times(1)).clearAccelerate();
verify(cache, times(1)).isAccelerated();
verify(cache, times(1)).releaseActiveWork(any(String.class));
verifyZeroInteractions(account1);
verifyZeroInteractions(account2);
verifyNoMoreInteractions(accounts);
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(cache);
}
}

View File

@ -0,0 +1,226 @@
/**
* Copyright (C) 2018 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.tests.storage;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.ActiveUserCounter;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Util;
import com.google.common.collect.ImmutableList;
import io.dropwizard.metrics.MetricsFactory;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.Optional;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class ActiveUserCounterTest {
private final String NUMBER_IOS = "+15551234567";
private final String NUMBER_ANDROID = "+5511987654321";
private final String NUMBER_NODEVICE = "+5215551234567";
private final String TALLY_KEY = "active_user_tally";
private final Device iosDevice = mock(Device.class);
private final Device androidDevice = mock(Device.class);
private final Account androidAccount = mock(Account.class);
private final Account iosAccount = mock(Account.class);
private final Account noDeviceAccount = mock(Account.class);
private final Jedis jedis = mock(Jedis.class);
private final ReplicatedJedisPool jedisPool = mock(ReplicatedJedisPool.class);
private final MetricsFactory metricsFactory = mock(MetricsFactory.class);
private final ActiveUserCounter activeUserCounter = new ActiveUserCounter(metricsFactory, jedisPool);
@Before
public void setup() {
long halfDayAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(12);
long fortyFiveDayAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(45);
when(androidDevice.getApnId()).thenReturn(null);
when(androidDevice.getGcmId()).thenReturn("mock-gcm-id");
when(androidDevice.getLastSeen()).thenReturn(fortyFiveDayAgo);
when(iosDevice.getApnId()).thenReturn("mock-apn-id");
when(iosDevice.getGcmId()).thenReturn(null);
when(iosDevice.getLastSeen()).thenReturn(halfDayAgo);
when(iosAccount.getNumber()).thenReturn(NUMBER_IOS);
when(iosAccount.getMasterDevice()).thenReturn(Optional.of(iosDevice));
when(androidAccount.getNumber()).thenReturn(NUMBER_ANDROID);
when(androidAccount.getMasterDevice()).thenReturn(Optional.of(androidDevice));
when(noDeviceAccount.getNumber()).thenReturn(NUMBER_NODEVICE);
when(noDeviceAccount.getMasterDevice()).thenReturn(Optional.ofNullable(null));
when(jedis.get(any(String.class))).thenReturn("{\"fromNumber\":\"+\",\"platforms\":{},\"countries\":{}}");
when(jedisPool.getWriteResource()).thenReturn(jedis);
when(jedisPool.getReadResource()).thenReturn(jedis);
when(metricsFactory.getReporters()).thenReturn(ImmutableList.of());
}
@Test
public void testCrawlStart() {
activeUserCounter.onCrawlStart();
verify(jedisPool, times(1)).getWriteResource();
verify(jedis, times(1)).del(any(String.class));
verify(jedis, times(1)).close();
verifyZeroInteractions(iosDevice);
verifyZeroInteractions(iosAccount);
verifyZeroInteractions(androidDevice);
verifyZeroInteractions(androidAccount);
verifyZeroInteractions(noDeviceAccount);
verifyZeroInteractions(metricsFactory);
verifyNoMoreInteractions(jedis);
verifyNoMoreInteractions(jedisPool);
}
@Test
public void testCrawlEnd() {
activeUserCounter.onCrawlEnd(Optional.empty());
verify(jedisPool, times(1)).getReadResource();
verify(jedis, times(1)).get(any(String.class));
verify(jedis, times(1)).close();
verify(metricsFactory, times(1)).getReporters();
verifyZeroInteractions(iosDevice);
verifyZeroInteractions(iosAccount);
verifyZeroInteractions(androidDevice);
verifyZeroInteractions(androidAccount);
verifyZeroInteractions(noDeviceAccount);
verifyNoMoreInteractions(metricsFactory);
verifyNoMoreInteractions(jedis);
verifyNoMoreInteractions(jedisPool);
}
@Test
public void testCrawlChunkValidAccount() {
activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount));
verify(iosAccount, times(1)).getMasterDevice();
verify(iosAccount, times(1)).getNumber();
verify(iosDevice, times(1)).getLastSeen();
verify(iosDevice, times(1)).getApnId();
verify(iosDevice, times(0)).getGcmId();
verify(jedisPool, times(1)).getWriteResource();
verify(jedis, times(1)).get(any(String.class));
verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_IOS+"\",\"platforms\":{\"ios\":[1,1,1,1,1]},\"countries\":{\"1\":[1,1,1,1,1]}}"));
verify(jedis, times(1)).close();
verify(metricsFactory, times(0)).getReporters();
verifyZeroInteractions(androidDevice);
verifyZeroInteractions(androidAccount);
verifyZeroInteractions(noDeviceAccount);
verifyZeroInteractions(metricsFactory);
verifyNoMoreInteractions(iosDevice);
verifyNoMoreInteractions(iosAccount);
verifyNoMoreInteractions(jedis);
verifyNoMoreInteractions(jedisPool);
}
@Test
public void testCrawlChunkNoDeviceAccount() {
activeUserCounter.onCrawlChunk(Optional.of(NUMBER_NODEVICE), Arrays.asList(noDeviceAccount));
verify(noDeviceAccount, times(1)).getMasterDevice();
verify(jedisPool, times(1)).getWriteResource();
verify(jedis, times(1)).get(eq(TALLY_KEY));
verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_NODEVICE+"\",\"platforms\":{},\"countries\":{}}"));
verify(jedis, times(1)).close();
verify(metricsFactory, times(0)).getReporters();
verifyZeroInteractions(iosDevice);
verifyZeroInteractions(iosAccount);
verifyZeroInteractions(androidDevice);
verifyZeroInteractions(androidAccount);
verifyZeroInteractions(noDeviceAccount);
verifyZeroInteractions(metricsFactory);
verifyNoMoreInteractions(jedis);
verifyNoMoreInteractions(jedisPool);
}
@Test
public void testCrawlChunkMixedAccount() {
activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount, androidAccount, noDeviceAccount));
verify(iosAccount, times(1)).getMasterDevice();
verify(iosAccount, times(1)).getNumber();
verify(androidAccount, times(1)).getMasterDevice();
verify(androidAccount, times(1)).getNumber();
verify(noDeviceAccount, times(1)).getMasterDevice();
verify(iosDevice, times(1)).getLastSeen();
verify(iosDevice, times(1)).getApnId();
verify(iosDevice, times(0)).getGcmId();
verify(androidDevice, times(1)).getLastSeen();
verify(androidDevice, times(1)).getApnId();
verify(androidDevice, times(1)).getGcmId();
verify(jedisPool, times(1)).getWriteResource();
verify(jedis, times(1)).get(eq(TALLY_KEY));
verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_IOS+"\",\"platforms\":{\"android\":[0,0,0,1,1],\"ios\":[1,1,1,1,1]},\"countries\":{\"55\":[0,0,0,1,1],\"1\":[1,1,1,1,1]}}"));
verify(jedis, times(1)).close();
verify(metricsFactory, times(0)).getReporters();
verifyZeroInteractions(metricsFactory);
verifyNoMoreInteractions(iosDevice);
verifyNoMoreInteractions(iosAccount);
verifyNoMoreInteractions(androidDevice);
verifyNoMoreInteractions(androidAccount);
verifyNoMoreInteractions(noDeviceAccount);
verifyNoMoreInteractions(jedis);
verifyNoMoreInteractions(jedisPool);
}
}

View File

@ -1,180 +1,82 @@
/**
* Copyright (C) 2018 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.tests.storage;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.whispersystems.textsecuregcm.entities.ClientContact;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle;
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationCache;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient;
import org.whispersystems.textsecuregcm.util.Util;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
public class DirectoryReconcilerTest {
private static final String VALID_NUMBER = "valid";
private static final String INACTIVE_NUMBER = "inactive";
private static final long INTERVAL_MS = 30_000L;
private final Account account = mock(Account.class);
private final Account activeAccount = mock(Account.class);
private final Account inactiveAccount = mock(Account.class);
private final Accounts accounts = mock(Accounts.class);
private final BatchOperationHandle batchOperationHandle = mock(BatchOperationHandle.class);
private final DirectoryManager directoryManager = mock(DirectoryManager.class);
private final DirectoryReconciliationClient reconciliationClient = mock(DirectoryReconciliationClient.class);
private final DirectoryReconciliationCache reconciliationCache = mock(DirectoryReconciliationCache.class);
private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, reconciliationCache, directoryManager, accounts, 1000, INTERVAL_MS);
private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, directoryManager);
private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK);
private final DirectoryReconciliationResponse notFoundResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.MISSING);
private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK);
private final DirectoryReconciliationResponse missingResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.MISSING);
@Before
public void setup() {
when(account.getNumber()).thenReturn(VALID_NUMBER);
when(account.isActive()).thenReturn(true);
when(activeAccount.getNumber()).thenReturn(VALID_NUMBER);
when(activeAccount.isActive()).thenReturn(true);
when(inactiveAccount.getNumber()).thenReturn(INACTIVE_NUMBER);
when(inactiveAccount.isActive()).thenReturn(false);
when(directoryManager.startBatchOperation()).thenReturn(batchOperationHandle);
}
when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account, inactiveAccount));
when(accounts.getAllFrom(eq(VALID_NUMBER), anyInt())).thenReturn(Arrays.asList(inactiveAccount));
when(accounts.getAllFrom(eq(INACTIVE_NUMBER), anyInt())).thenReturn(Collections.emptyList());
@Test
public void testCrawlChunkValid() {
when(reconciliationClient.sendChunk(any())).thenReturn(successResponse);
directoryReconciler.onCrawlChunk(Optional.of(VALID_NUMBER), Arrays.asList(activeAccount, inactiveAccount));
when(reconciliationCache.getLastNumber()).thenReturn(Optional.empty());
when(reconciliationCache.claimActiveWork(any(), anyLong())).thenReturn(true);
when(reconciliationCache.isAccelerated()).thenReturn(false);
}
@Test
public void testValid() {
long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS);
assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS);
verify(accounts, times(1)).getAllFrom(anyInt());
ArgumentCaptor<DirectoryReconciliationRequest> request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class);
verify(reconciliationClient, times(1)).sendChunk(request.capture());
assertThat(request.getValue().getFromNumber()).isNull();
assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER);
assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER));
ArgumentCaptor<ClientContact> addedContact = ArgumentCaptor.forClass(ClientContact.class);
verify(directoryManager, times(1)).startBatchOperation();
verify(directoryManager, times(1)).add(eq(batchOperationHandle), addedContact.capture());
verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER));
verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle));
assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER));
verify(reconciliationCache, times(1)).getLastNumber();
verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.of(INACTIVE_NUMBER)));
verify(reconciliationCache, times(1)).isAccelerated();
verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong());
verifyNoMoreInteractions(accounts);
verifyNoMoreInteractions(directoryManager);
verifyNoMoreInteractions(reconciliationClient);
verifyNoMoreInteractions(reconciliationCache);
}
@Test
public void testInProgress() {
when(reconciliationCache.getLastNumber()).thenReturn(Optional.of(VALID_NUMBER));
long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS);
assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS);
verify(accounts, times(1)).getAllFrom(eq(VALID_NUMBER), anyInt());
verify(activeAccount, times(2)).getNumber();
verify(activeAccount, times(2)).isActive();
verify(inactiveAccount, times(2)).getNumber();
verify(inactiveAccount, times(2)).isActive();
ArgumentCaptor<DirectoryReconciliationRequest> request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class);
verify(reconciliationClient, times(1)).sendChunk(request.capture());
assertThat(request.getValue().getFromNumber()).isEqualTo(VALID_NUMBER);
assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER);
assertThat(request.getValue().getNumbers()).isEqualTo(Collections.emptyList());
verify(directoryManager, times(1)).startBatchOperation();
verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER));
verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle));
verify(reconciliationCache, times(1)).getLastNumber();
verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.of(INACTIVE_NUMBER)));
verify(reconciliationCache, times(1)).isAccelerated();
verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong());
verifyNoMoreInteractions(accounts);
verifyNoMoreInteractions(directoryManager);
verifyNoMoreInteractions(reconciliationClient);
verifyNoMoreInteractions(reconciliationCache);
}
@Test
public void testLastChunk() {
when(reconciliationCache.getLastNumber()).thenReturn(Optional.of(INACTIVE_NUMBER));
long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS);
assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS);
verify(accounts, times(1)).getAllFrom(eq(INACTIVE_NUMBER), anyInt());
ArgumentCaptor<DirectoryReconciliationRequest> request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class);
verify(reconciliationClient, times(1)).sendChunk(request.capture());
assertThat(request.getValue().getFromNumber()).isEqualTo(INACTIVE_NUMBER);
assertThat(request.getValue().getToNumber()).isNull();
assertThat(request.getValue().getNumbers()).isEqualTo(Collections.emptyList());
verify(reconciliationCache, times(1)).getLastNumber();
verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.empty()));
verify(reconciliationCache, times(1)).clearAccelerate();
verify(reconciliationCache, times(1)).isAccelerated();
verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong());
verifyNoMoreInteractions(accounts);
verifyNoMoreInteractions(directoryManager);
verifyNoMoreInteractions(reconciliationClient);
verifyNoMoreInteractions(reconciliationCache);
}
@Test
public void testNotFound() {
when(reconciliationClient.sendChunk(any())).thenReturn(notFoundResponse);
long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS);
assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS);
verify(accounts, times(1)).getAllFrom(anyInt());
ArgumentCaptor<DirectoryReconciliationRequest> request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class);
verify(reconciliationClient, times(1)).sendChunk(request.capture());
assertThat(request.getValue().getFromNumber()).isNull();
assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER);
assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER));
ArgumentCaptor<ClientContact> addedContact = ArgumentCaptor.forClass(ClientContact.class);
@ -185,15 +87,11 @@ public class DirectoryReconcilerTest {
assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER));
verify(reconciliationCache, times(1)).getLastNumber();
verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.empty()));
verify(reconciliationCache, times(1)).clearAccelerate();
verify(reconciliationCache, times(1)).claimActiveWork(any(), anyLong());
verifyNoMoreInteractions(accounts);
verifyNoMoreInteractions(activeAccount);
verifyNoMoreInteractions(inactiveAccount);
verifyNoMoreInteractions(batchOperationHandle);
verifyNoMoreInteractions(directoryManager);
verifyNoMoreInteractions(reconciliationClient);
verifyNoMoreInteractions(reconciliationCache);
}
}