Correctly replicate delete events to directory

This commit is contained in:
Moxie Marlinspike 2018-09-20 11:44:17 -07:00
parent 777d77db53
commit fefadaebfa
8 changed files with 64 additions and 44 deletions

View File

@ -68,7 +68,7 @@ import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.s3.UrlSigner;
import org.whispersystems.textsecuregcm.sms.SmsSender;
import org.whispersystems.textsecuregcm.sms.TwilioSmsSender;
import org.whispersystems.textsecuregcm.sqs.ContactDiscoveryQueueSender;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
@ -174,6 +174,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ReplicatedJedisPool pushSchedulerClient = pushSchedulerClientFactory.getRedisClientPool();
DirectoryManager directory = new DirectoryManager(directoryClient);
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheClient);
PendingDevicesManager pendingDevicesManager = new PendingDevicesManager (pendingDevices, cacheClient );
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);
@ -184,7 +185,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DispatchManager dispatchManager = new DispatchManager(cacheClientFactory, Optional.of(deadLetterHandler));
PubSubManager pubSubManager = new PubSubManager(cacheClient, dispatchManager);
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey(), directoryQueue);
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager);
AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager );
FederatedPeerAuthenticator federatedPeerAuthenticator = new FederatedPeerAuthenticator(config.getFederationConfiguration());
@ -198,8 +199,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ReceiptSender receiptSender = new ReceiptSender(accountsManager, pushSender, federatedClientManager);
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
ContactDiscoveryQueueSender cdsSender = new ContactDiscoveryQueueSender(config.getDirectoryConfiguration().getSqsConfiguration());
DirectoryCredentialsGenerator directoryCredentialsGenerator = new DirectoryCredentialsGenerator(config.getDirectoryConfiguration().getDirectoryClientConfiguration().getUserAuthenticationTokenSharedSecret(),
config.getDirectoryConfiguration().getDirectoryClientConfiguration().getUserAuthenticationTokenUserIdSecret());
DirectoryReconciliationCache directoryReconciliationCache = new DirectoryReconciliationCache(cacheClient);
@ -232,8 +231,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.buildAuthFilter()));
environment.jersey().register(new AuthValueFactoryProvider.Binder());
environment.jersey().register(new AccountController(pendingAccountsManager, accountsManager, rateLimiters, smsSender, cdsSender, messagesManager, turnTokenGenerator, config.getTestDevices()));
environment.jersey().register(new DeviceController(pendingDevicesManager, accountsManager, messagesManager, cdsSender, rateLimiters, config.getMaxDevices()));
environment.jersey().register(new AccountController(pendingAccountsManager, accountsManager, rateLimiters, smsSender, directoryQueue, messagesManager, turnTokenGenerator, config.getTestDevices()));
environment.jersey().register(new DeviceController(pendingDevicesManager, accountsManager, messagesManager, directoryQueue, rateLimiters, config.getMaxDevices()));
environment.jersey().register(new DirectoryController(rateLimiters, directory, directoryCredentialsGenerator));
environment.jersey().register(new FederationControllerV1(accountsManager, attachmentController, messageController));
environment.jersey().register(new FederationControllerV2(accountsManager, attachmentController, messageController, keysController));

View File

