Make push sender queue depth configurable or disable-able.

// FREEBIE
This commit is contained in:
Moxie Marlinspike 2016-03-06 13:58:30 -08:00
parent 33e60f2527
commit d8a758211f
3 changed files with 32 additions and 12 deletions

View File

@ -185,7 +185,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration());
SmsSender smsSender = new SmsSender(twilioSmsSender);
UrlSigner urlSigner = new UrlSigner(config.getS3Configuration());
PushSender pushSender = new PushSender(apnFallbackManager, pushServiceClient, websocketSender);
PushSender pushSender = new PushSender(apnFallbackManager, pushServiceClient, websocketSender, config.getPushConfiguration().getQueueSize());
ReceiptSender receiptSender = new ReceiptSender(accountsManager, pushSender, federatedClientManager);
FeedbackHandler feedbackHandler = new FeedbackHandler(pushServiceClient, accountsManager);
Optional<byte[]> authorizationKey = config.getRedphoneConfiguration().getAuthorizationKey();

View File

@ -22,6 +22,10 @@ public class PushConfiguration {
@NotEmpty
private String password;
@JsonProperty
@Min(0)
private int queueSize = 200;
public String getHost() {
return host;
}
@ -37,4 +41,8 @@ public class PushConfiguration {
public String getPassword() {
return password;
}
public int getQueueSize() {
return queueSize;
}
}

View File

@ -47,12 +47,16 @@ public class PushSender implements Managed {
private final PushServiceClient pushServiceClient;
private final WebsocketSender webSocketSender;
private final BlockingThreadPoolExecutor executor;
private final int queueSize;
public PushSender(ApnFallbackManager apnFallbackManager, PushServiceClient pushServiceClient, WebsocketSender websocketSender) {
public PushSender(ApnFallbackManager apnFallbackManager, PushServiceClient pushServiceClient,
WebsocketSender websocketSender, int queueSize)
{
this.apnFallbackManager = apnFallbackManager;
this.pushServiceClient = pushServiceClient;
this.webSocketSender = websocketSender;
this.executor = new BlockingThreadPoolExecutor(50, 200);
this.queueSize = queueSize;
this.executor = new BlockingThreadPoolExecutor(50, queueSize);
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME)
.register(name(PushSender.class, "send_queue_depth"),
@ -71,15 +75,16 @@ public class PushSender implements Managed {
throw new NotPushRegisteredException("No delivery possible!");
}
executor.execute(new Runnable() {
@Override
public void run() {
if (device.getGcmId() != null) sendGcmMessage(account, device, message);
else if (device.getApnId() != null) sendApnMessage(account, device, message);
else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message);
else throw new AssertionError();
}
});
if (queueSize > 0) {
executor.execute(new Runnable() {
@Override
public void run() {
sendSynchronousMessage(account, device, message);
}
});
} else {
sendSynchronousMessage(account, device, message);
}
}
public void sendQueuedNotification(Account account, Device device, int messageQueueDepth)
@ -94,6 +99,13 @@ public class PushSender implements Managed {
return webSocketSender;
}
private void sendSynchronousMessage(Account account, Device device, Envelope message) {
if (device.getGcmId() != null) sendGcmMessage(account, device, message);
else if (device.getApnId() != null) sendApnMessage(account, device, message);
else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message);
else throw new AssertionError();
}
private void sendGcmMessage(Account account, Device device, Envelope message) {
DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, message, WebsocketSender.Type.GCM);