Adjust requeue message logic to avoid redis assumptions
// FREEBIE
This commit is contained in:
parent
d376035557
commit
fb5e0242d0
|
@ -249,7 +249,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
FilterRegistration.Dynamic filter = environment.servlets().addFilter("CORS", CrossOriginFilter.class);
|
||||
filter.addMappingForUrlPatterns(EnumSet.allOf(DispatcherType.class), true, "/*");
|
||||
filter.setInitParameter("allowedOrigins", "*");
|
||||
filter.setInitParameter("allowedHeaders", "Content-Type,Authorization,X-Requested-With,Content-Length,Accept,Origin");
|
||||
filter.setInitParameter("allowedHeaders", "Content-Type,Authorization,X-Requested-With,Content-Length,Accept,Origin,X-Signal-Agent");
|
||||
filter.setInitParameter("allowedMethods", "GET,PUT,POST,DELETE,OPTIONS");
|
||||
filter.setInitParameter("preflightMaxAge", "5184000");
|
||||
filter.setInitParameter("allowCredentials", "true");
|
||||
|
|
|
@ -19,8 +19,6 @@ package org.whispersystems.textsecuregcm.push;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.entities.ApnMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
|
||||
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.GcmMessage;
|
||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager.ApnFallbackTask;
|
||||
import org.whispersystems.textsecuregcm.push.WebsocketSender.DeliveryStatus;
|
||||
|
@ -58,54 +56,68 @@ public class PushSender {
|
|||
else throw new NotPushRegisteredException("No delivery possible!");
|
||||
}
|
||||
|
||||
public void sendQueuedNotification(Account account, Device device, int messageQueueDepth)
|
||||
throws NotPushRegisteredException, TransientPushFailureException
|
||||
{
|
||||
if (device.getGcmId() != null) sendGcmNotification(account, device);
|
||||
else if (device.getApnId() != null) sendApnNotification(account, device, messageQueueDepth);
|
||||
else if (!device.getFetchesMessages()) throw new NotPushRegisteredException("No notification possible!");
|
||||
}
|
||||
|
||||
public WebsocketSender getWebSocketSender() {
|
||||
return webSocketSender;
|
||||
}
|
||||
|
||||
private void sendGcmMessage(Account account, Device device, Envelope message)
|
||||
throws TransientPushFailureException
|
||||
{
|
||||
sendNotificationGcmMessage(account, device, message);
|
||||
}
|
||||
|
||||
private void sendNotificationGcmMessage(Account account, Device device, Envelope message)
|
||||
throws TransientPushFailureException
|
||||
{
|
||||
DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, message, WebsocketSender.Type.GCM);
|
||||
|
||||
if (!deliveryStatus.isDelivered()) {
|
||||
GcmMessage gcmMessage = new GcmMessage(device.getGcmId(), account.getNumber(),
|
||||
(int)device.getId(), "", false, true);
|
||||
|
||||
pushServiceClient.send(gcmMessage);
|
||||
sendGcmNotification(account, device);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendGcmNotification(Account account, Device device)
|
||||
throws TransientPushFailureException
|
||||
{
|
||||
GcmMessage gcmMessage = new GcmMessage(device.getGcmId(), account.getNumber(),
|
||||
(int)device.getId(), "", false, true);
|
||||
|
||||
pushServiceClient.send(gcmMessage);
|
||||
}
|
||||
|
||||
private void sendApnMessage(Account account, Device device, Envelope outgoingMessage)
|
||||
throws TransientPushFailureException
|
||||
{
|
||||
DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.APN);
|
||||
|
||||
if (!deliveryStatus.isDelivered() && outgoingMessage.getType() != Envelope.Type.RECEIPT) {
|
||||
ApnMessage apnMessage;
|
||||
|
||||
if (!Util.isEmpty(device.getVoipApnId())) {
|
||||
apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), (int)device.getId(),
|
||||
String.format(APN_PAYLOAD, deliveryStatus.getMessageQueueDepth()),
|
||||
true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30));
|
||||
|
||||
apnFallbackManager.schedule(new WebsocketAddress(account.getNumber(), device.getId()),
|
||||
new ApnFallbackTask(device.getApnId(), apnMessage));
|
||||
} else {
|
||||
apnMessage = new ApnMessage(device.getApnId(), account.getNumber(), (int)device.getId(),
|
||||
String.format(APN_PAYLOAD, deliveryStatus.getMessageQueueDepth()),
|
||||
false, ApnMessage.MAX_EXPIRATION);
|
||||
}
|
||||
|
||||
pushServiceClient.send(apnMessage);
|
||||
sendApnNotification(account, device, deliveryStatus.getMessageQueueDepth());
|
||||
}
|
||||
}
|
||||
|
||||
private void sendApnNotification(Account account, Device device, int messageQueueDepth)
|
||||
throws TransientPushFailureException
|
||||
{
|
||||
ApnMessage apnMessage;
|
||||
|
||||
if (!Util.isEmpty(device.getVoipApnId())) {
|
||||
apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), (int)device.getId(),
|
||||
String.format(APN_PAYLOAD, messageQueueDepth),
|
||||
true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30));
|
||||
|
||||
apnFallbackManager.schedule(new WebsocketAddress(account.getNumber(), device.getId()),
|
||||
new ApnFallbackTask(device.getApnId(), apnMessage));
|
||||
} else {
|
||||
apnMessage = new ApnMessage(device.getApnId(), account.getNumber(), (int)device.getId(),
|
||||
String.format(APN_PAYLOAD, messageQueueDepth),
|
||||
false, ApnMessage.MAX_EXPIRATION);
|
||||
}
|
||||
|
||||
pushServiceClient.send(apnMessage);
|
||||
}
|
||||
|
||||
private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage)
|
||||
{
|
||||
webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.WEB);
|
||||
|
|
|
@ -46,6 +46,7 @@ public class WebsocketSender {
|
|||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
|
||||
private final Meter websocketRequeueMeter = metricRegistry.meter(name(getClass(), "ws_requeue"));
|
||||
private final Meter websocketOnlineMeter = metricRegistry.meter(name(getClass(), "ws_online" ));
|
||||
private final Meter websocketOfflineMeter = metricRegistry.meter(name(getClass(), "ws_offline" ));
|
||||
|
||||
|
@ -84,15 +85,24 @@ public class WebsocketSender {
|
|||
else if (channel == Type.GCM) gcmOfflineMeter.mark();
|
||||
else websocketOfflineMeter.mark();
|
||||
|
||||
int queueDepth = messagesManager.insert(account.getNumber(), device.getId(), message);
|
||||
pubSubManager.publish(address, PubSubMessage.newBuilder()
|
||||
.setType(PubSubMessage.Type.QUERY_DB)
|
||||
.build());
|
||||
|
||||
int queueDepth = queueMessage(account, device, message);
|
||||
return new DeliveryStatus(false, queueDepth);
|
||||
}
|
||||
}
|
||||
|
||||
public int queueMessage(Account account, Device device, Envelope message) {
|
||||
websocketRequeueMeter.mark();
|
||||
|
||||
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
||||
int queueDepth = messagesManager.insert(account.getNumber(), device.getId(), message);
|
||||
|
||||
pubSubManager.publish(address, PubSubMessage.newBuilder()
|
||||
.setType(PubSubMessage.Type.QUERY_DB)
|
||||
.build());
|
||||
|
||||
return queueDepth;
|
||||
}
|
||||
|
||||
public boolean sendProvisioningMessage(ProvisioningAddress address, byte[] body) {
|
||||
PubSubMessage pubSubMessage = PubSubMessage.newBuilder()
|
||||
.setType(PubSubMessage.Type.DELIVER)
|
||||
|
|
|
@ -127,11 +127,12 @@ public class WebSocketConnection implements DispatchChannel {
|
|||
}
|
||||
|
||||
private void requeueMessage(Envelope message) {
|
||||
int queueDepth = pushSender.getWebSocketSender().queueMessage(account, device, message);
|
||||
|
||||
try {
|
||||
pushSender.sendMessage(account, device, message);
|
||||
pushSender.sendQueuedNotification(account, device, queueDepth);
|
||||
} catch (NotPushRegisteredException | TransientPushFailureException e) {
|
||||
logger.warn("requeueMessage", e);
|
||||
messagesManager.insert(account.getNumber(), device.getId(), message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
|
|||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||
import org.whispersystems.textsecuregcm.push.WebsocketSender;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
|
@ -167,6 +168,11 @@ public class WebSocketConnectionTest {
|
|||
@Test
|
||||
public void testOnlineSend() throws Exception {
|
||||
MessagesManager storedMessages = mock(MessagesManager.class);
|
||||
WebsocketSender websocketSender = mock(WebsocketSender.class);
|
||||
|
||||
when(pushSender.getWebSocketSender()).thenReturn(websocketSender);
|
||||
when(websocketSender.queueMessage(any(Account.class), any(Device.class), any(Envelope.class))).thenReturn(10);
|
||||
|
||||
Envelope firstMessage = Envelope.newBuilder()
|
||||
.setLegacyMessage(ByteString.copyFrom("first".getBytes()))
|
||||
.setSource("sender1")
|
||||
|
@ -245,12 +251,111 @@ public class WebSocketConnectionTest {
|
|||
futures.get(0).setException(new IOException());
|
||||
|
||||
verify(receiptSender, times(1)).sendReceipt(eq(account), eq("sender2"), eq(secondMessage.getTimestamp()), eq(Optional.<String>absent()));
|
||||
verify(pushSender, times(1)).sendMessage(eq(account), eq(device), any(Envelope.class));
|
||||
verify(websocketSender, times(1)).queueMessage(eq(account), eq(device), any(Envelope.class));
|
||||
verify(pushSender, times(1)).sendQueuedNotification(eq(account), eq(device), eq(10));
|
||||
|
||||
connection.onDispatchUnsubscribed(websocketAddress.serialize());
|
||||
verify(client).close(anyInt(), anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPendingSend() throws Exception {
|
||||
MessagesManager storedMessages = mock(MessagesManager.class);
|
||||
WebsocketSender websocketSender = mock(WebsocketSender.class);
|
||||
|
||||
reset(websocketSender);
|
||||
reset(pushSender);
|
||||
|
||||
when(pushSender.getWebSocketSender()).thenReturn(websocketSender);
|
||||
when(websocketSender.queueMessage(any(Account.class), any(Device.class), any(Envelope.class))).thenReturn(10);
|
||||
|
||||
final Envelope firstMessage = Envelope.newBuilder()
|
||||
.setLegacyMessage(ByteString.copyFrom("first".getBytes()))
|
||||
.setSource("sender1")
|
||||
.setTimestamp(System.currentTimeMillis())
|
||||
.setSourceDevice(1)
|
||||
.setType(Envelope.Type.CIPHERTEXT)
|
||||
.build();
|
||||
|
||||
final Envelope secondMessage = Envelope.newBuilder()
|
||||
.setLegacyMessage(ByteString.copyFrom("second".getBytes()))
|
||||
.setSource("sender2")
|
||||
.setTimestamp(System.currentTimeMillis())
|
||||
.setSourceDevice(2)
|
||||
.setType(Envelope.Type.CIPHERTEXT)
|
||||
.build();
|
||||
|
||||
List<OutgoingMessageEntity> pendingMessages = new LinkedList<OutgoingMessageEntity>() {{
|
||||
add(new OutgoingMessageEntity(1, firstMessage.getType().getNumber(), firstMessage.getRelay(),
|
||||
firstMessage.getTimestamp(), firstMessage.getSource(),
|
||||
firstMessage.getSourceDevice(), firstMessage.getLegacyMessage().toByteArray(),
|
||||
firstMessage.getContent().toByteArray()));
|
||||
add(new OutgoingMessageEntity(2, secondMessage.getType().getNumber(), secondMessage.getRelay(),
|
||||
secondMessage.getTimestamp(), secondMessage.getSource(),
|
||||
secondMessage.getSourceDevice(), secondMessage.getLegacyMessage().toByteArray(),
|
||||
secondMessage.getContent().toByteArray()));
|
||||
}};
|
||||
|
||||
OutgoingMessageEntityList pendingMessagesList = new OutgoingMessageEntityList(pendingMessages, false);
|
||||
|
||||
when(device.getId()).thenReturn(2L);
|
||||
when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52]));
|
||||
|
||||
when(account.getAuthenticatedDevice()).thenReturn(Optional.of(device));
|
||||
when(account.getNumber()).thenReturn("+14152222222");
|
||||
|
||||
final Device sender1device = mock(Device.class);
|
||||
|
||||
Set<Device> sender1devices = new HashSet<Device>() {{
|
||||
add(sender1device);
|
||||
}};
|
||||
|
||||
Account sender1 = mock(Account.class);
|
||||
when(sender1.getDevices()).thenReturn(sender1devices);
|
||||
|
||||
when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1));
|
||||
when(accountsManager.get("sender2")).thenReturn(Optional.<Account>absent());
|
||||
|
||||
when(storedMessages.getMessagesForDevice(account.getNumber(), device.getId()))
|
||||
.thenReturn(pendingMessagesList);
|
||||
|
||||
final List<SettableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
|
||||
final WebSocketClient client = mock(WebSocketClient.class);
|
||||
|
||||
when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), any(Optional.class)))
|
||||
.thenAnswer(new Answer<SettableFuture<WebSocketResponseMessage>>() {
|
||||
@Override
|
||||
public SettableFuture<WebSocketResponseMessage> answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
SettableFuture<WebSocketResponseMessage> future = SettableFuture.create();
|
||||
futures.add(future);
|
||||
return future;
|
||||
}
|
||||
});
|
||||
|
||||
WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId());
|
||||
WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, storedMessages,
|
||||
account, device, client);
|
||||
|
||||
connection.onDispatchSubscribed(websocketAddress.serialize());
|
||||
|
||||
verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(Optional.class));
|
||||
|
||||
assertEquals(futures.size(), 2);
|
||||
|
||||
WebSocketResponseMessage response = mock(WebSocketResponseMessage.class);
|
||||
when(response.getStatus()).thenReturn(200);
|
||||
futures.get(1).set(response);
|
||||
futures.get(0).setException(new IOException());
|
||||
|
||||
verify(receiptSender, times(1)).sendReceipt(eq(account), eq("sender2"), eq(secondMessage.getTimestamp()), eq(Optional.<String>absent()));
|
||||
verifyNoMoreInteractions(websocketSender);
|
||||
verifyNoMoreInteractions(pushSender);
|
||||
|
||||
connection.onDispatchUnsubscribed(websocketAddress.serialize());
|
||||
verify(client).close(anyInt(), anyString());
|
||||
}
|
||||
|
||||
|
||||
private OutgoingMessageEntity createMessage(long id, String sender, long timestamp, boolean receipt, String content) {
|
||||
return new OutgoingMessageEntity(id, receipt ? Envelope.Type.RECEIPT_VALUE : Envelope.Type.CIPHERTEXT_VALUE,
|
||||
null, timestamp, sender, 1, content.getBytes(), null);
|
||||
|
|
Loading…
Reference in New Issue