Chunk sending pending message queues > a chunk size.

// FREEBIE
This commit is contained in:
Moxie Marlinspike 2015-08-17 17:12:36 -07:00
parent 62d8f635b0
commit 5ccbf355bd
7 changed files with 45 additions and 26 deletions

View File

@ -153,9 +153,8 @@ public class MessageController {
@GET @GET
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public OutgoingMessageEntityList getPendingMessages(@Auth Account account) { public OutgoingMessageEntityList getPendingMessages(@Auth Account account) {
return new OutgoingMessageEntityList(messagesManager.getMessagesForDevice(account.getNumber(), return messagesManager.getMessagesForDevice(account.getNumber(),
account.getAuthenticatedDevice() account.getAuthenticatedDevice().get().getId());
.get().getId()));
} }
@Timed @Timed

View File

@ -10,14 +10,21 @@ public class OutgoingMessageEntityList {
@JsonProperty @JsonProperty
private List<OutgoingMessageEntity> messages; private List<OutgoingMessageEntity> messages;
@JsonProperty
private boolean more;
public OutgoingMessageEntityList() {} public OutgoingMessageEntityList() {}
public OutgoingMessageEntityList(List<OutgoingMessageEntity> messages) { public OutgoingMessageEntityList(List<OutgoingMessageEntity> messages, boolean more) {
this.messages = messages; this.messages = messages;
this.more = more;
} }
@VisibleForTesting
public List<OutgoingMessageEntity> getMessages() { public List<OutgoingMessageEntity> getMessages() {
return messages; return messages;
} }
public boolean hasMore() {
return more;
}
} }

View File

@ -24,6 +24,8 @@ import java.util.List;
public abstract class Messages { public abstract class Messages {
public static final int RESULT_SET_CHUNK_SIZE = 1000;
private static final String ID = "id"; private static final String ID = "id";
private static final String TYPE = "type"; private static final String TYPE = "type";
private static final String RELAY = "relay"; private static final String RELAY = "relay";
@ -43,7 +45,7 @@ public abstract class Messages {
@Bind("destination_device") long destinationDevice); @Bind("destination_device") long destinationDevice);
@Mapper(MessageMapper.class) @Mapper(MessageMapper.class)
@SqlQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC") @SqlQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC LIMIT " + RESULT_SET_CHUNK_SIZE)
abstract List<OutgoingMessageEntity> load(@Bind("destination") String destination, abstract List<OutgoingMessageEntity> load(@Bind("destination") String destination,
@Bind("destination_device") long destinationDevice); @Bind("destination_device") long destinationDevice);

View File

@ -4,6 +4,7 @@ package org.whispersystems.textsecuregcm.storage;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
import java.util.List; import java.util.List;
@ -19,8 +20,9 @@ public class MessagesManager {
return this.messages.store(message, destination, destinationDevice) + 1; return this.messages.store(message, destination, destinationDevice) + 1;
} }
public List<OutgoingMessageEntity> getMessagesForDevice(String destination, long destinationDevice) { public OutgoingMessageEntityList getMessagesForDevice(String destination, long destinationDevice) {
return this.messages.load(destination, destinationDevice); List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
} }
public void clear(String destination) { public void clear(String destination) {

View File

@ -13,6 +13,7 @@ import org.whispersystems.textsecuregcm.controllers.NoSuchUserException;
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException; import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.ReceiptSender;
@ -27,7 +28,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.Iterator;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage; import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
@ -69,7 +70,7 @@ public class WebSocketConnection implements DispatchChannel {
processStoredMessages(); processStoredMessages();
break; break;
case PubSubMessage.Type.DELIVER_VALUE: case PubSubMessage.Type.DELIVER_VALUE:
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.<Long>absent()); sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.<Long>absent(), false);
break; break;
default: default:
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber()); logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
@ -89,7 +90,8 @@ public class WebSocketConnection implements DispatchChannel {
} }
private void sendMessage(final Envelope message, private void sendMessage(final Envelope message,
final Optional<Long> storedMessageId) final Optional<Long> storedMessageId,
final boolean requery)
{ {
try { try {
EncryptedOutgoingMessage encryptedMessage = new EncryptedOutgoingMessage(message, device.getSignalingKey()); EncryptedOutgoingMessage encryptedMessage = new EncryptedOutgoingMessage(message, device.getSignalingKey());
@ -104,6 +106,7 @@ public class WebSocketConnection implements DispatchChannel {
if (isSuccessResponse(response)) { if (isSuccessResponse(response)) {
if (storedMessageId.isPresent()) messagesManager.delete(account.getNumber(), storedMessageId.get()); if (storedMessageId.isPresent()) messagesManager.delete(account.getNumber(), storedMessageId.get());
if (!isReceipt) sendDeliveryReceiptFor(message); if (!isReceipt) sendDeliveryReceiptFor(message);
if (requery) processStoredMessages();
} else if (!isSuccessResponse(response) && !storedMessageId.isPresent()) { } else if (!isSuccessResponse(response) && !storedMessageId.isPresent()) {
requeueMessage(message); requeueMessage(message);
} }
@ -145,9 +148,11 @@ public class WebSocketConnection implements DispatchChannel {
} }
private void processStoredMessages() { private void processStoredMessages() {
List<OutgoingMessageEntity> messages = messagesManager.getMessagesForDevice(account.getNumber(), device.getId()); OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), device.getId());
Iterator<OutgoingMessageEntity> iterator = messages.getMessages().iterator();
for (OutgoingMessageEntity message : messages) { while (iterator.hasNext()) {
OutgoingMessageEntity message = iterator.next();
Envelope.Builder builder = Envelope.newBuilder() Envelope.Builder builder = Envelope.newBuilder()
.setType(Envelope.Type.valueOf(message.getType())) .setType(Envelope.Type.valueOf(message.getType()))
.setSourceDevice(message.getSourceDevice()) .setSourceDevice(message.getSourceDevice())
@ -166,9 +171,7 @@ public class WebSocketConnection implements DispatchChannel {
builder.setRelay(message.getRelay()); builder.setRelay(message.getRelay());
} }
sendMessage(builder.build(), Optional.of(message.getId())); sendMessage(builder.build(), Optional.of(message.getId()), !iterator.hasNext() && messages.hasMore());
} }
} }
} }

