diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java index f9bbc00c6..3bfe3fed5 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java @@ -133,8 +133,16 @@ public class AccountDatabaseCrawler implements Managed, Runnable { cache.setLastNumber(Optional.empty()); cache.clearAccelerate(); } else { - listeners.forEach(listener -> { listener.onCrawlChunk(fromNumber, chunkAccounts); }); - cache.setLastNumber(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getNumber())); + try { + for (AccountDatabaseCrawlerListener listener : listeners) { + listener.onCrawlChunk(fromNumber, chunkAccounts); + } + cache.setLastNumber(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getNumber())); + } catch (AccountDatabaseCrawlerRestartException e) { + cache.setLastNumber(Optional.empty()); + cache.clearAccelerate(); + } + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java index b5ba7b3d8..051fc6638 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerListener.java @@ -21,6 +21,6 @@ import java.util.Optional; public interface AccountDatabaseCrawlerListener { void onCrawlStart(); - void onCrawlChunk(Optional fromNumber, List chunkAccounts); + void onCrawlChunk(Optional fromNumber, List chunkAccounts) throws AccountDatabaseCrawlerRestartException; void onCrawlEnd(Optional fromNumber); } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerRestartException.java b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerRestartException.java new file mode 100644 index 000000000..5ccd928d6 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawlerRestartException.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2018 Open WhisperSystems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.whispersystems.textsecuregcm.storage; + +public class AccountDatabaseCrawlerRestartException extends Exception { + public AccountDatabaseCrawlerRestartException(String s) { + super(s); + } + + public AccountDatabaseCrawlerRestartException(Exception e) { + super(e); + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java index a190f3a8f..68314b3e7 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryReconciler.java @@ -63,13 +63,15 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener { } - public void onCrawlChunk(Optional fromNumber, List chunkAccounts) { + public void onCrawlChunk(Optional fromNumber, List chunkAccounts) throws AccountDatabaseCrawlerRestartException { updateDirectoryCache(chunkAccounts); DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts); DirectoryReconciliationResponse response = sendChunk(request); - + if (response.getStatus() == DirectoryReconciliationResponse.Status.MISSING) { + throw new AccountDatabaseCrawlerRestartException("directory reconciler missing"); + } } private void updateDirectoryCache(List accounts) { diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java index be74a0b7e..8a4e8732f 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java @@ -22,12 +22,14 @@ import org.whispersystems.textsecuregcm.storage.Accounts; import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler; import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache; import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException; import org.junit.Before; import org.junit.Test; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; @@ -68,7 +70,7 @@ public class AccountDatabaseCrawlerTest { } @Test - public void testCrawlStart() { + public void testCrawlStart() throws AccountDatabaseCrawlerRestartException { when(cache.getLastNumber()).thenReturn(Optional.empty()); boolean accelerated = crawler.doPeriodicWork(); @@ -94,7 +96,7 @@ public class AccountDatabaseCrawlerTest { } @Test - public void testCrawlChunk() { + public void testCrawlChunk() throws AccountDatabaseCrawlerRestartException { when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1)); boolean accelerated = crawler.doPeriodicWork(); @@ -119,7 +121,7 @@ public class AccountDatabaseCrawlerTest { } @Test - public void testCrawlChunkAccelerated() { + public void testCrawlChunkAccelerated() throws AccountDatabaseCrawlerRestartException { when(cache.isAccelerated()).thenReturn(true); when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1)); @@ -144,6 +146,33 @@ public class AccountDatabaseCrawlerTest { verifyNoMoreInteractions(cache); } + @Test + public void testCrawlChunkRestart() throws AccountDatabaseCrawlerRestartException { + when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT1)); + doThrow(AccountDatabaseCrawlerRestartException.class).when(listener).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + + boolean accelerated = crawler.doPeriodicWork(); + assertThat(accelerated).isFalse(); + + verify(cache, times(1)).claimActiveWork(any(String.class), anyLong()); + verify(cache, times(1)).getLastNumber(); + verify(accounts, times(0)).getAllFrom(eq(CHUNK_SIZE)); + verify(accounts, times(1)).getAllFrom(eq(ACCOUNT1), eq(CHUNK_SIZE)); + verify(account2, times(0)).getNumber(); + verify(listener, times(1)).onCrawlChunk(eq(Optional.of(ACCOUNT1)), eq(Arrays.asList(account2))); + verify(cache, times(1)).setLastNumber(eq(Optional.empty())); + verify(cache, times(1)).clearAccelerate(); + verify(cache, times(1)).isAccelerated(); + verify(cache, times(1)).releaseActiveWork(any(String.class)); + + verifyZeroInteractions(account1); + + verifyNoMoreInteractions(account2); + verifyNoMoreInteractions(accounts); + verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(cache); + } + @Test public void testCrawlEnd() { when(cache.getLastNumber()).thenReturn(Optional.of(ACCOUNT2)); diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java index 0178a029f..77f7ebafc 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ActiveUserCounterTest.java @@ -20,6 +20,7 @@ package org.whispersystems.textsecuregcm.tests.storage; import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.ActiveUserCounter; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.util.Util; @@ -135,7 +136,7 @@ public class ActiveUserCounterTest { } @Test - public void testCrawlChunkValidAccount() { + public void testCrawlChunkValidAccount() throws AccountDatabaseCrawlerRestartException { activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount)); verify(iosAccount, times(1)).getMasterDevice(); @@ -164,7 +165,7 @@ public class ActiveUserCounterTest { } @Test - public void testCrawlChunkNoDeviceAccount() { + public void testCrawlChunkNoDeviceAccount() throws AccountDatabaseCrawlerRestartException { activeUserCounter.onCrawlChunk(Optional.of(NUMBER_NODEVICE), Arrays.asList(noDeviceAccount)); verify(noDeviceAccount, times(1)).getMasterDevice(); @@ -188,7 +189,7 @@ public class ActiveUserCounterTest { } @Test - public void testCrawlChunkMixedAccount() { + public void testCrawlChunkMixedAccount() throws AccountDatabaseCrawlerRestartException { activeUserCounter.onCrawlChunk(Optional.of(NUMBER_IOS), Arrays.asList(iosAccount, androidAccount, noDeviceAccount)); verify(iosAccount, times(1)).getMasterDevice(); diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java index b7acac7b5..3e9337b03 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/DirectoryReconcilerTest.java @@ -21,6 +21,7 @@ import org.whispersystems.textsecuregcm.entities.ClientContact; import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest; import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse; import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartException; import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle; import org.whispersystems.textsecuregcm.storage.DirectoryManager; import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; @@ -63,7 +64,7 @@ public class DirectoryReconcilerTest { } @Test - public void testCrawlChunkValid() { + public void testCrawlChunkValid() throws AccountDatabaseCrawlerRestartException { when(reconciliationClient.sendChunk(any())).thenReturn(successResponse); directoryReconciler.onCrawlChunk(Optional.of(VALID_NUMBER), Arrays.asList(activeAccount, inactiveAccount));