From 7a33cef27ea685448e3d7f0dc1d9443593a5d3c1 Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Wed, 7 May 2014 17:21:07 -0700 Subject: [PATCH] Updated iOS message delivery. 1) Use WebSockets for delivery if a client is connected. 2) If a client isn't connected, write to a redis queue and send an APN push. --- .../textsecuregcm/WhisperServerService.java | 23 +++--- .../controllers/WebsocketController.java | 41 ++++++---- .../entities/EncryptedOutgoingMessage.java | 19 +++-- .../textsecuregcm/push/APNSender.java | 48 +++++++++--- .../textsecuregcm/push/GCMSender.java | 3 - .../textsecuregcm/push/PushSender.java | 50 ++++++++----- .../textsecuregcm/push/WebsocketSender.java | 68 +++++++++++++++++ .../storage/StoredMessageManager.java | 66 ----------------- .../textsecuregcm/storage/StoredMessages.java | 74 ++++++++++++++++--- .../WebsocketControllerFactory.java | 20 +++-- .../controllers/WebsocketControllerTest.java | 17 +++-- 11 files changed, 272 insertions(+), 157 deletions(-) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java delete mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageManager.java rename src/main/java/org/whispersystems/textsecuregcm/{controllers => websocket}/WebsocketControllerFactory.java (59%) diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index bea26462d..57cdcc538 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -34,7 +34,6 @@ import org.whispersystems.textsecuregcm.controllers.DirectoryController; import org.whispersystems.textsecuregcm.controllers.FederationController; import org.whispersystems.textsecuregcm.controllers.KeysController; import org.whispersystems.textsecuregcm.controllers.MessageController; -import org.whispersystems.textsecuregcm.controllers.WebsocketControllerFactory; import org.whispersystems.textsecuregcm.federation.FederatedClientManager; import org.whispersystems.textsecuregcm.federation.FederatedPeer; import org.whispersystems.textsecuregcm.limits.RateLimiters; @@ -63,10 +62,10 @@ import org.whispersystems.textsecuregcm.storage.PendingAccountsManager; import org.whispersystems.textsecuregcm.storage.PendingDevices; import org.whispersystems.textsecuregcm.storage.PendingDevicesManager; import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.StoredMessageManager; import org.whispersystems.textsecuregcm.storage.StoredMessages; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.UrlSigner; +import org.whispersystems.textsecuregcm.websocket.WebsocketControllerFactory; import org.whispersystems.textsecuregcm.workers.DirectoryCommand; import javax.servlet.DispatcherType; @@ -121,18 +120,17 @@ public class WhisperServerService extends Application pendingMessages = new HashMap<>(); private final AccountAuthenticator accountAuthenticator; - private final StoredMessageManager storedMessageManager; private final PubSubManager pubSubManager; + private final StoredMessages storedMessages; + private final PushSender pushSender; - private Account account; - private Device device; - private Session session; + private WebsocketAddress address; + private Account account; + private Device device; + private Session session; private long pendingMessageSequence; public WebsocketController(AccountAuthenticator accountAuthenticator, - StoredMessageManager storedMessageManager, - PubSubManager pubSubManager) + PushSender pushSender, + PubSubManager pubSubManager, + StoredMessages storedMessages) { this.accountAuthenticator = accountAuthenticator; - this.storedMessageManager = storedMessageManager; + this.pushSender = pushSender; this.pubSubManager = pubSubManager; + this.storedMessages = storedMessages; } @Override @@ -80,11 +88,11 @@ public class WebsocketController implements WebSocketListener, PubSubListener { this.account = account.get(); this.device = account.get().getAuthenticatedDevice().get(); + this.address = new WebsocketAddress(this.account.getId(), this.device.getId()); this.session = session; this.session.setIdleTimeout(10 * 60 * 1000); - this.pubSubManager.subscribe(new WebsocketAddress(this.account.getId(), - this.device.getId()), this); + this.pubSubManager.subscribe(this.address, this); handleQueryDatabase(); } catch (AuthenticationException e) { @@ -114,7 +122,7 @@ public class WebsocketController implements WebSocketListener, PubSubListener { @Override public void onWebSocketClose(int i, String s) { - pubSubManager.unsubscribe(new WebsocketAddress(account.getId(), device.getId()), this); + pubSubManager.unsubscribe(this.address, this); List remainingMessages = new LinkedList<>(); @@ -129,7 +137,14 @@ public class WebsocketController implements WebSocketListener, PubSubListener { pendingMessages.clear(); } - storedMessageManager.storeMessages(account, device, remainingMessages); + for (String remainingMessage : remainingMessages) { + try { + pushSender.sendMessage(account, device, new EncryptedOutgoingMessage(remainingMessage)); + } catch (NotPushRegisteredException | TransientPushFailureException e) { + logger.warn("onWebSocketClose", e); + storedMessages.insert(account.getId(), device.getId(), remainingMessage); + } + } } @@ -177,7 +192,7 @@ public class WebsocketController implements WebSocketListener, PubSubListener { } private void handleQueryDatabase() { - List messages = storedMessageManager.getOutgoingMessages(account, device); + List messages = storedMessages.getMessagesForDevice(account.getId(), device.getId()); for (String message : messages) { handleDeliverOutgoingMessage(message); diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/EncryptedOutgoingMessage.java b/src/main/java/org/whispersystems/textsecuregcm/entities/EncryptedOutgoingMessage.java index b09e1a873..89a00695f 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/entities/EncryptedOutgoingMessage.java +++ b/src/main/java/org/whispersystems/textsecuregcm/entities/EncryptedOutgoingMessage.java @@ -41,23 +41,26 @@ public class EncryptedOutgoingMessage { private static final int MAC_KEY_SIZE = 20; private static final int MAC_SIZE = 10; - private final OutgoingMessageSignal outgoingMessage; - private final String signalingKey; + private final String serialized; public EncryptedOutgoingMessage(OutgoingMessageSignal outgoingMessage, String signalingKey) + throws CryptoEncodingException { - this.outgoingMessage = outgoingMessage; - this.signalingKey = signalingKey; - } - - public String serialize() throws CryptoEncodingException { byte[] plaintext = outgoingMessage.toByteArray(); SecretKeySpec cipherKey = getCipherKey (signalingKey); SecretKeySpec macKey = getMacKey(signalingKey); byte[] ciphertext = getCiphertext(plaintext, cipherKey, macKey); - return Base64.encodeBytes(ciphertext); + this.serialized = Base64.encodeBytes(ciphertext); + } + + public EncryptedOutgoingMessage(String serialized) { + this.serialized = serialized; + } + + public String serialize() { + return serialized; } private byte[] getCiphertext(byte[] plaintext, SecretKeySpec cipherKey, SecretKeySpec macKey) diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java index ff5b05640..1bc7c93b7 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/APNSender.java @@ -26,10 +26,15 @@ import com.notnoop.exceptions.NetworkIOException; import org.bouncycastle.openssl.PEMReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.entities.CryptoEncodingException; import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.PubSubManager; +import org.whispersystems.textsecuregcm.storage.PubSubMessage; +import org.whispersystems.textsecuregcm.storage.StoredMessages; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Util; +import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -48,17 +53,25 @@ import static com.codahale.metrics.MetricRegistry.name; public class APNSender { private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private final Meter success = metricRegistry.meter(name(getClass(), "sent", "success")); - private final Meter failure = metricRegistry.meter(name(getClass(), "sent", "failure")); + private final Meter websocketMeter = metricRegistry.meter(name(getClass(), "websocket")); + private final Meter pushMeter = metricRegistry.meter(name(getClass(), "push")); + private final Meter failureMeter = metricRegistry.meter(name(getClass(), "failure")); private final Logger logger = LoggerFactory.getLogger(APNSender.class); private static final String MESSAGE_BODY = "m"; private final Optional apnService; + private final PubSubManager pubSubManager; + private final StoredMessages storedMessages; - public APNSender(String apnCertificate, String apnKey) + public APNSender(PubSubManager pubSubManager, + StoredMessages storedMessages, + String apnCertificate, String apnKey) throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException { + this.pubSubManager = pubSubManager; + this.storedMessages = storedMessages; + if (!Util.isEmpty(apnCertificate) && !Util.isEmpty(apnKey)) { byte[] keyStore = initializeKeyStore(apnCertificate, apnKey); this.apnService = Optional.of(APNS.newService() @@ -69,31 +82,44 @@ public class APNSender { } } - public void sendMessage(String registrationId, EncryptedOutgoingMessage message) + public void sendMessage(Account account, Device device, + String registrationId, EncryptedOutgoingMessage message) throws TransientPushFailureException, NotPushRegisteredException + { + if (pubSubManager.publish(new WebsocketAddress(account.getId(), device.getId()), + new PubSubMessage(PubSubMessage.TYPE_DELIVER, message.serialize()))) + { + websocketMeter.mark(); + } else { + storedMessages.insert(account.getId(), device.getId(), message.serialize()); + sendPush(registrationId, message.serialize()); + } + } + + private void sendPush(String registrationId, String message) + throws TransientPushFailureException { try { if (!apnService.isPresent()) { - failure.mark(); + failureMeter.mark(); throw new TransientPushFailureException("APN access not configured!"); } String payload = APNS.newPayload() .alertBody("Message!") - .customField(MESSAGE_BODY, message.serialize()) + .customField(MESSAGE_BODY, message) .build(); logger.debug("APN Payload: " + payload); apnService.get().push(registrationId, payload); - success.mark(); + pushMeter.mark(); } catch (NetworkIOException nioe) { logger.warn("Network Error", nioe); - failure.mark(); + failureMeter.mark(); throw new TransientPushFailureException(nioe); - } catch (CryptoEncodingException e) { - throw new NotPushRegisteredException(e); } + } private static byte[] initializeKeyStore(String pemCertificate, String pemKey) diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java index 7065a8fb6..9790bc7a1 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/GCMSender.java @@ -23,7 +23,6 @@ import com.google.android.gcm.server.Constants; import com.google.android.gcm.server.Message; import com.google.android.gcm.server.Result; import com.google.android.gcm.server.Sender; -import org.whispersystems.textsecuregcm.entities.CryptoEncodingException; import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; import java.io.IOException; @@ -65,8 +64,6 @@ public class GCMSender { } } catch (IOException e) { throw new TransientPushFailureException(e); - } catch (CryptoEncodingException e) { - throw new NotPushRegisteredException(e); } } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java index 8f212a79d..c9df1f9c2 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java @@ -26,7 +26,8 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.storage.StoredMessageManager; +import org.whispersystems.textsecuregcm.storage.PubSubManager; +import org.whispersystems.textsecuregcm.storage.StoredMessages; import java.io.IOException; import java.security.KeyStoreException; @@ -37,32 +38,45 @@ public class PushSender { private final Logger logger = LoggerFactory.getLogger(PushSender.class); - private final AccountsManager accounts; - private final GCMSender gcmSender; - private final APNSender apnSender; - private final StoredMessageManager storedMessageManager; + private final AccountsManager accounts; + private final GCMSender gcmSender; + private final APNSender apnSender; + private final WebsocketSender webSocketSender; public PushSender(GcmConfiguration gcmConfiguration, ApnConfiguration apnConfiguration, - StoredMessageManager storedMessageManager, - AccountsManager accounts) + StoredMessages storedMessages, + PubSubManager pubSubManager, + AccountsManager accounts) throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException { - this.accounts = accounts; - this.storedMessageManager = storedMessageManager; - this.gcmSender = new GCMSender(gcmConfiguration.getApiKey()); - this.apnSender = new APNSender(apnConfiguration.getCertificate(), apnConfiguration.getKey()); + this.accounts = accounts; + this.webSocketSender = new WebsocketSender(storedMessages, pubSubManager); + this.gcmSender = new GCMSender(gcmConfiguration.getApiKey()); + this.apnSender = new APNSender(pubSubManager, storedMessages, + apnConfiguration.getCertificate(), + apnConfiguration.getKey()); } - public void sendMessage(Account account, Device device, MessageProtos.OutgoingMessageSignal outgoingMessage) + public void sendMessage(Account account, Device device, MessageProtos.OutgoingMessageSignal message) throws NotPushRegisteredException, TransientPushFailureException { - String signalingKey = device.getSignalingKey(); - EncryptedOutgoingMessage message = new EncryptedOutgoingMessage(outgoingMessage, signalingKey); + try { + String signalingKey = device.getSignalingKey(); + EncryptedOutgoingMessage encryptedMessage = new EncryptedOutgoingMessage(message, signalingKey); + sendMessage(account, device, encryptedMessage); + } catch (CryptoEncodingException e) { + throw new NotPushRegisteredException(e); + } + } + + public void sendMessage(Account account, Device device, EncryptedOutgoingMessage message) + throws NotPushRegisteredException, TransientPushFailureException + { if (device.getGcmId() != null) sendGcmMessage(account, device, message); else if (device.getApnId() != null) sendApnMessage(account, device, message); - else if (device.getFetchesMessages()) storeFetchedMessage(account, device, message); + else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message); else throw new NotPushRegisteredException("No delivery possible!"); } @@ -89,7 +103,7 @@ public class PushSender { throws TransientPushFailureException, NotPushRegisteredException { try { - apnSender.sendMessage(device.getApnId(), outgoingMessage); + apnSender.sendMessage(account, device, device.getApnId(), outgoingMessage); } catch (NotPushRegisteredException e) { device.setApnId(null); accounts.update(account); @@ -97,11 +111,11 @@ public class PushSender { } } - private void storeFetchedMessage(Account account, Device device, EncryptedOutgoingMessage outgoingMessage) + private void sendWebSocketMessage(Account account, Device device, EncryptedOutgoingMessage outgoingMessage) throws NotPushRegisteredException { try { - storedMessageManager.storeMessage(account, device, outgoingMessage); + webSocketSender.sendMessage(account, device, outgoingMessage); } catch (CryptoEncodingException e) { throw new NotPushRegisteredException(e); } diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java new file mode 100644 index 000000000..c549da4b3 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2014 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.push; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import org.whispersystems.textsecuregcm.entities.CryptoEncodingException; +import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.PubSubManager; +import org.whispersystems.textsecuregcm.storage.PubSubMessage; +import org.whispersystems.textsecuregcm.storage.StoredMessages; +import org.whispersystems.textsecuregcm.util.Constants; +import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; + +import java.util.List; + +import static com.codahale.metrics.MetricRegistry.name; + +public class WebsocketSender { + + private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private final Meter onlineMeter = metricRegistry.meter(name(getClass(), "online")); + private final Meter offlineMeter = metricRegistry.meter(name(getClass(), "offline")); + + private final StoredMessages storedMessages; + private final PubSubManager pubSubManager; + + public WebsocketSender(StoredMessages storedMessages, PubSubManager pubSubManager) { + this.storedMessages = storedMessages; + this.pubSubManager = pubSubManager; + } + + public void sendMessage(Account account, Device device, EncryptedOutgoingMessage outgoingMessage) + throws CryptoEncodingException + { + sendMessage(account, device, outgoingMessage.serialize()); + } + + private void sendMessage(Account account, Device device, String serializedMessage) { + WebsocketAddress address = new WebsocketAddress(account.getId(), device.getId()); + PubSubMessage pubSubMessage = new PubSubMessage(PubSubMessage.TYPE_DELIVER, serializedMessage); + + if (pubSubManager.publish(address, pubSubMessage)) { + onlineMeter.mark(); + } else { + offlineMeter.mark(); + storedMessages.insert(account.getId(), device.getId(), serializedMessage); + pubSubManager.publish(address, new PubSubMessage(PubSubMessage.TYPE_QUERY_DB, null)); + } + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageManager.java deleted file mode 100644 index b0de973af..000000000 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageManager.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Copyright (C) 2014 Open WhisperSystems - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.whispersystems.textsecuregcm.storage; - -import org.whispersystems.textsecuregcm.entities.CryptoEncodingException; -import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; -import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; - -import java.util.List; - -public class StoredMessageManager { - - private final StoredMessages storedMessages; - private final PubSubManager pubSubManager; - - public StoredMessageManager(StoredMessages storedMessages, PubSubManager pubSubManager) { - this.storedMessages = storedMessages; - this.pubSubManager = pubSubManager; - } - - public void storeMessage(Account account, Device device, EncryptedOutgoingMessage outgoingMessage) - throws CryptoEncodingException - { - storeMessage(account, device, outgoingMessage.serialize()); - } - - public void storeMessages(Account account, Device device, List serializedMessages) { - for (String serializedMessage : serializedMessages) { - storeMessage(account, device, serializedMessage); - } - } - - private void storeMessage(Account account, Device device, String serializedMessage) { - if (device.getFetchesMessages()) { - WebsocketAddress address = new WebsocketAddress(account.getId(), device.getId()); - PubSubMessage pubSubMessage = new PubSubMessage(PubSubMessage.TYPE_DELIVER, serializedMessage); - - if (!pubSubManager.publish(address, pubSubMessage)) { - storedMessages.insert(account.getId(), device.getId(), serializedMessage); - pubSubManager.publish(address, new PubSubMessage(PubSubMessage.TYPE_QUERY_DB, null)); - } - - return; - } - - storedMessages.insert(account.getId(), device.getId(), serializedMessage); - } - - public List getOutgoingMessages(Account account, Device device) { - return storedMessages.getMessagesForDevice(account.getId(), device.getId()); - } -} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java index fdc18b8f1..a3b46d87e 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java @@ -16,21 +16,71 @@ */ package org.whispersystems.textsecuregcm.storage; -import org.skife.jdbi.v2.sqlobject.Bind; -import org.skife.jdbi.v2.sqlobject.SqlBatch; -import org.skife.jdbi.v2.sqlobject.SqlQuery; -import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import org.whispersystems.textsecuregcm.util.Constants; + +import java.util.LinkedList; import java.util.List; -public interface StoredMessages { +import static com.codahale.metrics.MetricRegistry.name; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; - @SqlUpdate("INSERT INTO messages (account_id, device_id, encrypted_message) VALUES (:account_id, :device_id, :encrypted_message)") - void insert(@Bind("account_id") long accountId, @Bind("device_id") long deviceId, @Bind("encrypted_message") String encryptedOutgoingMessage); +public class StoredMessages { - @SqlBatch("INSERT INTO messages (account_id, device_id, encrypted_message) VALUES (:account_id, :device_id, :encrypted_message)") - void insert(@Bind("account_id") long accountId, @Bind("device_id") long deviceId, @Bind("encrypted_message") List encryptedOutgoingMessages); + private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private final Histogram queueSizeHistogram = metricRegistry.histogram(name(getClass(), "queue_size")); - @SqlQuery("DELETE FROM messages WHERE account_id = :account_id AND device_id = :device_id RETURNING encrypted_message") - List getMessagesForDevice(@Bind("account_id") long accountId, @Bind("device_id") long deviceId); -} + private static final String QUEUE_PREFIX = "msgs"; + + private final JedisPool jedisPool; + + public StoredMessages(JedisPool jedisPool) { + this.jedisPool = jedisPool; + } + + public void insert(long accountId, long deviceId, String message) { + Jedis jedis = null; + + try { + jedis = jedisPool.getResource(); + + long queueSize = jedis.lpush(getKey(accountId, deviceId), message); + queueSizeHistogram.update(queueSize); + + if (queueSize > 1000) { + jedis.ltrim(getKey(accountId, deviceId), 0, 999); + } + } finally { + if (jedis != null) + jedisPool.returnResource(jedis); + } + } + + public List getMessagesForDevice(long accountId, long deviceId) { + List messages = new LinkedList<>(); + Jedis jedis = null; + + try { + jedis = jedisPool.getResource(); + String message; + + while ((message = jedis.rpop(QUEUE_PREFIX + accountId + ":" + deviceId)) != null) { + messages.add(message); + } + + return messages; + } finally { + if (jedis != null) + jedisPool.returnResource(jedis); + } + } + + private String getKey(long accountId, long deviceId) { + return QUEUE_PREFIX + ":" + accountId + ":" + deviceId; + } + +} \ No newline at end of file diff --git a/src/main/java/org/whispersystems/textsecuregcm/controllers/WebsocketControllerFactory.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketControllerFactory.java similarity index 59% rename from src/main/java/org/whispersystems/textsecuregcm/controllers/WebsocketControllerFactory.java rename to src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketControllerFactory.java index c9e9201be..8c7c5c450 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/controllers/WebsocketControllerFactory.java +++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketControllerFactory.java @@ -1,4 +1,4 @@ -package org.whispersystems.textsecuregcm.controllers; +package org.whispersystems.textsecuregcm.websocket; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeResponse; @@ -8,24 +8,30 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; +import org.whispersystems.textsecuregcm.controllers.WebsocketController; +import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.push.WebsocketSender; import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.StoredMessageManager; +import org.whispersystems.textsecuregcm.storage.StoredMessages; public class WebsocketControllerFactory extends WebSocketServlet implements WebSocketCreator { private final Logger logger = LoggerFactory.getLogger(WebsocketControllerFactory.class); - private final StoredMessageManager storedMessageManager; + private final PushSender pushSender; + private final StoredMessages storedMessages; private final PubSubManager pubSubManager; private final AccountAuthenticator accountAuthenticator; public WebsocketControllerFactory(AccountAuthenticator accountAuthenticator, - StoredMessageManager storedMessageManager, - PubSubManager pubSubManager) + PushSender pushSender, + StoredMessages storedMessages, + PubSubManager pubSubManager) { this.accountAuthenticator = accountAuthenticator; - this.storedMessageManager = storedMessageManager; + this.pushSender = pushSender; + this.storedMessages = storedMessages; this.pubSubManager = pubSubManager; } @@ -36,6 +42,6 @@ public class WebsocketControllerFactory extends WebSocketServlet implements WebS @Override public Object createWebSocket(UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse) { - return new WebsocketController(accountAuthenticator, storedMessageManager, pubSubManager); + return new WebsocketController(accountAuthenticator, pushSender, pubSubManager, storedMessages); } } diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/WebsocketControllerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/WebsocketControllerTest.java index fe81dc6b8..296ef0331 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/WebsocketControllerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/WebsocketControllerTest.java @@ -9,13 +9,15 @@ import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.junit.Test; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.controllers.WebsocketController; -import org.whispersystems.textsecuregcm.controllers.WebsocketControllerFactory; import org.whispersystems.textsecuregcm.entities.AcknowledgeWebsocketMessage; +import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; +import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.StoredMessageManager; +import org.whispersystems.textsecuregcm.storage.StoredMessages; import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; +import org.whispersystems.textsecuregcm.websocket.WebsocketControllerFactory; import java.util.HashMap; import java.util.LinkedList; @@ -35,13 +37,14 @@ public class WebsocketControllerTest { private static final String VALID_PASSWORD = "secure"; private static final String INVALID_PASSWORD = "insecure"; - private static final StoredMessageManager storedMessageManager = mock(StoredMessageManager.class); + private static final StoredMessages storedMessages = mock(StoredMessages.class); private static final AccountAuthenticator accountAuthenticator = mock(AccountAuthenticator.class); private static final PubSubManager pubSubManager = mock(PubSubManager.class ); private static final Account account = mock(Account.class ); private static final Device device = mock(Device.class ); private static final UpgradeRequest upgradeRequest = mock(UpgradeRequest.class ); private static final Session session = mock(Session.class ); + private static final PushSender pushSender = mock(PushSender.class); @Test public void testCredentials() throws Exception { @@ -53,7 +56,7 @@ public class WebsocketControllerTest { when(session.getUpgradeRequest()).thenReturn(upgradeRequest); - WebsocketController controller = new WebsocketController(accountAuthenticator, storedMessageManager, pubSubManager); + WebsocketController controller = new WebsocketController(accountAuthenticator, pushSender, pubSubManager, storedMessages); when(upgradeRequest.getParameterMap()).thenReturn(new HashMap() {{ put("login", new String[] {VALID_USER}); @@ -100,9 +103,9 @@ public class WebsocketControllerTest { when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) .thenReturn(Optional.of(account)); - when(storedMessageManager.getOutgoingMessages(eq(account), eq(device))).thenReturn(outgoingMessages); + when(storedMessages.getMessagesForDevice(account.getId(), device.getId())).thenReturn(outgoingMessages); - WebsocketControllerFactory factory = new WebsocketControllerFactory(accountAuthenticator, storedMessageManager, pubSubManager); + WebsocketControllerFactory factory = new WebsocketControllerFactory(accountAuthenticator, pushSender, storedMessages, pubSubManager); WebsocketController controller = (WebsocketController) factory.createWebSocket(null, null); controller.onWebSocketConnect(session); @@ -118,7 +121,7 @@ public class WebsocketControllerTest { add("third"); }}; - verify(storedMessageManager).storeMessages(eq(account), eq(device), eq(pending)); + verify(pushSender, times(2)).sendMessage(eq(account), eq(device), any(EncryptedOutgoingMessage.class)); } }