From 5ccbf355bd79d8b05ef85f9cb2f2a7d3fbf3f606 Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Mon, 17 Aug 2015 17:12:36 -0700 Subject: [PATCH] Chunk sending pending message queues > a chunk size. // FREEBIE --- .../controllers/MessageController.java | 5 ++- .../entities/OutgoingMessageEntityList.java | 11 +++++-- .../textsecuregcm/storage/Messages.java | 4 ++- .../storage/MessagesManager.java | 6 ++-- .../websocket/WebSocketConnection.java | 31 ++++++++++--------- .../controllers/MessageControllerTest.java | 4 ++- .../websocket/WebSocketConnectionTest.java | 10 ++++-- 7 files changed, 45 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index 8026b4de0..fa8eba354 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -153,9 +153,8 @@ public class MessageController { @GET @Produces(MediaType.APPLICATION_JSON) public OutgoingMessageEntityList getPendingMessages(@Auth Account account) { - return new OutgoingMessageEntityList(messagesManager.getMessagesForDevice(account.getNumber(), - account.getAuthenticatedDevice() - .get().getId())); + return messagesManager.getMessagesForDevice(account.getNumber(), + account.getAuthenticatedDevice().get().getId()); } @Timed diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntityList.java b/src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntityList.java index 55d5a36a4..e8f57c97e 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntityList.java +++ b/src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntityList.java @@ -10,14 +10,21 @@ public class OutgoingMessageEntityList { @JsonProperty private List messages; + @JsonProperty + private boolean more; + public OutgoingMessageEntityList() {} - public OutgoingMessageEntityList(List messages) { + public OutgoingMessageEntityList(List messages, boolean more) { this.messages = messages; + this.more = more; } - @VisibleForTesting public List getMessages() { return messages; } + + public boolean hasMore() { + return more; + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java b/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java index 3ea9de700..a8832ea84 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java @@ -24,6 +24,8 @@ import java.util.List; public abstract class Messages { + public static final int RESULT_SET_CHUNK_SIZE = 1000; + private static final String ID = "id"; private static final String TYPE = "type"; private static final String RELAY = "relay"; @@ -43,7 +45,7 @@ public abstract class Messages { @Bind("destination_device") long destinationDevice); @Mapper(MessageMapper.class) - @SqlQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC") + @SqlQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC LIMIT " + RESULT_SET_CHUNK_SIZE) abstract List load(@Bind("destination") String destination, @Bind("destination_device") long destinationDevice); diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index dfcf16f95..8a9349f53 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -4,6 +4,7 @@ package org.whispersystems.textsecuregcm.storage; import com.google.common.base.Optional; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; import java.util.List; @@ -19,8 +20,9 @@ public class MessagesManager { return this.messages.store(message, destination, destinationDevice) + 1; } - public List getMessagesForDevice(String destination, long destinationDevice) { - return this.messages.load(destination, destinationDevice); + public OutgoingMessageEntityList getMessagesForDevice(String destination, long destinationDevice) { + List messages = this.messages.load(destination, destinationDevice); + return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE); } public void clear(String destination) { diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 27e243437..509b3ad18 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -13,6 +13,7 @@ import org.whispersystems.textsecuregcm.controllers.NoSuchUserException; import org.whispersystems.textsecuregcm.entities.CryptoEncodingException; import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -27,7 +28,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.ws.rs.WebApplicationException; import java.io.IOException; -import java.util.List; +import java.util.Iterator; import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage; @@ -69,7 +70,7 @@ public class WebSocketConnection implements DispatchChannel { processStoredMessages(); break; case PubSubMessage.Type.DELIVER_VALUE: - sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.absent()); + sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.absent(), false); break; default: logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber()); @@ -88,8 +89,9 @@ public class WebSocketConnection implements DispatchChannel { processStoredMessages(); } - private void sendMessage(final Envelope message, - final Optional storedMessageId) + private void sendMessage(final Envelope message, + final Optional storedMessageId, + final boolean requery) { try { EncryptedOutgoingMessage encryptedMessage = new EncryptedOutgoingMessage(message, device.getSignalingKey()); @@ -104,6 +106,7 @@ public class WebSocketConnection implements DispatchChannel { if (isSuccessResponse(response)) { if (storedMessageId.isPresent()) messagesManager.delete(account.getNumber(), storedMessageId.get()); if (!isReceipt) sendDeliveryReceiptFor(message); + if (requery) processStoredMessages(); } else if (!isSuccessResponse(response) && !storedMessageId.isPresent()) { requeueMessage(message); } @@ -145,14 +148,16 @@ public class WebSocketConnection implements DispatchChannel { } private void processStoredMessages() { - List messages = messagesManager.getMessagesForDevice(account.getNumber(), device.getId()); + OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), device.getId()); + Iterator iterator = messages.getMessages().iterator(); - for (OutgoingMessageEntity message : messages) { - Envelope.Builder builder = Envelope.newBuilder() - .setType(Envelope.Type.valueOf(message.getType())) - .setSourceDevice(message.getSourceDevice()) - .setSource(message.getSource()) - .setTimestamp(message.getTimestamp()); + while (iterator.hasNext()) { + OutgoingMessageEntity message = iterator.next(); + Envelope.Builder builder = Envelope.newBuilder() + .setType(Envelope.Type.valueOf(message.getType())) + .setSourceDevice(message.getSourceDevice()) + .setSource(message.getSource()) + .setTimestamp(message.getTimestamp()); if (message.getMessage() != null) { builder.setLegacyMessage(ByteString.copyFrom(message.getMessage())); @@ -166,9 +171,7 @@ public class WebSocketConnection implements DispatchChannel { builder.setRelay(message.getRelay()); } - sendMessage(builder.build(), Optional.of(message.getId())); + sendMessage(builder.build(), Optional.of(message.getId()), !iterator.hasNext() && messages.hasMore()); } } - - } diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java index 916c98d72..9d9386a35 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java @@ -204,7 +204,9 @@ public class MessageControllerTest { add(new OutgoingMessageEntity(2L, Envelope.Type.RECEIPT_VALUE, null, timestampTwo, "+14152222222", 2, null, null)); }}; - when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(1L))).thenReturn(messages); + OutgoingMessageEntityList messagesList = new OutgoingMessageEntityList(messages, false); + + when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(1L))).thenReturn(messagesList); OutgoingMessageEntityList response = resources.getJerseyTest().target("/v1/messages/") diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java index c883471eb..ac33f4249 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java @@ -9,6 +9,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -104,6 +105,8 @@ public class WebSocketConnectionTest { add(createMessage(3L, "sender2", 3333, false, "third")); }}; + OutgoingMessageEntityList outgoingMessagesList = new OutgoingMessageEntityList(outgoingMessages, false); + when(device.getId()).thenReturn(2L); when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52])); @@ -123,7 +126,7 @@ public class WebSocketConnectionTest { when(accountsManager.get("sender2")).thenReturn(Optional.absent()); when(storedMessages.getMessagesForDevice(account.getNumber(), device.getId())) - .thenReturn(outgoingMessages); + .thenReturn(outgoingMessagesList); final List> futures = new LinkedList<>(); final WebSocketClient client = mock(WebSocketClient.class); @@ -180,7 +183,8 @@ public class WebSocketConnectionTest { .setType(Envelope.Type.CIPHERTEXT) .build(); - List pendingMessages = new LinkedList<>(); + List pendingMessages = new LinkedList<>(); + OutgoingMessageEntityList pendingMessagesList = new OutgoingMessageEntityList(pendingMessages, false); when(device.getId()).thenReturn(2L); when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52])); @@ -201,7 +205,7 @@ public class WebSocketConnectionTest { when(accountsManager.get("sender2")).thenReturn(Optional.absent()); when(storedMessages.getMessagesForDevice(account.getNumber(), device.getId())) - .thenReturn(pendingMessages); + .thenReturn(pendingMessagesList); final List> futures = new LinkedList<>(); final WebSocketClient client = mock(WebSocketClient.class);