Add support for online only delivery
This commit is contained in:
parent
7e026a7072
commit
ab276a6a61
|
@ -129,7 +129,7 @@ public class MessageController {
|
||||||
Optional<Device> destinationDevice = destination.get().getDevice(incomingMessage.getDestinationDeviceId());
|
Optional<Device> destinationDevice = destination.get().getDevice(incomingMessage.getDestinationDeviceId());
|
||||||
|
|
||||||
if (destinationDevice.isPresent()) {
|
if (destinationDevice.isPresent()) {
|
||||||
sendMessage(source, destination.get(), destinationDevice.get(), messages.getTimestamp(), incomingMessage);
|
sendMessage(source, destination.get(), destinationDevice.get(), messages.getTimestamp(), messages.isOnline(), incomingMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,6 +215,7 @@ public class MessageController {
|
||||||
Account destinationAccount,
|
Account destinationAccount,
|
||||||
Device destinationDevice,
|
Device destinationDevice,
|
||||||
long timestamp,
|
long timestamp,
|
||||||
|
boolean online,
|
||||||
IncomingMessage incomingMessage)
|
IncomingMessage incomingMessage)
|
||||||
throws NoSuchUserException
|
throws NoSuchUserException
|
||||||
{
|
{
|
||||||
|
@ -240,7 +241,7 @@ public class MessageController {
|
||||||
messageBuilder.setContent(ByteString.copyFrom(messageContent.get()));
|
messageBuilder.setContent(ByteString.copyFrom(messageContent.get()));
|
||||||
}
|
}
|
||||||
|
|
||||||
pushSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build());
|
pushSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build(), online);
|
||||||
} catch (NotPushRegisteredException e) {
|
} catch (NotPushRegisteredException e) {
|
||||||
if (destinationDevice.isMaster()) throw new NoSuchUserException(e);
|
if (destinationDevice.isMaster()) throw new NoSuchUserException(e);
|
||||||
else logger.debug("Not registered", e);
|
else logger.debug("Not registered", e);
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Copyright (C) 2013 Open WhisperSystems
|
* Copyright (C) 2013 Open WhisperSystems
|
||||||
*
|
*
|
||||||
* This program is free software: you can redistribute it and/or modify
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
@ -32,6 +32,9 @@ public class IncomingMessageList {
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private long timestamp;
|
private long timestamp;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private boolean online;
|
||||||
|
|
||||||
public IncomingMessageList() {}
|
public IncomingMessageList() {}
|
||||||
|
|
||||||
public List<IncomingMessage> getMessages() {
|
public List<IncomingMessage> getMessages() {
|
||||||
|
@ -41,4 +44,8 @@ public class IncomingMessageList {
|
||||||
public long getTimestamp() {
|
public long getTimestamp() {
|
||||||
return timestamp;
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isOnline() {
|
||||||
|
return online;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class PushSender implements Managed {
|
||||||
(Gauge<Integer>) executor::getSize);
|
(Gauge<Integer>) executor::getSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessage(final Account account, final Device device, final Envelope message)
|
public void sendMessage(final Account account, final Device device, final Envelope message, boolean online)
|
||||||
throws NotPushRegisteredException
|
throws NotPushRegisteredException
|
||||||
{
|
{
|
||||||
if (device.getGcmId() == null && device.getApnId() == null && !device.getFetchesMessages()) {
|
if (device.getGcmId() == null && device.getApnId() == null && !device.getFetchesMessages()) {
|
||||||
|
@ -70,9 +70,9 @@ public class PushSender implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (queueSize > 0) {
|
if (queueSize > 0) {
|
||||||
executor.execute(() -> sendSynchronousMessage(account, device, message));
|
executor.execute(() -> sendSynchronousMessage(account, device, message, online));
|
||||||
} else {
|
} else {
|
||||||
sendSynchronousMessage(account, device, message);
|
sendSynchronousMessage(account, device, message, online);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,17 +88,17 @@ public class PushSender implements Managed {
|
||||||
return webSocketSender;
|
return webSocketSender;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendSynchronousMessage(Account account, Device device, Envelope message) {
|
private void sendSynchronousMessage(Account account, Device device, Envelope message, boolean online) {
|
||||||
if (device.getGcmId() != null) sendGcmMessage(account, device, message);
|
if (device.getGcmId() != null) sendGcmMessage(account, device, message, online);
|
||||||
else if (device.getApnId() != null) sendApnMessage(account, device, message);
|
else if (device.getApnId() != null) sendApnMessage(account, device, message, online);
|
||||||
else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message);
|
else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message, online);
|
||||||
else throw new AssertionError();
|
else throw new AssertionError();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendGcmMessage(Account account, Device device, Envelope message) {
|
private void sendGcmMessage(Account account, Device device, Envelope message, boolean online) {
|
||||||
DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, message, WebsocketSender.Type.GCM);
|
DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, message, WebsocketSender.Type.GCM, online);
|
||||||
|
|
||||||
if (!deliveryStatus.isDelivered()) {
|
if (!deliveryStatus.isDelivered() && !online) {
|
||||||
sendGcmNotification(account, device);
|
sendGcmNotification(account, device);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -110,10 +110,10 @@ public class PushSender implements Managed {
|
||||||
gcmSender.sendMessage(gcmMessage);
|
gcmSender.sendMessage(gcmMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendApnMessage(Account account, Device device, Envelope outgoingMessage) {
|
private void sendApnMessage(Account account, Device device, Envelope outgoingMessage, boolean online) {
|
||||||
DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.APN);
|
DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.APN, online);
|
||||||
|
|
||||||
if (!deliveryStatus.isDelivered() && outgoingMessage.getType() != Envelope.Type.RECEIPT) {
|
if (!deliveryStatus.isDelivered() && outgoingMessage.getType() != Envelope.Type.RECEIPT && !online) {
|
||||||
sendApnNotification(account, device, false);
|
sendApnNotification(account, device, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -135,9 +135,9 @@ public class PushSender implements Managed {
|
||||||
apnSender.sendMessage(apnMessage);
|
apnSender.sendMessage(apnMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage)
|
private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage, boolean online)
|
||||||
{
|
{
|
||||||
webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.WEB);
|
webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.WEB, online);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -41,7 +41,7 @@ public class ReceiptSender {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Device destinationDevice : destinationDevices) {
|
for (Device destinationDevice : destinationDevices) {
|
||||||
pushSender.sendMessage(destinationAccount, destinationDevice, message.build());
|
pushSender.sendMessage(destinationAccount, destinationDevice, message.build(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -68,7 +68,7 @@ public class WebsocketSender {
|
||||||
this.pubSubManager = pubSubManager;
|
this.pubSubManager = pubSubManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DeliveryStatus sendMessage(Account account, Device device, Envelope message, Type channel) {
|
public DeliveryStatus sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
|
||||||
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
||||||
PubSubMessage pubSubMessage = PubSubMessage.newBuilder()
|
PubSubMessage pubSubMessage = PubSubMessage.newBuilder()
|
||||||
.setType(PubSubMessage.Type.DELIVER)
|
.setType(PubSubMessage.Type.DELIVER)
|
||||||
|
@ -86,7 +86,7 @@ public class WebsocketSender {
|
||||||
else if (channel == Type.GCM) gcmOfflineMeter.mark();
|
else if (channel == Type.GCM) gcmOfflineMeter.mark();
|
||||||
else websocketOfflineMeter.mark();
|
else websocketOfflineMeter.mark();
|
||||||
|
|
||||||
queueMessage(account, device, message);
|
if (!online) queueMessage(account, device, message);
|
||||||
return new DeliveryStatus(false);
|
return new DeliveryStatus(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class MessageControllerTest {
|
||||||
assertThat("Good Response", response.getStatus(), is(equalTo(200)));
|
assertThat("Good Response", response.getStatus(), is(equalTo(200)));
|
||||||
|
|
||||||
ArgumentCaptor<Envelope> captor = ArgumentCaptor.forClass(Envelope.class);
|
ArgumentCaptor<Envelope> captor = ArgumentCaptor.forClass(Envelope.class);
|
||||||
verify(pushSender, times(1)).sendMessage(any(Account.class), any(Device.class), captor.capture());
|
verify(pushSender, times(1)).sendMessage(any(Account.class), any(Device.class), captor.capture(), eq(false));
|
||||||
|
|
||||||
assertTrue(captor.getValue().hasSource());
|
assertTrue(captor.getValue().hasSource());
|
||||||
assertTrue(captor.getValue().hasSourceDevice());
|
assertTrue(captor.getValue().hasSourceDevice());
|
||||||
|
@ -128,7 +128,7 @@ public class MessageControllerTest {
|
||||||
assertThat("Good Response", response.getStatus(), is(equalTo(200)));
|
assertThat("Good Response", response.getStatus(), is(equalTo(200)));
|
||||||
|
|
||||||
ArgumentCaptor<Envelope> captor = ArgumentCaptor.forClass(Envelope.class);
|
ArgumentCaptor<Envelope> captor = ArgumentCaptor.forClass(Envelope.class);
|
||||||
verify(pushSender, times(1)).sendMessage(any(Account.class), any(Device.class), captor.capture());
|
verify(pushSender, times(1)).sendMessage(any(Account.class), any(Device.class), captor.capture(), eq(false));
|
||||||
|
|
||||||
assertFalse(captor.getValue().hasSource());
|
assertFalse(captor.getValue().hasSource());
|
||||||
assertFalse(captor.getValue().hasSourceDevice());
|
assertFalse(captor.getValue().hasSourceDevice());
|
||||||
|
@ -197,7 +197,7 @@ public class MessageControllerTest {
|
||||||
|
|
||||||
assertThat("Good Response Code", response.getStatus(), is(equalTo(200)));
|
assertThat("Good Response Code", response.getStatus(), is(equalTo(200)));
|
||||||
|
|
||||||
verify(pushSender, times(2)).sendMessage(any(Account.class), any(Device.class), any(Envelope.class));
|
verify(pushSender, times(2)).sendMessage(any(Account.class), any(Device.class), any(Envelope.class), eq(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue