Introduce a reactive push notification feedback processor
This commit is contained in:
		
							parent
							
								
									4f4c23b12f
								
							
						
					
					
						commit
						9d47a6f41f
					
				|  | @ -216,6 +216,7 @@ import org.whispersystems.textsecuregcm.workers.CrawlAccountsCommand; | ||||||
| import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; | import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; | ||||||
| import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand; | import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand; | ||||||
| import org.whispersystems.textsecuregcm.workers.MigrateSignedECPreKeysCommand; | import org.whispersystems.textsecuregcm.workers.MigrateSignedECPreKeysCommand; | ||||||
|  | import org.whispersystems.textsecuregcm.workers.ProcessPushNotificationFeedbackCommand; | ||||||
| import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand; | import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand; | ||||||
| import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand; | import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand; | ||||||
| import org.whispersystems.textsecuregcm.workers.ServerVersionCommand; | import org.whispersystems.textsecuregcm.workers.ServerVersionCommand; | ||||||
|  | @ -274,6 +275,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration | ||||||
|     bootstrap.addCommand(new MessagePersisterServiceCommand()); |     bootstrap.addCommand(new MessagePersisterServiceCommand()); | ||||||
|     bootstrap.addCommand(new MigrateSignedECPreKeysCommand()); |     bootstrap.addCommand(new MigrateSignedECPreKeysCommand()); | ||||||
|     bootstrap.addCommand(new RemoveExpiredAccountsCommand(Clock.systemUTC())); |     bootstrap.addCommand(new RemoveExpiredAccountsCommand(Clock.systemUTC())); | ||||||
|  |     bootstrap.addCommand(new ProcessPushNotificationFeedbackCommand(Clock.systemUTC())); | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Override |   @Override | ||||||
|  |  | ||||||
|  | @ -0,0 +1,156 @@ | ||||||
|  | /* | ||||||
|  |  * Copyright 2023 Signal Messenger, LLC | ||||||
|  |  * SPDX-License-Identifier: AGPL-3.0-only | ||||||
|  |  */ | ||||||
|  | 
 | ||||||
|  | package org.whispersystems.textsecuregcm.workers; | ||||||
|  | 
 | ||||||
|  | import com.google.common.annotations.VisibleForTesting; | ||||||
|  | import io.micrometer.core.instrument.Metrics; | ||||||
|  | import io.micrometer.core.instrument.Tags; | ||||||
|  | import java.time.Clock; | ||||||
|  | import java.time.Duration; | ||||||
|  | import java.time.Instant; | ||||||
|  | import java.util.Optional; | ||||||
|  | import net.sourceforge.argparse4j.inf.Subparser; | ||||||
|  | import org.apache.commons.lang3.StringUtils; | ||||||
|  | import org.slf4j.Logger; | ||||||
|  | import org.slf4j.LoggerFactory; | ||||||
|  | import org.whispersystems.textsecuregcm.metrics.MetricsUtil; | ||||||
|  | import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; | ||||||
|  | import org.whispersystems.textsecuregcm.storage.Account; | ||||||
|  | import org.whispersystems.textsecuregcm.storage.Device; | ||||||
|  | import reactor.core.publisher.Mono; | ||||||
|  | import reactor.core.publisher.ParallelFlux; | ||||||
|  | 
 | ||||||
