From 4b8608906a31e23e9a1f4f040944d2dee5c11cee Mon Sep 17 00:00:00 2001 From: Brian Acton Date: Thu, 20 Dec 2018 21:20:06 -0800 Subject: [PATCH] tally active users by time interval by platform and by country 1) refactor Directory Reconciler and pull out AccountDatabaseCrawler class 2) implement ActiveUserCounter to tally daily, weekly, monthly, etc active use 3) rework and simplify the crawl and sleep logic 4) move chunk interval and chunk size configuration options out of directory section and into accountDatabaseCrawler section --- .../WhisperServerConfiguration.java | 9 + .../textsecuregcm/WhisperServerService.java | 15 +- .../AccountDatabaseCrawlerConfiguration.java | 36 +++ .../DirectoryServerConfiguration.java | 13 - .../entities/ActiveUserTally.java | 57 +++++ .../storage/AccountDatabaseCrawler.java | 160 +++++++++++++ ....java => AccountDatabaseCrawlerCache.java} | 40 ++-- .../AccountDatabaseCrawlerListener.java | 26 ++ .../storage/ActiveUserCounter.java | 218 +++++++++++++++++ .../storage/DirectoryReconciler.java | 157 ++---------- .../{ => account_database_crawler}/unlock.lua | 0 .../storage/AccountDatabaseCrawlerTest.java | 174 ++++++++++++++ .../tests/storage/ActiveUserCounterTest.java | 226 ++++++++++++++++++ .../storage/DirectoryReconcilerTest.java | 180 +++----------- 14 files changed, 984 insertions(+), 327 deletions(-) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/AccountDatabaseCrawlerConfiguration.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/entities/ActiveUserTally.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java rename src/main/java/org/whispersystems/textsecuregcm/storage/{DirectoryReconciliationCache.java => AccountDatabaseCrawlerCache.java} (65%) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java rename src/main/resources/lua/{ => account_database_crawler}/unlock.lua (100%) create mode 100644 src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java create mode 100644 src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 5d97983c7..762359616 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -63,6 +63,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private DirectoryConfiguration directory; + @NotNull + @Valid + @JsonProperty + private AccountDatabaseCrawlerConfiguration accountDatabaseCrawler; + @NotNull @Valid @JsonProperty @@ -176,6 +181,10 @@ public class WhisperServerConfiguration extends Configuration { return directory; } + public AccountDatabaseCrawlerConfiguration getAccountDatabaseCrawlerConfiguration() { + return accountDatabaseCrawler; + } + public MessageCacheConfiguration getMessageCacheConfiguration() { return messageCache; } diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index f425fe869..114ccbd40 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -83,7 +83,9 @@ import javax.servlet.DispatcherType; import javax.servlet.FilterRegistration; import javax.servlet.ServletRegistration; import java.security.Security; +import java.util.Arrays; import java.util.EnumSet; +import java.util.List; import java.util.Optional; import static com.codahale.metrics.MetricRegistry.name; @@ -195,11 +197,14 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = Arrays.asList(activeUserCounter, directoryReconciler); + + AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheClient); + AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accounts, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()); messagesCache.setPubSubManager(pubSubManager, pushSender); @@ -208,7 +213,7 @@ public class WhisperServerService extends Application. + */ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class AccountDatabaseCrawlerConfiguration { + + @JsonProperty + private int chunkSize = 1000; + + @JsonProperty + private long chunkIntervalMs = 8000L; + + public int getChunkSize() { + return chunkSize; + } + + public long getChunkIntervalMs() { + return chunkIntervalMs; + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java index d424143fd..313dfa583 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/DirectoryServerConfiguration.java @@ -33,12 +33,6 @@ public class DirectoryServerConfiguration { @JsonProperty private String replicationCaCertificate; - @JsonProperty - private int reconciliationChunkSize = 1000; - - @JsonProperty - private long reconciliationChunkIntervalMs = 8000L; - public String getReplicationUrl() { return replicationUrl; } @@ -51,11 +45,4 @@ public class DirectoryServerConfiguration { return replicationCaCertificate; } - public int getReconciliationChunkSize() { - return reconciliationChunkSize; - } - - public long getReconciliationChunkIntervalMs() { - return reconciliationChunkIntervalMs; - } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/ActiveUserTally.java b/src/main/java/org/whispersystems/textsecuregcm/entities/ActiveUserTally.java new file mode 100644 index 000000000..5976db2c4 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/entities/ActiveUserTally.java @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import java.util.HashMap; + +public class ActiveUserTally { + @JsonProperty + private String fromNumber; + + @JsonProperty + private Map platforms; + + @JsonProperty + private Map countries; + + public ActiveUserTally() {} + + public ActiveUserTally(String fromNumber, Map platforms, Map countries) { + this.fromNumber = fromNumber; + this.platforms = platforms; + this.countries = countries; + } + + public String getFromNumber() { + return this.fromNumber; + } + + public Map getPlatforms() { + return this.platforms; + } + + public Map getCountries() { + return this.countries; + } + + public void setFromNumber(String fromNumber) { + this.fromNumber = fromNumber; + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java new file mode 100644 index 000000000..f9bbc00c6 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.storage; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.util.Constants; +import org.whispersystems.textsecuregcm.util.Util; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.codahale.metrics.MetricRegistry.name; +import io.dropwizard.lifecycle.Managed; + +public class AccountDatabaseCrawler implements Managed, Runnable { + + private static final Logger logger = LoggerFactory.getLogger(AccountDatabaseCrawler.class); + private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private static final Timer readChunkTimer = metricRegistry.timer(name(AccountDatabaseCrawler.class, "readChunk")); + + private static final long WORKER_TTL_MS = 120_000L; + private static final long ACCELERATED_CHUNK_INTERVAL = 10L; + + private final Accounts accounts; + private final int chunkSize; + private final long chunkIntervalMs; + private final String workerId; + private final AccountDatabaseCrawlerCache cache; + private final List listeners; + + private AtomicBoolean running = new AtomicBoolean(false); + private boolean finished; + + public AccountDatabaseCrawler(Accounts accounts, + AccountDatabaseCrawlerCache cache, + List listeners, + int chunkSize, + long chunkIntervalMs) + { + this.accounts = accounts; + this.chunkSize = chunkSize; + this.chunkIntervalMs = chunkIntervalMs; + this.workerId = UUID.randomUUID().toString(); + this.cache = cache; + this.listeners = listeners; + } + + @Override + public synchronized void start() { + running.set(true); + new Thread(this).start(); + } + + @Override + public synchronized void stop() { + running.set(false); + notifyAll(); + while (!finished) { + Util.wait(this); + } + } + + @Override + public void run() { + boolean accelerated = false; + + while (running.get()) { + try { + accelerated = doPeriodicWork(); + sleepWhileRunning(accelerated ? ACCELERATED_CHUNK_INTERVAL : chunkIntervalMs); + } catch (Throwable t) { + logger.warn("error in database crawl: ", t); + } + } + + synchronized (this) { + finished = true; + notifyAll(); + } + } + + @VisibleForTesting + public boolean doPeriodicWork() { + if (cache.claimActiveWork(workerId, WORKER_TTL_MS)) { + try { + long startTimeMs = System.currentTimeMillis(); + processChunk(); + if (cache.isAccelerated()) { + return true; + } + long endTimeMs = System.currentTimeMillis(); + long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs); + if (sleepIntervalMs > 0) sleepWhileRunning(sleepIntervalMs); + } finally { + cache.releaseActiveWork(workerId); + } + } + return false; + } + + private void processChunk() { + Optional fromNumber = cache.getLastNumber(); + + if (!fromNumber.isPresent()) { + listeners.forEach(listener -> { listener.onCrawlStart(); }); + } + + List chunkAccounts = readChunk(fromNumber, chunkSize); + + if (chunkAccounts.isEmpty()) { + listeners.forEach(listener -> { listener.onCrawlEnd(fromNumber); }); + cache.setLastNumber(Optional.empty()); + cache.clearAccelerate(); + } else { + listeners.forEach(listener -> { listener.onCrawlChunk(fromNumber, chunkAccounts); }); + cache.setLastNumber(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getNumber())); + } + + } + + private List readChunk(Optional fromNumber, int chunkSize) { + try (Timer.Context timer = readChunkTimer.time()) { + List chunkAccounts; + + if (fromNumber.isPresent()) { + chunkAccounts = accounts.getAllFrom(fromNumber.get(), chunkSize); + } else { + chunkAccounts = accounts.getAllFrom(chunkSize); + } + + return chunkAccounts; + } + } + + private synchronized void sleepWhileRunning(long delayMs) { + if (running.get()) Util.wait(this, delayMs); + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java similarity index 65% rename from src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java rename to src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java index 118b2ea60..73e977c16 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerCache.java @@ -25,21 +25,20 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -@SuppressWarnings("OptionalUsedAsFieldOrParameterType") -public class DirectoryReconciliationCache { +public class AccountDatabaseCrawlerCache { - private static final String ACTIVE_WORKER_KEY = "directory_reconciliation_active_worker"; - private static final String LAST_NUMBER_KEY = "directory_reconciliation_last_number"; - private static final String ACCELERATE_KEY = "directory_reconciliation_accelerate"; + private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker"; + private static final String LAST_NUMBER_KEY = "account_database_crawler_cache_last_number"; + private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate"; private static final long LAST_NUMBER_TTL_MS = 86400_000L; private final ReplicatedJedisPool jedisPool; - private final UnlockOperation unlockOperation; + private final LuaScript luaScript; - public DirectoryReconciliationCache(ReplicatedJedisPool jedisPool) throws IOException { - this.jedisPool = jedisPool; - this.unlockOperation = new UnlockOperation(jedisPool); + public AccountDatabaseCrawlerCache(ReplicatedJedisPool jedisPool) throws IOException { + this.jedisPool = jedisPool; + this.luaScript = LuaScript.fromResource(jedisPool, "lua/account_database_crawler/unlock.lua"); } public void clearAccelerate() { @@ -55,12 +54,17 @@ public class DirectoryReconciliationCache { } public boolean claimActiveWork(String workerId, long ttlMs) { - unlockOperation.unlock(ACTIVE_WORKER_KEY, workerId); try (Jedis jedis = jedisPool.getWriteResource()) { return "OK".equals(jedis.set(ACTIVE_WORKER_KEY, workerId, "NX", "PX", ttlMs)); } } + public void releaseActiveWork(String workerId) { + List keys = Arrays.asList(ACTIVE_WORKER_KEY.getBytes()); + List args = Arrays.asList(workerId.getBytes()); + luaScript.execute(keys, args); + } + public Optional getLastNumber() { try (Jedis jedis = jedisPool.getWriteResource()) { return Optional.ofNullable(jedis.get(LAST_NUMBER_KEY)); @@ -77,20 +81,4 @@ public class DirectoryReconciliationCache { } } - public static class UnlockOperation { - - private final LuaScript luaScript; - - UnlockOperation(ReplicatedJedisPool jedisPool) throws IOException { - this.luaScript = LuaScript.fromResource(jedisPool, "lua/unlock.lua"); - } - - public boolean unlock(String key, String value) { - List keys = Arrays.asList(key.getBytes()); - List args = Arrays.asList(value.getBytes()); - - return ((long) luaScript.execute(keys, args)) > 0; - } - } - } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java new file mode 100644 index 000000000..b5ba7b3d8 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.storage; + +import java.util.List; +import java.util.Optional; + +public interface AccountDatabaseCrawlerListener { + void onCrawlStart(); + void onCrawlChunk(Optional fromNumber, List chunkAccounts); + void onCrawlEnd(Optional fromNumber); +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java b/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java new file mode 100644 index 000000000..8ef5cb6c6 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/ActiveUserCounter.java @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.storage; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dropwizard.metrics.MetricsFactory; +import io.dropwizard.metrics.ReporterFactory; +import org.whispersystems.textsecuregcm.entities.ActiveUserTally; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; +import org.whispersystems.textsecuregcm.util.SystemMapper; +import org.whispersystems.textsecuregcm.util.Util; +import redis.clients.jedis.Jedis; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ActiveUserCounter implements AccountDatabaseCrawlerListener { + + private static final String TALLY_KEY = "active_user_tally"; + + private static final String PLATFORM_IOS = "ios"; + private static final String PLATFORM_ANDROID = "android"; + + private static final String FIRST_FROM_NUMBER = "+"; + + private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"}; + + private final MetricsFactory metricsFactory; + private final ReplicatedJedisPool jedisPool; + private final ObjectMapper mapper; + + public ActiveUserCounter(MetricsFactory metricsFactory, ReplicatedJedisPool jedisPool) { + this.metricsFactory = metricsFactory; + this.jedisPool = jedisPool; + this.mapper = SystemMapper.getMapper(); + } + + public void onCrawlStart() { + try (Jedis jedis = jedisPool.getWriteResource()) { + jedis.del(TALLY_KEY); + } + } + + public void onCrawlChunk(Optional fromNumber, List chunkAccounts) { + long nowDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()); + long agoMs[] = {TimeUnit.DAYS.toMillis(nowDays - 1), + TimeUnit.DAYS.toMillis(nowDays - 7), + TimeUnit.DAYS.toMillis(nowDays - 30), + TimeUnit.DAYS.toMillis(nowDays - 90), + TimeUnit.DAYS.toMillis(nowDays - 365)}; + + Map platformIncrements = new HashMap<>(); + Map countryIncrements = new HashMap<>(); + + for (Account account : chunkAccounts) { + + Optional device = account.getMasterDevice(); + + if (device.isPresent()) { + + long lastActiveMs = device.get().getLastSeen(); + + String platform = null; + + if (device.get().getApnId() != null) { + platform = PLATFORM_IOS; + } else if (device.get().getGcmId() != null) { + platform = PLATFORM_ANDROID; + } + + if (platform != null) { + String country = Util.getCountryCode(account.getNumber()); + + long[] platformIncrement = getTallyFromMap(platformIncrements, platform); + long[] countryIncrement = getTallyFromMap(countryIncrements, country); + + for (int i = 0; i < agoMs.length; i++) { + if (lastActiveMs > agoMs[i]) { + platformIncrement[i]++; + countryIncrement[i]++; + } + } + } + } + } + + incrementTallies(fromNumber.orElse(FIRST_FROM_NUMBER), platformIncrements, countryIncrements); + + } + + public void onCrawlEnd(Optional fromNumber) { + MetricRegistry metrics = new MetricRegistry(); + long intervalTallies[] = new long[INTERVALS.length]; + ActiveUserTally activeUserTally = getFinalTallies(); + Map platforms = activeUserTally.getPlatforms(); + platforms.forEach((platform, platformTallies) -> { + for (int i = 0; i < INTERVALS.length; i++) { + final long tally = platformTallies[i]; + metrics.register(metricKey(platform, INTERVALS[i]), + new Gauge() { + @Override + public Long getValue() { return tally; } + }); + intervalTallies[i] += tally; + } + }); + + Map countries = activeUserTally.getCountries(); + countries.forEach((country, countryTallies) -> { + for (int i = 0; i < INTERVALS.length; i++) { + final long tally = countryTallies[i]; + metrics.register(metricKey(country, INTERVALS[i]), + new Gauge() { + @Override + public Long getValue() { return tally; } + }); + } + }); + + for (int i = 0; i < INTERVALS.length; i++) { + final long intervalTotal = intervalTallies[i]; + metrics.register(metricKey(INTERVALS[i]), + new Gauge() { + @Override + public Long getValue() { return intervalTotal; } + }); + } + for (ReporterFactory reporterFactory : metricsFactory.getReporters()) { + reporterFactory.build(metrics).report(); + } + } + + private long[] getTallyFromMap(Map map, String key) { + long[] tally = map.get(key); + if (tally == null) { + tally = new long[INTERVALS.length]; + map.put(key, tally); + } + return tally; + } + + private void incrementTallies(String fromNumber, Map platformIncrements, Map countryIncrements) { + try (Jedis jedis = jedisPool.getWriteResource()) { + String tallyValue = jedis.get(TALLY_KEY); + ActiveUserTally activeUserTally; + if (tallyValue == null) { + activeUserTally = new ActiveUserTally(fromNumber, platformIncrements, countryIncrements); + } else { + activeUserTally = mapper.readValue(tallyValue, ActiveUserTally.class); + if (activeUserTally.getFromNumber() != fromNumber) { + activeUserTally.setFromNumber(fromNumber); + Map platformTallies = activeUserTally.getPlatforms(); + addTallyMaps(platformTallies, platformIncrements); + Map countryTallies = activeUserTally.getCountries(); + addTallyMaps(countryTallies, countryIncrements); + } + } + jedis.set(TALLY_KEY, mapper.writeValueAsString(activeUserTally)); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void addTallyMaps(Map tallyMap, Map incrementMap) { + incrementMap.forEach((key, increments) -> { + long[] tallies = tallyMap.get(key); + if (tallies == null) { + tallyMap.put(key, increments); + } else { + for (int i = 0; i < INTERVALS.length; i++) { + tallies[i] += increments[i]; + } + } + }); + } + + private ActiveUserTally getFinalTallies() { + try (Jedis jedis = jedisPool.getReadResource()) { + return mapper.readValue(jedis.get(TALLY_KEY), ActiveUserTally.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private String metricKey(String platform, String intervalName) { + return MetricRegistry.name(ActiveUserCounter.class, intervalName + "_active_" + platform); + } + + private String metricKey(String intervalName) { + return MetricRegistry.name(ActiveUserCounter.class, intervalName + "_active"); + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java index 9d7496012..a190f3a8f 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java @@ -28,153 +28,51 @@ import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle; import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.Hex; import org.whispersystems.textsecuregcm.util.Util; import javax.ws.rs.ProcessingException; -import java.security.SecureRandom; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import static com.codahale.metrics.MetricRegistry.name; -import io.dropwizard.lifecycle.Managed; -@SuppressWarnings("OptionalUsedAsFieldOrParameterType") -public class DirectoryReconciler implements Managed, Runnable { +public class DirectoryReconciler implements AccountDatabaseCrawlerListener { private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class); private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private static final Timer readChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, "readChunk")); private static final Timer sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, "sendChunk")); private static final Meter sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, "sendChunkError")); - private static final long WORKER_TTL_MS = 120_000L; - private static final long MINIMUM_CHUNK_INTERVAL = 500L; - private static final long ACCELERATED_CHUNK_INTERVAL = 10L; - private static final double JITTER_MAX = 0.20; - - private final Accounts accounts; private final DirectoryManager directoryManager; private final DirectoryReconciliationClient reconciliationClient; - private final DirectoryReconciliationCache reconciliationCache; - private final int chunkSize; - private final long chunkIntervalMs; - private final String workerId; - private boolean running; - private boolean finished; - - public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient, - DirectoryReconciliationCache reconciliationCache, - DirectoryManager directoryManager, - Accounts accounts, - int chunkSize, - long chunkIntervalMs) - { - this.accounts = accounts; + public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient, DirectoryManager directoryManager) { this.directoryManager = directoryManager; this.reconciliationClient = reconciliationClient; - this.reconciliationCache = reconciliationCache; - this.chunkSize = chunkSize; - this.chunkIntervalMs = chunkIntervalMs; - this.workerId = Hex.toString(Util.generateSecretBytes(16)); } - @Override - public synchronized void start() { - running = true; - new Thread(this).start(); + public void onCrawlStart() { } + + public void onCrawlEnd(Optional fromNumber) { + + DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromNumber.orElse(null), null, Collections.emptyList()); + DirectoryReconciliationResponse response = sendChunk(request); + } - @Override - public synchronized void stop() { - running = false; - notifyAll(); - while (!finished) { - Util.wait(this); - } - } - - @Override - public void run() { - long delayMs = chunkIntervalMs; - - while (sleepWhileRunning(getDelayWithJitter(delayMs))) { - try { - delayMs = getBoundedChunkInterval(chunkIntervalMs); - delayMs = doPeriodicWork(delayMs); - } catch (Throwable t) { - logger.warn("error in directory reconciliation: ", t); - } - } - - synchronized (this) { - finished = true; - notifyAll(); - } - } - - @VisibleForTesting - public long doPeriodicWork(long intervalMs) { - long nextIntervalTimeMs = System.currentTimeMillis() + intervalMs; - - if (reconciliationCache.claimActiveWork(workerId, WORKER_TTL_MS)) { - if (processChunk()) { - if (!reconciliationCache.isAccelerated()) { - long timeUntilNextIntervalMs = getTimeUntilNextInterval(nextIntervalTimeMs); - reconciliationCache.claimActiveWork(workerId, timeUntilNextIntervalMs); - return timeUntilNextIntervalMs; - } else { - return ACCELERATED_CHUNK_INTERVAL; - } - } - } - return intervalMs; - } - - private boolean processChunk() { - Optional fromNumber = reconciliationCache.getLastNumber(); - List chunkAccounts = readChunk(fromNumber, chunkSize); + public void onCrawlChunk(Optional fromNumber, List chunkAccounts) { updateDirectoryCache(chunkAccounts); - DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts); - DirectoryReconciliationResponse sendChunkResponse = sendChunk(request); + DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts); + DirectoryReconciliationResponse response = sendChunk(request); - if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.MISSING || request.getToNumber() == null) { - reconciliationCache.clearAccelerate(); - } - - if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.OK) { - reconciliationCache.setLastNumber(Optional.ofNullable(request.getToNumber())); - } else if (sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.MISSING) { - reconciliationCache.setLastNumber(Optional.empty()); - } - - return sendChunkResponse.getStatus() == DirectoryReconciliationResponse.Status.OK; - } - - private List readChunk(Optional fromNumber, int chunkSize) { - try (Timer.Context timer = readChunkTimer.time()) { - Optional> chunkAccounts; - - if (fromNumber.isPresent()) { - chunkAccounts = Optional.ofNullable(accounts.getAllFrom(fromNumber.get(), chunkSize)); - } else { - chunkAccounts = Optional.ofNullable(accounts.getAllFrom(chunkSize)); - } - - return chunkAccounts.orElse(Collections.emptyList()); - } } private void updateDirectoryCache(List accounts) { - if (accounts.isEmpty()) { - return; - } BatchOperationHandle batchOperation = directoryManager.startBatchOperation(); @@ -183,7 +81,6 @@ public class DirectoryReconciler implements Managed, Runnable { if (account.isActive()) { byte[] token = Util.getContactToken(account.getNumber()); ClientContact clientContact = new ClientContact(token, null, true, true); - directoryManager.add(batchOperation, clientContact); } else { directoryManager.remove(batchOperation, account.getNumber()); @@ -200,9 +97,10 @@ public class DirectoryReconciler implements Managed, Runnable { .map(Account::getNumber) .collect(Collectors.toList()); - Optional toNumber = Optional.empty(); + Optional toNumber = Optional.empty(); + if (!accounts.isEmpty()) { - toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber()); + toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber()); } return new DirectoryReconciliationRequest(fromNumber.orElse(null), toNumber.orElse(null), numbers); @@ -223,29 +121,4 @@ public class DirectoryReconciler implements Managed, Runnable { } } - private synchronized boolean sleepWhileRunning(long delayMs) { - long startTimeMs = System.currentTimeMillis(); - while (running && delayMs > 0) { - Util.wait(this, delayMs); - - long nowMs = System.currentTimeMillis(); - delayMs -= Math.abs(nowMs - startTimeMs); - } - return running; - } - - private long getTimeUntilNextInterval(long nextIntervalTimeMs) { - long nextIntervalMs = nextIntervalTimeMs - System.currentTimeMillis(); - return getBoundedChunkInterval(nextIntervalMs); - } - - private long getBoundedChunkInterval(long intervalMs) { - return Math.max(Math.min(intervalMs, chunkIntervalMs), MINIMUM_CHUNK_INTERVAL); - } - - private long getDelayWithJitter(long delayMs) { - long randomJitterMs = (long) (new SecureRandom().nextDouble() * JITTER_MAX * delayMs); - return delayMs + randomJitterMs; - } - } diff --git a/src/main/resources/lua/unlock.lua b/src/main/resources/lua/account_database_crawler/unlock.lua similarity index 100% rename from src/main/resources/lua/unlock.lua rename to src/main/resources/lua/account_database_crawler/unlock.lua diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java new file mode 100644 index 000000000..be74a0b7e --- /dev/null +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java @@ -0,0 +1,174 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.whispersystems.textsecuregcm.tests.storage; + +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Accounts; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +public class AccountDatabaseCrawlerTest { + + private static final String ACCOUNT1 = "+1"; + private static final String ACCOUNT2 = "+2"; + + private static final int CHUNK_SIZE = 1000; + private static final long CHUNK_INTERVAL_MS = 30_000L; + + private final Account account1 = mock(Account.class); + private final Account account2 = mock(Account.class); + + private final Accounts accounts = mock(Accounts.class); + private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class); + private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class); + + private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS); + + @Before + public void setup() { + when(account1.getNumber()).thenReturn(ACCOUNT1); + when(account2.getNumber()).thenReturn(ACCOUNT2); + + when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account1, account2)); + when(accounts.getAllFrom(eq(ACCOUNT1), anyInt())).thenReturn(Arrays.asList(account2)); + when(accounts.getAllFrom(eq(ACCOUNT2), anyInt())).thenReturn(Collections.emptyList()); + + when(cache.claimActiveWork(any(), anyLong())).thenReturn(true); + when(cache.isAccelerated()).thenReturn(false); + } + + @Test + public void testCrawlStart() { + when(cache.getLastNumber()).thenReturn(Optional.empty()); + + boolean accelerated = crawler.doPeriodicWork(); + assertThat(accelerated).isFalse(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(1)).getLastNumber(); + verify(listener, times(1)).onCrawlStart(); + verify(accounts, times(1)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(0)).getAllFrom(any(String.class), eq(CHUNK_SIZE)); + verify(account1, times(0)).getNumber(); + verify(account2, times(1)).getNumber(); + verify(listener, times(1)).onCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2))); + verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2))); + verify(cache, times(1)).isAccelerated(); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyNoMoreInteractions(account1); + verifyNoMoreInteractions(account2); + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + + @Test + public void testCrawlChunk() { + when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1)); + + boolean accelerated = crawler.doPeriodicWork(); + assertThat(accelerated).isFalse(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(1)).getLastNumber(); + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); + verify(account2, times(1)).getNumber(); + verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2))); + verify(cache, times(1)).isAccelerated(); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyZeroInteractions(account1); + + verifyNoMoreInteractions(account2); + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + + @Test + public void testCrawlChunkAccelerated() { + when(cache.isAccelerated()).thenReturn(true); + when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1)); + + boolean accelerated = crawler.doPeriodicWork(); + assertThat(accelerated).isTrue(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(1)).getLastNumber(); + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); + verify(account2, times(1)).getNumber(); + verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + verify(cache, times(1)).setLastNumber(eq(Optional.of(ACCOUNT2))); + verify(cache, times(1)).isAccelerated(); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyZeroInteractions(account1); + + verifyNoMoreInteractions(account2); + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + + @Test + public void testCrawlEnd() { + when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT2)); + + boolean accelerated = crawler.doPeriodicWork(); + assertThat(accelerated).isFalse(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(1)).getLastNumber(); + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT2), eq(CHUNK_SIZE)); + verify(account1, times(0)).getNumber(); + verify(account2, times(0)).getNumber(); + verify(listener, times(1)).onCrawlEnd(eq(Optional.of(ACCOUNT2))); + verify(cache, times(1)).setLastNumber(eq(Optional.empty())); + verify(cache, times(1)).clearAccelerate(); + verify(cache, times(1)).isAccelerated(); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyZeroInteractions(account1); + verifyZeroInteractions(account2); + + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + +} diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java new file mode 100644 index 000000000..0178a029f --- /dev/null +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java @@ -0,0 +1,226 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.whispersystems.textsecuregcm.tests.storage; + +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.ActiveUserCounter; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.util.Util; + +import com.google.common.collect.ImmutableList; +import io.dropwizard.metrics.MetricsFactory; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.Optional; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class ActiveUserCounterTest { + + private final String NUMBER_IOS = "+15551234567"; + private final String NUMBER_ANDROID = "+5511987654321"; + private final String NUMBER_NODEVICE = "+5215551234567"; + + private final String TALLY_KEY = "active_user_tally"; + + private final Device iosDevice = mock(Device.class); + private final Device androidDevice = mock(Device.class); + + private final Account androidAccount = mock(Account.class); + private final Account iosAccount = mock(Account.class); + private final Account noDeviceAccount = mock(Account.class); + + private final Jedis jedis = mock(Jedis.class); + private final ReplicatedJedisPool jedisPool = mock(ReplicatedJedisPool.class); + private final MetricsFactory metricsFactory = mock(MetricsFactory.class); + + private final ActiveUserCounter activeUserCounter = new ActiveUserCounter(metricsFactory, jedisPool); + + @Before + public void setup() { + + long halfDayAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(12); + long fortyFiveDayAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(45); + + when(androidDevice.getApnId()).thenReturn(null); + when(androidDevice.getGcmId()).thenReturn("mock-gcm-id"); + when(androidDevice.getLastSeen()).thenReturn(fortyFiveDayAgo); + + when(iosDevice.getApnId()).thenReturn("mock-apn-id"); + when(iosDevice.getGcmId()).thenReturn(null); + when(iosDevice.getLastSeen()).thenReturn(halfDayAgo); + + when(iosAccount.getNumber()).thenReturn(NUMBER_IOS); + when(iosAccount.getMasterDevice()).thenReturn(Optional.of(iosDevice)); + + when(androidAccount.getNumber()).thenReturn(NUMBER_ANDROID); + when(androidAccount.getMasterDevice()).thenReturn(Optional.of(androidDevice)); + + when(noDeviceAccount.getNumber()).thenReturn(NUMBER_NODEVICE); + when(noDeviceAccount.getMasterDevice()).thenReturn(Optional.ofNullable(null)); + + when(jedis.get(any(String.class))).thenReturn("{\"fromNumber\":\"+\",\"platforms\":{},\"countries\":{}}"); + when(jedisPool.getWriteResource()).thenReturn(jedis); + when(jedisPool.getReadResource()).thenReturn(jedis); + when(metricsFactory.getReporters()).thenReturn(ImmutableList.of()); + + } + + @Test + public void testCrawlStart() { + activeUserCounter.onCrawlStart(); + + verify(jedisPool, times(1)).getWriteResource(); + verify(jedis, times(1)).del(any(String.class)); + verify(jedis, times(1)).close(); + + verifyZeroInteractions(iosDevice); + verifyZeroInteractions(iosAccount); + verifyZeroInteractions(androidDevice); + verifyZeroInteractions(androidAccount); + verifyZeroInteractions(noDeviceAccount); + verifyZeroInteractions(metricsFactory); + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(jedisPool); + } + + @Test + public void testCrawlEnd() { + activeUserCounter.onCrawlEnd(Optional.empty()); + + verify(jedisPool, times(1)).getReadResource(); + verify(jedis, times(1)).get(any(String.class)); + verify(jedis, times(1)).close(); + + verify(metricsFactory, times(1)).getReporters(); + + verifyZeroInteractions(iosDevice); + verifyZeroInteractions(iosAccount); + verifyZeroInteractions(androidDevice); + verifyZeroInteractions(androidAccount); + verifyZeroInteractions(noDeviceAccount); + + verifyNoMoreInteractions(metricsFactory); + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(jedisPool); + + } + + @Test + public void testCrawlChunkValidAccount() { + activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount)); + + verify(iosAccount, times(1)).getMasterDevice(); + verify(iosAccount, times(1)).getNumber(); + + verify(iosDevice, times(1)).getLastSeen(); + verify(iosDevice, times(1)).getApnId(); + verify(iosDevice, times(0)).getGcmId(); + + verify(jedisPool, times(1)).getWriteResource(); + verify(jedis, times(1)).get(any(String.class)); + verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_IOS+"\",\"platforms\":{\"ios\":[1,1,1,1,1]},\"countries\":{\"1\":[1,1,1,1,1]}}")); + verify(jedis, times(1)).close(); + + verify(metricsFactory, times(0)).getReporters(); + + verifyZeroInteractions(androidDevice); + verifyZeroInteractions(androidAccount); + verifyZeroInteractions(noDeviceAccount); + verifyZeroInteractions(metricsFactory); + + verifyNoMoreInteractions(iosDevice); + verifyNoMoreInteractions(iosAccount); + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(jedisPool); + } + + @Test + public void testCrawlChunkNoDeviceAccount() { + activeUserCounter.onCrawlChunk(Optional.of(NUMBER_NODEVICE), Arrays.asList(noDeviceAccount)); + + verify(noDeviceAccount, times(1)).getMasterDevice(); + + verify(jedisPool, times(1)).getWriteResource(); + verify(jedis, times(1)).get(eq(TALLY_KEY)); + verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_NODEVICE+"\",\"platforms\":{},\"countries\":{}}")); + verify(jedis, times(1)).close(); + + verify(metricsFactory, times(0)).getReporters(); + + verifyZeroInteractions(iosDevice); + verifyZeroInteractions(iosAccount); + verifyZeroInteractions(androidDevice); + verifyZeroInteractions(androidAccount); + verifyZeroInteractions(noDeviceAccount); + verifyZeroInteractions(metricsFactory); + + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(jedisPool); + } + + @Test + public void testCrawlChunkMixedAccount() { + activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount, androidAccount, noDeviceAccount)); + + verify(iosAccount, times(1)).getMasterDevice(); + verify(iosAccount, times(1)).getNumber(); + verify(androidAccount, times(1)).getMasterDevice(); + verify(androidAccount, times(1)).getNumber(); + verify(noDeviceAccount, times(1)).getMasterDevice(); + + verify(iosDevice, times(1)).getLastSeen(); + verify(iosDevice, times(1)).getApnId(); + verify(iosDevice, times(0)).getGcmId(); + + verify(androidDevice, times(1)).getLastSeen(); + verify(androidDevice, times(1)).getApnId(); + verify(androidDevice, times(1)).getGcmId(); + + verify(jedisPool, times(1)).getWriteResource(); + verify(jedis, times(1)).get(eq(TALLY_KEY)); + verify(jedis, times(1)).set(any(String.class), eq("{\"fromNumber\":\""+NUMBER_IOS+"\",\"platforms\":{\"android\":[0,0,0,1,1],\"ios\":[1,1,1,1,1]},\"countries\":{\"55\":[0,0,0,1,1],\"1\":[1,1,1,1,1]}}")); + verify(jedis, times(1)).close(); + + verify(metricsFactory, times(0)).getReporters(); + + verifyZeroInteractions(metricsFactory); + + verifyNoMoreInteractions(iosDevice); + verifyNoMoreInteractions(iosAccount); + verifyNoMoreInteractions(androidDevice); + verifyNoMoreInteractions(androidAccount); + verifyNoMoreInteractions(noDeviceAccount); + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(jedisPool); + } + +} diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java index 3120c59fb..b7acac7b5 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java @@ -1,180 +1,82 @@ +/** + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + package org.whispersystems.textsecuregcm.tests.storage; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.whispersystems.textsecuregcm.entities.ClientContact; import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.Accounts; -import org.whispersystems.textsecuregcm.storage.DirectoryManager; import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle; +import org.whispersystems.textsecuregcm.storage.DirectoryManager; import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; -import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationCache; import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient; import org.whispersystems.textsecuregcm.util.Util; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + import java.util.Arrays; -import java.util.Collections; import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; public class DirectoryReconcilerTest { - private static final String VALID_NUMBER = "valid"; private static final String INACTIVE_NUMBER = "inactive"; - private static final long INTERVAL_MS = 30_000L; - - private final Account account = mock(Account.class); + private final Account activeAccount = mock(Account.class); private final Account inactiveAccount = mock(Account.class); - private final Accounts accounts = mock(Accounts.class); private final BatchOperationHandle batchOperationHandle = mock(BatchOperationHandle.class); private final DirectoryManager directoryManager = mock(DirectoryManager.class); private final DirectoryReconciliationClient reconciliationClient = mock(DirectoryReconciliationClient.class); - private final DirectoryReconciliationCache reconciliationCache = mock(DirectoryReconciliationCache.class); - private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, reconciliationCache, directoryManager, accounts, 1000, INTERVAL_MS); + private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, directoryManager); - private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK); - private final DirectoryReconciliationResponse notFoundResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.MISSING); + private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK); + private final DirectoryReconciliationResponse missingResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.MISSING); @Before public void setup() { - when(account.getNumber()).thenReturn(VALID_NUMBER); - when(account.isActive()).thenReturn(true); + when(activeAccount.getNumber()).thenReturn(VALID_NUMBER); + when(activeAccount.isActive()).thenReturn(true); when(inactiveAccount.getNumber()).thenReturn(INACTIVE_NUMBER); when(inactiveAccount.isActive()).thenReturn(false); - when(directoryManager.startBatchOperation()).thenReturn(batchOperationHandle); + } - when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account, inactiveAccount)); - when(accounts.getAllFrom(eq(VALID_NUMBER), anyInt())).thenReturn(Arrays.asList(inactiveAccount)); - when(accounts.getAllFrom(eq(INACTIVE_NUMBER), anyInt())).thenReturn(Collections.emptyList()); - + @Test + public void testCrawlChunkValid() { when(reconciliationClient.sendChunk(any())).thenReturn(successResponse); + directoryReconciler.onCrawlChunk(Optional.of(VALID_NUMBER), Arrays.asList(activeAccount, inactiveAccount)); - when(reconciliationCache.getLastNumber()).thenReturn(Optional.empty()); - when(reconciliationCache.claimActiveWork(any(), anyLong())).thenReturn(true); - when(reconciliationCache.isAccelerated()).thenReturn(false); - } - - @Test - public void testValid() { - long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); - - assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); - - verify(accounts, times(1)).getAllFrom(anyInt()); - - ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); - verify(reconciliationClient, times(1)).sendChunk(request.capture()); - - assertThat(request.getValue().getFromNumber()).isNull(); - assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER); - assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER)); - - ArgumentCaptor addedContact = ArgumentCaptor.forClass(ClientContact.class); - verify(directoryManager, times(1)).startBatchOperation(); - verify(directoryManager, times(1)).add(eq(batchOperationHandle), addedContact.capture()); - verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER)); - verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle)); - - assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER)); - - verify(reconciliationCache, times(1)).getLastNumber(); - verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.of(INACTIVE_NUMBER))); - verify(reconciliationCache, times(1)).isAccelerated(); - verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong()); - - verifyNoMoreInteractions(accounts); - verifyNoMoreInteractions(directoryManager); - verifyNoMoreInteractions(reconciliationClient); - verifyNoMoreInteractions(reconciliationCache); - } - - @Test - public void testInProgress() { - when(reconciliationCache.getLastNumber()).thenReturn(Optional.of(VALID_NUMBER)); - - long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); - - assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); - - verify(accounts, times(1)).getAllFrom(eq(VALID_NUMBER), anyInt()); + verify(activeAccount, times(2)).getNumber(); + verify(activeAccount, times(2)).isActive(); + verify(inactiveAccount, times(2)).getNumber(); + verify(inactiveAccount, times(2)).isActive(); ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); verify(reconciliationClient, times(1)).sendChunk(request.capture()); assertThat(request.getValue().getFromNumber()).isEqualTo(VALID_NUMBER); assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER); - assertThat(request.getValue().getNumbers()).isEqualTo(Collections.emptyList()); - - verify(directoryManager, times(1)).startBatchOperation(); - verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER)); - verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle)); - - verify(reconciliationCache, times(1)).getLastNumber(); - verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.of(INACTIVE_NUMBER))); - verify(reconciliationCache, times(1)).isAccelerated(); - verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong()); - - verifyNoMoreInteractions(accounts); - verifyNoMoreInteractions(directoryManager); - verifyNoMoreInteractions(reconciliationClient); - verifyNoMoreInteractions(reconciliationCache); - } - - @Test - public void testLastChunk() { - when(reconciliationCache.getLastNumber()).thenReturn(Optional.of(INACTIVE_NUMBER)); - - long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); - - assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); - - verify(accounts, times(1)).getAllFrom(eq(INACTIVE_NUMBER), anyInt()); - - ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); - verify(reconciliationClient, times(1)).sendChunk(request.capture()); - - assertThat(request.getValue().getFromNumber()).isEqualTo(INACTIVE_NUMBER); - assertThat(request.getValue().getToNumber()).isNull(); - assertThat(request.getValue().getNumbers()).isEqualTo(Collections.emptyList()); - - verify(reconciliationCache, times(1)).getLastNumber(); - verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.empty())); - verify(reconciliationCache, times(1)).clearAccelerate(); - verify(reconciliationCache, times(1)).isAccelerated(); - verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong()); - - verifyNoMoreInteractions(accounts); - verifyNoMoreInteractions(directoryManager); - verifyNoMoreInteractions(reconciliationClient); - verifyNoMoreInteractions(reconciliationCache); - } - - @Test - public void testNotFound() { - when(reconciliationClient.sendChunk(any())).thenReturn(notFoundResponse); - - long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS); - - assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS); - - verify(accounts, times(1)).getAllFrom(anyInt()); - - ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class); - verify(reconciliationClient, times(1)).sendChunk(request.capture()); - - assertThat(request.getValue().getFromNumber()).isNull(); - assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER); assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER)); ArgumentCaptor addedContact = ArgumentCaptor.forClass(ClientContact.class); @@ -185,15 +87,11 @@ public class DirectoryReconcilerTest { assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER)); - verify(reconciliationCache, times(1)).getLastNumber(); - verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.empty())); - verify(reconciliationCache, times(1)).clearAccelerate(); - verify(reconciliationCache, times(1)).claimActiveWork(any(), anyLong()); - - verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(activeAccount); + verifyNoMoreInteractions(inactiveAccount); + verifyNoMoreInteractions(batchOperationHandle); verifyNoMoreInteractions(directoryManager); verifyNoMoreInteractions(reconciliationClient); - verifyNoMoreInteractions(reconciliationCache); } }