Mark accounts as inactive if no device has been seen for a year.
// FREEBIE
This commit is contained in:
parent
c410348278
commit
d95ca5f9e4
|
@ -225,7 +225,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
if (config.getWebsocketConfiguration().isEnabled()) {
|
||||
WebSocketEnvironment webSocketEnvironment = new WebSocketEnvironment(environment, config, 90000);
|
||||
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(deviceAuthenticator));
|
||||
webSocketEnvironment.setConnectListener(new AuthenticatedConnectListener(accountsManager, pushSender, receiptSender, messagesManager, pubSubManager, apnFallbackManager));
|
||||
webSocketEnvironment.setConnectListener(new AuthenticatedConnectListener(accountsManager, pushSender, receiptSender, messagesManager, pubSubManager));
|
||||
webSocketEnvironment.jersey().register(new KeepAliveController(pubSubManager));
|
||||
|
||||
WebSocketEnvironment provisioningEnvironment = new WebSocketEnvironment(environment, config);
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.base.Optional;
|
|||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class Account {
|
||||
|
||||
|
@ -105,7 +106,8 @@ public class Account {
|
|||
public boolean isActive() {
|
||||
return
|
||||
getMasterDevice().isPresent() &&
|
||||
getMasterDevice().get().isActive();
|
||||
getMasterDevice().get().isActive() &&
|
||||
getLastSeen() > (System.currentTimeMillis() - TimeUnit.DAYS.toMillis(365));
|
||||
}
|
||||
|
||||
public long getNextDeviceId() {
|
||||
|
@ -147,4 +149,16 @@ public class Account {
|
|||
public String getIdentityKey() {
|
||||
return identityKey;
|
||||
}
|
||||
|
||||
public long getLastSeen() {
|
||||
long lastSeen = 0;
|
||||
|
||||
for (Device device : devices) {
|
||||
if (device.getLastSeen() > lastSeen) {
|
||||
lastSeen = device.getLastSeen();
|
||||
}
|
||||
}
|
||||
|
||||
return lastSeen;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
package org.whispersystems.textsecuregcm.websocket;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
|
@ -13,7 +12,6 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
|||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
@ -24,11 +22,10 @@ import static com.codahale.metrics.MetricRegistry.name;
|
|||
|
||||
public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private static final Histogram durationHistogram = metricRegistry.histogram(name(WebSocketConnection.class, "connected_duration"));
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private static final Timer durationTimer = metricRegistry.timer(name(WebSocketConnection.class, "connected_duration"));
|
||||
|
||||
private final ApnFallbackManager apnFallbackManager;
|
||||
private final AccountsManager accountsManager;
|
||||
private final PushSender pushSender;
|
||||
private final ReceiptSender receiptSender;
|
||||
|
@ -37,26 +34,25 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
|||
|
||||
public AuthenticatedConnectListener(AccountsManager accountsManager, PushSender pushSender,
|
||||
ReceiptSender receiptSender, MessagesManager messagesManager,
|
||||
PubSubManager pubSubManager, ApnFallbackManager apnFallbackManager)
|
||||
PubSubManager pubSubManager)
|
||||
{
|
||||
this.accountsManager = accountsManager;
|
||||
this.pushSender = pushSender;
|
||||
this.receiptSender = receiptSender;
|
||||
this.messagesManager = messagesManager;
|
||||
this.pubSubManager = pubSubManager;
|
||||
this.apnFallbackManager = apnFallbackManager;
|
||||
this.accountsManager = accountsManager;
|
||||
this.pushSender = pushSender;
|
||||
this.receiptSender = receiptSender;
|
||||
this.messagesManager = messagesManager;
|
||||
this.pubSubManager = pubSubManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(WebSocketSessionContext context) {
|
||||
final Account account = context.getAuthenticated(Account.class);
|
||||
final Device device = account.getAuthenticatedDevice().get();
|
||||
final long connectTime = System.currentTimeMillis();
|
||||
final WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
||||
final WebSocketConnectionInfo info = new WebSocketConnectionInfo(address);
|
||||
final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender,
|
||||
messagesManager, account, device,
|
||||
context.getClient());
|
||||
final Account account = context.getAuthenticated(Account.class);
|
||||
final Device device = account.getAuthenticatedDevice().get();
|
||||
final Timer.Context timer = durationTimer.time();
|
||||
final WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
||||
final WebSocketConnectionInfo info = new WebSocketConnectionInfo(address);
|
||||
final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender,
|
||||
messagesManager, account, device,
|
||||
context.getClient());
|
||||
|
||||
pubSubManager.publish(info, PubSubMessage.newBuilder().setType(PubSubMessage.Type.CONNECTED).build());
|
||||
updateLastSeen(account, device);
|
||||
|
@ -66,7 +62,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
|||
@Override
|
||||
public void onWebSocketClose(WebSocketSessionContext context, int statusCode, String reason) {
|
||||
pubSubManager.unsubscribe(address, connection);
|
||||
durationHistogram.update(System.currentTimeMillis() - connectTime);
|
||||
timer.stop();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -76,14 +76,14 @@ public class DirectoryCommand extends EnvironmentCommand<WhisperServerConfigurat
|
|||
JedisPool redisClient = new RedisClientFactory(configuration.getDirectoryConfiguration().getUrl()).getRedisClientPool();
|
||||
DirectoryManager directory = new DirectoryManager(redisClient);
|
||||
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);
|
||||
FederatedClientManager federatedClientManager = new FederatedClientManager(environment,
|
||||
configuration.getJerseyClientConfiguration(),
|
||||
configuration.getFederationConfiguration());
|
||||
// FederatedClientManager federatedClientManager = new FederatedClientManager(environment,
|
||||
// configuration.getJerseyClientConfiguration(),
|
||||
// configuration.getFederationConfiguration());
|
||||
|
||||
DirectoryUpdater update = new DirectoryUpdater(accountsManager, federatedClientManager, directory);
|
||||
DirectoryUpdater update = new DirectoryUpdater(accountsManager, directory);
|
||||
|
||||
update.updateFromLocalDatabase();
|
||||
update.updateFromPeers();
|
||||
// update.updateFromPeers();
|
||||
} catch (Exception ex) {
|
||||
logger.warn("Directory Exception", ex);
|
||||
throw new RuntimeException(ex);
|
||||
|
|
|
@ -42,15 +42,11 @@ public class DirectoryUpdater {
|
|||
private final Logger logger = LoggerFactory.getLogger(DirectoryUpdater.class);
|
||||
|
||||
private final AccountsManager accountsManager;
|
||||
private final FederatedClientManager federatedClientManager;
|
||||
private final DirectoryManager directory;
|
||||
|
||||
public DirectoryUpdater(AccountsManager accountsManager,
|
||||
FederatedClientManager federatedClientManager,
|
||||
DirectoryManager directory)
|
||||
public DirectoryUpdater(AccountsManager accountsManager, DirectoryManager directory)
|
||||
{
|
||||
this.accountsManager = accountsManager;
|
||||
this.federatedClientManager = federatedClientManager;
|
||||
this.directory = directory;
|
||||
}
|
||||
|
||||
|
@ -91,75 +87,75 @@ public class DirectoryUpdater {
|
|||
logger.info(String.format("Local directory is updated (%d added, %d removed).", contactsAdded, contactsRemoved));
|
||||
}
|
||||
|
||||
public void updateFromPeers() {
|
||||
logger.info("Updating peer directories.");
|
||||
|
||||
int contactsAdded = 0;
|
||||
int contactsRemoved = 0;
|
||||
List<FederatedClient> clients = federatedClientManager.getClients();
|
||||
|
||||
for (FederatedClient client : clients) {
|
||||
logger.info("Updating directory from peer: " + client.getPeerName());
|
||||
|
||||
int userCount = client.getUserCount();
|
||||
int retrieved = 0;
|
||||
|
||||
logger.info("Remote peer user count: " + userCount);
|
||||
|
||||
while (retrieved < userCount) {
|
||||
logger.info("Retrieving remote tokens...");
|
||||
List<ClientContact> remoteContacts = client.getUserTokens(retrieved);
|
||||
List<PendingClientContact> localContacts = new LinkedList<>();
|
||||
BatchOperationHandle handle = directory.startBatchOperation();
|
||||
|
||||
if (remoteContacts == null) {
|
||||
logger.info("Remote tokens empty, ending...");
|
||||
break;
|
||||
} else {
|
||||
logger.info("Retrieved " + remoteContacts.size() + " remote tokens...");
|
||||
}
|
||||
|
||||
for (ClientContact remoteContact : remoteContacts) {
|
||||
localContacts.add(directory.get(handle, remoteContact.getToken()));
|
||||
}
|
||||
|
||||
directory.stopBatchOperation(handle);
|
||||
|
||||
handle = directory.startBatchOperation();
|
||||
Iterator<ClientContact> remoteContactIterator = remoteContacts.iterator();
|
||||
Iterator<PendingClientContact> localContactIterator = localContacts.iterator();
|
||||
|
||||
while (remoteContactIterator.hasNext() && localContactIterator.hasNext()) {
|
||||
try {
|
||||
ClientContact remoteContact = remoteContactIterator.next();
|
||||
Optional<ClientContact> localContact = localContactIterator.next().get();
|
||||
|
||||
remoteContact.setRelay(client.getPeerName());
|
||||
|
||||
if (!remoteContact.isInactive() && (!localContact.isPresent() || client.getPeerName().equals(localContact.get().getRelay()))) {
|
||||
contactsAdded++;
|
||||
directory.add(handle, remoteContact);
|
||||
} else {
|
||||
if (localContact.isPresent() && client.getPeerName().equals(localContact.get().getRelay())) {
|
||||
contactsRemoved++;
|
||||
directory.remove(handle, remoteContact.getToken());
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("JSON Serialization Failed: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
directory.stopBatchOperation(handle);
|
||||
|
||||
retrieved += remoteContacts.size();
|
||||
logger.info("Processed: " + retrieved + " remote tokens.");
|
||||
}
|
||||
|
||||
logger.info("Update from peer complete.");
|
||||
}
|
||||
|
||||
logger.info("Update from peer directories complete.");
|
||||
logger.info(String.format("Added %d and removed %d remove contacts.", contactsAdded, contactsRemoved));
|
||||
}
|
||||
// public void updateFromPeers() {
|
||||
// logger.info("Updating peer directories.");
|
||||
//
|
||||
// int contactsAdded = 0;
|
||||
// int contactsRemoved = 0;
|
||||
// List<FederatedClient> clients = federatedClientManager.getClients();
|
||||
//
|
||||
// for (FederatedClient client : clients) {
|
||||
// logger.info("Updating directory from peer: " + client.getPeerName());
|
||||
//
|
||||
// int userCount = client.getUserCount();
|
||||
// int retrieved = 0;
|
||||
//
|
||||
// logger.info("Remote peer user count: " + userCount);
|
||||
//
|
||||
// while (retrieved < userCount) {
|
||||
// logger.info("Retrieving remote tokens...");
|
||||
// List<ClientContact> remoteContacts = client.getUserTokens(retrieved);
|
||||
// List<PendingClientContact> localContacts = new LinkedList<>();
|
||||
// BatchOperationHandle handle = directory.startBatchOperation();
|
||||
//
|
||||
// if (remoteContacts == null) {
|
||||
// logger.info("Remote tokens empty, ending...");
|
||||
// break;
|
||||
// } else {
|
||||
// logger.info("Retrieved " + remoteContacts.size() + " remote tokens...");
|
||||
// }
|
||||
//
|
||||
// for (ClientContact remoteContact : remoteContacts) {
|
||||
// localContacts.add(directory.get(handle, remoteContact.getToken()));
|
||||
// }
|
||||
//
|
||||
// directory.stopBatchOperation(handle);
|
||||
//
|
||||
// handle = directory.startBatchOperation();
|
||||
// Iterator<ClientContact> remoteContactIterator = remoteContacts.iterator();
|
||||
// Iterator<PendingClientContact> localContactIterator = localContacts.iterator();
|
||||
//
|
||||
// while (remoteContactIterator.hasNext() && localContactIterator.hasNext()) {
|
||||
// try {
|
||||
// ClientContact remoteContact = remoteContactIterator.next();
|
||||
// Optional<ClientContact> localContact = localContactIterator.next().get();
|
||||
//
|
||||
// remoteContact.setRelay(client.getPeerName());
|
||||
//
|
||||
// if (!remoteContact.isInactive() && (!localContact.isPresent() || client.getPeerName().equals(localContact.get().getRelay()))) {
|
||||
// contactsAdded++;
|
||||
// directory.add(handle, remoteContact);
|
||||
// } else {
|
||||
// if (localContact.isPresent() && client.getPeerName().equals(localContact.get().getRelay())) {
|
||||
// contactsRemoved++;
|
||||
// directory.remove(handle, remoteContact.getToken());
|
||||
// }
|
||||
// }
|
||||
// } catch (IOException e) {
|
||||
// logger.warn("JSON Serialization Failed: ", e);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// directory.stopBatchOperation(handle);
|
||||
//
|
||||
// retrieved += remoteContacts.size();
|
||||
// logger.info("Processed: " + retrieved + " remote tokens.");
|
||||
// }
|
||||
//
|
||||
// logger.info("Update from peer complete.");
|
||||
// }
|
||||
//
|
||||
// logger.info("Update from peer directories complete.");
|
||||
// logger.info(String.format("Added %d and removed %d remove contacts.", contactsAdded, contactsRemoved));
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
package org.whispersystems.textsecuregcm.tests.storage;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class AccountTest {
|
||||
|
||||
private final Device oldMasterDevice = mock(Device.class);
|
||||
private final Device recentMasterDevice = mock(Device.class);
|
||||
private final Device agingSecondaryDevice = mock(Device.class);
|
||||
private final Device recentSecondaryDevice = mock(Device.class);
|
||||
private final Device oldSecondaryDevice = mock(Device.class);
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
when(oldMasterDevice.getLastSeen()).thenReturn(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(366));
|
||||
when(oldMasterDevice.isActive()).thenReturn(true);
|
||||
when(oldMasterDevice.getId()).thenReturn(Device.MASTER_ID);
|
||||
|
||||
when(recentMasterDevice.getLastSeen()).thenReturn(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1));
|
||||
when(recentMasterDevice.isActive()).thenReturn(true);
|
||||
when(recentMasterDevice.getId()).thenReturn(Device.MASTER_ID);
|
||||
|
||||
when(agingSecondaryDevice.getLastSeen()).thenReturn(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(31));
|
||||
when(agingSecondaryDevice.isActive()).thenReturn(false);
|
||||
when(agingSecondaryDevice.getId()).thenReturn(2L);
|
||||
|
||||
when(recentSecondaryDevice.getLastSeen()).thenReturn(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1));
|
||||
when(recentSecondaryDevice.isActive()).thenReturn(true);
|
||||
when(recentSecondaryDevice.getId()).thenReturn(2L);
|
||||
|
||||
when(oldSecondaryDevice.getLastSeen()).thenReturn(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(366));
|
||||
when(oldSecondaryDevice.isActive()).thenReturn(false);
|
||||
when(oldSecondaryDevice.getId()).thenReturn(2L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAccountActive() {
|
||||
Account recentAccount = new Account("+14152222222", new HashSet<Device>() {{
|
||||
add(recentMasterDevice);
|
||||
add(recentSecondaryDevice);
|
||||
}});
|
||||
|
||||
assertTrue(recentAccount.isActive());
|
||||
|
||||
Account oldSecondaryAccount = new Account("+14152222222", new HashSet<Device>() {{
|
||||
add(recentMasterDevice);
|
||||
add(agingSecondaryDevice);
|
||||
}});
|
||||
|
||||
assertTrue(oldSecondaryAccount.isActive());
|
||||
|
||||
Account agingPrimaryAccount = new Account("+14152222222", new HashSet<Device>() {{
|
||||
add(oldMasterDevice);
|
||||
add(agingSecondaryDevice);
|
||||
}});
|
||||
|
||||
assertTrue(agingPrimaryAccount.isActive());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAccountInactive() {
|
||||
Account oldPrimaryAccount = new Account("+14152222222", new HashSet<Device>() {{
|
||||
add(oldMasterDevice);
|
||||
add(oldSecondaryDevice);
|
||||
}});
|
||||
|
||||
assertFalse(oldPrimaryAccount.isActive());
|
||||
}
|
||||
|
||||
}
|
|
@ -10,7 +10,6 @@ import org.mockito.stubbing.Answer;
|
|||
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||
import org.whispersystems.textsecuregcm.push.WebsocketSender;
|
||||
|
@ -58,13 +57,12 @@ public class WebSocketConnectionTest {
|
|||
private static final UpgradeRequest upgradeRequest = mock(UpgradeRequest.class );
|
||||
private static final PushSender pushSender = mock(PushSender.class);
|
||||
private static final ReceiptSender receiptSender = mock(ReceiptSender.class);
|
||||
private static final ApnFallbackManager apnFallbackManager = mock(ApnFallbackManager.class);
|
||||
|
||||
@Test
|
||||
public void testCredentials() throws Exception {
|
||||
MessagesManager storedMessages = mock(MessagesManager.class);
|
||||
WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator);
|
||||
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(accountsManager, pushSender, receiptSender, storedMessages, pubSubManager, apnFallbackManager);
|
||||
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(accountsManager, pushSender, receiptSender, storedMessages, pubSubManager);
|
||||
WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);
|
||||
|
||||
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
|
||||
|
|
Loading…
Reference in New Issue