Use config option to tune reconciliation instead of auto-tuning
the COUNT query on the accounts db is too heavyweight and risky to justify an auto-tuning reconciliation cycle
This commit is contained in:
parent
7a262eac12
commit
8d72515a30
|
@ -40,10 +40,11 @@ directory:
|
|||
accessSecret: # AWS SQS accessSecret
|
||||
queueUrl: # AWS SQS queue url
|
||||
server:
|
||||
replicationUrl: # CDS replication endpoint base url
|
||||
replicationPassword: # CDS replication endpoint password
|
||||
replicationCaCertificate: # CDS replication endpoint TLS certificate trust root
|
||||
|
||||
replicationUrl: # CDS replication endpoint base url
|
||||
replicationPassword: # CDS replication endpoint password
|
||||
replicationCaCertificate: # CDS replication endpoint TLS certificate trust root
|
||||
reconciliationChunkSize: # CDS reconciliation chunk size
|
||||
reconciliationChunkIntervalMs: # CDS reconciliation chunk interval, in milliseconds
|
||||
|
||||
messageCache: # Redis server configuration for message store cache
|
||||
url:
|
||||
|
|
|
@ -204,7 +204,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
config.getDirectoryConfiguration().getDirectoryClientConfiguration().getUserAuthenticationTokenUserIdSecret());
|
||||
DirectoryReconciliationCache directoryReconciliationCache = new DirectoryReconciliationCache(cacheClient);
|
||||
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
|
||||
DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryReconciliationClient, directoryReconciliationCache, directory, accounts);
|
||||
DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryReconciliationClient, directoryReconciliationCache, directory, accounts,
|
||||
config.getDirectoryConfiguration().getDirectoryServerConfiguration().getReconciliationChunkSize(),
|
||||
config.getDirectoryConfiguration().getDirectoryServerConfiguration().getReconciliationChunkIntervalMs());
|
||||
|
||||
messagesCache.setPubSubManager(pubSubManager, pushSender);
|
||||
|
||||
|
|
|
@ -33,6 +33,12 @@ public class DirectoryServerConfiguration {
|
|||
@JsonProperty
|
||||
private String replicationCaCertificate;
|
||||
|
||||
@JsonProperty
|
||||
private int reconciliationChunkSize = 1000;
|
||||
|
||||
@JsonProperty
|
||||
private long reconciliationChunkIntervalMs = 8000L;
|
||||
|
||||
public String getReplicationUrl() {
|
||||
return replicationUrl;
|
||||
}
|
||||
|
@ -44,4 +50,12 @@ public class DirectoryServerConfiguration {
|
|||
public String getReplicationCaCertificate() {
|
||||
return replicationCaCertificate;
|
||||
}
|
||||
|
||||
public int getReconciliationChunkSize() {
|
||||
return reconciliationChunkSize;
|
||||
}
|
||||
|
||||
public long getReconciliationChunkIntervalMs() {
|
||||
return reconciliationChunkIntervalMs;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,18 +51,16 @@ public class DirectoryReconciler implements Managed, Runnable {
|
|||
private static final Meter sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, "sendChunkError"));
|
||||
|
||||
private static final long WORKER_TTL_MS = 120_000L;
|
||||
private static final long PERIOD = 86400_000L;
|
||||
private static final long MAXIMUM_CHUNK_INTERVAL = 30_000L;
|
||||
private static final long DEFAULT_CHUNK_INTERVAL = 10_000L;
|
||||
private static final long MINIMUM_CHUNK_INTERVAL = 500L;
|
||||
private static final long ACCELERATED_CHUNK_INTERVAL = 10L;
|
||||
private static final int CHUNK_SIZE = 1000;
|
||||
private static final double JITTER_MAX = 0.20;
|
||||
|
||||
private final Accounts accounts;
|
||||
private final DirectoryManager directoryManager;
|
||||
private final DirectoryReconciliationClient reconciliationClient;
|
||||
private final DirectoryReconciliationCache reconciliationCache;
|
||||
private final int chunkSize;
|
||||
private final long chunkIntervalMs;
|
||||
private final String workerId;
|
||||
private final SecureRandom random;
|
||||
|
||||
|
@ -72,11 +70,15 @@ public class DirectoryReconciler implements Managed, Runnable {
|
|||
public DirectoryReconciler(DirectoryReconciliationClient reconciliationClient,
|
||||
DirectoryReconciliationCache reconciliationCache,
|
||||
DirectoryManager directoryManager,
|
||||
Accounts accounts) {
|
||||
Accounts accounts,
|
||||
int chunkSize,
|
||||
long chunkIntervalMs) {
|
||||
this.accounts = accounts;
|
||||
this.directoryManager = directoryManager;
|
||||
this.reconciliationClient = reconciliationClient;
|
||||
this.reconciliationCache = reconciliationCache;
|
||||
this.chunkSize = chunkSize;
|
||||
this.chunkIntervalMs = chunkIntervalMs;
|
||||
this.random = new SecureRandom();
|
||||
this.workerId = generateWorkerId(random);
|
||||
}
|
||||
|
@ -104,12 +106,11 @@ public class DirectoryReconciler implements Managed, Runnable {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
long delayMs = DEFAULT_CHUNK_INTERVAL;
|
||||
long delayMs = chunkIntervalMs;
|
||||
|
||||
while (sleepWhileRunning(getDelayWithJitter(delayMs))) {
|
||||
try {
|
||||
delayMs = DEFAULT_CHUNK_INTERVAL;
|
||||
delayMs = getBoundedChunkInterval(PERIOD * CHUNK_SIZE / getAccountCount());
|
||||
delayMs = getBoundedChunkInterval(chunkIntervalMs);
|
||||
delayMs = doPeriodicWork(delayMs);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("error in directory reconciliation: ", t);
|
||||
|
@ -140,19 +141,6 @@ public class DirectoryReconciler implements Managed, Runnable {
|
|||
return intervalMs;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getAccountCount() {
|
||||
Optional<Long> cachedCount = reconciliationCache.getCachedAccountCount();
|
||||
|
||||
if (cachedCount.isPresent()) {
|
||||
return cachedCount.get();
|
||||
}
|
||||
|
||||
long count = accounts.getCount();
|
||||
reconciliationCache.setCachedAccountCount(count);
|
||||
return count;
|
||||
}
|
||||
|
||||
private synchronized boolean sleepWhileRunning(long delayMs) {
|
||||
long startTimeMs = System.currentTimeMillis();
|
||||
while (running && delayMs > 0) {
|
||||
|
@ -170,7 +158,7 @@ public class DirectoryReconciler implements Managed, Runnable {
|
|||
}
|
||||
|
||||
private long getBoundedChunkInterval(long intervalMs) {
|
||||
return Math.max(Math.min(intervalMs, MAXIMUM_CHUNK_INTERVAL), MINIMUM_CHUNK_INTERVAL);
|
||||
return Math.max(Math.min(intervalMs, chunkIntervalMs), MINIMUM_CHUNK_INTERVAL);
|
||||
}
|
||||
|
||||
private long getDelayWithJitter(long delayMs) {
|
||||
|
@ -180,7 +168,7 @@ public class DirectoryReconciler implements Managed, Runnable {
|
|||
|
||||
private boolean processChunk() {
|
||||
Optional<String> fromNumber = reconciliationCache.getLastNumber();
|
||||
List<Account> chunkAccounts = readChunk(fromNumber, CHUNK_SIZE);
|
||||
List<Account> chunkAccounts = readChunk(fromNumber, chunkSize);
|
||||
|
||||
writeChunktoDirectoryCache(chunkAccounts);
|
||||
|
||||
|
|
|
@ -29,10 +29,8 @@ public class DirectoryReconciliationCache {
|
|||
|
||||
private static final String ACTIVE_WORKER_KEY = "directory_reconciliation_active_worker";
|
||||
private static final String LAST_NUMBER_KEY = "directory_reconciliation_last_number";
|
||||
private static final String CACHED_COUNT_KEY = "directory_reconciliation_cached_count";
|
||||
private static final String ACCELERATE_KEY = "directory_reconciliation_accelerate";
|
||||
|
||||
private static final long CACHED_COUNT_TTL_MS = 21600_000L;
|
||||
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
|
||||
|
||||
private final ReplicatedJedisPool jedisPool;
|
||||
|
@ -78,27 +76,6 @@ public class DirectoryReconciliationCache {
|
|||
}
|
||||
}
|
||||
|
||||
public Optional<Long> getCachedAccountCount() {
|
||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||
Optional<String> cachedAccountCount = Optional.fromNullable(jedis.get(CACHED_COUNT_KEY));
|
||||
if (!cachedAccountCount.isPresent()) {
|
||||
return Optional.absent();
|
||||
}
|
||||
|
||||
try {
|
||||
return Optional.of(Long.parseUnsignedLong(cachedAccountCount.get()));
|
||||
} catch (NumberFormatException ex) {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void setCachedAccountCount(long accountCount) {
|
||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||
jedis.psetex(CACHED_COUNT_KEY, CACHED_COUNT_TTL_MS, Long.toString(accountCount));
|
||||
}
|
||||
}
|
||||
|
||||
public static class UnlockOperation {
|
||||
|
||||
private final LuaScript luaScript;
|
||||
|
|
|
@ -35,7 +35,6 @@ public class DirectoryReconcilerTest {
|
|||
private static final String VALID_NUMBER = "valid";
|
||||
private static final String INACTIVE_NUMBER = "inactive";
|
||||
|
||||
private static final long ACCOUNT_COUNT = 0L;
|
||||
private static final long INTERVAL_MS = 30_000L;
|
||||
|
||||
private final Account account = mock(Account.class);
|
||||
|
@ -45,7 +44,7 @@ public class DirectoryReconcilerTest {
|
|||
private final DirectoryManager directoryManager = mock(DirectoryManager.class);
|
||||
private final DirectoryReconciliationClient reconciliationClient = mock(DirectoryReconciliationClient.class);
|
||||
private final DirectoryReconciliationCache reconciliationCache = mock(DirectoryReconciliationCache.class);
|
||||
private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, reconciliationCache, directoryManager, accounts);
|
||||
private final DirectoryReconciler directoryReconciler = new DirectoryReconciler(reconciliationClient, reconciliationCache, directoryManager, accounts, 1000, INTERVAL_MS);
|
||||
|
||||
private final DirectoryReconciliationResponse successResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK);
|
||||
private final DirectoryReconciliationResponse notFoundResponse = new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.MISSING);
|
||||
|
@ -64,7 +63,6 @@ public class DirectoryReconcilerTest {
|
|||
when(accounts.getAllFrom(anyInt())).thenReturn(Arrays.asList(account, inactiveAccount));
|
||||
when(accounts.getAllFrom(eq(VALID_NUMBER), anyInt())).thenReturn(Arrays.asList(inactiveAccount));
|
||||
when(accounts.getAllFrom(eq(INACTIVE_NUMBER), anyInt())).thenReturn(Collections.emptyList());
|
||||
when(accounts.getCount()).thenReturn(ACCOUNT_COUNT);
|
||||
|
||||
when(reconciliationClient.sendChunk(any())).thenReturn(successResponse);
|
||||
|
||||
|
@ -73,41 +71,6 @@ public class DirectoryReconcilerTest {
|
|||
when(reconciliationCache.isAccelerated()).thenReturn(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetUncachedAccountCount() {
|
||||
when(reconciliationCache.getCachedAccountCount()).thenReturn(Optional.absent());
|
||||
|
||||
long accountCount = directoryReconciler.getAccountCount();
|
||||
|
||||
assertThat(accountCount).isEqualTo(ACCOUNT_COUNT);
|
||||
|
||||
verify(accounts, times(1)).getCount();
|
||||
|
||||
verify(reconciliationCache, times(1)).getCachedAccountCount();
|
||||
verify(reconciliationCache, times(1)).setCachedAccountCount(eq(ACCOUNT_COUNT));
|
||||
|
||||
verifyNoMoreInteractions(directoryManager);
|
||||
verifyNoMoreInteractions(accounts);
|
||||
verifyNoMoreInteractions(reconciliationClient);
|
||||
verifyNoMoreInteractions(reconciliationCache);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCachedAccountCount() {
|
||||
when(reconciliationCache.getCachedAccountCount()).thenReturn(Optional.of(ACCOUNT_COUNT));
|
||||
|
||||
long accountCount = directoryReconciler.getAccountCount();
|
||||
|
||||
assertThat(accountCount).isEqualTo(ACCOUNT_COUNT);
|
||||
|
||||
verify(reconciliationCache, times(1)).getCachedAccountCount();
|
||||
|
||||
verifyNoMoreInteractions(directoryManager);
|
||||
verifyNoMoreInteractions(accounts);
|
||||
verifyNoMoreInteractions(reconciliationClient);
|
||||
verifyNoMoreInteractions(reconciliationCache);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValid() {
|
||||
long delayMs = directoryReconciler.doPeriodicWork(INTERVAL_MS);
|
||||
|
|
Loading…
Reference in New Issue