View File

@ -204,7 +204,9 @@ public class MessageControllerTest {
add(new OutgoingMessageEntity(2L, Envelope.Type.RECEIPT_VALUE, null, timestampTwo, "+14152222222", 2, null, null)); add(new OutgoingMessageEntity(2L, Envelope.Type.RECEIPT_VALUE, null, timestampTwo, "+14152222222", 2, null, null));
}}; }};
when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(1L))).thenReturn(messages); OutgoingMessageEntityList messagesList = new OutgoingMessageEntityList(messages, false);
when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(1L))).thenReturn(messagesList);
OutgoingMessageEntityList response = OutgoingMessageEntityList response =
resources.getJerseyTest().target("/v1/messages/") resources.getJerseyTest().target("/v1/messages/")

View File

@ -9,6 +9,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
import org.whispersystems.textsecuregcm.push.ApnFallbackManager; import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.ReceiptSender;
@ -104,6 +105,8 @@ public class WebSocketConnectionTest {
add(createMessage(3L, "sender2", 3333, false, "third")); add(createMessage(3L, "sender2", 3333, false, "third"));
}}; }};
OutgoingMessageEntityList outgoingMessagesList = new OutgoingMessageEntityList(outgoingMessages, false);
when(device.getId()).thenReturn(2L); when(device.getId()).thenReturn(2L);
when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52])); when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52]));
@ -123,7 +126,7 @@ public class WebSocketConnectionTest {
when(accountsManager.get("sender2")).thenReturn(Optional.<Account>absent()); when(accountsManager.get("sender2")).thenReturn(Optional.<Account>absent());
when(storedMessages.getMessagesForDevice(account.getNumber(), device.getId())) when(storedMessages.getMessagesForDevice(account.getNumber(), device.getId()))
.thenReturn(outgoingMessages); .thenReturn(outgoingMessagesList);
final List<SettableFuture<WebSocketResponseMessage>> futures = new LinkedList<>(); final List<SettableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
@ -181,6 +184,7 @@ public class WebSocketConnectionTest {
.build(); .build();
List<OutgoingMessageEntity> pendingMessages = new LinkedList<>(); List<OutgoingMessageEntity> pendingMessages = new LinkedList<>();
OutgoingMessageEntityList pendingMessagesList = new OutgoingMessageEntityList(pendingMessages, false);
when(device.getId()).thenReturn(2L); when(device.getId()).thenReturn(2L);
when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52])); when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52]));
@ -201,7 +205,7 @@ public class WebSocketConnectionTest {
when(accountsManager.get("sender2")).thenReturn(Optional.<Account>absent()); when(accountsManager.get("sender2")).thenReturn(Optional.<Account>absent());
when(storedMessages.getMessagesForDevice(account.getNumber(), device.getId())) when(storedMessages.getMessagesForDevice(account.getNumber(), device.getId()))
.thenReturn(pendingMessages); .thenReturn(pendingMessagesList);
final List<SettableFuture<WebSocketResponseMessage>> futures = new LinkedList<>(); final List<SettableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);