diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java b/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java index 83f01200f..a317261d8 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/ApnFallbackManager.java @@ -30,6 +30,8 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel { private static final Logger logger = LoggerFactory.getLogger(ApnFallbackManager.class); + public static final int FALLBACK_DURATION = 15; + private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private static final Meter voipOneSuccess = metricRegistry.meter(name(ApnFallbackManager.class, "voip_one_success")); private static final Meter voipOneDelivery = metricRegistry.meter(name(ApnFallbackManager.class, "voip_one_failure")); @@ -57,6 +59,12 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel { } } + private void scheduleRetry(final WebsocketAddress address, ApnFallbackTask task) { + if (taskQueue.putIfMissing(address, task)) { + pubSubManager.subscribe(new WebSocketConnectionInfo(address), this); + } + } + private void cancel(WebsocketAddress address) { ApnFallbackTask task = taskQueue.remove(address); @@ -84,9 +92,17 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel { Entry taskEntry = taskQueue.get(); ApnFallbackTask task = taskEntry.getValue(); - pubSubManager.unsubscribe(new WebSocketConnectionInfo(taskEntry.getKey()), this); - pushServiceClient.send(new ApnMessage(task.getMessage(), task.getApnId(), - false, ApnMessage.MAX_EXPIRATION)); + ApnMessage message; + + if (task.getAttempt() == 0) { + message = new ApnMessage(task.getMessage(), task.getVoipApnId(), true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(FALLBACK_DURATION)); + scheduleRetry(taskEntry.getKey(), new ApnFallbackTask(task.getApnId(), task.getVoipApnId(), task.getMessage(), task.getDelay(),1)); + } else { + message = new ApnMessage(task.getMessage(), task.getApnId(), false, ApnMessage.MAX_EXPIRATION); + pubSubManager.unsubscribe(new WebSocketConnectionInfo(taskEntry.getKey()), this); + } + + pushServiceClient.send(message); } catch (Throwable e) { logger.warn("ApnFallbackThread", e); } @@ -123,24 +139,32 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel { private final long delay; private final long scheduledTime; private final String apnId; + private final String voipApnId; private final ApnMessage message; + private final int attempt; - public ApnFallbackTask(String apnId, ApnMessage message) { - this(apnId, message, TimeUnit.SECONDS.toMillis(30)); + public ApnFallbackTask(String apnId, String voipApnId, ApnMessage message) { + this(apnId, voipApnId, message, TimeUnit.SECONDS.toMillis(FALLBACK_DURATION), 0); } @VisibleForTesting - public ApnFallbackTask(String apnId, ApnMessage message, long delay) { + public ApnFallbackTask(String apnId, String voipApnId, ApnMessage message, long delay, int attempt) { this.scheduledTime = System.currentTimeMillis(); this.delay = delay; this.apnId = apnId; + this.voipApnId = voipApnId; this.message = message; + this.attempt = attempt; } public String getApnId() { return apnId; } + public String getVoipApnId() { + return voipApnId; + } + public ApnMessage getMessage() { return message; } @@ -156,6 +180,10 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel { public long getDelay() { return delay; } + + public int getAttempt() { + return attempt; + } } @VisibleForTesting @@ -194,6 +222,13 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel { } } + public boolean putIfMissing(WebsocketAddress address, ApnFallbackTask task) { + synchronized (tasks) { + if (tasks.containsKey(address)) return false; + return put(address, task); + } + } + public ApnFallbackTask remove(WebsocketAddress address) { synchronized (tasks) { return tasks.remove(address); diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java index a5cafca0f..3d2295a0a 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java @@ -140,11 +140,11 @@ public class PushSender implements Managed { if (!Util.isEmpty(device.getVoipApnId())) { apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), (int)device.getId(), String.format(APN_PAYLOAD, messageQueueDepth), - true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30)); + true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(ApnFallbackManager.FALLBACK_DURATION)); if (fallback) { apnFallbackManager.schedule(new WebsocketAddress(account.getNumber(), device.getId()), - new ApnFallbackTask(device.getApnId(), apnMessage)); + new ApnFallbackTask(device.getApnId(), device.getVoipApnId(), apnMessage)); } } else { apnMessage = new ApnMessage(device.getApnId(), account.getNumber(), (int)device.getId(), diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/push/ApnFallbackManagerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/push/ApnFallbackManagerTest.java index bfd47aca7..3790c33f7 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/push/ApnFallbackManagerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/push/ApnFallbackManagerTest.java @@ -12,8 +12,11 @@ import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.textsecuregcm.websocket.WebSocketConnectionInfo; import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; +import java.util.List; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.*; public class ApnFallbackManagerTest { @@ -25,7 +28,7 @@ public class ApnFallbackManagerTest { WebsocketAddress address = new WebsocketAddress("+14152222223", 1L); WebSocketConnectionInfo info = new WebSocketConnectionInfo(address); ApnMessage message = new ApnMessage("bar", "123", 1, "hmm", true, 1111); - ApnFallbackTask task = new ApnFallbackTask("foo", message, 500); + ApnFallbackTask task = new ApnFallbackTask("foo", "voipfoo", message, 500, 0); ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushServiceClient, pubSubManager); apnFallbackManager.start(); @@ -35,13 +38,20 @@ public class ApnFallbackManagerTest { Util.sleep(1100); ArgumentCaptor captor = ArgumentCaptor.forClass(ApnMessage.class); - verify(pushServiceClient, times(1)).send(captor.capture()); + verify(pushServiceClient, times(2)).send(captor.capture()); verify(pubSubManager).unsubscribe(eq(info), eq(apnFallbackManager)); - assertEquals(captor.getValue().getMessage(), message.getMessage()); - assertEquals(captor.getValue().getApnId(), task.getApnId()); - assertFalse(captor.getValue().isVoip()); - assertEquals(captor.getValue().getExpirationTime(), Integer.MAX_VALUE * 1000L); + List arguments = captor.getAllValues(); + + assertEquals(arguments.get(0).getMessage(), message.getMessage()); + assertEquals(arguments.get(0).getApnId(), task.getVoipApnId()); + assertTrue(arguments.get(0).isVoip()); +// assertEquals(arguments.get(0).getExpirationTime(), Integer.MAX_VALUE * 1000L); + + assertEquals(arguments.get(1).getMessage(), message.getMessage()); + assertEquals(arguments.get(1).getApnId(), task.getApnId()); + assertFalse(arguments.get(1).isVoip()); + assertEquals(arguments.get(1).getExpirationTime(), Integer.MAX_VALUE * 1000L); } @Test @@ -51,7 +61,7 @@ public class ApnFallbackManagerTest { WebsocketAddress address = new WebsocketAddress("+14152222222", 1); WebSocketConnectionInfo info = new WebSocketConnectionInfo(address); ApnMessage message = new ApnMessage("bar", "123", 1, "hmm", true, 5555); - ApnFallbackTask task = new ApnFallbackTask ("foo", message, 500); + ApnFallbackTask task = new ApnFallbackTask ("foo", "voipfoo", message, 500, 0); ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushServiceClient, pubSubManager); apnFallbackManager.start();