toNumber = Optional.absent();
+ if (!accounts.isEmpty()) {
+ toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber());
+ }
+
+ return new DirectoryReconciliationRequest(fromNumber.orNull(), toNumber.orNull(), numbers);
+ }
+
+ private DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) {
+ try (Timer.Context timer = sendChunkTimer.time()) {
+ DirectoryReconciliationResponse response = reconciliationClient.sendChunk(request);
+ if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) {
+ sendChunkErrorMeter.mark();
+ logger.warn("reconciliation error: " + response.getStatus());
+ }
+ return response;
+ } catch (ProcessingException ex) {
+ sendChunkErrorMeter.mark();
+ logger.warn("request error: ", ex);
+ throw new ProcessingException(ex);
+ }
+ }
+
+}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java
new file mode 100644
index 000000000..37cf30f60
--- /dev/null
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationCache.java
@@ -0,0 +1,118 @@
+/**
+ * Copyright (C) 2018 Open WhisperSystems
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.whispersystems.textsecuregcm.storage;
+
+import com.google.common.base.Optional;
+import org.whispersystems.textsecuregcm.redis.LuaScript;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class DirectoryReconciliationCache {
+
+ private static final String ACTIVE_WORKER_KEY = "directory_reconciliation_active_worker";
+ private static final String LAST_NUMBER_KEY = "directory_reconciliation_last_number";
+ private static final String CACHED_COUNT_KEY = "directory_reconciliation_cached_count";
+ private static final String ACCELERATE_KEY = "directory_reconciliation_accelerate";
+
+ private static final long CACHED_COUNT_TTL_MS = 21600_000L;
+ private static final long LAST_NUMBER_TTL_MS = 86400_000L;
+
+ private final ReplicatedJedisPool jedisPool;
+ private final UnlockOperation unlockOperation;
+
+ public DirectoryReconciliationCache(ReplicatedJedisPool jedisPool) throws IOException {
+ this.jedisPool = jedisPool;
+ this.unlockOperation = new UnlockOperation(jedisPool);
+ }
+
+ public void clearAccelerate() {
+ try (Jedis jedis = jedisPool.getWriteResource()) {
+ jedis.del(ACCELERATE_KEY);
+ }
+ }
+
+ public boolean isAccelerated() {
+ try (Jedis jedis = jedisPool.getWriteResource()) {
+ return "1".equals(jedis.get(ACCELERATE_KEY));
+ }
+ }
+
+ public boolean claimActiveWork(String workerId, long ttlMs) {
+ unlockOperation.unlock(ACTIVE_WORKER_KEY, workerId);
+ try (Jedis jedis = jedisPool.getWriteResource()) {
+ return "OK".equals(jedis.set(ACTIVE_WORKER_KEY, workerId, "NX", "PX", ttlMs));
+ }
+ }
+
+ public Optional getLastNumber() {
+ try (Jedis jedis = jedisPool.getWriteResource()) {
+ return Optional.fromNullable(jedis.get(LAST_NUMBER_KEY));
+ }
+ }
+
+ public void setLastNumber(Optional lastNumber) {
+ try (Jedis jedis = jedisPool.getWriteResource()) {
+ if (lastNumber.isPresent()) {
+ jedis.psetex(LAST_NUMBER_KEY, LAST_NUMBER_TTL_MS, lastNumber.get());
+ } else {
+ jedis.del(LAST_NUMBER_KEY);
+ }
+ }
+ }
+
+ public Optional getCachedAccountCount() {
+ try (Jedis jedis = jedisPool.getWriteResource()) {
+ Optional cachedAccountCount = Optional.fromNullable(jedis.get(CACHED_COUNT_KEY));
+ if (!cachedAccountCount.isPresent()) {
+ return Optional.absent();
+ }
+
+ try {
+ return Optional.of(Long.parseUnsignedLong(cachedAccountCount.get()));
+ } catch (NumberFormatException ex) {
+ return Optional.absent();
+ }
+ }
+ }
+
+ public void setCachedAccountCount(long accountCount) {
+ try (Jedis jedis = jedisPool.getWriteResource()) {
+ jedis.psetex(CACHED_COUNT_KEY, CACHED_COUNT_TTL_MS, Long.toString(accountCount));
+ }
+ }
+
+ public static class UnlockOperation {
+
+ private final LuaScript luaScript;
+
+ UnlockOperation(ReplicatedJedisPool jedisPool) throws IOException {
+ this.luaScript = LuaScript.fromResource(jedisPool, "lua/unlock.lua");
+ }
+
+ public boolean unlock(String key, String value) {
+ List keys = Arrays.asList(key.getBytes());
+ List args = Arrays.asList(value.getBytes());
+
+ return ((long) luaScript.execute(keys, args)) > 0;
+ }
+ }
+
+}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java
new file mode 100644
index 000000000..441dd4745
--- /dev/null
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciliationClient.java
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2018 Open WhisperSystems
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.whispersystems.textsecuregcm.storage;
+
+import org.bouncycastle.openssl.PEMReader;
+import org.glassfish.jersey.SslConfigurator;
+import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
+import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration;
+import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
+import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+public class DirectoryReconciliationClient {
+
+ private final String replicationUrl;
+ private final Client client;
+
+ public DirectoryReconciliationClient(DirectoryServerConfiguration directoryServerConfiguration)
+ throws CertificateException
+ {
+ this.replicationUrl = directoryServerConfiguration.getReplicationUrl();
+ this.client = initializeClient(directoryServerConfiguration);
+ }
+
+ public DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) {
+ return client.target(replicationUrl)
+ .path("/v1/directory/reconcile")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .put(Entity.json(request), DirectoryReconciliationResponse.class);
+ }
+
+ private static Client initializeClient(DirectoryServerConfiguration directoryServerConfiguration)
+ throws CertificateException
+ {
+ KeyStore trustStore = initializeKeyStore(directoryServerConfiguration.getReplicationCaCertificate());
+ SSLContext sslContext = SslConfigurator.newInstance()
+ .securityProtocol("TLSv1.2")
+ .trustStore(trustStore)
+ .createSSLContext();
+ return ClientBuilder.newBuilder()
+ .register(HttpAuthenticationFeature.basic("signal", directoryServerConfiguration.getReplicationPassword().getBytes()))
+ .sslContext(sslContext)
+ .build();
+ }
+
+ private static KeyStore initializeKeyStore(String caCertificatePem)
+ throws CertificateException
+ {
+ try {
+ PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(caCertificatePem.getBytes())));
+ X509Certificate certificate = (X509Certificate) reader.readObject();
+
+ if (certificate == null) {
+ throw new CertificateException("No certificate found in parsing!");
+ }
+
+ KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ keyStore.load(null);
+ keyStore.setCertificateEntry("ca", certificate);
+ return keyStore;
+ } catch (IOException | KeyStoreException ex) {
+ throw new CertificateException(ex);
+ } catch (NoSuchAlgorithmException ex) {
+ throw new AssertionError(ex);
+ }
+ }
+
+}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/util/Util.java b/src/main/java/org/whispersystems/textsecuregcm/util/Util.java
index e07794e23..849e83083 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/util/Util.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/util/Util.java
@@ -134,6 +134,14 @@ public class Util {
}
}
+ public static void wait(Object object, long timeoutMs) {
+ try {
+ object.wait(timeoutMs);
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ }
+
public static int hashCode(Object... objects) {
return Arrays.hashCode(objects);
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java
index 09a23ac13..6c82cbf67 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java
@@ -75,7 +75,7 @@ public class DeleteUserCommand extends EnvironmentCommand()))
@@ -137,6 +140,7 @@ public class AccountControllerTest {
assertThat(response.getStatus()).isEqualTo(204);
verify(accountsManager, times(1)).create(isA(Account.class));
+ verify(cdsSender, times(1)).addRegisteredUser(eq(SENDER));
}
@Test
@@ -282,4 +286,4 @@ public class AccountControllerTest {
-}
\ No newline at end of file
+}
diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java
index 73a7531cb..2aeafb7a3 100644
--- a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java
+++ b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DeviceControllerTest.java
@@ -29,10 +29,8 @@ import org.whispersystems.textsecuregcm.entities.DeviceResponse;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.mappers.DeviceLimitExceededExceptionMapper;
-import org.whispersystems.textsecuregcm.storage.Account;
-import org.whispersystems.textsecuregcm.storage.AccountsManager;
-import org.whispersystems.textsecuregcm.storage.MessagesManager;
-import org.whispersystems.textsecuregcm.storage.PendingDevicesManager;
+import org.whispersystems.textsecuregcm.sqs.ContactDiscoveryQueueSender;
+import org.whispersystems.textsecuregcm.storage.*;
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
import org.whispersystems.textsecuregcm.util.VerificationCode;
@@ -55,10 +53,11 @@ public class DeviceControllerTest {
public DumbVerificationDeviceController(PendingDevicesManager pendingDevices,
AccountsManager accounts,
MessagesManager messages,
+ ContactDiscoveryQueueSender cdsSender,
RateLimiters rateLimiters,
Map deviceConfiguration)
{
- super(pendingDevices, accounts, messages, rateLimiters, deviceConfiguration);
+ super(pendingDevices, accounts, messages, cdsSender, rateLimiters, deviceConfiguration);
}
@Override
@@ -70,10 +69,12 @@ public class DeviceControllerTest {
private PendingDevicesManager pendingDevicesManager = mock(PendingDevicesManager.class);
private AccountsManager accountsManager = mock(AccountsManager.class );
private MessagesManager messagesManager = mock(MessagesManager.class);
+ private ContactDiscoveryQueueSender cdsSender = mock(ContactDiscoveryQueueSender.class);
private RateLimiters rateLimiters = mock(RateLimiters.class );
private RateLimiter rateLimiter = mock(RateLimiter.class );
private Account account = mock(Account.class );
private Account maxedAccount = mock(Account.class);
+ private Device masterDevice = mock(Device.class);
private Map deviceConfiguration = new HashMap() {{
@@ -88,6 +89,7 @@ public class DeviceControllerTest {
.addResource(new DumbVerificationDeviceController(pendingDevicesManager,
accountsManager,
messagesManager,
+ cdsSender,
rateLimiters,
deviceConfiguration))
.build();
@@ -101,9 +103,13 @@ public class DeviceControllerTest {
when(rateLimiters.getAllocateDeviceLimiter()).thenReturn(rateLimiter);
when(rateLimiters.getVerifyDeviceLimiter()).thenReturn(rateLimiter);
+ when(masterDevice.getId()).thenReturn(1L);
+
when(account.getNextDeviceId()).thenReturn(42L);
when(account.getNumber()).thenReturn(AuthHelper.VALID_NUMBER);
// when(maxedAccount.getActiveDeviceCount()).thenReturn(6);
+ when(account.getAuthenticatedDevice()).thenReturn(Optional.of(masterDevice));
+ when(account.isActive()).thenReturn(false);
when(pendingDevicesManager.getCodeForNumber(AuthHelper.VALID_NUMBER)).thenReturn(Optional.of(new StoredVerificationCode("5678901", System.currentTimeMillis())));
when(pendingDevicesManager.getCodeForNumber(AuthHelper.VALID_NUMBER_TWO)).thenReturn(Optional.of(new StoredVerificationCode("1112223", System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(31))));
@@ -195,4 +201,16 @@ public class DeviceControllerTest {
assertEquals(response.getStatus(), 422);
verifyNoMoreInteractions(messagesManager);
}
+
+ @Test
+ public void removeDeviceTest() throws Exception {
+ Response response = resources.getJerseyTest()
+ .target("/v1/devices/12345")
+ .request()
+ .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD))
+ .delete();
+
+ assertEquals(204, response.getStatus());
+ verify(cdsSender).deleteRegisteredUser(eq(AuthHelper.VALID_NUMBER));
+ }
}
diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DirectoryControllerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DirectoryControllerTest.java
index e244d1e63..342d3e124 100644
--- a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DirectoryControllerTest.java
+++ b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/DirectoryControllerTest.java
@@ -1,5 +1,6 @@
package org.whispersystems.textsecuregcm.tests.controllers;
+import com.google.common.base.Optional;
import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory;
import org.junit.Before;
import org.junit.Rule;
@@ -7,6 +8,10 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.whispersystems.dropwizard.simpleauth.AuthValueFactoryProvider;
+import org.whispersystems.textsecuregcm.auth.DirectoryCredentials;
+import org.whispersystems.textsecuregcm.auth.DirectoryCredentialsGenerator;
+import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration;
+import org.whispersystems.textsecuregcm.configuration.DirectoryClientConfiguration;
import org.whispersystems.textsecuregcm.controllers.DirectoryController;
import org.whispersystems.textsecuregcm.entities.ClientContactTokens;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
@@ -24,15 +29,19 @@ import java.util.List;
import io.dropwizard.testing.junit.ResourceTestRule;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyListOf;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DirectoryControllerTest {
- private final RateLimiters rateLimiters = mock(RateLimiters.class );
- private final RateLimiter rateLimiter = mock(RateLimiter.class );
- private final DirectoryManager directoryManager = mock(DirectoryManager.class);
+ private final RateLimiters rateLimiters = mock(RateLimiters.class);
+ private final RateLimiter rateLimiter = mock(RateLimiter.class);
+ private final DirectoryManager directoryManager = mock(DirectoryManager.class);
+ private final DirectoryCredentialsGenerator directoryCredentialsGenerator = mock(DirectoryCredentialsGenerator.class);
+
+ private final DirectoryCredentials validCredentials = new DirectoryCredentials("username", "password");
@Rule
public final ResourceTestRule resources = ResourceTestRule.builder()
@@ -40,7 +49,8 @@ public class DirectoryControllerTest {
.addProvider(new AuthValueFactoryProvider.Binder())
.setTestContainerFactory(new GrizzlyWebTestContainerFactory())
.addResource(new DirectoryController(rateLimiters,
- directoryManager))
+ directoryManager,
+ directoryCredentialsGenerator))
.build();
@@ -56,6 +66,19 @@ public class DirectoryControllerTest {
return response;
}
});
+ when(directoryCredentialsGenerator.generateFor(eq(AuthHelper.VALID_NUMBER))).thenReturn(validCredentials);
+ }
+
+ @Test
+ public void testGetAuthToken() {
+ DirectoryCredentials token =
+ resources.getJerseyTest()
+ .target("/v1/directory/auth")
+ .request()
+ .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD))
+ .get(DirectoryCredentials.class);
+ assertThat(token.getUsername()).isEqualTo(validCredentials.getUsername());
+ assertThat(token.getPassword()).isEqualTo(validCredentials.getPassword());
}
@Test
diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java
new file mode 100644
index 000000000..d90d225b8
--- /dev/null
+++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java
@@ -0,0 +1,242 @@
+package org.whispersystems.textsecuregcm.tests.storage;
+
+import com.google.common.base.Optional;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.whispersystems.textsecuregcm.entities.ClientContact;
+import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
+import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse;
+import org.whispersystems.textsecuregcm.storage.Account;
+import org.whispersystems.textsecuregcm.storage.Accounts;
+import org.whispersystems.textsecuregcm.storage.DirectoryManager;
+import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle;
+import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
+import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationCache;
+import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient;
+import org.whispersystems.textsecuregcm.util.Util;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class DirectoryReconcilerTest {
+
+ private static final String VALID_NUMBER = "valid";
+ private static final String INACTIVE_NUMBER = "inactive";
+
+ private static final long ACCOUNT_COUNT = 0L;
+ private static final long INTERVAL_MS = 30_000L;
+
+ private final Account account = mock(Account.class);
+ private final Account inactiveAccount = mock(Account.class);
+ private final Accounts accounts = mock(Accounts.class);
+ private final BatchOperationHandle batchOperationHandle = mock(BatchOperationHandle.class);
+ private final DirectoryManager directoryManager = mock(DirectoryManager.class);
+ private final DirectoryReconciliationClient reconciliationClient = mock(DirectoryReconciliationClient.class);
+ private final DirectoryReconciliationCache reconciliationCache = mock(DirectoryReconciliationCache.class);
+ private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, reconciliationCache, directoryManager, accounts);
+
+ private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK);
+ private final DirectoryReconciliationResponse notFoundResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.MISSING);
+
+ @Before
+ public void setup() {
+ when(account.getNumber()).thenReturn(VALID_NUMBER);
+ when(account.isActive()).thenReturn(true);
+ when(account.isVideoSupported()).thenReturn(true);
+ when(account.isVoiceSupported()).thenReturn(true);
+ when(inactiveAccount.getNumber()).thenReturn(INACTIVE_NUMBER);
+ when(inactiveAccount.isActive()).thenReturn(false);
+
+ when(directoryManager.startBatchOperation()).thenReturn(batchOperationHandle);
+
+ when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account, inactiveAccount));
+ when(accounts.getAllFrom(eq(VALID_NUMBER), anyInt())).thenReturn(Arrays.asList(inactiveAccount));
+ when(accounts.getAllFrom(eq(INACTIVE_NUMBER), anyInt())).thenReturn(Collections.emptyList());
+ when(accounts.getCount()).thenReturn(ACCOUNT_COUNT);
+
+ when(reconciliationClient.sendChunk(any())).thenReturn(successResponse);
+
+ when(reconciliationCache.getLastNumber()).thenReturn(Optional.absent());
+ when(reconciliationCache.claimActiveWork(any(), anyLong())).thenReturn(true);
+ when(reconciliationCache.isAccelerated()).thenReturn(false);
+ }
+
+ @Test
+ public void testGetUncachedAccountCount() {
+ when(reconciliationCache.getCachedAccountCount()).thenReturn(Optional.absent());
+
+ long accountCount = directoryReconciler.getAccountCount();
+
+ assertThat(accountCount).isEqualTo(ACCOUNT_COUNT);
+
+ verify(accounts, times(1)).getCount();
+
+ verify(reconciliationCache, times(1)).getCachedAccountCount();
+ verify(reconciliationCache, times(1)).setCachedAccountCount(eq(ACCOUNT_COUNT));
+
+ verifyNoMoreInteractions(directoryManager);
+ verifyNoMoreInteractions(accounts);
+ verifyNoMoreInteractions(reconciliationClient);
+ verifyNoMoreInteractions(reconciliationCache);
+ }
+
+ @Test
+ public void testGetCachedAccountCount() {
+ when(reconciliationCache.getCachedAccountCount()).thenReturn(Optional.of(ACCOUNT_COUNT));
+
+ long accountCount = directoryReconciler.getAccountCount();
+
+ assertThat(accountCount).isEqualTo(ACCOUNT_COUNT);
+
+ verify(reconciliationCache, times(1)).getCachedAccountCount();
+
+ verifyNoMoreInteractions(directoryManager);
+ verifyNoMoreInteractions(accounts);
+ verifyNoMoreInteractions(reconciliationClient);
+ verifyNoMoreInteractions(reconciliationCache);
+ }
+
+ @Test
+ public void testValid() {
+ long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS);
+
+ assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS);
+
+ verify(accounts, times(1)).getAllFrom(anyInt());
+
+ ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class);
+ verify(reconciliationClient, times(1)).sendChunk(request.capture());
+
+ assertThat(request.getValue().getFromNumber()).isNull();
+ assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER);
+ assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER));
+
+ ArgumentCaptor addedContact = ArgumentCaptor.forClass(ClientContact.class);
+ verify(directoryManager, times(1)).startBatchOperation();
+ verify(directoryManager, times(1)).add(eq(batchOperationHandle), addedContact.capture());
+ verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER));
+ verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle));
+
+ assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER));
+
+ verify(reconciliationCache, times(1)).getLastNumber();
+ verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.of(INACTIVE_NUMBER)));
+ verify(reconciliationCache, times(1)).isAccelerated();
+ verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong());
+
+ verifyNoMoreInteractions(accounts);
+ verifyNoMoreInteractions(directoryManager);
+ verifyNoMoreInteractions(reconciliationClient);
+ verifyNoMoreInteractions(reconciliationCache);
+ }
+
+ @Test
+ public void testInProgress() {
+ when(reconciliationCache.getLastNumber()).thenReturn(Optional.of(VALID_NUMBER));
+
+ long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS);
+
+ assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS);
+
+ verify(accounts, times(1)).getAllFrom(eq(VALID_NUMBER), anyInt());
+
+ ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class);
+ verify(reconciliationClient, times(1)).sendChunk(request.capture());
+
+ assertThat(request.getValue().getFromNumber()).isEqualTo(VALID_NUMBER);
+ assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER);
+ assertThat(request.getValue().getNumbers()).isEqualTo(Collections.emptyList());
+
+ verify(directoryManager, times(1)).startBatchOperation();
+ verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER));
+ verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle));
+
+ verify(reconciliationCache, times(1)).getLastNumber();
+ verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.of(INACTIVE_NUMBER)));
+ verify(reconciliationCache, times(1)).isAccelerated();
+ verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong());
+
+ verifyNoMoreInteractions(accounts);
+ verifyNoMoreInteractions(directoryManager);
+ verifyNoMoreInteractions(reconciliationClient);
+ verifyNoMoreInteractions(reconciliationCache);
+ }
+
+ @Test
+ public void testLastChunk() {
+ when(reconciliationCache.getLastNumber()).thenReturn(Optional.of(INACTIVE_NUMBER));
+
+ long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS);
+
+ assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS);
+
+ verify(accounts, times(1)).getAllFrom(eq(INACTIVE_NUMBER), anyInt());
+
+ ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class);
+ verify(reconciliationClient, times(1)).sendChunk(request.capture());
+
+ assertThat(request.getValue().getFromNumber()).isEqualTo(INACTIVE_NUMBER);
+ assertThat(request.getValue().getToNumber()).isNull();
+ assertThat(request.getValue().getNumbers()).isEqualTo(Collections.emptyList());
+
+ verify(reconciliationCache, times(1)).getLastNumber();
+ verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.absent()));
+ verify(reconciliationCache, times(1)).clearAccelerate();
+ verify(reconciliationCache, times(1)).isAccelerated();
+ verify(reconciliationCache, times(2)).claimActiveWork(any(), anyLong());
+
+ verifyNoMoreInteractions(accounts);
+ verifyNoMoreInteractions(directoryManager);
+ verifyNoMoreInteractions(reconciliationClient);
+ verifyNoMoreInteractions(reconciliationCache);
+ }
+
+ @Test
+ public void testNotFound() {
+ when(reconciliationClient.sendChunk(any())).thenReturn(notFoundResponse);
+
+ long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS);
+
+ assertThat(delayMs).isLessThanOrEqualTo(INTERVAL_MS);
+
+ verify(accounts, times(1)).getAllFrom(anyInt());
+
+ ArgumentCaptor request = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class);
+ verify(reconciliationClient, times(1)).sendChunk(request.capture());
+
+ assertThat(request.getValue().getFromNumber()).isNull();
+ assertThat(request.getValue().getToNumber()).isEqualTo(INACTIVE_NUMBER);
+ assertThat(request.getValue().getNumbers()).isEqualTo(Arrays.asList(VALID_NUMBER));
+
+ ArgumentCaptor addedContact = ArgumentCaptor.forClass(ClientContact.class);
+ verify(directoryManager, times(1)).startBatchOperation();
+ verify(directoryManager, times(1)).add(eq(batchOperationHandle), addedContact.capture());
+ verify(directoryManager, times(1)).remove(eq(batchOperationHandle), eq(INACTIVE_NUMBER));
+ verify(directoryManager, times(1)).stopBatchOperation(eq(batchOperationHandle));
+
+ assertThat(addedContact.getValue().getToken()).isEqualTo(Util.getContactToken(VALID_NUMBER));
+
+ verify(reconciliationCache, times(1)).getLastNumber();
+ verify(reconciliationCache, times(1)).setLastNumber(eq(Optional.absent()));
+ verify(reconciliationCache, times(1)).clearAccelerate();
+ verify(reconciliationCache, times(1)).claimActiveWork(any(), anyLong());
+
+ verifyNoMoreInteractions(accounts);
+ verifyNoMoreInteractions(directoryManager);
+ verifyNoMoreInteractions(reconciliationClient);
+ verifyNoMoreInteractions(reconciliationCache);
+ }
+
+}