Add crawler names to log messages

This commit is contained in:
Jon Chambers 2021-11-23 15:43:21 -05:00 committed by Jon Chambers
parent ee1f8b34ea
commit a42fe9bfb0
4 changed files with 25 additions and 22 deletions

View File

@ -542,7 +542,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountDatabaseCrawlerCache directoryReconciliationAccountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache( AccountDatabaseCrawlerCache directoryReconciliationAccountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(
cacheCluster, AccountDatabaseCrawlerCache.DIRECTORY_RECONCILER_PREFIX); cacheCluster, AccountDatabaseCrawlerCache.DIRECTORY_RECONCILER_PREFIX);
AccountDatabaseCrawler directoryReconciliationAccountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, AccountDatabaseCrawler directoryReconciliationAccountDatabaseCrawler = new AccountDatabaseCrawler(
"Reconciliation crawler",
accountsManager,
directoryReconciliationAccountDatabaseCrawlerCache, directoryReconciliationAccountDatabaseCrawlerListeners, directoryReconciliationAccountDatabaseCrawlerCache, directoryReconciliationAccountDatabaseCrawlerListeners,
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs() config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()
@ -560,7 +562,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster, AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster,
AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX); AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler("General-purpose account crawler",
accountsManager,
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs() config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()

View File

@ -34,6 +34,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
private static final long WORKER_TTL_MS = 120_000L; private static final long WORKER_TTL_MS = 120_000L;
private static final long ACCELERATED_CHUNK_INTERVAL = 10L; private static final long ACCELERATED_CHUNK_INTERVAL = 10L;
private final String name;
private final AccountsManager accounts; private final AccountsManager accounts;
private final int chunkSize; private final int chunkSize;
private final long chunkIntervalMs; private final long chunkIntervalMs;
@ -44,11 +45,13 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
private AtomicBoolean running = new AtomicBoolean(false); private AtomicBoolean running = new AtomicBoolean(false);
private boolean finished; private boolean finished;
public AccountDatabaseCrawler(AccountsManager accounts, public AccountDatabaseCrawler(final String name,
AccountsManager accounts,
AccountDatabaseCrawlerCache cache, AccountDatabaseCrawlerCache cache,
List<AccountDatabaseCrawlerListener> listeners, List<AccountDatabaseCrawlerListener> listeners,
int chunkSize, int chunkSize,
long chunkIntervalMs) { long chunkIntervalMs) {
this.name = name;
this.accounts = accounts; this.accounts = accounts;
this.chunkSize = chunkSize; this.chunkSize = chunkSize;
this.chunkIntervalMs = chunkIntervalMs; this.chunkIntervalMs = chunkIntervalMs;
@ -82,7 +85,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
accelerated = doPeriodicWork(); accelerated = doPeriodicWork();
sleepWhileRunning(accelerated ? ACCELERATED_CHUNK_INTERVAL : chunkIntervalMs); sleepWhileRunning(accelerated ? ACCELERATED_CHUNK_INTERVAL : chunkIntervalMs);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("error in database crawl: {}: {}", t.getClass().getSimpleName(), t.getMessage(), t); logger.warn("{}: error in database crawl: {}: {}", name, t.getClass().getSimpleName(), t.getMessage(), t);
Util.sleep(10000); Util.sleep(10000);
} }
} }
@ -106,7 +109,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
final long endTimeMs = System.currentTimeMillis(); final long endTimeMs = System.currentTimeMillis();
final long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs); final long sleepIntervalMs = chunkIntervalMs - (endTimeMs - startTimeMs);
if (sleepIntervalMs > 0) { if (sleepIntervalMs > 0) {
logger.debug("Sleeping {}ms", sleepIntervalMs); logger.debug("{}: Sleeping {}ms", name, sleepIntervalMs);
sleepWhileRunning(sleepIntervalMs); sleepWhileRunning(sleepIntervalMs);
} }
} finally { } finally {
@ -123,19 +126,19 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
final Optional<UUID> fromUuid = getLastUuid(); final Optional<UUID> fromUuid = getLastUuid();
if (fromUuid.isEmpty()) { if (fromUuid.isEmpty()) {
logger.info("Started crawl"); logger.info("{}: Started crawl", name);
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart); listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
} }
final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize); final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize);
if (chunkAccounts.getAccounts().isEmpty()) { if (chunkAccounts.getAccounts().isEmpty()) {
logger.info("Finished crawl"); logger.info("{}: Finished crawl", name);
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid)); listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
cacheLastUuid(Optional.empty()); cacheLastUuid(Optional.empty());
cache.setAccelerated(false); cache.setAccelerated(false);
} else { } else {
logger.debug("Processing chunk"); logger.debug("{}: Processing chunk", name);
try { try {
for (AccountDatabaseCrawlerListener listener : listeners) { for (AccountDatabaseCrawlerListener listener : listeners) {
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts()); listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts());

View File

@ -58,7 +58,7 @@ public class AccountDatabaseCrawlerIntegrationTest extends AbstractRedisClusterT
.thenReturn(new AccountCrawlChunk(Collections.emptyList(), null)); .thenReturn(new AccountCrawlChunk(Collections.emptyList(), null));
final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(getRedisCluster(), "test"); final AccountDatabaseCrawlerCache crawlerCache = new AccountDatabaseCrawlerCache(getRedisCluster(), "test");
accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE, accountDatabaseCrawler = new AccountDatabaseCrawler("test", accountsManager, crawlerCache, List.of(listener), CHUNK_SIZE,
CHUNK_INTERVAL_MS); CHUNK_INTERVAL_MS);
} }

