From fb5e0242d05e6e629cb1d7fc64c24312427c5eaa Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Fri, 4 Dec 2015 11:39:33 -0800 Subject: [PATCH] Adjust requeue message logic to avoid redis assumptions // FREEBIE --- .../textsecuregcm/WhisperServerService.java | 2 +- .../textsecuregcm/push/PushSender.java | 68 ++++++----- .../textsecuregcm/push/WebsocketSender.java | 20 +++- .../websocket/WebSocketConnection.java | 5 +- .../websocket/WebSocketConnectionTest.java | 107 +++++++++++++++++- 5 files changed, 165 insertions(+), 37 deletions(-) diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 9e7dead2e..d91141162 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -249,7 +249,7 @@ public class WhisperServerService extends Applicationabsent())); - verify(pushSender, times(1)).sendMessage(eq(account), eq(device), any(Envelope.class)); + verify(websocketSender, times(1)).queueMessage(eq(account), eq(device), any(Envelope.class)); + verify(pushSender, times(1)).sendQueuedNotification(eq(account), eq(device), eq(10)); connection.onDispatchUnsubscribed(websocketAddress.serialize()); verify(client).close(anyInt(), anyString()); } + @Test + public void testPendingSend() throws Exception { + MessagesManager storedMessages = mock(MessagesManager.class); + WebsocketSender websocketSender = mock(WebsocketSender.class); + + reset(websocketSender); + reset(pushSender); + + when(pushSender.getWebSocketSender()).thenReturn(websocketSender); + when(websocketSender.queueMessage(any(Account.class), any(Device.class), any(Envelope.class))).thenReturn(10); + + final Envelope firstMessage = Envelope.newBuilder() + .setLegacyMessage(ByteString.copyFrom("first".getBytes())) + .setSource("sender1") + .setTimestamp(System.currentTimeMillis()) + .setSourceDevice(1) + .setType(Envelope.Type.CIPHERTEXT) + .build(); + + final Envelope secondMessage = Envelope.newBuilder() + .setLegacyMessage(ByteString.copyFrom("second".getBytes())) + .setSource("sender2") + .setTimestamp(System.currentTimeMillis()) + .setSourceDevice(2) + .setType(Envelope.Type.CIPHERTEXT) + .build(); + + List pendingMessages = new LinkedList() {{ + add(new OutgoingMessageEntity(1, firstMessage.getType().getNumber(), firstMessage.getRelay(), + firstMessage.getTimestamp(), firstMessage.getSource(), + firstMessage.getSourceDevice(), firstMessage.getLegacyMessage().toByteArray(), + firstMessage.getContent().toByteArray())); + add(new OutgoingMessageEntity(2, secondMessage.getType().getNumber(), secondMessage.getRelay(), + secondMessage.getTimestamp(), secondMessage.getSource(), + secondMessage.getSourceDevice(), secondMessage.getLegacyMessage().toByteArray(), + secondMessage.getContent().toByteArray())); + }}; + + OutgoingMessageEntityList pendingMessagesList = new OutgoingMessageEntityList(pendingMessages, false); + + when(device.getId()).thenReturn(2L); + when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52])); + + when(account.getAuthenticatedDevice()).thenReturn(Optional.of(device)); + when(account.getNumber()).thenReturn("+14152222222"); + + final Device sender1device = mock(Device.class); + + Set sender1devices = new HashSet() {{ + add(sender1device); + }}; + + Account sender1 = mock(Account.class); + when(sender1.getDevices()).thenReturn(sender1devices); + + when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1)); + when(accountsManager.get("sender2")).thenReturn(Optional.absent()); + + when(storedMessages.getMessagesForDevice(account.getNumber(), device.getId())) + .thenReturn(pendingMessagesList); + + final List> futures = new LinkedList<>(); + final WebSocketClient client = mock(WebSocketClient.class); + + when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), any(Optional.class))) + .thenAnswer(new Answer>() { + @Override + public SettableFuture answer(InvocationOnMock invocationOnMock) throws Throwable { + SettableFuture future = SettableFuture.create(); + futures.add(future); + return future; + } + }); + + WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId()); + WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, storedMessages, + account, device, client); + + connection.onDispatchSubscribed(websocketAddress.serialize()); + + verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(Optional.class)); + + assertEquals(futures.size(), 2); + + WebSocketResponseMessage response = mock(WebSocketResponseMessage.class); + when(response.getStatus()).thenReturn(200); + futures.get(1).set(response); + futures.get(0).setException(new IOException()); + + verify(receiptSender, times(1)).sendReceipt(eq(account), eq("sender2"), eq(secondMessage.getTimestamp()), eq(Optional.absent())); + verifyNoMoreInteractions(websocketSender); + verifyNoMoreInteractions(pushSender); + + connection.onDispatchUnsubscribed(websocketAddress.serialize()); + verify(client).close(anyInt(), anyString()); + } + + private OutgoingMessageEntity createMessage(long id, String sender, long timestamp, boolean receipt, String content) { return new OutgoingMessageEntity(id, receipt ? Envelope.Type.RECEIPT_VALUE : Envelope.Type.CIPHERTEXT_VALUE, null, timestamp, sender, 1, content.getBytes(), null);