Use a fixed-size thread pool for sending FCM notifications

This commit is contained in:
Jon Chambers 2022-08-03 15:28:23 -04:00 committed by Jon Chambers
parent 4a0ef1f834
commit 39562775d9
2 changed files with 28 additions and 9 deletions

View File

@ -381,7 +381,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ScheduledExecutorService websocketScheduledExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "websocket-%d")).threads(8).build();
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build();
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build();
ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build();
ExecutorService fcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "fcmSender-%d")).maxThreads(32).minThreads(32).build();
ExecutorService backupServiceExecutor = environment.lifecycle().executorService(name(getClass(), "backupService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle().executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
@ -448,7 +448,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.empty());
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
APNSender apnSender = new APNSender(apnSenderExecutor, config.getApnConfiguration());
FcmSender fcmSender = new FcmSender(gcmSenderExecutor, config.getFcmConfiguration().credentials());
FcmSender fcmSender = new FcmSender(fcmSenderExecutor, config.getFcmConfiguration().credentials());
ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushSchedulerCluster, apnSender, accountsManager);
PushNotificationManager pushNotificationManager = new PushNotificationManager(accountsManager, apnSender, fcmSender, apnFallbackManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), rateLimitersCluster);

View File

@ -5,19 +5,25 @@
package org.whispersystems.textsecuregcm.push;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.firebase.FirebaseApp;
import com.google.firebase.FirebaseOptions;
import com.google.firebase.ThreadManager;
import com.google.firebase.messaging.AndroidConfig;
import com.google.firebase.messaging.FirebaseMessaging;
import com.google.firebase.messaging.FirebaseMessagingException;
import com.google.firebase.messaging.Message;
import com.google.firebase.messaging.MessagingErrorCode;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -25,13 +31,10 @@ import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
public class FcmSender implements PushNotificationSender {
private final ExecutorService executor;
@ -43,11 +46,27 @@ public class FcmSender implements PushNotificationSender {
public FcmSender(ExecutorService executor, String credentials) throws IOException {
try (final ByteArrayInputStream credentialInputStream = new ByteArrayInputStream(credentials.getBytes(StandardCharsets.UTF_8))) {
FirebaseOptions options = FirebaseOptions.builder()
FirebaseApp.initializeApp(FirebaseOptions.builder()
.setCredentials(GoogleCredentials.fromStream(credentialInputStream))
.build();
.setThreadManager(new ThreadManager() {
@Override
protected ExecutorService getExecutor(final FirebaseApp app) {
return executor;
}
FirebaseApp.initializeApp(options);
@Override
protected void releaseExecutor(final FirebaseApp app, final ExecutorService executor) {
// Do nothing; the executor service is managed by Dropwizard
}
@Override
protected ThreadFactory getThreadFactory() {
return new ThreadFactoryBuilder()
.setNameFormat("firebase-%d")
.build();
}
})
.build());
}
this.executor = executor;