View File

@ -18,11 +18,10 @@ import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Account;
@ -48,10 +47,8 @@ class AccountDatabaseCrawlerTest {
private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class); private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class);
private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class); private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class);
private final ExecutorService chunkPreReadExecutorService = mock(ExecutorService.class); private final AccountDatabaseCrawler crawler =
new AccountDatabaseCrawler("test", accounts, cache, List.of(listener), CHUNK_SIZE, CHUNK_INTERVAL_MS);
private final AccountDatabaseCrawler crawler = new AccountDatabaseCrawler(accounts, cache, Arrays.asList(listener),
CHUNK_SIZE, CHUNK_INTERVAL_MS);
@BeforeEach @BeforeEach
void setup() { void setup() {
@ -59,9 +56,9 @@ class AccountDatabaseCrawlerTest {
when(account2.getUuid()).thenReturn(ACCOUNT2); when(account2.getUuid()).thenReturn(ACCOUNT2);
when(accounts.getAllFromDynamo(anyInt())).thenReturn( when(accounts.getAllFromDynamo(anyInt())).thenReturn(
new AccountCrawlChunk(Arrays.asList(account1, account2), ACCOUNT2)); new AccountCrawlChunk(List.of(account1, account2), ACCOUNT2));
when(accounts.getAllFromDynamo(eq(ACCOUNT1), anyInt())).thenReturn( when(accounts.getAllFromDynamo(eq(ACCOUNT1), anyInt())).thenReturn(
new AccountCrawlChunk(Arrays.asList(account2), ACCOUNT2)); new AccountCrawlChunk(List.of(account2), ACCOUNT2));
when(accounts.getAllFromDynamo(eq(ACCOUNT2), anyInt())).thenReturn( when(accounts.getAllFromDynamo(eq(ACCOUNT2), anyInt())).thenReturn(
new AccountCrawlChunk(Collections.emptyList(), null)); new AccountCrawlChunk(Collections.emptyList(), null));
@ -85,7 +82,7 @@ class AccountDatabaseCrawlerTest {
verify(accounts, times(1)).getAllFromDynamo(eq(CHUNK_SIZE)); verify(accounts, times(1)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(0)).getAllFromDynamo(any(UUID.class), eq(CHUNK_SIZE)); verify(accounts, times(0)).getAllFromDynamo(any(UUID.class), eq(CHUNK_SIZE));
verify(account1, times(0)).getUuid(); verify(account1, times(0)).getUuid();
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.empty()), eq(Arrays.asList(account1, account2))); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.empty()), eq(List.of(account1, account2)));
verify(cache, times(0)).setLastUuid(eq(Optional.of(ACCOUNT2))); verify(cache, times(0)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2))); verify(cache, times(1)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).isAccelerated();
@ -111,7 +108,7 @@ class AccountDatabaseCrawlerTest {
verify(cache, times(1)).getLastUuidDynamo(); verify(cache, times(1)).getLastUuidDynamo();
verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE)); verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE)); verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2)));
verify(cache, times(0)).setLastUuid(eq(Optional.of(ACCOUNT2))); verify(cache, times(0)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2))); verify(cache, times(1)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).isAccelerated();
@ -139,7 +136,7 @@ class AccountDatabaseCrawlerTest {
verify(cache, times(1)).getLastUuidDynamo(); verify(cache, times(1)).getLastUuidDynamo();
verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE)); verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE)); verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2)));
verify(cache, times(0)).setLastUuid(eq(Optional.of(ACCOUNT2))); verify(cache, times(0)).setLastUuid(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2))); verify(cache, times(1)).setLastUuidDynamo(eq(Optional.of(ACCOUNT2)));
verify(cache, times(1)).isAccelerated(); verify(cache, times(1)).isAccelerated();
@ -158,7 +155,7 @@ class AccountDatabaseCrawlerTest {
when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1)); when(cache.getLastUuid()).thenReturn(Optional.of(ACCOUNT1));
when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1)); when(cache.getLastUuidDynamo()).thenReturn(Optional.of(ACCOUNT1));
doThrow(AccountDatabaseCrawlerRestartException.class).when(listener) doThrow(AccountDatabaseCrawlerRestartException.class).when(listener)
.timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); .timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2)));
boolean accelerated = crawler.doPeriodicWork(); boolean accelerated = crawler.doPeriodicWork();
assertThat(accelerated).isFalse(); assertThat(accelerated).isFalse();
@ -169,7 +166,7 @@ class AccountDatabaseCrawlerTest {
verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE)); verify(accounts, times(0)).getAllFromDynamo(eq(CHUNK_SIZE));
verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE)); verify(accounts, times(1)).getAllFromDynamo(eq(ACCOUNT1), eq(CHUNK_SIZE));
verify(account2, times(0)).getNumber(); verify(account2, times(0)).getNumber();
verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); verify(listener, times(1)).timeAndProcessCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(List.of(account2)));
verify(cache, times(0)).setLastUuid(eq(Optional.empty())); verify(cache, times(0)).setLastUuid(eq(Optional.empty()));
verify(cache, times(1)).setLastUuidDynamo(eq(Optional.empty())); verify(cache, times(1)).setLastUuidDynamo(eq(Optional.empty()));
verify(cache, times(1)).setAccelerated(false); verify(cache, times(1)).setAccelerated(false);