Allow the account cleaner to operate on multiple accounts in parallel
This commit is contained in:
		
							parent
							
								
									ae57853ec4
								
							
						
					
					
						commit
						cb50b44d8f
					
				| 
						 | 
					@ -403,6 +403,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
 | 
				
			||||||
    ExecutorService          fcmSenderExecutor                    = environment.lifecycle().executorService(name(getClass(), "fcmSender-%d")).maxThreads(32).minThreads(32).workQueue(fcmSenderQueue).build();
 | 
					    ExecutorService          fcmSenderExecutor                    = environment.lifecycle().executorService(name(getClass(), "fcmSender-%d")).maxThreads(32).minThreads(32).workQueue(fcmSenderQueue).build();
 | 
				
			||||||
    ExecutorService          backupServiceExecutor                = environment.lifecycle().executorService(name(getClass(), "backupService-%d")).maxThreads(1).minThreads(1).build();
 | 
					    ExecutorService          backupServiceExecutor                = environment.lifecycle().executorService(name(getClass(), "backupService-%d")).maxThreads(1).minThreads(1).build();
 | 
				
			||||||
    ExecutorService          storageServiceExecutor               = environment.lifecycle().executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
 | 
					    ExecutorService          storageServiceExecutor               = environment.lifecycle().executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
 | 
				
			||||||
 | 
					    ExecutorService          accountDeletionExecutor              = environment.lifecycle().executorService(name(getClass(), "accountCleaner-%d")).maxThreads(16).minThreads(16).build();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // TODO: generally speaking this is a DynamoDB I/O executor for the accounts table; we should eventually have a general executor for speaking to the accounts table, but most of the server is still synchronous so this isn't widely useful yet
 | 
					    // TODO: generally speaking this is a DynamoDB I/O executor for the accounts table; we should eventually have a general executor for speaking to the accounts table, but most of the server is still synchronous so this isn't widely useful yet
 | 
				
			||||||
    ExecutorService batchIdentityCheckExecutor = environment.lifecycle().executorService(name(getClass(), "batchIdentityCheck-%d")).minThreads(32).maxThreads(32).build();
 | 
					    ExecutorService batchIdentityCheckExecutor = environment.lifecycle().executorService(name(getClass(), "batchIdentityCheck-%d")).minThreads(32).maxThreads(32).build();
 | 
				
			||||||
| 
						 | 
					@ -552,7 +553,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
 | 
				
			||||||
        new AccountDatabaseCrawlerCache(cacheCluster, AccountDatabaseCrawlerCache.ACCOUNT_CLEANER_PREFIX);
 | 
					        new AccountDatabaseCrawlerCache(cacheCluster, AccountDatabaseCrawlerCache.ACCOUNT_CLEANER_PREFIX);
 | 
				
			||||||
    AccountDatabaseCrawler accountCleanerAccountDatabaseCrawler = new AccountDatabaseCrawler("Account cleaner crawler",
 | 
					    AccountDatabaseCrawler accountCleanerAccountDatabaseCrawler = new AccountDatabaseCrawler("Account cleaner crawler",
 | 
				
			||||||
        accountsManager,
 | 
					        accountsManager,
 | 
				
			||||||
        accountCleanerAccountDatabaseCrawlerCache, List.of(new AccountCleaner(accountsManager)),
 | 
					        accountCleanerAccountDatabaseCrawlerCache, List.of(new AccountCleaner(accountsManager, accountDeletionExecutor)),
 | 
				
			||||||
        config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
 | 
					        config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
 | 
				
			||||||
        config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()
 | 
					        config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()
 | 
				
			||||||
    );
 | 
					    );
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -7,9 +7,13 @@ package org.whispersystems.textsecuregcm.storage;
 | 
				
			||||||
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
 | 
					import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import io.micrometer.core.instrument.Metrics;
 | 
					import io.micrometer.core.instrument.Metrics;
 | 
				
			||||||
 | 
					import java.util.ArrayList;
 | 
				
			||||||
import java.util.List;
 | 
					import java.util.List;
 | 
				
			||||||
import java.util.Optional;
 | 
					import java.util.Optional;
 | 
				
			||||||
