From db6ee8f687af1da94abc0cda0c2f75230ada9a11 Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Wed, 15 Apr 2015 16:19:07 -0700 Subject: [PATCH] Make stored messages REST accessible. Add REST endpoints for retrieving and acknowledging pending messges, beyond the WebSocket. // FREEBIE --- .../textsecuregcm/WhisperServerService.java | 8 +- .../controllers/MessageController.java | 42 +++++++++ .../controllers/ReceiptController.java | 74 ++------------- .../entities/OutgoingMessageEntity.java | 70 +++++++++++++++ .../entities/OutgoingMessageEntityList.java | 23 +++++ .../textsecuregcm/push/ReceiptSender.java | 89 +++++++++++++++++++ .../textsecuregcm/storage/Messages.java | 32 +++---- .../storage/MessagesManager.java | 8 +- .../AuthenticatedConnectListener.java | 8 +- .../websocket/WebSocketConnection.java | 66 ++++++-------- .../controllers/FederatedControllerTest.java | 6 +- .../controllers/MessageControllerTest.java | 86 +++++++++++++++++- .../controllers/ReceiptControllerTest.java | 5 +- .../websocket/WebSocketConnectionTest.java | 45 ++++++---- 14 files changed, 412 insertions(+), 150 deletions(-) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntity.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntityList.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/push/ReceiptSender.java diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 414758eae..5df9246a7 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -58,6 +58,7 @@ import org.whispersystems.textsecuregcm.providers.TimeProvider; import org.whispersystems.textsecuregcm.push.FeedbackHandler; import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.PushServiceClient; +import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.WebsocketSender; import org.whispersystems.textsecuregcm.sms.NexmoSmsSender; import org.whispersystems.textsecuregcm.sms.SmsSender; @@ -174,6 +175,7 @@ public class WhisperServerService extends Application authorizationKey = config.getRedphoneConfiguration().getAuthorizationKey(); @@ -183,7 +185,7 @@ public class WhisperServerService extends Application(new FederatedPeerAuthenticator(config.getFederationConfiguration()), FederatedPeer.class, @@ -195,7 +197,7 @@ public class WhisperServerService extends Application message = messagesManager.delete(account.getNumber(), id); + + if (message.isPresent() && message.get().getType() != OutgoingMessageSignal.Type.RECEIPT_VALUE) { + receiptSender.sendReceipt(account, + message.get().getSource(), + message.get().getTimestamp(), + Optional.fromNullable(message.get().getRelay())); + } + } catch (NoSuchUserException | NotPushRegisteredException | TransientPushFailureException e) { + logger.warn("Sending delivery receipt", e); + } + } + + private void sendLocalMessage(Account source, String destinationName, IncomingMessageList messages, diff --git a/src/main/java/org/whispersystems/textsecuregcm/controllers/ReceiptController.java b/src/main/java/org/whispersystems/textsecuregcm/controllers/ReceiptController.java index b33ff7aba..6d56e72ff 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/controllers/ReceiptController.java +++ b/src/main/java/org/whispersystems/textsecuregcm/controllers/ReceiptController.java @@ -2,14 +2,10 @@ package org.whispersystems.textsecuregcm.controllers; import com.codahale.metrics.annotation.Timed; import com.google.common.base.Optional; -import org.whispersystems.textsecuregcm.federation.FederatedClientManager; -import org.whispersystems.textsecuregcm.federation.NoSuchPeerException; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; -import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.TransientPushFailureException; import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.AccountsManager; -import org.whispersystems.textsecuregcm.storage.Device; import javax.ws.rs.PUT; import javax.ws.rs.Path; @@ -18,25 +14,16 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import java.io.IOException; -import java.util.Set; import io.dropwizard.auth.Auth; -import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal; @Path("/v1/receipt") public class ReceiptController { - private final AccountsManager accountManager; - private final PushSender pushSender; - private final FederatedClientManager federatedClientManager; + private final ReceiptSender receiptSender; - public ReceiptController(AccountsManager accountManager, - FederatedClientManager federatedClientManager, - PushSender pushSender) - { - this.accountManager = accountManager; - this.federatedClientManager = federatedClientManager; - this.pushSender = pushSender; + public ReceiptController(ReceiptSender receiptSender) { + this.receiptSender = receiptSender; } @Timed @@ -49,11 +36,7 @@ public class ReceiptController { throws IOException { try { - if (relay.isPresent() && !relay.get().isEmpty()) { - sendRelayedReceipt(source, destination, messageId, relay.get()); - } else { - sendDirectReceipt(source, destination, messageId); - } + receiptSender.sendReceipt(source, destination, messageId, relay); } catch (NoSuchUserException | NotPushRegisteredException e) { throw new WebApplicationException(Response.Status.NOT_FOUND); } catch (TransientPushFailureException e) { @@ -61,51 +44,4 @@ public class ReceiptController { } } - private void sendRelayedReceipt(Account source, String destination, long messageId, String relay) - throws NoSuchUserException, IOException - { - try { - federatedClientManager.getClient(relay) - .sendDeliveryReceipt(source.getNumber(), - source.getAuthenticatedDevice().get().getId(), - destination, messageId); - } catch (NoSuchPeerException e) { - throw new NoSuchUserException(e); - } - } - - private void sendDirectReceipt(Account source, String destination, long messageId) - throws NotPushRegisteredException, TransientPushFailureException, NoSuchUserException - { - Account destinationAccount = getDestinationAccount(destination); - Set destinationDevices = destinationAccount.getDevices(); - - OutgoingMessageSignal.Builder message = - OutgoingMessageSignal.newBuilder() - .setSource(source.getNumber()) - .setSourceDevice((int) source.getAuthenticatedDevice().get().getId()) - .setTimestamp(messageId) - .setType(OutgoingMessageSignal.Type.RECEIPT_VALUE); - - if (source.getRelay().isPresent()) { - message.setRelay(source.getRelay().get()); - } - - for (Device destinationDevice : destinationDevices) { - pushSender.sendMessage(destinationAccount, destinationDevice, message.build()); - } - } - - private Account getDestinationAccount(String destination) - throws NoSuchUserException - { - Optional account = accountManager.get(destination); - - if (!account.isPresent()) { - throw new NoSuchUserException(destination); - } - - return account.get(); - } - } diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntity.java b/src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntity.java new file mode 100644 index 000000000..8a1babd71 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntity.java @@ -0,0 +1,70 @@ +package org.whispersystems.textsecuregcm.entities; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class OutgoingMessageEntity { + + @JsonIgnore + private long id; + + @JsonProperty + private int type; + + @JsonProperty + private String relay; + + @JsonProperty + private long timestamp; + + @JsonProperty + private String source; + + @JsonProperty + private int sourceDevice; + + @JsonProperty + private byte[] message; + + public OutgoingMessageEntity() {} + + public OutgoingMessageEntity(long id, int type, String relay, long timestamp, + String source, int sourceDevice, byte[] message) + { + this.id = id; + this.type = type; + this.relay = relay; + this.timestamp = timestamp; + this.source = source; + this.sourceDevice = sourceDevice; + this.message = message; + } + + public int getType() { + return type; + } + + public String getRelay() { + return relay; + } + + public long getTimestamp() { + return timestamp; + } + + public String getSource() { + return source; + } + + public int getSourceDevice() { + return sourceDevice; + } + + public byte[] getMessage() { + return message; + } + + public long getId() { + return id; + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntityList.java b/src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntityList.java new file mode 100644 index 000000000..55d5a36a4 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/entities/OutgoingMessageEntityList.java @@ -0,0 +1,23 @@ +package org.whispersystems.textsecuregcm.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; + +import java.util.List; + +public class OutgoingMessageEntityList { + + @JsonProperty + private List messages; + + public OutgoingMessageEntityList() {} + + public OutgoingMessageEntityList(List messages) { + this.messages = messages; + } + + @VisibleForTesting + public List getMessages() { + return messages; + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/ReceiptSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/ReceiptSender.java new file mode 100644 index 000000000..6ab16c05f --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/push/ReceiptSender.java @@ -0,0 +1,89 @@ +package org.whispersystems.textsecuregcm.push; + +import com.google.common.base.Optional; +import org.whispersystems.textsecuregcm.controllers.NoSuchUserException; +import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.federation.FederatedClientManager; +import org.whispersystems.textsecuregcm.federation.NoSuchPeerException; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; + +import java.io.IOException; +import java.util.Set; + +public class ReceiptSender { + + private final PushSender pushSender; + private final FederatedClientManager federatedClientManager; + private final AccountsManager accountManager; + + public ReceiptSender(AccountsManager accountManager, + PushSender pushSender, + FederatedClientManager federatedClientManager) + { + this.federatedClientManager = federatedClientManager; + this.accountManager = accountManager; + this.pushSender = pushSender; + } + + public void sendReceipt(Account source, String destination, + long messageId, Optional relay) + throws IOException, NoSuchUserException, + NotPushRegisteredException, TransientPushFailureException + { + if (relay.isPresent() && !relay.get().isEmpty()) { + sendRelayedReceipt(source, destination, messageId, relay.get()); + } else { + sendDirectReceipt(source, destination, messageId); + } + } + + private void sendRelayedReceipt(Account source, String destination, long messageId, String relay) + throws NoSuchUserException, IOException + { + try { + federatedClientManager.getClient(relay) + .sendDeliveryReceipt(source.getNumber(), + source.getAuthenticatedDevice().get().getId(), + destination, messageId); + } catch (NoSuchPeerException e) { + throw new NoSuchUserException(e); + } + } + + private void sendDirectReceipt(Account source, String destination, long messageId) + throws NotPushRegisteredException, TransientPushFailureException, NoSuchUserException + { + Account destinationAccount = getDestinationAccount(destination); + Set destinationDevices = destinationAccount.getDevices(); + + MessageProtos.OutgoingMessageSignal.Builder message = + MessageProtos.OutgoingMessageSignal.newBuilder() + .setSource(source.getNumber()) + .setSourceDevice((int) source.getAuthenticatedDevice().get().getId()) + .setTimestamp(messageId) + .setType(MessageProtos.OutgoingMessageSignal.Type.RECEIPT_VALUE); + + if (source.getRelay().isPresent()) { + message.setRelay(source.getRelay().get()); + } + + for (Device destinationDevice : destinationDevices) { + pushSender.sendMessage(destinationAccount, destinationDevice, message.build()); + } + } + + private Account getDestinationAccount(String destination) + throws NoSuchUserException + { + Optional account = accountManager.get(destination); + + if (!account.isPresent()) { + throw new NoSuchUserException(destination); + } + + return account.get(); + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java b/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java index e151d1b04..db9c02de9 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java @@ -1,6 +1,5 @@ package org.whispersystems.textsecuregcm.storage; -import com.google.protobuf.ByteString; import org.skife.jdbi.v2.SQLStatement; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.sqlobject.Bind; @@ -12,7 +11,7 @@ import org.skife.jdbi.v2.sqlobject.SqlUpdate; import org.skife.jdbi.v2.sqlobject.customizers.Mapper; import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal; -import org.whispersystems.textsecuregcm.util.Pair; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import java.lang.annotation.Annotation; import java.lang.annotation.ElementType; @@ -44,9 +43,14 @@ public abstract class Messages { @Mapper(MessageMapper.class) @SqlQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC") - abstract List> load(@Bind("destination") String destination, - @Bind("destination_device") long destinationDevice); + abstract List load(@Bind("destination") String destination, + @Bind("destination_device") long destinationDevice); + @Mapper(MessageMapper.class) + @SqlQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM messages WHERE " + DESTINATION + " = :destination AND " + TIMESTAMP + " = :timestamp ORDER BY " + ID + " LIMIT 1) RETURNING *") + abstract OutgoingMessageEntity remove(@Bind("destination") String destination, @Bind("timestamp") long timestamp); + + @Mapper(MessageMapper.class) @SqlUpdate("DELETE FROM messages WHERE " + ID + " = :id") abstract void remove(@Bind("id") long id); @@ -56,20 +60,18 @@ public abstract class Messages { @SqlUpdate("VACUUM messages") public abstract void vacuum(); - public static class MessageMapper implements ResultSetMapper> { + public static class MessageMapper implements ResultSetMapper { @Override - public Pair map(int i, ResultSet resultSet, StatementContext statementContext) + public OutgoingMessageEntity map(int i, ResultSet resultSet, StatementContext statementContext) throws SQLException { - return new Pair<>(resultSet.getLong(ID), - OutgoingMessageSignal.newBuilder() - .setType(resultSet.getInt(TYPE)) - .setRelay(resultSet.getString(RELAY)) - .setTimestamp(resultSet.getLong(TIMESTAMP)) - .setSource(resultSet.getString(SOURCE)) - .setSourceDevice(resultSet.getInt(SOURCE_DEVICE)) - .setMessage(ByteString.copyFrom(resultSet.getBytes(MESSAGE))) - .build()); + return new OutgoingMessageEntity(resultSet.getLong(ID), + resultSet.getInt(TYPE), + resultSet.getString(RELAY), + resultSet.getLong(TIMESTAMP), + resultSet.getString(SOURCE), + resultSet.getInt(SOURCE_DEVICE), + resultSet.getBytes(MESSAGE)); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 0f4295206..0e3144fa2 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -1,7 +1,9 @@ package org.whispersystems.textsecuregcm.storage; +import com.google.common.base.Optional; import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.util.Pair; import java.util.List; @@ -18,7 +20,7 @@ public class MessagesManager { return this.messages.store(message, destination, destinationDevice) + 1; } - public List> getMessagesForDevice(String destination, long destinationDevice) { + public List getMessagesForDevice(String destination, long destinationDevice) { return this.messages.load(destination, destinationDevice); } @@ -26,6 +28,10 @@ public class MessagesManager { this.messages.clear(destination); } + public Optional delete(String destination, long timestamp) { + return Optional.fromNullable(this.messages.remove(destination, timestamp)); + } + public void delete(long id) { this.messages.remove(id); } diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index 8ae561cf5..d3c5d8211 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -6,6 +6,7 @@ import com.codahale.metrics.SharedMetricRegistries; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -27,14 +28,17 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { private final AccountsManager accountsManager; private final PushSender pushSender; + private final ReceiptSender receiptSender; private final MessagesManager messagesManager; private final PubSubManager pubSubManager; public AuthenticatedConnectListener(AccountsManager accountsManager, PushSender pushSender, - MessagesManager messagesManager, PubSubManager pubSubManager) + ReceiptSender receiptSender, MessagesManager messagesManager, + PubSubManager pubSubManager) { this.accountsManager = accountsManager; this.pushSender = pushSender; + this.receiptSender = receiptSender; this.messagesManager = messagesManager; this.pubSubManager = pubSubManager; } @@ -45,7 +49,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { final Device device = account.getAuthenticatedDevice().get(); final long connectTime = System.currentTimeMillis(); final WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId()); - final WebSocketConnection connection = new WebSocketConnection(accountsManager, pushSender, + final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, context.getClient()); diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 612a77d38..40a55f3e6 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -1,35 +1,33 @@ package org.whispersystems.textsecuregcm.websocket; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; import com.google.common.base.Optional; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.dispatch.DispatchChannel; +import org.whispersystems.textsecuregcm.controllers.NoSuchUserException; import org.whispersystems.textsecuregcm.entities.CryptoEncodingException; import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.TransientPushFailureException; import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessagesManager; -import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.messages.WebSocketResponseMessage; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.IOException; import java.util.List; -import static com.codahale.metrics.MetricRegistry.name; import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal; import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage; @@ -37,7 +35,7 @@ public class WebSocketConnection implements DispatchChannel { private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class); - private final AccountsManager accountsManager; + private final ReceiptSender receiptSender; private final PushSender pushSender; private final MessagesManager messagesManager; @@ -45,17 +43,15 @@ public class WebSocketConnection implements DispatchChannel { private final Device device; private final WebSocketClient client; - private long connectionStartTime; - - public WebSocketConnection(AccountsManager accountsManager, - PushSender pushSender, + public WebSocketConnection(PushSender pushSender, + ReceiptSender receiptSender, MessagesManager messagesManager, Account account, Device device, WebSocketClient client) { - this.accountsManager = accountsManager; this.pushSender = pushSender; + this.receiptSender = receiptSender; this.messagesManager = messagesManager; this.account = account; this.device = device; @@ -107,7 +103,7 @@ public class WebSocketConnection implements DispatchChannel { if (isSuccessResponse(response)) { if (storedMessageId.isPresent()) messagesManager.delete(storedMessageId.get()); if (!isReceipt) sendDeliveryReceiptFor(message); - } else if (!isSuccessResponse(response) & !storedMessageId.isPresent()) { + } else if (!isSuccessResponse(response) && !storedMessageId.isPresent()) { requeueMessage(message); } } @@ -137,34 +133,30 @@ public class WebSocketConnection implements DispatchChannel { private void sendDeliveryReceiptFor(OutgoingMessageSignal message) { try { - Optional source = accountsManager.get(message.getSource()); - - if (!source.isPresent()) { - logger.warn(String.format("Source account disappeared? (%s)", message.getSource())); - return; - } - - OutgoingMessageSignal.Builder receipt = - OutgoingMessageSignal.newBuilder() - .setSource(account.getNumber()) - .setSourceDevice((int) device.getId()) - .setTimestamp(message.getTimestamp()) - .setType(OutgoingMessageSignal.Type.RECEIPT_VALUE); - - for (Device device : source.get().getDevices()) { - pushSender.sendMessage(source.get(), device, receipt.build()); - } - } catch (NotPushRegisteredException | TransientPushFailureException e) { - logger.warn("sendDeliveryReceiptFor", "Delivery receipet", e); + receiptSender.sendReceipt(account, message.getSource(), message.getTimestamp(), + message.hasRelay() ? Optional.of(message.getRelay()) : + Optional.absent()); + } catch (IOException | NoSuchUserException | TransientPushFailureException | NotPushRegisteredException e) { + logger.warn("sendDeliveryReceiptFor", e); } } private void processStoredMessages() { - List> messages = messagesManager.getMessagesForDevice(account.getNumber(), - device.getId()); + List messages = messagesManager.getMessagesForDevice(account.getNumber(), device.getId()); - for (Pair message : messages) { - sendMessage(message.second(), Optional.of(message.first())); + for (OutgoingMessageEntity message : messages) { + OutgoingMessageSignal.Builder builder = OutgoingMessageSignal.newBuilder() + .setType(message.getType()) + .setMessage(ByteString.copyFrom(message.getMessage())) + .setSourceDevice(message.getSourceDevice()) + .setSource(message.getSource()) + .setTimestamp(message.getTimestamp()); + + if (message.getRelay() != null && !message.getRelay().isEmpty()) { + builder.setRelay(message.getRelay()); + } + + sendMessage(builder.build(), Optional.of(message.getId())); } } diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/FederatedControllerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/FederatedControllerTest.java index c3cc05c59..d0a455029 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/FederatedControllerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/FederatedControllerTest.java @@ -20,9 +20,11 @@ import org.whispersystems.textsecuregcm.federation.FederatedClientManager; import org.whispersystems.textsecuregcm.limits.RateLimiter; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.tests.util.AuthHelper; import javax.ws.rs.core.MediaType; @@ -45,8 +47,10 @@ public class FederatedControllerTest { private static final String MULTI_DEVICE_RECIPIENT = "+14152222222"; private PushSender pushSender = mock(PushSender.class ); + private ReceiptSender receiptSender = mock(ReceiptSender.class); private FederatedClientManager federatedClientManager = mock(FederatedClientManager.class); private AccountsManager accountsManager = mock(AccountsManager.class ); + private MessagesManager messagesManager = mock(MessagesManager.class); private RateLimiters rateLimiters = mock(RateLimiters.class ); private RateLimiter rateLimiter = mock(RateLimiter.class ); @@ -55,7 +59,7 @@ public class FederatedControllerTest { private final ObjectMapper mapper = new ObjectMapper(); - private final MessageController messageController = new MessageController(rateLimiters, pushSender, accountsManager, federatedClientManager); + private final MessageController messageController = new MessageController(rateLimiters, pushSender, receiptSender, accountsManager, messagesManager, federatedClientManager); private final KeysControllerV2 keysControllerV2 = mock(KeysControllerV2.class); @Rule diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java index dadd5cd47..734a3e7aa 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java @@ -3,6 +3,7 @@ package org.whispersystems.textsecuregcm.tests.controllers; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.sun.jersey.api.client.ClientResponse; +import org.hamcrest.CoreMatchers; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -10,19 +11,26 @@ import org.whispersystems.textsecuregcm.controllers.MessageController; import org.whispersystems.textsecuregcm.entities.IncomingMessageList; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MismatchedDevices; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; import org.whispersystems.textsecuregcm.entities.SignedPreKey; import org.whispersystems.textsecuregcm.entities.StaleDevices; import org.whispersystems.textsecuregcm.federation.FederatedClientManager; import org.whispersystems.textsecuregcm.limits.RateLimiter; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.tests.util.AuthHelper; +import org.whispersystems.textsecuregcm.util.SystemMapper; import javax.ws.rs.core.MediaType; import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -30,6 +38,7 @@ import io.dropwizard.testing.junit.ResourceTestRule; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.*; @@ -42,8 +51,10 @@ public class MessageControllerTest { private static final String MULTI_DEVICE_RECIPIENT = "+14152222222"; private final PushSender pushSender = mock(PushSender.class ); + private final ReceiptSender receiptSender = mock(ReceiptSender.class); private final FederatedClientManager federatedClientManager = mock(FederatedClientManager.class); private final AccountsManager accountsManager = mock(AccountsManager.class ); + private final MessagesManager messagesManager = mock(MessagesManager.class); private final RateLimiters rateLimiters = mock(RateLimiters.class ); private final RateLimiter rateLimiter = mock(RateLimiter.class ); @@ -52,8 +63,8 @@ public class MessageControllerTest { @Rule public final ResourceTestRule resources = ResourceTestRule.builder() .addProvider(AuthHelper.getAuthenticator()) - .addResource(new MessageController(rateLimiters, pushSender, accountsManager, - federatedClientManager)) + .addResource(new MessageController(rateLimiters, pushSender, receiptSender, accountsManager, + messagesManager, federatedClientManager)) .build(); @@ -175,4 +186,75 @@ public class MessageControllerTest { } + @Test + public synchronized void testGetMessages() throws Exception { + + final long timestampOne = 313377; + final long timestampTwo = 313388; + + List messages = new LinkedList() {{ + add(new OutgoingMessageEntity(1L, MessageProtos.OutgoingMessageSignal.Type.CIPHERTEXT_VALUE, null, timestampOne, "+14152222222", 2, "hi there".getBytes())); + add(new OutgoingMessageEntity(2L, MessageProtos.OutgoingMessageSignal.Type.RECEIPT_VALUE, null, timestampTwo, "+14152222222", 2, null)); + }}; + + when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(1L))).thenReturn(messages); + + OutgoingMessageEntityList response = + resources.client().resource("/v1/messages/") + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD)) + .accept(MediaType.APPLICATION_JSON_TYPE) + .get(OutgoingMessageEntityList.class); + + + assertEquals(response.getMessages().size(), 2); + + assertEquals(response.getMessages().get(0).getId(), 0); + assertEquals(response.getMessages().get(1).getId(), 0); + + assertEquals(response.getMessages().get(0).getTimestamp(), timestampOne); + assertEquals(response.getMessages().get(1).getTimestamp(), timestampTwo); + } + + @Test + public synchronized void testDeleteMessages() throws Exception { + long timestamp = System.currentTimeMillis(); + when(messagesManager.delete(AuthHelper.VALID_NUMBER, 31337)) + .thenReturn(Optional.of(new OutgoingMessageEntity(31337L, + MessageProtos.OutgoingMessageSignal.Type.CIPHERTEXT_VALUE, + null, timestamp, + "+14152222222", 1, "hi".getBytes()))); + + when(messagesManager.delete(AuthHelper.VALID_NUMBER, 31338)) + .thenReturn(Optional.of(new OutgoingMessageEntity(31337L, + MessageProtos.OutgoingMessageSignal.Type.RECEIPT_VALUE, + null, System.currentTimeMillis(), + "+14152222222", 1, null))); + + + when(messagesManager.delete(AuthHelper.VALID_NUMBER, 31339)) + .thenReturn(Optional.absent()); + + ClientResponse response = resources.client().resource(String.format("/v1/messages/%d", 31337)) + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD)) + .delete(ClientResponse.class); + + assertThat("Good Response Code", response.getStatus(), is(equalTo(204))); + verify(receiptSender).sendReceipt(any(Account.class), eq("+14152222222"), eq(timestamp), eq(Optional.absent())); + + response = resources.client().resource(String.format("/v1/messages/%d", 31338)) + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD)) + .delete(ClientResponse.class); + + assertThat("Good Response Code", response.getStatus(), is(equalTo(204))); + verifyNoMoreInteractions(receiptSender); + + response = resources.client().resource(String.format("/v1/messages/%d", 31339)) + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD)) + .delete(ClientResponse.class); + + assertThat("Good Response Code", response.getStatus(), is(equalTo(204))); + verifyNoMoreInteractions(receiptSender); + + } + } diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/ReceiptControllerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/ReceiptControllerTest.java index cd6cfec4b..55cb2543e 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/ReceiptControllerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/ReceiptControllerTest.java @@ -10,6 +10,7 @@ import org.whispersystems.textsecuregcm.controllers.ReceiptController; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.federation.FederatedClientManager; import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -32,12 +33,14 @@ public class ReceiptControllerTest { private final FederatedClientManager federatedClientManager = mock(FederatedClientManager.class); private final AccountsManager accountsManager = mock(AccountsManager.class ); + private final ReceiptSender receiptSender = new ReceiptSender(accountsManager, pushSender, federatedClientManager); + private final ObjectMapper mapper = new ObjectMapper(); @Rule public final ResourceTestRule resources = ResourceTestRule.builder() .addProvider(AuthHelper.getAuthenticator()) - .addResource(new ReceiptController(accountsManager, federatedClientManager, pushSender)) + .addResource(new ReceiptController(receiptSender)) .build(); @Before diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java index 40cf98200..a37c59f34 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java @@ -9,7 +9,9 @@ import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; +import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -57,12 +59,13 @@ public class WebSocketConnectionTest { private static final Device device = mock(Device.class ); private static final UpgradeRequest upgradeRequest = mock(UpgradeRequest.class ); private static final PushSender pushSender = mock(PushSender.class); + private static final ReceiptSender receiptSender = mock(ReceiptSender.class); @Test public void testCredentials() throws Exception { MessagesManager storedMessages = mock(MessagesManager.class); WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); - AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(accountsManager, pushSender, storedMessages, pubSubManager); + AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(accountsManager, pushSender, receiptSender, storedMessages, pubSubManager); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) @@ -98,10 +101,10 @@ public class WebSocketConnectionTest { public void testOpen() throws Exception { MessagesManager storedMessages = mock(MessagesManager.class); - List> outgoingMessages = new LinkedList> () {{ - add(new Pair<>(1L, createMessage("sender1", 1111, false, "first"))); - add(new Pair<>(2L, createMessage("sender1", 2222, false, "second"))); - add(new Pair<>(3L, createMessage("sender2", 3333, false, "third"))); + List outgoingMessages = new LinkedList () {{ + add(createMessage(1L, "sender1", 1111, false, "first")); + add(createMessage(2L, "sender1", 2222, false, "second")); + add(createMessage(3L, "sender2", 3333, false, "third")); }}; when(device.getId()).thenReturn(2L); @@ -139,7 +142,7 @@ public class WebSocketConnectionTest { }); WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId()); - WebSocketConnection connection = new WebSocketConnection(accountsManager, pushSender, storedMessages, + WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, storedMessages, account, device, client); connection.onDispatchSubscribed(websocketAddress.serialize()); @@ -154,25 +157,29 @@ public class WebSocketConnectionTest { futures.get(0).setException(new IOException()); futures.get(2).setException(new IOException()); - List pending = new LinkedList() {{ - add(createMessage("sender1", 1111, false, "first")); - add(createMessage("sender2", 3333, false, "third")); - }}; +// List pending = new LinkedList() {{ +// add(createMessage("sender1", 1111, false, "first")); +// add(createMessage("sender2", 3333, false, "third")); +// }}; - verify(pushSender, times(1)).sendMessage(eq(sender1), eq(sender1device), any(OutgoingMessageSignal.class)); + verify(storedMessages, times(1)).delete(eq(2L)); + +// verify(pushSender, times(1)).sendMessage(eq(sender1), eq(sender1device), any(OutgoingMessageSignal.class)); connection.onDispatchUnsubscribed(websocketAddress.serialize()); verify(client).close(anyInt(), anyString()); } - private OutgoingMessageSignal createMessage(String sender, long timestamp, boolean receipt, String content) { - return OutgoingMessageSignal.newBuilder() - .setSource(sender) - .setSourceDevice(1) - .setType(receipt ? OutgoingMessageSignal.Type.RECEIPT_VALUE : OutgoingMessageSignal.Type.CIPHERTEXT_VALUE) - .setTimestamp(timestamp) - .setMessage(ByteString.copyFrom(content.getBytes())) - .build(); + private OutgoingMessageEntity createMessage(long id, String sender, long timestamp, boolean receipt, String content) { + return new OutgoingMessageEntity(id, receipt ? OutgoingMessageSignal.Type.RECEIPT_VALUE : OutgoingMessageSignal.Type.CIPHERTEXT_VALUE, + null, timestamp, sender, 1, content.getBytes()); +// return OutgoingMessageSignal.newBuilder() +// .setSource(sender) +// .setSourceDevice(1) +// .setType(receipt ? OutgoingMessageSignal.Type.RECEIPT_VALUE : OutgoingMessageSignal.Type.CIPHERTEXT_VALUE) +// .setTimestamp(timestamp) +// .setMessage(ByteString.copyFrom(content.getBytes())) +// .build(); } }