@ -38,7 +38,7 @@ import org.whispersystems.textsecuregcm.entities.RegistrationLockFailure;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.sms.SmsSender;
import org.whispersystems.textsecuregcm.sms.TwilioSmsSender;
import org.whispersystems.textsecuregcm.sqs.ContactDiscoveryQueueSender;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@ -82,7 +82,7 @@ public class AccountController {
private final AccountsManager accounts;
private final RateLimiters rateLimiters;
private final SmsSender smsSender;
private final ContactDiscoveryQueueSender cdsSender;
private final DirectoryQueue directoryQueue;
private final MessagesManager messagesManager;
private final TurnTokenGenerator turnTokenGenerator;
private final Map<String, Integer> testDevices;
@ -91,7 +91,7 @@ public class AccountController {
AccountsManager accounts,
RateLimiters rateLimiters,
SmsSender smsSenderFactory,
ContactDiscoveryQueueSender cdsSender,
DirectoryQueue directoryQueue,
MessagesManager messagesManager,
TurnTokenGenerator turnTokenGenerator,
Map<String, Integer> testDevices)
@ -100,7 +100,7 @@ public class AccountController {
this.accounts = accounts;
this.rateLimiters = rateLimiters;
this.smsSender = smsSenderFactory;
this.cdsSender = cdsSender;
this.directoryQueue = directoryQueue;
this.messagesManager = messagesManager;
this.testDevices = testDevices;
this.turnTokenGenerator = turnTokenGenerator;
@ -245,7 +245,12 @@ public class AccountController {
Device device = account.getAuthenticatedDevice().get();
device.setGcmId(null);
device.setFetchesMessages(false);
accounts.update(account);
if (!account.isActive()) {
directoryQueue.deleteRegisteredUser(account.getNumber());
}
}
@Timed
@ -268,7 +273,12 @@ public class AccountController {
Device device = account.getAuthenticatedDevice().get();
device.setApnId(null);
device.setFetchesMessages(false);
accounts.update(account);
if (!account.isActive()) {
directoryQueue.deleteRegisteredUser(account.getNumber());
}
}
@Timed
@ -343,7 +353,8 @@ public class AccountController {
if (accounts.create(account)) {
newUserMeter.mark();
}
cdsSender.addRegisteredUser(number);
directoryQueue.addRegisteredUser(number);
messagesManager.clear(number);
pendingAccounts.remove(number);

View File

@ -1,5 +1,5 @@
/**
* Copyright (C) 2013 Open WhisperSystems
/*
* Copyright (C) 2013-2018 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@ -30,7 +30,7 @@ import org.whispersystems.textsecuregcm.entities.DeviceInfo;
import org.whispersystems.textsecuregcm.entities.DeviceInfoList;
import org.whispersystems.textsecuregcm.entities.DeviceResponse;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.sqs.ContactDiscoveryQueueSender;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@ -51,7 +51,6 @@ import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.LinkedList;
import java.util.List;
@ -71,20 +70,19 @@ public class DeviceController {
private final MessagesManager messages;
private final RateLimiters rateLimiters;
private final Map<String, Integer> maxDeviceConfiguration;
private final ContactDiscoveryQueueSender cdsSender;
private final DirectoryQueue directoryQueue;
public DeviceController(PendingDevicesManager pendingDevices,
AccountsManager accounts,
MessagesManager messages,
ContactDiscoveryQueueSender cdsSender,
DirectoryQueue directoryQueue,
RateLimiters rateLimiters,
Map<String, Integer> maxDeviceConfiguration)
{
this.pendingDevices = pendingDevices;
this.accounts = accounts;
this.messages = messages;
this.cdsSender = cdsSender;
this.directoryQueue = directoryQueue;
this.rateLimiters = rateLimiters;
this.maxDeviceConfiguration = maxDeviceConfiguration;
}
@ -113,9 +111,11 @@ public class DeviceController {
account.removeDevice(deviceId);
accounts.update(account);
if (!account.isActive()) {
cdsSender.deleteRegisteredUser(account.getNumber());
directoryQueue.deleteRegisteredUser(account.getNumber());
}
messages.clear(account.getNumber(), deviceId);
}

View File

@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.gcm.server.Message;
import org.whispersystems.gcm.server.Result;
import org.whispersystems.gcm.server.Sender;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@ -46,17 +47,20 @@ public class GCMSender implements Managed {
private final AccountsManager accountsManager;
private final Sender signalSender;
private final DirectoryQueue directoryQueue;
private ExecutorService executor;
public GCMSender(AccountsManager accountsManager, String signalKey) {
public GCMSender(AccountsManager accountsManager, String signalKey, DirectoryQueue directoryQueue) {
this.accountsManager = accountsManager;
this.signalSender = new Sender(signalKey, 50);
this.directoryQueue = directoryQueue;
}
@VisibleForTesting
public GCMSender(AccountsManager accountsManager, Sender sender, ExecutorService executor) {
public GCMSender(AccountsManager accountsManager, Sender sender, DirectoryQueue directoryQueue, ExecutorService executor) {
this.accountsManager = accountsManager;
this.signalSender = sender;
this.directoryQueue = directoryQueue;
this.executor = executor;
}
@ -115,6 +119,10 @@ public class GCMSender implements Managed {
device.setFetchesMessages(false);
accountsManager.update(account.get());
if (!account.get().isActive()) {
directoryQueue.deleteRegisteredUser(account.get().getNumber());
}
}
unregistered.mark();

View File

@ -23,15 +23,13 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration;
import org.whispersystems.textsecuregcm.configuration.SqsConfiguration;
import org.whispersystems.textsecuregcm.util.Constants;
@ -40,18 +38,18 @@ import java.util.Map;
import static com.codahale.metrics.MetricRegistry.name;
public class ContactDiscoveryQueueSender {
public class DirectoryQueue {
private static final Logger logger = LoggerFactory.getLogger(ContactDiscoveryQueueSender.class);
private static final Logger logger = LoggerFactory.getLogger(DirectoryQueue.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter serviceErrorMeter = metricRegistry.meter(name(ContactDiscoveryQueueSender.class, "serviceError"));
private final Meter clientErrorMeter = metricRegistry.meter(name(ContactDiscoveryQueueSender.class, "clientError"));
private final Meter serviceErrorMeter = metricRegistry.meter(name(DirectoryQueue.class, "serviceError"));
private final Meter clientErrorMeter = metricRegistry.meter(name(DirectoryQueue.class, "clientError"));
private final String queueUrl;
private final AmazonSQS sqs;
public ContactDiscoveryQueueSender(SqsConfiguration sqsConfig) {
public DirectoryQueue(SqsConfiguration sqsConfig) {
final AWSCredentials credentials = new BasicAWSCredentials(sqsConfig.getAccessKey(), sqsConfig.getAccessSecret());
final AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);

View File

@ -18,7 +18,7 @@ import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper;
import org.whispersystems.textsecuregcm.providers.TimeProvider;
import org.whispersystems.textsecuregcm.sms.SmsSender;
import org.whispersystems.textsecuregcm.sqs.ContactDiscoveryQueueSender;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
@ -50,7 +50,7 @@ public class AccountControllerTest {
private RateLimiter rateLimiter = mock(RateLimiter.class );
private RateLimiter pinLimiter = mock(RateLimiter.class );
private SmsSender smsSender = mock(SmsSender.class );
private ContactDiscoveryQueueSender cdsSender = mock(ContactDiscoveryQueueSender.class);
private DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
private MessagesManager storedMessages = mock(MessagesManager.class );
private TimeProvider timeProvider = mock(TimeProvider.class );
private TurnTokenGenerator turnTokenGenerator = mock(TurnTokenGenerator.class);
@ -67,7 +67,7 @@ public class AccountControllerTest {
accountsManager,
rateLimiters,
smsSender,
cdsSender,
directoryQueue,
storedMessages,
turnTokenGenerator,
new HashMap<>()))
@ -140,7 +140,7 @@ public class AccountControllerTest {
assertThat(response.getStatus()).isEqualTo(204);
verify(accountsManager, times(1)).create(isA(Account.class));
verify(cdsSender, times(1)).addRegisteredUser(eq(SENDER));
verify(directoryQueue, times(1)).addRegisteredUser(eq(SENDER));
}
@Test

View File

@ -29,7 +29,7 @@ import org.whispersystems.textsecuregcm.entities.DeviceResponse;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.mappers.DeviceLimitExceededExceptionMapper;
import org.whispersystems.textsecuregcm.sqs.ContactDiscoveryQueueSender;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.*;
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
import org.whispersystems.textsecuregcm.util.VerificationCode;
@ -53,7 +53,7 @@ public class DeviceControllerTest {
public DumbVerificationDeviceController(PendingDevicesManager pendingDevices,
AccountsManager accounts,
MessagesManager messages,
ContactDiscoveryQueueSender cdsSender,
DirectoryQueue cdsSender,
RateLimiters rateLimiters,
Map<String, Integer> deviceConfiguration)
{
@ -69,7 +69,7 @@ public class DeviceControllerTest {
private PendingDevicesManager pendingDevicesManager = mock(PendingDevicesManager.class);
private AccountsManager accountsManager = mock(AccountsManager.class );
private MessagesManager messagesManager = mock(MessagesManager.class);
private ContactDiscoveryQueueSender cdsSender = mock(ContactDiscoveryQueueSender.class);
private DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
private RateLimiters rateLimiters = mock(RateLimiters.class );
private RateLimiter rateLimiter = mock(RateLimiter.class );
private Account account = mock(Account.class );
@ -89,7 +89,7 @@ public class DeviceControllerTest {
.addResource(new DumbVerificationDeviceController(pendingDevicesManager,
accountsManager,
messagesManager,
cdsSender,
directoryQueue,
rateLimiters,
deviceConfiguration))
.build();
@ -211,6 +211,6 @@ public class DeviceControllerTest {
.delete();
assertEquals(204, response.getStatus());
verify(cdsSender).deleteRegisteredUser(eq(AuthHelper.VALID_NUMBER));
verify(directoryQueue).deleteRegisteredUser(eq(AuthHelper.VALID_NUMBER));
}
}

View File

@ -9,6 +9,7 @@ import org.whispersystems.gcm.server.Result;
import org.whispersystems.gcm.server.Sender;
import org.whispersystems.textsecuregcm.push.GCMSender;
import org.whispersystems.textsecuregcm.push.GcmMessage;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@ -24,6 +25,7 @@ public class GCMSenderTest {
AccountsManager accountsManager = mock(AccountsManager.class);
Sender sender = mock(Sender.class );
Result successResult = mock(Result.class );
DirectoryQueue directoryQueue = mock(DirectoryQueue.class );
SynchronousExecutorService executorService = new SynchronousExecutorService();
when(successResult.isInvalidRegistrationId()).thenReturn(false);
@ -32,7 +34,7 @@ public class GCMSenderTest {
when(successResult.isSuccess()).thenReturn(true);
GcmMessage message = new GcmMessage("foo", "+12223334444", 1, false);
GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
GCMSender gcmSender = new GCMSender(accountsManager, sender, directoryQueue, executorService);
SettableFuture<Result> successFuture = SettableFuture.create();
successFuture.set(successResult);
@ -53,6 +55,7 @@ public class GCMSenderTest {
AccountsManager accountsManager = mock(AccountsManager.class);
Sender sender = mock(Sender.class );
Result invalidResult = mock(Result.class );
DirectoryQueue directoryQueue = mock(DirectoryQueue.class );
SynchronousExecutorService executorService = new SynchronousExecutorService();
Account destinationAccount = mock(Account.class);
@ -68,7 +71,7 @@ public class GCMSenderTest {
when(invalidResult.isSuccess()).thenReturn(true);
GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, false);
GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
GCMSender gcmSender = new GCMSender(accountsManager, sender, directoryQueue, executorService);
SettableFuture<Result> invalidFuture = SettableFuture.create();
invalidFuture.set(invalidResult);
@ -95,8 +98,9 @@ public class GCMSenderTest {
Result canonicalResult = mock(Result.class );
SynchronousExecutorService executorService = new SynchronousExecutorService();
Account destinationAccount = mock(Account.class);
Device destinationDevice = mock(Device.class );
Account destinationAccount = mock(Account.class );
Device destinationDevice = mock(Device.class );
DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
when(destinationAccount.getDevice(1)).thenReturn(Optional.of(destinationDevice));
when(accountsManager.get(destinationNumber)).thenReturn(Optional.of(destinationAccount));
@ -109,7 +113,7 @@ public class GCMSenderTest {
when(canonicalResult.getCanonicalRegistrationId()).thenReturn(canonicalId);
GcmMessage message = new GcmMessage(gcmId, destinationNumber, 1, false);
GCMSender gcmSender = new GCMSender(accountsManager, sender, executorService);
GCMSender gcmSender = new GCMSender(accountsManager, sender, directoryQueue, executorService);
SettableFuture<Result> invalidFuture = SettableFuture.create();
invalidFuture.set(canonicalResult);