parent
074fd14849
commit
818c5a9cf5
|
@ -30,6 +30,8 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ApnFallbackManager.class);
|
||||
|
||||
public static final int FALLBACK_DURATION = 15;
|
||||
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private static final Meter voipOneSuccess = metricRegistry.meter(name(ApnFallbackManager.class, "voip_one_success"));
|
||||
private static final Meter voipOneDelivery = metricRegistry.meter(name(ApnFallbackManager.class, "voip_one_failure"));
|
||||
|
@ -57,6 +59,12 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel {
|
|||
}
|
||||
}
|
||||
|
||||
private void scheduleRetry(final WebsocketAddress address, ApnFallbackTask task) {
|
||||
if (taskQueue.putIfMissing(address, task)) {
|
||||
pubSubManager.subscribe(new WebSocketConnectionInfo(address), this);
|
||||
}
|
||||
}
|
||||
|
||||
private void cancel(WebsocketAddress address) {
|
||||
ApnFallbackTask task = taskQueue.remove(address);
|
||||
|
||||
|
@ -84,9 +92,17 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel {
|
|||
Entry<WebsocketAddress, ApnFallbackTask> taskEntry = taskQueue.get();
|
||||
ApnFallbackTask task = taskEntry.getValue();
|
||||
|
||||
pubSubManager.unsubscribe(new WebSocketConnectionInfo(taskEntry.getKey()), this);
|
||||
pushServiceClient.send(new ApnMessage(task.getMessage(), task.getApnId(),
|
||||
false, ApnMessage.MAX_EXPIRATION));
|
||||
ApnMessage message;
|
||||
|
||||
if (task.getAttempt() == 0) {
|
||||
message = new ApnMessage(task.getMessage(), task.getVoipApnId(), true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(FALLBACK_DURATION));
|
||||
scheduleRetry(taskEntry.getKey(), new ApnFallbackTask(task.getApnId(), task.getVoipApnId(), task.getMessage(), task.getDelay(),1));
|
||||
} else {
|
||||
message = new ApnMessage(task.getMessage(), task.getApnId(), false, ApnMessage.MAX_EXPIRATION);
|
||||
pubSubManager.unsubscribe(new WebSocketConnectionInfo(taskEntry.getKey()), this);
|
||||
}
|
||||
|
||||
pushServiceClient.send(message);
|
||||
} catch (Throwable e) {
|
||||
logger.warn("ApnFallbackThread", e);
|
||||
}
|
||||
|
@ -123,24 +139,32 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel {
|
|||
private final long delay;
|
||||
private final long scheduledTime;
|
||||
private final String apnId;
|
||||
private final String voipApnId;
|
||||
private final ApnMessage message;
|
||||
private final int attempt;
|
||||
|
||||
public ApnFallbackTask(String apnId, ApnMessage message) {
|
||||
this(apnId, message, TimeUnit.SECONDS.toMillis(30));
|
||||
public ApnFallbackTask(String apnId, String voipApnId, ApnMessage message) {
|
||||
this(apnId, voipApnId, message, TimeUnit.SECONDS.toMillis(FALLBACK_DURATION), 0);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ApnFallbackTask(String apnId, ApnMessage message, long delay) {
|
||||
public ApnFallbackTask(String apnId, String voipApnId, ApnMessage message, long delay, int attempt) {
|
||||
this.scheduledTime = System.currentTimeMillis();
|
||||
this.delay = delay;
|
||||
this.apnId = apnId;
|
||||
this.voipApnId = voipApnId;
|
||||
this.message = message;
|
||||
this.attempt = attempt;
|
||||
}
|
||||
|
||||
public String getApnId() {
|
||||
return apnId;
|
||||
}
|
||||
|
||||
public String getVoipApnId() {
|
||||
return voipApnId;
|
||||
}
|
||||
|
||||
public ApnMessage getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
@ -156,6 +180,10 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel {
|
|||
public long getDelay() {
|
||||
return delay;
|
||||
}
|
||||
|
||||
public int getAttempt() {
|
||||
return attempt;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -194,6 +222,13 @@ public class ApnFallbackManager implements Managed, Runnable, DispatchChannel {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean putIfMissing(WebsocketAddress address, ApnFallbackTask task) {
|
||||
synchronized (tasks) {
|
||||
if (tasks.containsKey(address)) return false;
|
||||
return put(address, task);
|
||||
}
|
||||
}
|
||||
|
||||
public ApnFallbackTask remove(WebsocketAddress address) {
|
||||
synchronized (tasks) {
|
||||
return tasks.remove(address);
|
||||
|
|
|
@ -140,11 +140,11 @@ public class PushSender implements Managed {
|
|||
if (!Util.isEmpty(device.getVoipApnId())) {
|
||||
apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), (int)device.getId(),
|
||||
String.format(APN_PAYLOAD, messageQueueDepth),
|
||||
true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30));
|
||||
true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(ApnFallbackManager.FALLBACK_DURATION));
|
||||
|
||||
if (fallback) {
|
||||
apnFallbackManager.schedule(new WebsocketAddress(account.getNumber(), device.getId()),
|
||||
new ApnFallbackTask(device.getApnId(), apnMessage));
|
||||
new ApnFallbackTask(device.getApnId(), device.getVoipApnId(), apnMessage));
|
||||
}
|
||||
} else {
|
||||
apnMessage = new ApnMessage(device.getApnId(), account.getNumber(), (int)device.getId(),
|
||||
|
|
|
@ -12,8 +12,11 @@ import org.whispersystems.textsecuregcm.util.Util;
|
|||
import org.whispersystems.textsecuregcm.websocket.WebSocketConnectionInfo;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class ApnFallbackManagerTest {
|
||||
|
@ -25,7 +28,7 @@ public class ApnFallbackManagerTest {
|
|||
WebsocketAddress address = new WebsocketAddress("+14152222223", 1L);
|
||||
WebSocketConnectionInfo info = new WebSocketConnectionInfo(address);
|
||||
ApnMessage message = new ApnMessage("bar", "123", 1, "hmm", true, 1111);
|
||||
ApnFallbackTask task = new ApnFallbackTask("foo", message, 500);
|
||||
ApnFallbackTask task = new ApnFallbackTask("foo", "voipfoo", message, 500, 0);
|
||||
|
||||
ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushServiceClient, pubSubManager);
|
||||
apnFallbackManager.start();
|
||||
|
@ -35,13 +38,20 @@ public class ApnFallbackManagerTest {
|
|||
Util.sleep(1100);
|
||||
|
||||
ArgumentCaptor<ApnMessage> captor = ArgumentCaptor.forClass(ApnMessage.class);
|
||||
verify(pushServiceClient, times(1)).send(captor.capture());
|
||||
verify(pushServiceClient, times(2)).send(captor.capture());
|
||||
verify(pubSubManager).unsubscribe(eq(info), eq(apnFallbackManager));
|
||||
|
||||
assertEquals(captor.getValue().getMessage(), message.getMessage());
|
||||
assertEquals(captor.getValue().getApnId(), task.getApnId());
|
||||
assertFalse(captor.getValue().isVoip());
|
||||
assertEquals(captor.getValue().getExpirationTime(), Integer.MAX_VALUE * 1000L);
|
||||
List<ApnMessage> arguments = captor.getAllValues();
|
||||
|
||||
assertEquals(arguments.get(0).getMessage(), message.getMessage());
|
||||
assertEquals(arguments.get(0).getApnId(), task.getVoipApnId());
|
||||
assertTrue(arguments.get(0).isVoip());
|
||||
// assertEquals(arguments.get(0).getExpirationTime(), Integer.MAX_VALUE * 1000L);
|
||||
|
||||
assertEquals(arguments.get(1).getMessage(), message.getMessage());
|
||||
assertEquals(arguments.get(1).getApnId(), task.getApnId());
|
||||
assertFalse(arguments.get(1).isVoip());
|
||||
assertEquals(arguments.get(1).getExpirationTime(), Integer.MAX_VALUE * 1000L);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -51,7 +61,7 @@ public class ApnFallbackManagerTest {
|
|||
WebsocketAddress address = new WebsocketAddress("+14152222222", 1);
|
||||
WebSocketConnectionInfo info = new WebSocketConnectionInfo(address);
|
||||
ApnMessage message = new ApnMessage("bar", "123", 1, "hmm", true, 5555);
|
||||
ApnFallbackTask task = new ApnFallbackTask ("foo", message, 500);
|
||||
ApnFallbackTask task = new ApnFallbackTask ("foo", "voipfoo", message, 500, 0);
|
||||
|
||||
ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushServiceClient, pubSubManager);
|
||||
apnFallbackManager.start();
|
||||
|
|
Loading…
Reference in New Issue