diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index ca61a71ce..b8189035e 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -185,7 +185,7 @@ public class WhisperServerService extends Application authorizationKey = config.getRedphoneConfiguration().getAuthorizationKey(); diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/PushConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/PushConfiguration.java index 987ecd529..beab8e4c3 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/configuration/PushConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/PushConfiguration.java @@ -22,6 +22,10 @@ public class PushConfiguration { @NotEmpty private String password; + @JsonProperty + @Min(0) + private int queueSize = 200; + public String getHost() { return host; } @@ -37,4 +41,8 @@ public class PushConfiguration { public String getPassword() { return password; } + + public int getQueueSize() { + return queueSize; + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java index 99f57bee2..01a0e4ee1 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java @@ -47,12 +47,16 @@ public class PushSender implements Managed { private final PushServiceClient pushServiceClient; private final WebsocketSender webSocketSender; private final BlockingThreadPoolExecutor executor; + private final int queueSize; - public PushSender(ApnFallbackManager apnFallbackManager, PushServiceClient pushServiceClient, WebsocketSender websocketSender) { + public PushSender(ApnFallbackManager apnFallbackManager, PushServiceClient pushServiceClient, + WebsocketSender websocketSender, int queueSize) + { this.apnFallbackManager = apnFallbackManager; this.pushServiceClient = pushServiceClient; this.webSocketSender = websocketSender; - this.executor = new BlockingThreadPoolExecutor(50, 200); + this.queueSize = queueSize; + this.executor = new BlockingThreadPoolExecutor(50, queueSize); SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME) .register(name(PushSender.class, "send_queue_depth"), @@ -71,15 +75,16 @@ public class PushSender implements Managed { throw new NotPushRegisteredException("No delivery possible!"); } - executor.execute(new Runnable() { - @Override - public void run() { - if (device.getGcmId() != null) sendGcmMessage(account, device, message); - else if (device.getApnId() != null) sendApnMessage(account, device, message); - else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message); - else throw new AssertionError(); - } - }); + if (queueSize > 0) { + executor.execute(new Runnable() { + @Override + public void run() { + sendSynchronousMessage(account, device, message); + } + }); + } else { + sendSynchronousMessage(account, device, message); + } } public void sendQueuedNotification(Account account, Device device, int messageQueueDepth) @@ -94,6 +99,13 @@ public class PushSender implements Managed { return webSocketSender; } + private void sendSynchronousMessage(Account account, Device device, Envelope message) { + if (device.getGcmId() != null) sendGcmMessage(account, device, message); + else if (device.getApnId() != null) sendApnMessage(account, device, message); + else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message); + else throw new AssertionError(); + } + private void sendGcmMessage(Account account, Device device, Envelope message) { DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, message, WebsocketSender.Type.GCM);