diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 5d97983c7..762359616 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -63,6 +63,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private DirectoryConfiguration directory; + @NotNull + @Valid + @JsonProperty + private AccountDatabaseCrawlerConfiguration accountDatabaseCrawler; + @NotNull @Valid @JsonProperty @@ -176,6 +181,10 @@ public class WhisperServerConfiguration extends Configuration { return directory; } + public AccountDatabaseCrawlerConfiguration getAccountDatabaseCrawlerConfiguration() { + return accountDatabaseCrawler; + } + public MessageCacheConfiguration getMessageCacheConfiguration() { return messageCache; } diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index f425fe869..114ccbd40 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -83,7 +83,9 @@ import javax.servlet.DispatcherType; import javax.servlet.FilterRegistration; import javax.servlet.ServletRegistration; import java.security.Security; +import java.util.Arrays; import java.util.EnumSet; +import java.util.List; import java.util.Optional; import static com.codahale.metrics.MetricRegistry.name; @@ -195,11 +197,14 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = Arrays.asList(activeUserCounter, directoryReconciler); + + AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheClient); + AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accounts, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()); messagesCache.setPubSubManager(pubSubManager, pushSender); @@ -208,7 +213,7 @@ public class WhisperServerService extends Application. + */ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class AccountDatabaseCrawlerConfiguration { + + @JsonProperty + private int chunkSize = 1000; + + @JsonProperty + private long chunkIntervalMs = 8000L; + + public int getChunkSize() { + return chunkSize; + } + + public long getChunkIntervalMs() { + return chunkIntervalMs; + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java index d424143fd..313dfa583 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java @@ -33,12 +33,6 @@ public class DirectoryServerConfiguration { @JsonProperty private String replicationCaCertificate; - @JsonProperty - private int reconciliationChunkSize = 1000; - - @JsonProperty - private long reconciliationChunkIntervalMs = 8000L; - public String getReplicationUrl() { return replicationUrl; } @@ -51,11 +45,4 @@ public class DirectoryServerConfiguration { return replicationCaCertificate; } - public int getReconciliationChunkSize() { - return reconciliationChunkSize; - } - - public long getReconciliationChunkIntervalMs() { - return reconciliationChunkIntervalMs; - } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/ActiveUserTally.java b/src/main/java/org/whispersystems/textsecuregcm/entities/ActiveUserTally.java new file mode 100644 index 000000000..5976db2c4 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/entities/ActiveUserTally.java @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import java.util.HashMap; + +public class ActiveUserTally { + @JsonProperty + private String fromNumber; + + @JsonProperty + private Map platforms; + + @JsonProperty + private Map countries; + + public ActiveUserTally() {} + + public ActiveUserTally(String fromNumber, Map platforms, Map countries) { + this.fromNumber = fromNumber; + this.platforms = platforms; + this.countries = countries; + } + + public String getFromNumber() { + return this.fromNumber; + } + + public Map getPlatforms() { + return this.platforms; + } + + public Map getCountries() { + return this.countries; + } + + public void setFromNumber(String fromNumber) { + this.fromNumber = fromNumber; + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java new file mode 100644 index 000000000..f9bbc00c6 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.storage; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.util.Constants; +import org.whispersystems.textsecuregcm.util.Util; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.codahale.metrics.MetricRegistry.name; +import io.dropwizard.lifecycle.Managed; + +public class AccountDatabaseCrawler implements Managed, Runnable { + + private static final Logger logger = LoggerFactory.getLogger(AccountDatabaseCrawler.class); + private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private static final Timer readChunkTimer = metricRegistry.timer(name(AccountDatabaseCrawler.class, "readChunk")); + + private static final long WORKER_TTL_MS = 120_000L; + private static final long ACCELERATED_CHUNK_INTERVAL = 10L; + + private final Accounts accounts; + private final int chunkSize; + private final long chunkIntervalMs; + private final String workerId; + private final AccountDatabaseCrawlerCache cache; + private final List listeners; + + private AtomicBoolean running = new AtomicBoolean(false); + private boolean finished; + + public AccountDatabaseCrawler(Accounts accounts, + AccountDatabaseCrawlerCache cache, + List listeners, + int chunkSize, + long chunkIntervalMs) + { + this.accounts = accounts; + this.chunkSize = chunkSize; + this.chunkIntervalMs = chunkIntervalMs; + this.workerId = UUID.randomUUID().toString(); + this.cache = cache; + this.listeners = listeners; + } + + @Override + public synchronized void start() { + running.set(true); + new Thread(this).start(); + } + + @Override + public synchronized void stop() { + running.set(false); + notifyAll(); + while (!finished) { + Util.wait(this); + } + } + + @Override + public void run() { + boolean accelerated = false; + + while (running.get()) { + try { + accelerated = doPeriodicWork(); + sleepWhileRunning(accelerated ? ACCELERATED_CHUNK_INTERVAL : chunkIntervalMs); + } catch (Throwable t) { + logger.warn("error in database crawl: ", t); + } + } + + synchronized (this) { + finished = true; + notifyAll(); + } + } + + @VisibleForTesting + public boolean doPeriodicWork() { + if (cache.claimActiveWork(workerId, WORKER_TTL_MS)) { + try { + long startTimeMs = System.currentTimeMillis(); + processChunk(); + if (cache.isAccelerated()) { + return true; + } + long endTimeMs = System.currentTimeMillis(); + long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs); + if (sleepIntervalMs > 0) sleepWhileRunning(sleepIntervalMs); + } finally { + cache.releaseActiveWork(workerId); + } + } + return false; + } + + private void processChunk() { + Optional fromNumber = cache.getLastNumber(); + + if (!fromNumber.isPresent()) { + listeners.forEach(listener -> { listener.onCrawlStart(); }); + } + + List chunkAccounts = readChunk(fromNumber, chunkSize); + + if (chunkAccounts.isEmpty()) { + listeners.forEach(listener -> { listener.onCrawlEnd(fromNumber); }); + cache.setLastNumber(Optional.empty()); + cache.clearAccelerate(); + } else { + listeners.forEach(listener -> { listener.onCrawlChunk(fromNumber, chunkAccounts); }); + cache.setLastNumber(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getNumber())); + } + + } + + private List readChunk(Optional fromNumber, int chunkSize) { + try (Timer.Context timer = readChunkTimer.time()) { + List chunkAccounts; + + if (fromNumber.isPresent()) { + chunkAccounts = accounts.getAllFrom(fromNumber.get(), chunkSize); + } else { + chunkAccounts = accounts.getAllFrom(chunkSize); + } + + return chunkAccounts; + } + } + + private synchronized void sleepWhileRunning(long delayMs) { + if (running.get()) Util.wait(this, delayMs); + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java similarity index 65% rename from src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java rename to src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java index 118b2ea60..73e977c16 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java @@ -25,21 +25,20 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -@SuppressWarnings("OptionalUsedAsFieldOrParameterType") -public class DirectoryReconciliationCache { +public class AccountDatabaseCrawlerCache { - private static final String ACTIVE_WORKER_KEY = "directory_reconciliation_active_worker"; - private static final String LAST_NUMBER_KEY = "directory_reconciliation_last_number"; - private static final String ACCELERATE_KEY = "directory_reconciliation_accelerate"; + private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker"; + private static final String LAST_NUMBER_KEY = "account_database_crawler_cache_last_number"; + private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate"; private static final long LAST_NUMBER_TTL_MS = 86400_000L; private final ReplicatedJedisPool jedisPool; - private final UnlockOperation unlockOperation; + private final LuaScript luaScript; - public DirectoryReconciliationCache(ReplicatedJedisPool jedisPool) throws IOException { - this.jedisPool = jedisPool; - this.unlockOperation = new UnlockOperation(jedisPool); + public AccountDatabaseCrawlerCache(ReplicatedJedisPool jedisPool) throws IOException { + this.jedisPool = jedisPool; + this.luaScript = LuaScript.fromResource(jedisPool, "lua/account_database_crawler/unlock.lua"); } public void clearAccelerate() { @@ -55,12 +54,17 @@ public class DirectoryReconciliationCache { } public boolean claimActiveWork(String workerId, long ttlMs) { - unlockOperation.unlock(ACTIVE_WORKER_KEY, workerId); try (Jedis jedis = jedisPool.getWriteResource()) { return "OK".equals(jedis.set(ACTIVE_WORKER_KEY, workerId, "NX", "PX", ttlMs)); } } + public void releaseActiveWork(String workerId) { + List keys = Arrays.asList(ACTIVE_WORKER_KEY.getBytes()); + List args = Arrays.asList(workerId.getBytes()); + luaScript.execute(keys, args); + } + public Optional getLastNumber() { try (Jedis jedis = jedisPool.getWriteResource()) { return Optional.ofNullable(jedis.get(LAST_NUMBER_KEY)); @@ -77,20 +81,4 @@ public class DirectoryReconciliationCache { } } - public static class UnlockOperation { - - private final LuaScript luaScript; - - UnlockOperation(ReplicatedJedisPool jedisPool) throws IOException { - this.luaScript = LuaScript.fromResource(jedisPool, "lua/unlock.lua"); - } - - public boolean unlock(String key, String value) { - List keys = Arrays.asList(key.getBytes()); - List args = Arrays.asList(value.getBytes()); - - return ((long) luaScript.execute(keys, args)) > 0; - } - } - } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java new file mode 100644 index 000000000..b5ba7b3d8 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.storage; + +import java.util.List; +import java.util.Optional; + +public interface AccountDatabaseCrawlerListener { + void onCrawlStart(); + void onCrawlChunk(Optional fromNumber, List chunkAccounts); + void onCrawlEnd(Optional fromNumber); +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java b/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java new file mode 100644 index 000000000..8ef5cb6c6 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.storage; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dropwizard.metrics.MetricsFactory; +import io.dropwizard.metrics.ReporterFactory; +import org.whispersystems.textsecuregcm.entities.ActiveUserTally; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; +import org.whispersystems.textsecuregcm.util.SystemMapper; +import org.whispersystems.textsecuregcm.util.Util; +import redis.clients.jedis.Jedis; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ActiveUserCounter implements AccountDatabaseCrawlerListener { + + private static final String TALLY_KEY = "active_user_tally"; + + private static final String PLATFORM_IOS = "ios"; + private static final String PLATFORM_ANDROID = "android"; + + private static final String FIRST_FROM_NUMBER = "+"; + + private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"}; + + private final MetricsFactory metricsFactory; + private final ReplicatedJedisPool jedisPool; + private final ObjectMapper mapper; + + public ActiveUserCounter(MetricsFactory metricsFactory, ReplicatedJedisPool jedisPool) { + this.metricsFactory = metricsFactory; + this.jedisPool = jedisPool; + this.mapper = SystemMapper.getMapper(); + } + + public void onCrawlStart() { + try (Jedis jedis = jedisPool.getWriteResource()) { + jedis.del(TALLY_KEY); + } + } + + public void onCrawlChunk(Optional fromNumber, List chunkAccounts) { + long nowDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()); + long agoMs[] = {TimeUnit.DAYS.toMillis(nowDays - 1), + TimeUnit.DAYS.toMillis(nowDays - 7), + TimeUnit.DAYS.toMillis(nowDays - 30), + TimeUnit.DAYS.toMillis(nowDays - 90), + TimeUnit.DAYS.toMillis(nowDays - 365)}; + + Map platformIncrements = new HashMap<>(); + Map countryIncrements = new HashMap<>(); + + for (Account account : chunkAccounts) { + + Optional device = account.getMasterDevice(); + + if (device.isPresent()) { + + long lastActiveMs = device.get().getLastSeen(); + + String platform = null; + + if (device.get().getApnId() != null) { + platform = PLATFORM_IOS; + } else if (device.get().getGcmId() != null) { + platform = PLATFORM_ANDROID; + } + + if (platform != null) { + String country = Util.getCountryCode(account.getNumber()); + + long[] platformIncrement = getTallyFromMap(platformIncrements, platform); + long[] countryIncrement = getTallyFromMap(countryIncrements, country); + + for (int i = 0; i < agoMs.length; i++) { + if (lastActiveMs > agoMs[i]) { + platformIncrement[i]++; + countryIncrement[i]++; + } + } + } + } + } + + incrementTallies(fromNumber.orElse(FIRST_FROM_NUMBER), platformIncrements, countryIncrements); + + } + + public void onCrawlEnd(Optional fromNumber) { + MetricRegistry metrics = new MetricRegistry(); + long intervalTallies[] = new long[INTERVALS.length]; + ActiveUserTally activeUserTally = getFinalTallies(); + Map platforms = activeUserTally.getPlatforms(); + platforms.forEach((platform, platformTallies) -> { + for (int i = 0; i < INTERVALS.length; i++) { + final long tally = platformTallies[i]; + metrics.register(metricKey(platform, INTERVALS[i]), + new Gauge() { + @Override + public Long getValue() { return tally; } + }); + intervalTallies[i] += tally; + } + }); + + Map countries = activeUserTally.getCountries(); + countries.forEach((country, countryTallies) -> { + for (int i = 0; i < INTERVALS.length; i++) { + final long tally = countryTallies[i]; + metrics.register(metricKey(country, INTERVALS[i]), + new Gauge() { + @Override + public Long getValue() { return tally; } + }); + } + }); + + for (int i = 0; i < INTERVALS.length; i++) { + final long intervalTotal = intervalTallies[i]; + metrics.register(metricKey(INTERVALS[i]), + new Gauge() { + @Override + public Long getValue() { return intervalTotal; } + }); + } + for (ReporterFactory reporterFactory : metricsFactory.getReporters()) { + reporterFactory.build(metrics).report(); + } + } + + private long[] getTallyFromMap(Map map, String key) { + long[] tally = map.get(key); + if (tally == null) { + tally = new long[INTERVALS.length]; + map.put(key, tally); + } + return tally; + } + + private void incrementTallies(String fromNumber, Map platformIncrements, Map countryIncrements) { + try (Jedis jedis = jedisPool.getWriteResource()) { + String tallyValue = jedis.get(TALLY_KEY); + ActiveUserTally activeUserTally; + if (tallyValue == null) { + activeUserTally = new ActiveUserTally(fromNumber, platformIncrements, countryIncrements); + } else { + activeUserTally = mapper.readValue(tallyValue, ActiveUserTally.class); + if (activeUserTally.getFromNumber() != fromNumber) { + activeUserTally.setFromNumber(fromNumber); + Map platformTallies = activeUserTally.getPlatforms(); + addTallyMaps(platformTallies, platformIncrements); + Map countryTallies = activeUserTally.getCountries(); + addTallyMaps(countryTallies, countryIncrements); + } + } + jedis.set(TALLY_KEY, mapper.writeValueAsString(activeUserTally)); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void addTallyMaps(Map tallyMap, Map incrementMap) { + incrementMap.forEach((key, increments) -> { + long[] tallies = tallyMap.get(key); + if (tallies == null) { + tallyMap.put(key, increments); + } else { + for (int i = 0; i < INTERVALS.length; i++) { + tallies[i] += increments[i]; + } + } + }); + } + + private ActiveUserTally getFinalTallies() { + try (Jedis jedis = jedisPool.getReadResource()) { + return mapper.readValue(jedis.get(TALLY_KEY), ActiveUserTally.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private String metricKey(String platform, String intervalName) { + return MetricRegistry.name(ActiveUserCounter.class, intervalName + "_active_" + platform); + } + + private String metricKey(String intervalName) { + return MetricRegistry.name(ActiveUserCounter.class, intervalName + "_active"); + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java index 9d7496012..a190f3a8f 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java @@ -28,153 +28,51 @@ import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle; import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.Hex; import org.whispersystems.textsecuregcm.util.Util; import javax.ws.rs.ProcessingException; -import java.security.SecureRandom; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import static com.codahale.metrics.MetricRegistry.name; -import io.dropwizard.lifecycle.Managed; -@SuppressWarnings("OptionalUsedAsFieldOrParameterType") -public class DirectoryReconciler implements Managed, Runnable { +public class DirectoryReconciler implements AccountDatabaseCrawlerListener { private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class); private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private static final Timer readChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, "readChunk")); private static final Timer sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, "sendChunk")); private static final Meter sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, "sendChunkError")); - private static final long WORKER_TTL_MS = 120_000L; - private static final long MINIMUM_CHUNK_INTERVAL = 500L; - private static final long ACCELERATED_CHUNK_INTERVAL = 10L; - private static final double JITTER_MAX = 0.20; - - private final Accounts accounts; private final DirectoryManager directoryManager; private final DirectoryReconciliationClient reconciliationClient; - private final DirectoryReconciliationCache reconciliationCache; - private final int chunkSize; - private final long chunkIntervalMs; - private final String workerId; - private boolean running; - private boolean finished; - - public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient, - DirectoryReconciliationCache reconciliationCache, - DirectoryManager directoryManager, - Accounts accounts, - int chunkSize, - long chunkIntervalMs) - { - this.accounts = accounts; + public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient, DirectoryManager directoryManager) { this.directoryManager = directoryManager; this.reconciliationClient = reconciliationClient; - this.reconciliationCache = reconciliationCache; - this.chunkSize = chunkSize; - this.chunkIntervalMs = chunkIntervalMs; - this.workerId = Hex.toString(Util.generateSecretBytes(16)); } - @Override - public synchronized void start() { - running = true; - new Thread(this).start(); + public void onCrawlStart() { } + + public void onCrawlEnd(Optional fromNumber) { + + DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromNumber.orElse(null), null, Collections.emptyList()); + DirectoryReconciliationResponse response = sendChunk(request); + } - @Override - public synchronized void stop() { - running = false; - notifyAll(); - while (!finished) { - Util.wait(this); - } - } - - @Override - public void run() { - long delayMs = chunkIntervalMs; - - while (sleepWhileRunning(getDelayWithJitter(delayMs))) { - try { - delayMs = getBoundedChunkInterval(chunkIntervalMs); - delayMs = doPeriodicWork(delayMs); - } catch (Throwable t) { - logger.warn("error in directory reconciliation: ", t); - } - } - - synchronized (this) { - finished = true; - notifyAll(); - } - } - - @VisibleForTesting - public long doPeriodicWork(long intervalMs) { - long nextIntervalTimeMs = System.currentTimeMillis() + intervalMs; - - if (reconciliationCache.claimActiveWork(workerId, WORKER_TTL_MS)) { - if (processChunk()) { - if (!reconciliationCache.isAccelerated()) { - long timeUntilNextIntervalMs = getTimeUntilNextInterval(nextIntervalTimeMs); - reconciliationCache.claimActiveWork(workerId, timeUntilNextIntervalMs); - return timeUntilNextIntervalMs; - } else { - return ACCELERATED_CHUNK_INTERVAL; - } - } - } - return intervalMs; - } - - private boolean processChunk() { - Optional fromNumber = reconciliationCache.getLastNumber(); - List chunkAccounts = readChunk(fromNumber, chunkSize); + public void onCrawlChunk(Optional fromNumber, List chunkAccounts) { updateDirectoryCache(chunkAccounts); - DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts); - DirectoryReconciliationResponse sendChunkResponse = sendChunk(request); + DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts); + DirectoryReconciliationResponse response = sendChunk(request); - if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.MISSING || request.getToNumber() == null) { - reconciliationCache.clearAccelerate(); - } - - if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.OK) { - reconciliationCache.setLastNumber(Optional.ofNullable(request.getToNumber())); - } else if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.MISSING) { - reconciliationCache.setLastNumber(Optional.empty()); - } - - return sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.OK; - } - - private List readChunk(Optional fromNumber, int chunkSize) { - try (Timer.Context timer = readChunkTimer.time()) { - Optional> chunkAccounts; - - if (fromNumber.isPresent()) { - chunkAccounts = Optional.ofNullable(accounts.getAllFrom(fromNumber.get(), chunkSize)); - } else { - chunkAccounts = Optional.ofNullable(accounts.getAllFrom(chunkSize)); - } - - return chunkAccounts.orElse(Collections.emptyList()); - } } private void updateDirectoryCache(List accounts) { - if (accounts.isEmpty()) { - return; - } BatchOperationHandle batchOperation = directoryManager.startBatchOperation(); @@ -183,7 +81,6 @@ public class DirectoryReconciler implements Managed, Runnable { if (account.isActive()) { byte[] token = Util.getContactToken(account.getNumber()); ClientContact clientContact = new ClientContact(token, null, true, true); - directoryManager.add(batchOperation, clientContact); } else { directoryManager.remove(batchOperation, account.getNumber()); @@ -200,9 +97,10 @@ public class DirectoryReconciler implements Managed, Runnable { .map(Account::getNumber) .collect(Collectors.toList()); - Optional toNumber = Optional.empty(); + Optional toNumber = Optional.empty(); + if (!accounts.isEmpty()) { - toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber()); + toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber()); } return new DirectoryReconciliationRequest(fromNumber.orElse(null), toNumber.orElse(null), numbers); @@ -223,29 +121,4 @@ public class DirectoryReconciler implements Managed, Runnable { } } - private synchronized boolean sleepWhileRunning(long delayMs) { - long startTimeMs = System.currentTimeMillis(); - while (running && delayMs > 0) { - Util.wait(this, delayMs); - - long nowMs = System.currentTimeMillis(); - delayMs -= Math.abs(nowMs - startTimeMs); - } - return running; - } - - private long getTimeUntilNextInterval(long nextIntervalTimeMs) { - long nextIntervalMs = nextIntervalTimeMs - System.currentTimeMillis(); - return getBoundedChunkInterval(nextIntervalMs); - } - - private long getBoundedChunkInterval(long intervalMs) { - return Math.max(Math.min(intervalMs, chunkIntervalMs), MINIMUM_CHUNK_INTERVAL); - } - - private long getDelayWithJitter(long delayMs) { - long randomJitterMs = (long) (new SecureRandom().nextDouble() * JITTER_MAX * delayMs); - return delayMs + randomJitterMs; - } - } diff --git a/src/main/resources/lua/unlock.lua b/src/main/resources/lua/account_database_crawler/unlock.lua similarity index 100% rename from src/main/resources/lua/unlock.lua rename to src/main/resources/lua/account_database_crawler/unlock.lua diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java new file mode 100644 index 000000000..be74a0b7e --- /dev/null +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java @@ -0,0 +1,174 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.whispersystems.textsecuregcm.tests.storage; + +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Accounts; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +public class AccountDatabaseCrawlerTest { + + private static final String ACCOUNT1 = "+1"; + private static final String ACCOUNT2 = "+2"; + + private static final int CHUNK_SIZE = 1000; + private static final long CHUNK_INTERVAL_MS = 30_000L; + + private final Account account1 = mock(Account.class); + private final Account account2 = mock(Account.class); + + private final Accounts accounts = mock(Accounts.class); + private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class); + private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class); + + private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS); + + @Before + public void setup() { + when(account1.getNumber()).thenReturn(ACCOUNT1); + when(account2.getNumber()).thenReturn(ACCOUNT2); + + when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account1, account2)); + when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(Arrays.asList(account2)); + when(accounts.getAllFrom(eq(ACCOUNT2), anyInt())).thenReturn(Collections.emptyList()); + + when(cache.claimActiveWork(any(), anyLong())).thenReturn(true); + when(cache.isAccelerated()).thenReturn(false); + } + + @Test + public void testCrawlStart() { + when(cache.getLastNumber()).thenReturn(Optional.empty()); + + boolean accelerated = crawler.doPeriodicWork(); + assertThat(accelerated).isFalse(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(1)).getLastNumber(); + verify(listener, times(1)).onCrawlStart(); + verify(accounts, times(1)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(0)).getAllFrom(any(String.class), eq(CHUNK_SIZE)); + verify(account1, times(0)).getNumber(); + verify(account2, times(1)).getNumber(); + verify(listener, times(1)).onCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2))); + verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2))); + verify(cache, times(1)).isAccelerated(); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyNoMoreInteractions(account1); + verifyNoMoreInteractions(account2); + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + + @Test + public void testCrawlChunk() { + when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1)); + + boolean accelerated = crawler.doPeriodicWork(); + assertThat(accelerated).isFalse(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(1)).getLastNumber(); + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); + verify(account2, times(1)).getNumber(); + verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2))); + verify(cache, times(1)).isAccelerated(); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyZeroInteractions(account1); + + verifyNoMoreInteractions(account2); + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + + @Test + public void testCrawlChunkAccelerated() { + when(cache.isAccelerated()).thenReturn(true); + when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1)); + + boolean accelerated = crawler.doPeriodicWork(); + assertThat(accelerated).isTrue(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(1)).getLastNumber(); + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); + verify(account2, times(1)).getNumber(); + verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2))); + verify(cache, times(1)).isAccelerated(); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyZeroInteractions(account1); + + verifyNoMoreInteractions(account2); + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + + @Test + public void testCrawlEnd() { + when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT2)); + + boolean accelerated = crawler.doPeriodicWork(); + assertThat(accelerated).isFalse(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(1)).getLastNumber(); + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT2), eq(CHUNK_SIZE)); + verify(account1, times(0)).getNumber(); + verify(account2, times(0)).getNumber(); + verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2))); + verify(cache, times(1)).setLastNumber(eq(Optional.empty())); + verify(cache, times(1)).clearAccelerate(); + verify(cache, times(1)).isAccelerated(); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyZeroInteractions(account1); + verifyZeroInteractions(account2); + + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + +} diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java new file mode 100644 index 000000000..0178a029f --- /dev/null +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java @@ -0,0 +1,226 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.whispersystems.textsecuregcm.tests.storage; + +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.ActiveUserCounter; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.util.Util; + +import com.google.common.collect.ImmutableList; +import io.dropwizard.metrics.MetricsFactory; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.Optional; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class ActiveUserCounterTest { + + private final String NUMBER_IOS = "+15551234567"; + private final String NUMBER_ANDROID = "+5511987654321"; + private final String NUMBER_NODEVICE = "+5215551234567"; + + private final String TALLY_KEY = "active_user_tally"; + + private final Device iosDevice = mock(Device.class); + private final Device androidDevice = mock(Device.class); + + private final Account androidAccount = mock(Account.class); + private final Account iosAccount = mock(Account.class); + private final Account noDeviceAccount = mock(Account.class); + + private final Jedis jedis = mock(Jedis.class); + private final ReplicatedJedisPool jedisPool = mock(ReplicatedJedisPool.class); + private final MetricsFactory metricsFactory = mock(MetricsFactory.class); + + private final ActiveUserCounter activeUserCounter = new ActiveUserCounter(metricsFactory, jedisPool); + + @Before + public void setup() { + + long halfDayAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(12); + long fortyFiveDayAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(45); + + when(androidDevice.getApnId()).thenReturn(null); + when(androidDevice.getGcmId()).thenReturn("mock-gcm-id"); + when(androidDevice.getLastSeen()).thenReturn(fortyFiveDayAgo); + + when(iosDevice.getApnId()).thenReturn("mock-apn-id"); + when(iosDevice.getGcmId()).thenReturn(null); + when(iosDevice.getLastSeen()).thenReturn(halfDayAgo); + + when(iosAccount.getNumber()).thenReturn(NUMBER_IOS); + when(iosAccount.getMasterDevice()).thenReturn(Optional.of(iosDevice)); + + when(androidAccount.getNumber()).thenReturn(NUMBER_ANDROID); + when(androidAccount.getMasterDevice()).thenReturn(Optional.of(androidDevice)); + + when(noDeviceAccount.getNumber()).thenReturn(NUMBER_NODEVICE); + when(noDeviceAccount.getMasterDevice()).thenReturn(Optional.ofNullable(null)); + + when(jedis.get(any(String.class))).thenReturn("{\"fromNumber\":\"+\",\"platforms\":{},\"countries\":{}}"); + when(jedisPool.getWriteResource()).thenReturn(jedis); + when(jedisPool.getReadResource()).thenReturn(jedis); + when(metricsFactory.getReporters()).thenReturn(ImmutableList.of()); + + } + + @Test + public void testCrawlStart() { + activeUserCounter.onCrawlStart(); + + verify(jedisPool, times(1)).getWriteResource(); + verify(jedis, times(1)).del(any(String.class)); + verify(jedis, times(1)).close(); + + verifyZeroInteractions(iosDevice); + verifyZeroInteractions(iosAccount); + verifyZeroInteractions(androidDevice); + verifyZeroInteractions(androidAccount); + verifyZeroInteractions(noDeviceAccount); + verifyZeroInteractions(metricsFactory); + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(jedisPool); + } + + @Test + public void testCrawlEnd() { + activeUserCounter.onCrawlEnd(Optional.empty()); + + verify(jedisPool, times(1)).getReadResource(); + verify(jedis, times(1)).get(any(String.class)); + verify(jedis, times(1)).close(); + + verify(metricsFactory, times(1)).getReporters(); + + verifyZeroInteractions(iosDevice); + verifyZeroInteractions(iosAccount); + verifyZeroInteractions(androidDevice); + verifyZeroInteractions(androidAccount); + verifyZeroInteractions(noDeviceAccount); + + verifyNoMoreInteractions(metricsFactory); + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(jedisPool); + + } + + @Test + public void testCrawlChunkValidAccount() { + activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount)); + + verify(iosAccount, times(1)).getMasterDevice(); + verify(iosAccount, times(1)).getNumber(); + + verify(iosDevice, times(1)).getLastSeen(); + verify(iosDevice, times(1)).getApnId(); + verify(iosDevice, times(0)).getGcmId(); + + verify(jedisPool, times(1)).getWriteResource(); + verify(jedis, times(1)).get(any(String.class)); + verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_IOS+"\",\"platforms\":{\"ios\":[1,1,1,1,1]},\"countries\":{\"1\":[1,1,1,1,1]}}")); + verify(jedis, times(1)).close(); + + verify(metricsFactory, times(0)).getReporters(); + + verifyZeroInteractions(androidDevice); + verifyZeroInteractions(androidAccount); + verifyZeroInteractions(noDeviceAccount); + verifyZeroInteractions(metricsFactory); + + verifyNoMoreInteractions(iosDevice); + verifyNoMoreInteractions(iosAccount); + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(jedisPool); + } + + @Test + public void testCrawlChunkNoDeviceAccount() { + activeUserCounter.onCrawlChunk(Optional.of(NUMBER_NODEVICE), Arrays.asList(noDeviceAccount)); + + verify(noDeviceAccount, times(1)).getMasterDevice(); + + verify(jedisPool, times(1)).getWriteResource(); + verify(jedis, times(1)).get(eq(TALLY_KEY)); + verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_NODEVICE+"\",\"platforms\":{},\"countries\":{}}")); + verify(jedis, times(1)).close(); + + verify(metricsFactory, times(0)).getReporters(); + + verifyZeroInteractions(iosDevice); + verifyZeroInteractions(iosAccount); + verifyZeroInteractions(androidDevice); + verifyZeroInteractions(androidAccount); + verifyZeroInteractions(noDeviceAccount); + verifyZeroInteractions(metricsFactory); + + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(jedisPool); + } + + @Test + public void testCrawlChunkMixedAccount() { + activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount, androidAccount, noDeviceAccount)); + + verify(iosAccount, times(1)).getMasterDevice(); + verify(iosAccount, times(1)).getNumber(); + verify(androidAccount, times(1)).getMasterDevice(); + verify(androidAccount, times(1)).getNumber(); + verify(noDeviceAccount, times(1)).getMasterDevice(); + + verify(iosDevice, times(1)).getLastSeen(); + verify(iosDevice, times(1)).getApnId(); + verify(iosDevice, times(0)).getGcmId(); + + verify(androidDevice, times(1)).getLastSeen(); + verify(androidDevice, times(1)).getApnId(); + verify(androidDevice, times(1)).getGcmId(); + + verify(jedisPool, times(1)).getWriteResource(); + verify(jedis, times(1)).get(eq(TALLY_KEY)); + verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_IOS+"\",\"platforms\":{\"android\":[0,0,0,1,1],\"ios\":[1,1,1,1,1]},\"countries\":{\"55\":[0,0,0,1,1],\"1\":[1,1,1,1,1]}}")); + verify(jedis, times(1)).close(); + + verify(metricsFactory, times(0)).getReporters(); + + verifyZeroInteractions(metricsFactory); + + verifyNoMoreInteractions(iosDevice); + verifyNoMoreInteractions(iosAccount); + verifyNoMoreInteractions(androidDevice); + verifyNoMoreInteractions(androidAccount); + verifyNoMoreInteractions(noDeviceAccount); + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(jedisPool); + } + +} diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java index 3120c59fb..b7acac7b5 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java @@ -1,180 +1,82 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + package org.whispersystems.textsecuregcm.tests.storage; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.whispersystems.textsecuregcm.entities.ClientContact; import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.Accounts; -import org.whispersystems.textsecuregcm.storage.DirectoryManager; import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle; +import org.whispersystems.textsecuregcm.storage.DirectoryManager; import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; -import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationCache; import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient; import org.whispersystems.textsecuregcm.util.Util; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + import java.util.Arrays; -import java.util.Collections; import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; public class DirectoryReconcilerTest { - private static final String VALID_NUMBER = "valid"; private static final String INACTIVE_NUMBER = "inactive"; - private static final long INTERVAL_MS = 30_000L; - - private final Account account = mock(Account.class); + private final Account activeAccount = mock(Account.class); private final Account inactiveAccount = mock(Account.class); - private final Accounts accounts = mock(Accounts.class); private final BatchOperationHandle batchOperationHandle = mock(BatchOperationHandle.class); private final DirectoryManager directoryManager = mock(DirectoryManager.class); private final DirectoryReconciliationClient reconciliationClient = mock(DirectoryReconciliationClient.class); - private final DirectoryReconciliationCache reconciliationCache = mock(DirectoryReconciliationCache.class); - private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, reconciliationCache, directoryManager, accounts, 1000, INTERVAL_MS); + private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, directoryManager); - private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK); - private final DirectoryReconciliationResponse notFoundResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.MISSING); + private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK); + private final DirectoryReconciliationResponse missingResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.MISSING); @Before public void setup() { - when(account.getNumber()).thenReturn(VALID_NUMBER); - when(account.isActive()).thenReturn(true); + when(activeAccount.getNumber()).thenReturn(VALID_NUMBER); + when(activeAccount.isActive()).thenReturn(true); when(inactiveAccount.getNumber()).thenReturn(INACTIVE_NUMBER); when(inactiveAccount.isActive()).thenReturn(false); - when(directoryManager.startBatchOperation()).thenReturn(batchOperationHandle); + } - when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account, inactiveAccount)); - when(accounts.getAllFrom(eq(VALID_NUMBER), anyInt())).thenReturn(Arrays.asList(inactiveAccount)); - when(accounts.getAllFrom(eq(INACTIVE_NUMBER), anyInt())).thenReturn(Collections.emptyList()); - + @Test + public void testCrawlChunkValid() { when(reconciliationClient.sendChunk(any())).thenReturn(successResponse); + directoryReconciler.onCrawlChunk(Optional.of(VALID_NUMBER), Arrays.asList(activeAccount, inactiveAccount)); - when(reconciliationCache.getLastNumber()).thenReturn(Optional.empty()); - when(reconciliationCache.claimActiveWork(any(), anyLong())).thenReturn(true); - when(reconciliationCache.isAccelerated()).thenReturn(false); - } - - @Test - public void testValid() { - long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); - - assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); - - verify(accounts, times(1)).getAllFrom(anyInt()); - - ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); - verify(reconciliationClient, times(1)).sendChunk(request.capture()); - - assertThat(request.getValue().getFromNumber()).isNull(); - assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER); - assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER)); - - ArgumentCaptor addedContact = ArgumentCaptor.forClass(ClientContact.class); - verify(directoryManager, times(1)).startBatchOperation(); - verify(directoryManager, times(1)).add(eq(batchOperationHandle), addedContact.capture()); - verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER)); - verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle)); - - assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER)); - - verify(reconciliationCache, times(1)).getLastNumber(); - verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.of(INACTIVE_NUMBER))); - verify(reconciliationCache, times(1)).isAccelerated(); - verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong()); - - verifyNoMoreInteractions(accounts); - verifyNoMoreInteractions(directoryManager); - verifyNoMoreInteractions(reconciliationClient); - verifyNoMoreInteractions(reconciliationCache); - } - - @Test - public void testInProgress() { - when(reconciliationCache.getLastNumber()).thenReturn(Optional.of(VALID_NUMBER)); - - long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); - - assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); - - verify(accounts, times(1)).getAllFrom(eq(VALID_NUMBER), anyInt()); + verify(activeAccount, times(2)).getNumber(); + verify(activeAccount, times(2)).isActive(); + verify(inactiveAccount, times(2)).getNumber(); + verify(inactiveAccount, times(2)).isActive(); ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); verify(reconciliationClient, times(1)).sendChunk(request.capture()); assertThat(request.getValue().getFromNumber()).isEqualTo(VALID_NUMBER); assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER); - assertThat(request.getValue().getNumbers()).isEqualTo(Collections.emptyList()); - - verify(directoryManager, times(1)).startBatchOperation(); - verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER)); - verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle)); - - verify(reconciliationCache, times(1)).getLastNumber(); - verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.of(INACTIVE_NUMBER))); - verify(reconciliationCache, times(1)).isAccelerated(); - verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong()); - - verifyNoMoreInteractions(accounts); - verifyNoMoreInteractions(directoryManager); - verifyNoMoreInteractions(reconciliationClient); - verifyNoMoreInteractions(reconciliationCache); - } - - @Test - public void testLastChunk() { - when(reconciliationCache.getLastNumber()).thenReturn(Optional.of(INACTIVE_NUMBER)); - - long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); - - assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); - - verify(accounts, times(1)).getAllFrom(eq(INACTIVE_NUMBER), anyInt()); - - ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); - verify(reconciliationClient, times(1)).sendChunk(request.capture()); - - assertThat(request.getValue().getFromNumber()).isEqualTo(INACTIVE_NUMBER); - assertThat(request.getValue().getToNumber()).isNull(); - assertThat(request.getValue().getNumbers()).isEqualTo(Collections.emptyList()); - - verify(reconciliationCache, times(1)).getLastNumber(); - verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.empty())); - verify(reconciliationCache, times(1)).clearAccelerate(); - verify(reconciliationCache, times(1)).isAccelerated(); - verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong()); - - verifyNoMoreInteractions(accounts); - verifyNoMoreInteractions(directoryManager); - verifyNoMoreInteractions(reconciliationClient); - verifyNoMoreInteractions(reconciliationCache); - } - - @Test - public void testNotFound() { - when(reconciliationClient.sendChunk(any())).thenReturn(notFoundResponse); - - long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); - - assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); - - verify(accounts, times(1)).getAllFrom(anyInt()); - - ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); - verify(reconciliationClient, times(1)).sendChunk(request.capture()); - - assertThat(request.getValue().getFromNumber()).isNull(); - assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER); assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER)); ArgumentCaptor addedContact = ArgumentCaptor.forClass(ClientContact.class); @@ -185,15 +87,11 @@ public class DirectoryReconcilerTest { assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER)); - verify(reconciliationCache, times(1)).getLastNumber(); - verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.empty())); - verify(reconciliationCache, times(1)).clearAccelerate(); - verify(reconciliationCache, times(1)).claimActiveWork(any(), anyLong()); - - verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(activeAccount); + verifyNoMoreInteractions(inactiveAccount); + verifyNoMoreInteractions(batchOperationHandle); verifyNoMoreInteractions(directoryManager); verifyNoMoreInteractions(reconciliationClient); - verifyNoMoreInteractions(reconciliationCache); } }