Add an experiment to allow a phased transition from the old GCM API to the current FCM API

This commit is contained in:
Jon Chambers 2022-08-01 14:55:12 -04:00 committed by Jon Chambers
parent 421d594507
commit c9ae991aa3
3 changed files with 62 additions and 37 deletions

View File

@ -144,6 +144,7 @@ import org.whispersystems.textsecuregcm.providers.RedisClusterHealthCheck;
import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.push.FcmSender;
import org.whispersystems.textsecuregcm.push.GCMSender;
import org.whispersystems.textsecuregcm.push.MessageSender;
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
@ -447,7 +448,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.empty());
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
APNSender apnSender = new APNSender(apnSenderExecutor, accountsManager, config.getApnConfiguration());
GCMSender gcmSender = new GCMSender(gcmSenderExecutor, accountsManager, config.getGcmConfiguration().getApiKey());
FcmSender fcmSender = new FcmSender(gcmSenderExecutor, accountsManager, config.getFcmConfiguration().credentials());
GCMSender gcmSender = new GCMSender(gcmSenderExecutor, accountsManager, config.getGcmConfiguration().getApiKey(), experimentEnrollmentManager, fcmSender);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), rateLimitersCluster);
DynamicRateLimiters dynamicRateLimiters = new DynamicRateLimiters(rateLimitersCluster, dynamicConfigurationManager);
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);

View File

@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.gcm.server.Message;
import org.whispersystems.gcm.server.Result;
import org.whispersystems.gcm.server.Sender;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@ -54,55 +55,75 @@ public class GCMSender {
private final Sender signalSender;
private final ExecutorService executor;
public GCMSender(ExecutorService executor, AccountsManager accountsManager, String signalKey) {
this(executor, accountsManager, new Sender(signalKey, SystemMapper.getMapper(), 6));
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private final FcmSender fcmSender;
public GCMSender(ExecutorService executor, AccountsManager accountsManager, String signalKey, ExperimentEnrollmentManager experimentEnrollmentManager, FcmSender fcmSender) {
this(executor, accountsManager, new Sender(signalKey, SystemMapper.getMapper(), 6), experimentEnrollmentManager, fcmSender);
CircuitBreakerUtil.registerMetrics(metricRegistry, signalSender.getRetry(), Sender.class);
}
@VisibleForTesting
public GCMSender(ExecutorService executor, AccountsManager accountsManager, Sender sender) {
public GCMSender(ExecutorService executor, AccountsManager accountsManager, Sender sender, ExperimentEnrollmentManager experimentEnrollmentManager, FcmSender fcmSender) {
this.accountsManager = accountsManager;
this.signalSender = sender;
this.executor = executor;
this.experimentEnrollmentManager = experimentEnrollmentManager;
this.fcmSender = fcmSender;
}
public void sendMessage(GcmMessage message) {
Message.Builder builder = Message.newBuilder()
.withDestination(message.getGcmId())
.withPriority("high");
final boolean useFcmSender = message.getUuid()
.map(uuid -> experimentEnrollmentManager.isEnrolled(uuid, "fcmSender"))
.orElse(false);
String key;
if (useFcmSender) {
fcmSender.sendMessage(message);
} else {
Message.Builder builder = Message.newBuilder()
.withDestination(message.getGcmId())
.withPriority("high");
switch (message.getType()) {
case NOTIFICATION: key = "notification"; break;
case CHALLENGE: key = "challenge"; break;
case RATE_LIMIT_CHALLENGE: key = "rateLimitChallenge"; break;
default: throw new AssertionError();
}
String key;
Message request = builder.withDataPart(key, message.getData().orElse("")).build();
CompletableFuture<Result> future = signalSender.send(request);
markOutboundMeter(key);
future.handle((result, throwable) -> {
if (result != null && message.getType() != GcmMessage.Type.CHALLENGE) {
if (result.isUnregistered() || result.isInvalidRegistrationId()) {
executor.submit(() -> handleBadRegistration(message));
} else if (result.hasCanonicalRegistrationId()) {
executor.submit(() -> handleCanonicalRegistrationId(message, result));
} else if (!result.isSuccess()) {
executor.submit(() -> handleGenericError(message, result));
} else {
success.mark();
}
} else {
logger.warn("FCM Failed: " + throwable + ", " + throwable.getCause());
switch (message.getType()) {
case NOTIFICATION:
key = "notification";
break;
case CHALLENGE:
key = "challenge";
break;
case RATE_LIMIT_CHALLENGE:
key = "rateLimitChallenge";
break;
default:
throw new AssertionError();
}
return null;
});
Message request = builder.withDataPart(key, message.getData().orElse("")).build();
CompletableFuture<Result> future = signalSender.send(request);
markOutboundMeter(key);
future.handle((result, throwable) -> {
if (result != null && message.getType() != GcmMessage.Type.CHALLENGE) {
if (result.isUnregistered() || result.isInvalidRegistrationId()) {
executor.submit(() -> handleBadRegistration(message));
} else if (result.hasCanonicalRegistrationId()) {
executor.submit(() -> handleCanonicalRegistrationId(message, result));
} else if (!result.isSuccess()) {
executor.submit(() -> handleGenericError(message, result));
} else {
success.mark();
}
} else {
logger.warn("FCM Failed: " + throwable + ", " + throwable.getCause());
}
return null;
});
}
}
private void handleBadRegistration(GcmMessage message) {

View File

@ -19,6 +19,8 @@ import org.junit.jupiter.api.Test;
import org.whispersystems.gcm.server.Message;
import org.whispersystems.gcm.server.Result;
import org.whispersystems.gcm.server.Sender;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.push.FcmSender;
import org.whispersystems.textsecuregcm.push.GCMSender;
import org.whispersystems.textsecuregcm.push.GcmMessage;
import org.whispersystems.textsecuregcm.storage.Account;
@ -45,7 +47,7 @@ class GCMSenderTest {
AccountsHelper.setupMockUpdate(accountsManager);
GcmMessage message = new GcmMessage("foo", UUID.randomUUID(), 1, GcmMessage.Type.NOTIFICATION, Optional.empty());
GCMSender gcmSender = new GCMSender(executorService, accountsManager, sender);
GCMSender gcmSender = new GCMSender(executorService, accountsManager, sender, mock(ExperimentEnrollmentManager.class), mock(FcmSender.class));
CompletableFuture<Result> successFuture = CompletableFuture.completedFuture(successResult);
@ -81,7 +83,7 @@ class GCMSenderTest {
when(invalidResult.isSuccess()).thenReturn(true);
GcmMessage message = new GcmMessage(gcmId, destinationUuid, 1, GcmMessage.Type.NOTIFICATION, Optional.empty());
GCMSender gcmSender = new GCMSender(executorService, accountsManager, sender);
GCMSender gcmSender = new GCMSender(executorService, accountsManager, sender, mock(ExperimentEnrollmentManager.class), mock(FcmSender.class));
CompletableFuture<Result> invalidFuture = CompletableFuture.completedFuture(invalidResult);
@ -122,7 +124,7 @@ class GCMSenderTest {
when(canonicalResult.getCanonicalRegistrationId()).thenReturn(canonicalId);
GcmMessage message = new GcmMessage(gcmId, destinationUuid, 1, GcmMessage.Type.NOTIFICATION, Optional.empty());
GCMSender gcmSender = new GCMSender(executorService, accountsManager, sender);
GCMSender gcmSender = new GCMSender(executorService, accountsManager, sender, mock(ExperimentEnrollmentManager.class), mock(FcmSender.class));
CompletableFuture<Result> invalidFuture = CompletableFuture.completedFuture(canonicalResult);