Updated iOS message delivery.

1) Use WebSockets for delivery if a client is connected.

2) If a client isn't connected, write to a redis queue and send
   an APN push.
This commit is contained in:
Moxie Marlinspike 2014-05-07 17:21:07 -07:00
parent b433b9c879
commit 7a33cef27e
11 changed files with 272 additions and 157 deletions

View File

@ -34,7 +34,6 @@ import org.whispersystems.textsecuregcm.controllers.DirectoryController;
import org.whispersystems.textsecuregcm.controllers.FederationController;
import org.whispersystems.textsecuregcm.controllers.KeysController;
import org.whispersystems.textsecuregcm.controllers.MessageController;
import org.whispersystems.textsecuregcm.controllers.WebsocketControllerFactory;
import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
import org.whispersystems.textsecuregcm.federation.FederatedPeer;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
@ -63,10 +62,10 @@ import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
import org.whispersystems.textsecuregcm.storage.PendingDevices;
import org.whispersystems.textsecuregcm.storage.PendingDevicesManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.StoredMessageManager;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.UrlSigner;
import org.whispersystems.textsecuregcm.websocket.WebsocketControllerFactory;
import org.whispersystems.textsecuregcm.workers.DirectoryCommand;
import javax.servlet.DispatcherType;
@ -121,18 +120,17 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
PendingAccounts pendingAccounts = jdbi.onDemand(PendingAccounts.class);
PendingDevices pendingDevices = jdbi.onDemand(PendingDevices.class);
Keys keys = jdbi.onDemand(Keys.class);
StoredMessages storedMessages = jdbi.onDemand(StoredMessages.class );
MemcachedClient memcachedClient = new MemcachedClientFactory(config.getMemcacheConfiguration()).getClient();
JedisPool redisClient = new RedisClientFactory(config.getRedisConfiguration()).getRedisClientPool();
DirectoryManager directory = new DirectoryManager(redisClient);
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, memcachedClient);
PendingDevicesManager pendingDevicesManager = new PendingDevicesManager(pendingDevices, memcachedClient);
AccountsManager accountsManager = new AccountsManager(accounts, directory, memcachedClient);
FederatedClientManager federatedClientManager = new FederatedClientManager(config.getFederationConfiguration());
PubSubManager pubSubManager = new PubSubManager(redisClient);
StoredMessageManager storedMessageManager = new StoredMessageManager(storedMessages, pubSubManager);
DirectoryManager directory = new DirectoryManager(redisClient);
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, memcachedClient);
PendingDevicesManager pendingDevicesManager = new PendingDevicesManager (pendingDevices, memcachedClient );
AccountsManager accountsManager = new AccountsManager(accounts, directory, memcachedClient);
FederatedClientManager federatedClientManager = new FederatedClientManager(config.getFederationConfiguration());
StoredMessages storedMessages = new StoredMessages(redisClient);
PubSubManager pubSubManager = new PubSubManager(redisClient);
AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), memcachedClient);
@ -143,7 +141,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
UrlSigner urlSigner = new UrlSigner(config.getS3Configuration());
PushSender pushSender = new PushSender(config.getGcmConfiguration(),
config.getApnConfiguration(),
storedMessageManager,
storedMessages, pubSubManager,
accountsManager);
AttachmentController attachmentController = new AttachmentController(rateLimiters, federatedClientManager, urlSigner);
@ -165,7 +163,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
if (config.getWebsocketConfiguration().isEnabled()) {
WebsocketControllerFactory servlet = new WebsocketControllerFactory(deviceAuthenticator,
storedMessageManager,
pushSender,
storedMessages,
pubSubManager);
ServletRegistration.Dynamic websocket = environment.servlets().addServlet("WebSocket", servlet);

View File

@ -10,13 +10,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.entities.AcknowledgeWebsocketMessage;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.entities.IncomingWebsocketMessage;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.PubSubListener;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.PubSubMessage;
import org.whispersystems.textsecuregcm.storage.StoredMessageManager;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import org.whispersystems.textsecuregcm.websocket.WebsocketMessage;
@ -37,22 +41,26 @@ public class WebsocketController implements WebSocketListener, PubSubListener {
private static final Map<Long, String> pendingMessages = new HashMap<>();
private final AccountAuthenticator accountAuthenticator;
private final StoredMessageManager storedMessageManager;
private final PubSubManager pubSubManager;
private final StoredMessages storedMessages;
private final PushSender pushSender;
private Account account;
private Device device;
private Session session;
private WebsocketAddress address;
private Account account;
private Device device;
private Session session;
private long pendingMessageSequence;
public WebsocketController(AccountAuthenticator accountAuthenticator,
StoredMessageManager storedMessageManager,
PubSubManager pubSubManager)
PushSender pushSender,
PubSubManager pubSubManager,
StoredMessages storedMessages)
{
this.accountAuthenticator = accountAuthenticator;
this.storedMessageManager = storedMessageManager;
this.pushSender = pushSender;
this.pubSubManager = pubSubManager;
this.storedMessages = storedMessages;
}
@Override
@ -80,11 +88,11 @@ public class WebsocketController implements WebSocketListener, PubSubListener {
this.account = account.get();
this.device = account.get().getAuthenticatedDevice().get();
this.address = new WebsocketAddress(this.account.getId(), this.device.getId());
this.session = session;
this.session.setIdleTimeout(10 * 60 * 1000);
this.pubSubManager.subscribe(new WebsocketAddress(this.account.getId(),
this.device.getId()), this);
this.pubSubManager.subscribe(this.address, this);
handleQueryDatabase();
} catch (AuthenticationException e) {
@ -114,7 +122,7 @@ public class WebsocketController implements WebSocketListener, PubSubListener {
@Override
public void onWebSocketClose(int i, String s) {
pubSubManager.unsubscribe(new WebsocketAddress(account.getId(), device.getId()), this);
pubSubManager.unsubscribe(this.address, this);
List<String> remainingMessages = new LinkedList<>();
@ -129,7 +137,14 @@ public class WebsocketController implements WebSocketListener, PubSubListener {
pendingMessages.clear();
}
storedMessageManager.storeMessages(account, device, remainingMessages);
for (String remainingMessage : remainingMessages) {
try {
pushSender.sendMessage(account, device, new EncryptedOutgoingMessage(remainingMessage));
} catch (NotPushRegisteredException | TransientPushFailureException e) {
logger.warn("onWebSocketClose", e);
storedMessages.insert(account.getId(), device.getId(), remainingMessage);
}
}
}
@ -177,7 +192,7 @@ public class WebsocketController implements WebSocketListener, PubSubListener {
}
private void handleQueryDatabase() {
List<String> messages = storedMessageManager.getOutgoingMessages(account, device);
List<String> messages = storedMessages.getMessagesForDevice(account.getId(), device.getId());
for (String message : messages) {
handleDeliverOutgoingMessage(message);

View File

@ -41,23 +41,26 @@ public class EncryptedOutgoingMessage {
private static final int MAC_KEY_SIZE = 20;
private static final int MAC_SIZE = 10;
private final OutgoingMessageSignal outgoingMessage;
private final String signalingKey;
private final String serialized;
public EncryptedOutgoingMessage(OutgoingMessageSignal outgoingMessage,
String signalingKey)
throws CryptoEncodingException
{
this.outgoingMessage = outgoingMessage;
this.signalingKey = signalingKey;
}
public String serialize() throws CryptoEncodingException {
byte[] plaintext = outgoingMessage.toByteArray();
SecretKeySpec cipherKey = getCipherKey (signalingKey);
SecretKeySpec macKey = getMacKey(signalingKey);
byte[] ciphertext = getCiphertext(plaintext, cipherKey, macKey);
return Base64.encodeBytes(ciphertext);
this.serialized = Base64.encodeBytes(ciphertext);
}
public EncryptedOutgoingMessage(String serialized) {
this.serialized = serialized;
}
public String serialize() {
return serialized;
}
private byte[] getCiphertext(byte[] plaintext, SecretKeySpec cipherKey, SecretKeySpec macKey)

View File

@ -26,10 +26,15 @@ import com.notnoop.exceptions.NetworkIOException;
import org.bouncycastle.openssl.PEMReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.PubSubMessage;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -48,17 +53,25 @@ import static com.codahale.metrics.MetricRegistry.name;
public class APNSender {
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter success = metricRegistry.meter(name(getClass(), "sent", "success"));
private final Meter failure = metricRegistry.meter(name(getClass(), "sent", "failure"));
private final Meter websocketMeter = metricRegistry.meter(name(getClass(), "websocket"));
private final Meter pushMeter = metricRegistry.meter(name(getClass(), "push"));
private final Meter failureMeter = metricRegistry.meter(name(getClass(), "failure"));
private final Logger logger = LoggerFactory.getLogger(APNSender.class);
private static final String MESSAGE_BODY = "m";
private final Optional<ApnsService> apnService;
private final PubSubManager pubSubManager;
private final StoredMessages storedMessages;
public APNSender(String apnCertificate, String apnKey)
public APNSender(PubSubManager pubSubManager,
StoredMessages storedMessages,
String apnCertificate, String apnKey)
throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException
{
this.pubSubManager = pubSubManager;
this.storedMessages = storedMessages;
if (!Util.isEmpty(apnCertificate) && !Util.isEmpty(apnKey)) {
byte[] keyStore = initializeKeyStore(apnCertificate, apnKey);
this.apnService = Optional.of(APNS.newService()
@ -69,31 +82,44 @@ public class APNSender {
}
}
public void sendMessage(String registrationId, EncryptedOutgoingMessage message)
public void sendMessage(Account account, Device device,
String registrationId, EncryptedOutgoingMessage message)
throws TransientPushFailureException, NotPushRegisteredException
{
if (pubSubManager.publish(new WebsocketAddress(account.getId(), device.getId()),
new PubSubMessage(PubSubMessage.TYPE_DELIVER, message.serialize())))
{
websocketMeter.mark();
} else {
storedMessages.insert(account.getId(), device.getId(), message.serialize());
sendPush(registrationId, message.serialize());
}
}
private void sendPush(String registrationId, String message)
throws TransientPushFailureException
{
try {
if (!apnService.isPresent()) {
failure.mark();
failureMeter.mark();
throw new TransientPushFailureException("APN access not configured!");
}
String payload = APNS.newPayload()
.alertBody("Message!")
.customField(MESSAGE_BODY, message.serialize())
.customField(MESSAGE_BODY, message)
.build();
logger.debug("APN Payload: " + payload);
apnService.get().push(registrationId, payload);
success.mark();
pushMeter.mark();
} catch (NetworkIOException nioe) {
logger.warn("Network Error", nioe);
failure.mark();
failureMeter.mark();
throw new TransientPushFailureException(nioe);
} catch (CryptoEncodingException e) {
throw new NotPushRegisteredException(e);
}
}
private static byte[] initializeKeyStore(String pemCertificate, String pemKey)

View File

@ -23,7 +23,6 @@ import com.google.android.gcm.server.Constants;
import com.google.android.gcm.server.Message;
import com.google.android.gcm.server.Result;
import com.google.android.gcm.server.Sender;
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import java.io.IOException;
@ -65,8 +64,6 @@ public class GCMSender {
}
} catch (IOException e) {
throw new TransientPushFailureException(e);
} catch (CryptoEncodingException e) {
throw new NotPushRegisteredException(e);
}
}
}

View File

@ -26,7 +26,8 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.StoredMessageManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
import java.io.IOException;
import java.security.KeyStoreException;
@ -37,32 +38,45 @@ public class PushSender {
private final Logger logger = LoggerFactory.getLogger(PushSender.class);
private final AccountsManager accounts;
private final GCMSender gcmSender;
private final APNSender apnSender;
private final StoredMessageManager storedMessageManager;
private final AccountsManager accounts;
private final GCMSender gcmSender;
private final APNSender apnSender;
private final WebsocketSender webSocketSender;
public PushSender(GcmConfiguration gcmConfiguration,
ApnConfiguration apnConfiguration,
StoredMessageManager storedMessageManager,
AccountsManager accounts)
StoredMessages storedMessages,
PubSubManager pubSubManager,
AccountsManager accounts)
throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException
{
this.accounts = accounts;
this.storedMessageManager = storedMessageManager;
this.gcmSender = new GCMSender(gcmConfiguration.getApiKey());
this.apnSender = new APNSender(apnConfiguration.getCertificate(), apnConfiguration.getKey());
this.accounts = accounts;
this.webSocketSender = new WebsocketSender(storedMessages, pubSubManager);
this.gcmSender = new GCMSender(gcmConfiguration.getApiKey());
this.apnSender = new APNSender(pubSubManager, storedMessages,
apnConfiguration.getCertificate(),
apnConfiguration.getKey());
}
public void sendMessage(Account account, Device device, MessageProtos.OutgoingMessageSignal outgoingMessage)
public void sendMessage(Account account, Device device, MessageProtos.OutgoingMessageSignal message)
throws NotPushRegisteredException, TransientPushFailureException
{
String signalingKey = device.getSignalingKey();
EncryptedOutgoingMessage message = new EncryptedOutgoingMessage(outgoingMessage, signalingKey);
try {
String signalingKey = device.getSignalingKey();
EncryptedOutgoingMessage encryptedMessage = new EncryptedOutgoingMessage(message, signalingKey);
sendMessage(account, device, encryptedMessage);
} catch (CryptoEncodingException e) {
throw new NotPushRegisteredException(e);
}
}
public void sendMessage(Account account, Device device, EncryptedOutgoingMessage message)
throws NotPushRegisteredException, TransientPushFailureException
{
if (device.getGcmId() != null) sendGcmMessage(account, device, message);
else if (device.getApnId() != null) sendApnMessage(account, device, message);
else if (device.getFetchesMessages()) storeFetchedMessage(account, device, message);
else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message);
else throw new NotPushRegisteredException("No delivery possible!");
}
@ -89,7 +103,7 @@ public class PushSender {
throws TransientPushFailureException, NotPushRegisteredException
{
try {
apnSender.sendMessage(device.getApnId(), outgoingMessage);
apnSender.sendMessage(account, device, device.getApnId(), outgoingMessage);
} catch (NotPushRegisteredException e) {
device.setApnId(null);
accounts.update(account);
@ -97,11 +111,11 @@ public class PushSender {
}
}
private void storeFetchedMessage(Account account, Device device, EncryptedOutgoingMessage outgoingMessage)
private void sendWebSocketMessage(Account account, Device device, EncryptedOutgoingMessage outgoingMessage)
throws NotPushRegisteredException
{
try {
storedMessageManager.storeMessage(account, device, outgoingMessage);
webSocketSender.sendMessage(account, device, outgoingMessage);
} catch (CryptoEncodingException e) {
throw new NotPushRegisteredException(e);
}

View File

@ -0,0 +1,68 @@
/**
* Copyright (C) 2014 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.PubSubMessage;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import java.util.List;
import static com.codahale.metrics.MetricRegistry.name;
public class WebsocketSender {
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter onlineMeter = metricRegistry.meter(name(getClass(), "online"));
private final Meter offlineMeter = metricRegistry.meter(name(getClass(), "offline"));
private final StoredMessages storedMessages;
private final PubSubManager pubSubManager;
public WebsocketSender(StoredMessages storedMessages, PubSubManager pubSubManager) {
this.storedMessages = storedMessages;
this.pubSubManager = pubSubManager;
}
public void sendMessage(Account account, Device device, EncryptedOutgoingMessage outgoingMessage)
throws CryptoEncodingException
{
sendMessage(account, device, outgoingMessage.serialize());
}
private void sendMessage(Account account, Device device, String serializedMessage) {
WebsocketAddress address = new WebsocketAddress(account.getId(), device.getId());
PubSubMessage pubSubMessage = new PubSubMessage(PubSubMessage.TYPE_DELIVER, serializedMessage);
if (pubSubManager.publish(address, pubSubMessage)) {
onlineMeter.mark();
} else {
offlineMeter.mark();
storedMessages.insert(account.getId(), device.getId(), serializedMessage);
pubSubManager.publish(address, new PubSubMessage(PubSubMessage.TYPE_QUERY_DB, null));
}
}
}

View File

@ -1,66 +0,0 @@
/**
* Copyright (C) 2014 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.storage;
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import java.util.List;
public class StoredMessageManager {
private final StoredMessages storedMessages;
private final PubSubManager pubSubManager;
public StoredMessageManager(StoredMessages storedMessages, PubSubManager pubSubManager) {
this.storedMessages = storedMessages;
this.pubSubManager = pubSubManager;
}
public void storeMessage(Account account, Device device, EncryptedOutgoingMessage outgoingMessage)
throws CryptoEncodingException
{
storeMessage(account, device, outgoingMessage.serialize());
}
public void storeMessages(Account account, Device device, List<String> serializedMessages) {
for (String serializedMessage : serializedMessages) {
storeMessage(account, device, serializedMessage);
}
}
private void storeMessage(Account account, Device device, String serializedMessage) {
if (device.getFetchesMessages()) {
WebsocketAddress address = new WebsocketAddress(account.getId(), device.getId());
PubSubMessage pubSubMessage = new PubSubMessage(PubSubMessage.TYPE_DELIVER, serializedMessage);
if (!pubSubManager.publish(address, pubSubMessage)) {
storedMessages.insert(account.getId(), device.getId(), serializedMessage);
pubSubManager.publish(address, new PubSubMessage(PubSubMessage.TYPE_QUERY_DB, null));
}
return;
}
storedMessages.insert(account.getId(), device.getId(), serializedMessage);
}
public List<String> getOutgoingMessages(Account account, Device device) {
return storedMessages.getMessagesForDevice(account.getId(), device.getId());
}
}

View File

@ -16,21 +16,71 @@
*/
package org.whispersystems.textsecuregcm.storage;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.SqlBatch;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.whispersystems.textsecuregcm.util.Constants;
import java.util.LinkedList;
import java.util.List;
public interface StoredMessages {
import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
@SqlUpdate("INSERT INTO messages (account_id, device_id, encrypted_message) VALUES (:account_id, :device_id, :encrypted_message)")
void insert(@Bind("account_id") long accountId, @Bind("device_id") long deviceId, @Bind("encrypted_message") String encryptedOutgoingMessage);
public class StoredMessages {
@SqlBatch("INSERT INTO messages (account_id, device_id, encrypted_message) VALUES (:account_id, :device_id, :encrypted_message)")
void insert(@Bind("account_id") long accountId, @Bind("device_id") long deviceId, @Bind("encrypted_message") List<String> encryptedOutgoingMessages);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(getClass(), "queue_size"));
@SqlQuery("DELETE FROM messages WHERE account_id = :account_id AND device_id = :device_id RETURNING encrypted_message")
List<String> getMessagesForDevice(@Bind("account_id") long accountId, @Bind("device_id") long deviceId);
}
private static final String QUEUE_PREFIX = "msgs";
private final JedisPool jedisPool;
public StoredMessages(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
public void insert(long accountId, long deviceId, String message) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
long queueSize = jedis.lpush(getKey(accountId, deviceId), message);
queueSizeHistogram.update(queueSize);
if (queueSize > 1000) {
jedis.ltrim(getKey(accountId, deviceId), 0, 999);
}
} finally {
if (jedis != null)
jedisPool.returnResource(jedis);
}
}
public List<String> getMessagesForDevice(long accountId, long deviceId) {
List<String> messages = new LinkedList<>();
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String message;
while ((message = jedis.rpop(QUEUE_PREFIX + accountId + ":" + deviceId)) != null) {
messages.add(message);
}
return messages;
} finally {
if (jedis != null)
jedisPool.returnResource(jedis);
}
}
private String getKey(long accountId, long deviceId) {
return QUEUE_PREFIX + ":" + accountId + ":" + deviceId;
}
}

View File

@ -1,4 +1,4 @@
package org.whispersystems.textsecuregcm.controllers;
package org.whispersystems.textsecuregcm.websocket;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
@ -8,24 +8,30 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.controllers.WebsocketController;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.WebsocketSender;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.StoredMessageManager;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
public class WebsocketControllerFactory extends WebSocketServlet implements WebSocketCreator {
private final Logger logger = LoggerFactory.getLogger(WebsocketControllerFactory.class);
private final StoredMessageManager storedMessageManager;
private final PushSender pushSender;
private final StoredMessages storedMessages;
private final PubSubManager pubSubManager;
private final AccountAuthenticator accountAuthenticator;
public WebsocketControllerFactory(AccountAuthenticator accountAuthenticator,
StoredMessageManager storedMessageManager,
PubSubManager pubSubManager)
PushSender pushSender,
StoredMessages storedMessages,
PubSubManager pubSubManager)
{
this.accountAuthenticator = accountAuthenticator;
this.storedMessageManager = storedMessageManager;
this.pushSender = pushSender;
this.storedMessages = storedMessages;
this.pubSubManager = pubSubManager;
}
@ -36,6 +42,6 @@ public class WebsocketControllerFactory extends WebSocketServlet implements WebS
@Override
public Object createWebSocket(UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse) {
return new WebsocketController(accountAuthenticator, storedMessageManager, pubSubManager);
return new WebsocketController(accountAuthenticator, pushSender, pubSubManager, storedMessages);
}
}

View File

@ -9,13 +9,15 @@ import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.junit.Test;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.controllers.WebsocketController;
import org.whispersystems.textsecuregcm.controllers.WebsocketControllerFactory;
import org.whispersystems.textsecuregcm.entities.AcknowledgeWebsocketMessage;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.StoredMessageManager;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import org.whispersystems.textsecuregcm.websocket.WebsocketControllerFactory;
import java.util.HashMap;
import java.util.LinkedList;
@ -35,13 +37,14 @@ public class WebsocketControllerTest {
private static final String VALID_PASSWORD = "secure";
private static final String INVALID_PASSWORD = "insecure";
private static final StoredMessageManager storedMessageManager = mock(StoredMessageManager.class);
private static final StoredMessages storedMessages = mock(StoredMessages.class);
private static final AccountAuthenticator accountAuthenticator = mock(AccountAuthenticator.class);
private static final PubSubManager pubSubManager = mock(PubSubManager.class );
private static final Account account = mock(Account.class );
private static final Device device = mock(Device.class );
private static final UpgradeRequest upgradeRequest = mock(UpgradeRequest.class );
private static final Session session = mock(Session.class );
private static final PushSender pushSender = mock(PushSender.class);
@Test
public void testCredentials() throws Exception {
@ -53,7 +56,7 @@ public class WebsocketControllerTest {
when(session.getUpgradeRequest()).thenReturn(upgradeRequest);
WebsocketController controller = new WebsocketController(accountAuthenticator, storedMessageManager, pubSubManager);
WebsocketController controller = new WebsocketController(accountAuthenticator, pushSender, pubSubManager, storedMessages);
when(upgradeRequest.getParameterMap()).thenReturn(new HashMap<String, String[]>() {{
put("login", new String[] {VALID_USER});
@ -100,9 +103,9 @@ public class WebsocketControllerTest {
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
.thenReturn(Optional.of(account));
when(storedMessageManager.getOutgoingMessages(eq(account), eq(device))).thenReturn(outgoingMessages);
when(storedMessages.getMessagesForDevice(account.getId(), device.getId())).thenReturn(outgoingMessages);
WebsocketControllerFactory factory = new WebsocketControllerFactory(accountAuthenticator, storedMessageManager, pubSubManager);
WebsocketControllerFactory factory = new WebsocketControllerFactory(accountAuthenticator, pushSender, storedMessages, pubSubManager);
WebsocketController controller = (WebsocketController) factory.createWebSocket(null, null);
controller.onWebSocketConnect(session);
@ -118,7 +121,7 @@ public class WebsocketControllerTest {
add("third");
}};
verify(storedMessageManager).storeMessages(eq(account), eq(device), eq(pending));
verify(pushSender, times(2)).sendMessage(eq(account), eq(device), any(EncryptedOutgoingMessage.class));
}
}