|  | public class ProcessPushNotificationFeedbackCommand extends AbstractSinglePassCrawlAccountsCommand { | ||||||
|  | 
 | ||||||
|  |   private final Clock clock; | ||||||
|  | 
 | ||||||
|  |   @VisibleForTesting | ||||||
|  |   static final Duration MAX_TOKEN_REFRESH_DELAY = Duration.ofDays(3); | ||||||
|  | 
 | ||||||
|  |   @VisibleForTesting | ||||||
|  |   static final String DRY_RUN_ARGUMENT = "dry-run"; | ||||||
|  | 
 | ||||||
|  |   private static final int MAX_CONCURRENCY = 16; | ||||||
|  | 
 | ||||||
|  |   private static final String EXPIRED_DEVICE_COUNTER_NAME = | ||||||
|  |       MetricsUtil.name(ProcessPushNotificationFeedbackCommand.class, "expiredDevice"); | ||||||
|  | 
 | ||||||
|  |   private static final String RECOVERED_DEVICE_COUNTER_NAME = | ||||||
|  |       MetricsUtil.name(ProcessPushNotificationFeedbackCommand.class, "recoveredDevice"); | ||||||
|  | 
 | ||||||
|  |   private static final Logger log = LoggerFactory.getLogger(ProcessPushNotificationFeedbackCommand.class); | ||||||
|  | 
 | ||||||
|  |   public ProcessPushNotificationFeedbackCommand(final Clock clock) { | ||||||
|  |     super("process-push-notification-feedback", ""); | ||||||
|  | 
 | ||||||
|  |     this.clock = clock; | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @Override | ||||||
|  |   public void configure(final Subparser subparser) { | ||||||
|  |     super.configure(subparser); | ||||||
|  | 
 | ||||||
|  |     subparser.addArgument("--dry-run") | ||||||
|  |         .type(Boolean.class) | ||||||
|  |         .dest(DRY_RUN_ARGUMENT) | ||||||
|  |         .required(false) | ||||||
|  |         .setDefault(true) | ||||||
|  |         .help("If true, don't actually modify accounts with stale devices"); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @Override | ||||||
|  |   protected void crawlAccounts(final ParallelFlux<Account> accounts) { | ||||||
|  |     final boolean isDryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT); | ||||||
|  | 
 | ||||||
|  |     accounts | ||||||
|  |         .filter(account -> account.getDevices().stream().anyMatch(this::deviceNeedsUpdate)) | ||||||
|  |         .sequential() | ||||||
|  |         .flatMap(account -> { | ||||||
|  |           account.getDevices().stream() | ||||||
|  |               .filter(this::deviceNeedsUpdate) | ||||||
|  |               .forEach(device -> { | ||||||
|  |                 final Tags tags = Tags.of( | ||||||
|  |                     UserAgentTagUtil.PLATFORM_TAG, getPlatform(device), | ||||||
|  |                     "dryRun", String.valueOf(isDryRun)); | ||||||
|  | 
 | ||||||
|  |                 if (deviceExpired(device)) { | ||||||
|  |                   Metrics.counter(EXPIRED_DEVICE_COUNTER_NAME, tags).increment(); | ||||||
|  |                 } else { | ||||||
|  |                   Metrics.counter(RECOVERED_DEVICE_COUNTER_NAME, tags).increment(); | ||||||
|  |                 } | ||||||
|  |               }); | ||||||
|  | 
 | ||||||
|  |           if (isDryRun) { | ||||||
|  |             return Mono.just(account); | ||||||
|  |           } else { | ||||||
|  |             return Mono.fromFuture(() -> getCommandDependencies().accountsManager().updateAsync(account, | ||||||
|  |                     a -> a.getDevices().stream() | ||||||
|  |                         .filter(this::deviceNeedsUpdate) | ||||||
|  |                         .forEach(device -> { | ||||||
|  |                           if (deviceExpired(device)) { | ||||||
|  |                             getUserAgent(device).ifPresent(device::setUserAgent); | ||||||
|  | 
 | ||||||
|  |                             device.setGcmId(null); | ||||||
|  |                             device.setApnId(null); | ||||||
|  |                             device.setVoipApnId(null); | ||||||
|  |                             device.setFetchesMessages(false); | ||||||
|  |                           } else { | ||||||
|  |                             device.setUninstalledFeedbackTimestamp(0); | ||||||
|  |                           } | ||||||
|  |                         }))) | ||||||
|  |                 .onErrorResume(throwable -> { | ||||||
|  |                   log.warn("Failed to process push notification feedback for account {}", account.getUuid(), throwable); | ||||||
|  |                   return Mono.empty(); | ||||||
|  |                 }); | ||||||
|  |           } | ||||||
|  |         }, MAX_CONCURRENCY) | ||||||
|  |         .then() | ||||||
|  |         .block(); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @VisibleForTesting | ||||||
|  |   boolean deviceNeedsUpdate(final Device device) { | ||||||
|  |     // After we get an indication that a device may have uninstalled the Signal app (`uninstalledFeedbackTimestamp` is | ||||||
|  |     // non-zero), check back in after a few days to see what ultimately happened. | ||||||
|  |     return device.getUninstalledFeedbackTimestamp() != 0 && | ||||||
|  |         Instant.ofEpochMilli(device.getUninstalledFeedbackTimestamp()).plus(MAX_TOKEN_REFRESH_DELAY) | ||||||
|  |             .isBefore(clock.instant()); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @VisibleForTesting | ||||||
|  |   boolean deviceExpired(final Device device) { | ||||||
|  |     // If we received an indication that a device may have uninstalled the Signal app and we haven't seen that device in | ||||||
|  |     // a few days since that event, we consider the device "expired" and should clean up its push tokens. If we have | ||||||
|  |     // seen the device recently, though, we assume that the "uninstalled" hint was either incorrect or the device has | ||||||
|  |     // since reinstalled the app and provided new push tokens. | ||||||
|  |     return Instant.ofEpochMilli(device.getLastSeen()).plus(MAX_TOKEN_REFRESH_DELAY).isBefore(clock.instant()); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @VisibleForTesting | ||||||
|  |   static Optional<String> getUserAgent(final Device device) { | ||||||
|  |     if (StringUtils.isNotBlank(device.getApnId())) { | ||||||
|  |       if (device.isPrimary()) { | ||||||
|  |         return Optional.of("OWI"); | ||||||
|  |       } else { | ||||||
|  |         return Optional.of("OWP"); | ||||||
|  |       } | ||||||
|  |     } else if (StringUtils.isNotBlank(device.getGcmId())) { | ||||||
|  |       return Optional.of("OWA"); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     return Optional.empty(); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   private static String getPlatform(final Device device) { | ||||||
|  |     if (StringUtils.isNotBlank(device.getApnId())) { | ||||||
|  |       return "ios"; | ||||||
|  |     } else if (StringUtils.isNotBlank(device.getGcmId())) { | ||||||
|  |       return "android"; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     return "unrecognized"; | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | @ -0,0 +1,195 @@ | ||||||
|  | /* | ||||||
|  |  * Copyright 2023 Signal Messenger, LLC | ||||||
|  |  * SPDX-License-Identifier: AGPL-3.0-only | ||||||
|  |  */ | ||||||
|  | 
 | ||||||
|  | package org.whispersystems.textsecuregcm.workers; | ||||||
|  | 
 | ||||||
|  | import net.sourceforge.argparse4j.inf.Namespace; | ||||||
|  | import org.junit.jupiter.api.BeforeEach; | ||||||
|  | import org.junit.jupiter.api.Test; | ||||||
|  | import org.junit.jupiter.params.ParameterizedTest; | ||||||
|  | import org.junit.jupiter.params.provider.Arguments; | ||||||
|  | import org.junit.jupiter.params.provider.MethodSource; | ||||||
|  | import org.junit.jupiter.params.provider.ValueSource; | ||||||
|  | import org.whispersystems.textsecuregcm.storage.Account; | ||||||
|  | import org.whispersystems.textsecuregcm.storage.AccountsManager; | ||||||
|  | import org.whispersystems.textsecuregcm.storage.Device; | ||||||
|  | import reactor.core.publisher.Flux; | ||||||
|  | 
 | ||||||
|  | import java.time.Clock; | ||||||
|  | import java.time.Instant; | ||||||
|  | import java.time.ZoneId; | ||||||
|  | import java.util.List; | ||||||
|  | import java.util.Optional; | ||||||
|  | import java.util.concurrent.CompletableFuture; | ||||||
|  | import java.util.function.Consumer; | ||||||
|  | 
 | ||||||
|  | import static org.junit.jupiter.api.Assertions.*; | ||||||
|  | import static org.mockito.ArgumentMatchers.any; | ||||||
|  | import static org.mockito.ArgumentMatchers.eq; | ||||||
|  | import static org.mockito.Mockito.mock; | ||||||
|  | import static org.mockito.Mockito.never; | ||||||
|  | import static org.mockito.Mockito.verify; | ||||||
|  | import static org.mockito.Mockito.when; | ||||||
|  | 
 | ||||||
|  | class ProcessPushNotificationFeedbackCommandTest { | ||||||
|  | 
 | ||||||
|  |   private AccountsManager accountsManager; | ||||||
|  |   private Clock clock; | ||||||
|  | 
 | ||||||
|  |   private ProcessPushNotificationFeedbackCommand processPushNotificationFeedbackCommand; | ||||||
|  | 
 | ||||||
|  |   private static class TestProcessPushNotificationFeedbackCommand extends ProcessPushNotificationFeedbackCommand { | ||||||
|  | 
 | ||||||
|  |     private final CommandDependencies commandDependencies; | ||||||
|  |     private final Namespace namespace; | ||||||
|  | 
 | ||||||
|  |     public TestProcessPushNotificationFeedbackCommand(final Clock clock, final AccountsManager accountsManager, final boolean isDryRun) { | ||||||
|  |       super(clock); | ||||||
|  | 
 | ||||||
|  |       commandDependencies = mock(CommandDependencies.class); | ||||||
|  |       when(commandDependencies.accountsManager()).thenReturn(accountsManager); | ||||||
|  | 
 | ||||||
|  |       namespace = mock(Namespace.class); | ||||||
|  |       when(namespace.getBoolean(RemoveExpiredAccountsCommand.DRY_RUN_ARGUMENT)).thenReturn(isDryRun); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     protected CommandDependencies getCommandDependencies() { | ||||||
|  |       return commandDependencies; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     protected Namespace getNamespace() { | ||||||
|  |       return namespace; | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @BeforeEach | ||||||
|  |   void setUpBeforeEach() { | ||||||
|  |     accountsManager = mock(AccountsManager.class); | ||||||
|  | 
 | ||||||
|  |     when(accountsManager.updateAsync(any(), any())) | ||||||
|  |         .thenAnswer(invocation -> { | ||||||
|  |           final Account account = invocation.getArgument(0); | ||||||
|  |           final Consumer<Account> accountUpdater = invocation.getArgument(1); | ||||||
|  | 
 | ||||||
|  |           accountUpdater.accept(account); | ||||||
|  | 
 | ||||||
|  |           return CompletableFuture.completedFuture(account); | ||||||
|  |         }); | ||||||
|  | 
 | ||||||
|  |     clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); | ||||||
|  | 
 | ||||||
|  |     processPushNotificationFeedbackCommand = | ||||||
|  |         new TestProcessPushNotificationFeedbackCommand(clock, accountsManager, true); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @ParameterizedTest | ||||||
|  |   @ValueSource(booleans = {true, false}) | ||||||
|  |   void crawlAccounts(final boolean isDryRun) { | ||||||
|  |     processPushNotificationFeedbackCommand = | ||||||
|  |         new TestProcessPushNotificationFeedbackCommand(clock, accountsManager, isDryRun); | ||||||
|  | 
 | ||||||
|  |     final Account accountWithActiveDevice = mock(Account.class); | ||||||
|  |     { | ||||||
|  |       final Device device = mock(Device.class); | ||||||
|  | 
 | ||||||
|  |       when(accountWithActiveDevice.getDevices()).thenReturn(List.of(device)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     final Account accountWithUninstalledDevice = mock(Account.class); | ||||||
|  |     { | ||||||
|  |       final Device uninstalledDevice = mock(Device.class); | ||||||
|  |       when(uninstalledDevice.getUninstalledFeedbackTimestamp()) | ||||||
|  |           .thenReturn(clock.instant().minus(ProcessPushNotificationFeedbackCommand.MAX_TOKEN_REFRESH_DELAY.multipliedBy(2)).toEpochMilli()); | ||||||
|  | 
 | ||||||
|  |       when(accountWithUninstalledDevice.getDevices()).thenReturn(List.of(uninstalledDevice)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     processPushNotificationFeedbackCommand.crawlAccounts( | ||||||
|  |         Flux.just(accountWithActiveDevice, accountWithUninstalledDevice).parallel()); | ||||||
|  | 
 | ||||||
|  |     if (isDryRun) { | ||||||
|  |       verify(accountsManager, never()).updateAsync(any(), any()); | ||||||
|  |     } else { | ||||||
|  |       verify(accountsManager, never()).updateAsync(eq(accountWithActiveDevice), any()); | ||||||
|  |       verify(accountsManager).updateAsync(eq(accountWithUninstalledDevice), any()); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @Test | ||||||
|  |   void deviceNeedsUpdate() { | ||||||
|  |     { | ||||||
|  |       final Device deviceWithMaturePushNotificationFeedback = mock(Device.class); | ||||||
|  |       when(deviceWithMaturePushNotificationFeedback.getUninstalledFeedbackTimestamp()) | ||||||
|  |           .thenReturn(clock.instant().minus(ProcessPushNotificationFeedbackCommand.MAX_TOKEN_REFRESH_DELAY.multipliedBy(2)).toEpochMilli()); | ||||||
|  | 
 | ||||||
|  |       assertTrue(processPushNotificationFeedbackCommand.deviceNeedsUpdate(deviceWithMaturePushNotificationFeedback)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     { | ||||||
|  |       final Device deviceWithRecentPushNotificationFeedback = mock(Device.class); | ||||||
|  |       when(deviceWithRecentPushNotificationFeedback.getUninstalledFeedbackTimestamp()) | ||||||
|  |           .thenReturn(clock.instant().minus(ProcessPushNotificationFeedbackCommand.MAX_TOKEN_REFRESH_DELAY.dividedBy(2)).toEpochMilli()); | ||||||
|  | 
 | ||||||
|  |       assertFalse(processPushNotificationFeedbackCommand.deviceNeedsUpdate(deviceWithRecentPushNotificationFeedback)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     { | ||||||
|  |       final Device deviceWithoutPushNotificationFeedback = mock(Device.class); | ||||||
|  | 
 | ||||||
|  |       assertFalse(processPushNotificationFeedbackCommand.deviceNeedsUpdate(deviceWithoutPushNotificationFeedback)); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @Test | ||||||
|  |   void deviceExpired() { | ||||||
|  |     { | ||||||
|  |       final Device expiredDevice = mock(Device.class); | ||||||
|  |       when(expiredDevice.getLastSeen()) | ||||||
|  |           .thenReturn( | ||||||
|  |               clock.instant().minus(ProcessPushNotificationFeedbackCommand.MAX_TOKEN_REFRESH_DELAY.multipliedBy(2)) | ||||||
|  |                   .toEpochMilli()); | ||||||
|  | 
 | ||||||
|  |       assertTrue(processPushNotificationFeedbackCommand.deviceExpired(expiredDevice)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     { | ||||||
|  |       final Device activeDevice = mock(Device.class); | ||||||
|  |       when(activeDevice.getLastSeen()).thenReturn(clock.instant().toEpochMilli()); | ||||||
|  | 
 | ||||||
|  |       assertFalse(processPushNotificationFeedbackCommand.deviceExpired(activeDevice)); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   @SuppressWarnings("OptionalUsedAsFieldOrParameterType") | ||||||
|  |   @ParameterizedTest | ||||||
|  |   @MethodSource | ||||||
|  |   void getUserAgent(final Device device, final Optional<String> expectedUserAgentString) { | ||||||
|  |     assertEquals(expectedUserAgentString, ProcessPushNotificationFeedbackCommand.getUserAgent(device)); | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   private static List<Arguments> getUserAgent() { | ||||||
|  |     final Device iosPrimaryDevice = mock(Device.class); | ||||||
|  |     when(iosPrimaryDevice.isPrimary()).thenReturn(true); | ||||||
|  |     when(iosPrimaryDevice.getApnId()).thenReturn("apns-token"); | ||||||
|  | 
 | ||||||
|  |     final Device iosLinkedDevice = mock(Device.class); | ||||||
|  |     when(iosLinkedDevice.isPrimary()).thenReturn(false); | ||||||
|  |     when(iosLinkedDevice.getApnId()).thenReturn("apns-token"); | ||||||
|  | 
 | ||||||
|  |     final Device androidDevice = mock(Device.class); | ||||||
|  |     when(androidDevice.getGcmId()).thenReturn("gcm-id"); | ||||||
|  | 
 | ||||||
|  |     final Device deviceWithoutTokens = mock(Device.class); | ||||||
|  | 
 | ||||||
|  |     return List.of( | ||||||
|  |         Arguments.of(iosPrimaryDevice, Optional.of("OWI")), | ||||||
|  |         Arguments.of(iosLinkedDevice, Optional.of("OWP")), | ||||||
|  |         Arguments.of(androidDevice, Optional.of("OWA")), | ||||||
|  |         Arguments.of(deviceWithoutTokens, Optional.empty()) | ||||||
|  |     ); | ||||||
|  |   } | ||||||
|  | } | ||||||
		Loading…
	
		Reference in New Issue
	
	 Jon Chambers
						Jon Chambers