import java.util.UUID;
 | 
					import java.util.UUID;
 | 
				
			||||||
 | 
					import java.util.concurrent.CompletableFuture;
 | 
				
			||||||
 | 
					import java.util.concurrent.CompletionException;
 | 
				
			||||||
 | 
					import java.util.concurrent.Executor;
 | 
				
			||||||
import java.util.concurrent.TimeUnit;
 | 
					import java.util.concurrent.TimeUnit;
 | 
				
			||||||
import org.slf4j.Logger;
 | 
					import org.slf4j.Logger;
 | 
				
			||||||
import org.slf4j.LoggerFactory;
 | 
					import org.slf4j.LoggerFactory;
 | 
				
			||||||
| 
						 | 
					@ -23,9 +27,11 @@ public class AccountCleaner extends AccountDatabaseCrawlerListener {
 | 
				
			||||||
  private static final String DELETION_REASON_TAG_NAME = "reason";
 | 
					  private static final String DELETION_REASON_TAG_NAME = "reason";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private final AccountsManager accountsManager;
 | 
					  private final AccountsManager accountsManager;
 | 
				
			||||||
 | 
					  private final Executor deletionExecutor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public AccountCleaner(AccountsManager accountsManager) {
 | 
					  public AccountCleaner(final AccountsManager accountsManager, final Executor deletionExecutor) {
 | 
				
			||||||
    this.accountsManager = accountsManager;
 | 
					    this.accountsManager = accountsManager;
 | 
				
			||||||
 | 
					    this.deletionExecutor = deletionExecutor;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  @Override
 | 
					  @Override
 | 
				
			||||||
| 
						 | 
					@ -38,17 +44,33 @@ public class AccountCleaner extends AccountDatabaseCrawlerListener {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  @Override
 | 
					  @Override
 | 
				
			||||||
  protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
 | 
					  protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
 | 
				
			||||||
 | 
					    final List<CompletableFuture<Void>> deletionFutures = new ArrayList<>();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    for (Account account : chunkAccounts) {
 | 
					    for (Account account : chunkAccounts) {
 | 
				
			||||||
      if (isExpired(account) || needsExplicitRemoval(account)) {
 | 
					      if (isExpired(account) || needsExplicitRemoval(account)) {
 | 
				
			||||||
        final String deletionReason = needsExplicitRemoval(account) ? "newlyExpired" : "previouslyExpired";
 | 
					        final String deletionReason = needsExplicitRemoval(account) ? "newlyExpired" : "previouslyExpired";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        deletionFutures.add(CompletableFuture.runAsync(() -> {
 | 
				
			||||||
              try {
 | 
					              try {
 | 
				
			||||||
                accountsManager.delete(account, AccountsManager.DeletionReason.EXPIRED);
 | 
					                accountsManager.delete(account, AccountsManager.DeletionReason.EXPIRED);
 | 
				
			||||||
 | 
					              } catch (final InterruptedException e) {
 | 
				
			||||||
 | 
					                throw new CompletionException(e);
 | 
				
			||||||
 | 
					              }
 | 
				
			||||||
 | 
					            }, deletionExecutor)
 | 
				
			||||||
 | 
					            .whenComplete((ignored, throwable) -> {
 | 
				
			||||||
 | 
					              if (throwable != null) {
 | 
				
			||||||
 | 
					                log.warn("Failed to delete account {}", account.getUuid(), throwable);
 | 
				
			||||||
 | 
					              } else {
 | 
				
			||||||
                Metrics.counter(DELETED_ACCOUNT_COUNTER_NAME, DELETION_REASON_TAG_NAME, deletionReason).increment();
 | 
					                Metrics.counter(DELETED_ACCOUNT_COUNTER_NAME, DELETION_REASON_TAG_NAME, deletionReason).increment();
 | 
				
			||||||
 | 
					              }
 | 
				
			||||||
 | 
					            }));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    try {
 | 
				
			||||||
 | 
					      CompletableFuture.allOf(deletionFutures.toArray(new CompletableFuture[0])).join();
 | 
				
			||||||
    } catch (final Exception e) {
 | 
					    } catch (final Exception e) {
 | 
				
			||||||
          log.warn("Failed to delete account {}", account.getUuid(), e);
 | 
					      log.debug("Failed to delete one or more accounts in chunk", e);
 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -15,7 +15,10 @@ import static org.mockito.Mockito.when;
 | 
				
			||||||
import java.util.Arrays;
 | 
					import java.util.Arrays;
 | 
				
			||||||
import java.util.Optional;
 | 
					import java.util.Optional;
 | 
				
			||||||
import java.util.UUID;
 | 
					import java.util.UUID;
 | 
				
			||||||
 | 
					import java.util.concurrent.ExecutorService;
 | 
				
			||||||
 | 
					import java.util.concurrent.Executors;
 | 
				
			||||||
import java.util.concurrent.TimeUnit;
 | 
					import java.util.concurrent.TimeUnit;
 | 
				
			||||||
 | 
					import org.junit.jupiter.api.AfterEach;
 | 
				
			||||||
import org.junit.jupiter.api.BeforeEach;
 | 
					import org.junit.jupiter.api.BeforeEach;
 | 
				
			||||||
import org.junit.jupiter.api.Test;
 | 
					import org.junit.jupiter.api.Test;
 | 
				
			||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason;
 | 
					import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason;
 | 
				
			||||||
| 
						 | 
					@ -32,6 +35,8 @@ class AccountCleanerTest {
 | 
				
			||||||
  private final Device  undeletedDisabledDevice  = mock(Device.class );
 | 
					  private final Device  undeletedDisabledDevice  = mock(Device.class );
 | 
				
			||||||
  private final Device  undeletedEnabledDevice   = mock(Device.class );
 | 
					  private final Device  undeletedEnabledDevice   = mock(Device.class );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  private ExecutorService deletionExecutor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  @BeforeEach
 | 
					  @BeforeEach
 | 
				
			||||||
  void setup() {
 | 
					  void setup() {
 | 
				
			||||||
| 
						 | 
					@ -61,11 +66,19 @@ class AccountCleanerTest {
 | 
				
			||||||
    when(undeletedEnabledAccount.getNumber()).thenReturn("+14153333333");
 | 
					    when(undeletedEnabledAccount.getNumber()).thenReturn("+14153333333");
 | 
				
			||||||
    when(undeletedEnabledAccount.getLastSeen()).thenReturn(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(364));
 | 
					    when(undeletedEnabledAccount.getLastSeen()).thenReturn(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(364));
 | 
				
			||||||
    when(undeletedEnabledAccount.getUuid()).thenReturn(UUID.randomUUID());
 | 
					    when(undeletedEnabledAccount.getUuid()).thenReturn(UUID.randomUUID());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    deletionExecutor = Executors.newFixedThreadPool(2);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @AfterEach
 | 
				
			||||||
 | 
					  void tearDown() throws InterruptedException {
 | 
				
			||||||
 | 
					    deletionExecutor.shutdown();
 | 
				
			||||||
 | 
					    deletionExecutor.awaitTermination(2, TimeUnit.SECONDS);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  @Test
 | 
					  @Test
 | 
				
			||||||
  void testAccounts() throws AccountDatabaseCrawlerRestartException, InterruptedException {
 | 
					  void testAccounts() throws AccountDatabaseCrawlerRestartException, InterruptedException {
 | 
				
			||||||
    AccountCleaner accountCleaner = new AccountCleaner(accountsManager);
 | 
					    AccountCleaner accountCleaner = new AccountCleaner(accountsManager, deletionExecutor);
 | 
				
			||||||
    accountCleaner.onCrawlStart();
 | 
					    accountCleaner.onCrawlStart();
 | 
				
			||||||
    accountCleaner.timeAndProcessCrawlChunk(Optional.empty(), Arrays.asList(deletedDisabledAccount, undeletedDisabledAccount, undeletedEnabledAccount));
 | 
					    accountCleaner.timeAndProcessCrawlChunk(Optional.empty(), Arrays.asList(deletedDisabledAccount, undeletedDisabledAccount, undeletedEnabledAccount));
 | 
				
			||||||
    accountCleaner.onCrawlEnd(Optional.empty());
 | 
					    accountCleaner.onCrawlEnd(Optional.empty());
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue