Subdivide the account database crawler cache experiment and add logging to track down lingering disagreements.
This commit is contained in:
parent
e53a7f65b8
commit
7faf143a97
|
@ -18,6 +18,8 @@ package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
import io.lettuce.core.ScriptOutputType;
|
import io.lettuce.core.ScriptOutputType;
|
||||||
import io.lettuce.core.SetArgs;
|
import io.lettuce.core.SetArgs;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.experiment.Experiment;
|
import org.whispersystems.textsecuregcm.experiment.Experiment;
|
||||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||||
|
@ -41,11 +43,14 @@ public class AccountDatabaseCrawlerCache {
|
||||||
|
|
||||||
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
|
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(AccountDatabaseCrawlerCache.class);
|
||||||
|
|
||||||
private final ReplicatedJedisPool jedisPool;
|
private final ReplicatedJedisPool jedisPool;
|
||||||
private final FaultTolerantRedisCluster cacheCluster;
|
private final FaultTolerantRedisCluster cacheCluster;
|
||||||
private final LuaScript unlockScript;
|
private final LuaScript unlockScript;
|
||||||
private final ClusterLuaScript unlockClusterScript;
|
private final ClusterLuaScript unlockClusterScript;
|
||||||
private final Experiment redisClusterExperiment = new Experiment("RedisCluster", "AccountDatabaseCrawlerCache");
|
private final Experiment isAcceleratedExperiment = new Experiment("RedisCluster", "AccountDatabaseCrawlerCache", "isAccelerated");
|
||||||
|
private final Experiment getLastUuidExperiment = new Experiment("RedisCluster", "AccountDatabaseCrawlerCache", "getLastUuid");
|
||||||
|
|
||||||
public AccountDatabaseCrawlerCache(ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster) throws IOException {
|
public AccountDatabaseCrawlerCache(ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster) throws IOException {
|
||||||
this.jedisPool = jedisPool;
|
this.jedisPool = jedisPool;
|
||||||
|
@ -64,7 +69,7 @@ public class AccountDatabaseCrawlerCache {
|
||||||
public boolean isAccelerated() {
|
public boolean isAccelerated() {
|
||||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||||
final String accelerated = jedis.get(ACCELERATE_KEY);
|
final String accelerated = jedis.get(ACCELERATE_KEY);
|
||||||
redisClusterExperiment.compareSupplierResult(accelerated, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(ACCELERATE_KEY)));
|
isAcceleratedExperiment.compareSupplierResult(accelerated, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(ACCELERATE_KEY)));
|
||||||
|
|
||||||
return "1".equals(accelerated);
|
return "1".equals(accelerated);
|
||||||
}
|
}
|
||||||
|
@ -87,13 +92,18 @@ public class AccountDatabaseCrawlerCache {
|
||||||
List<byte[]> keys = Arrays.asList(ACTIVE_WORKER_KEY.getBytes());
|
List<byte[]> keys = Arrays.asList(ACTIVE_WORKER_KEY.getBytes());
|
||||||
List<byte[]> args = Arrays.asList(workerId.getBytes());
|
List<byte[]> args = Arrays.asList(workerId.getBytes());
|
||||||
unlockScript.execute(keys, args);
|
unlockScript.execute(keys, args);
|
||||||
unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId));
|
|
||||||
|
try {
|
||||||
|
unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId));
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to execute clustered unlock script", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<UUID> getLastUuid() {
|
public Optional<UUID> getLastUuid() {
|
||||||
try (Jedis jedis = jedisPool.getWriteResource()) {
|
try (Jedis jedis = jedisPool.getWriteResource()) {
|
||||||
String lastUuidString = jedis.get(LAST_UUID_KEY);
|
String lastUuidString = jedis.get(LAST_UUID_KEY);
|
||||||
redisClusterExperiment.compareSupplierResult(lastUuidString, () -> cacheCluster.withWriteCluster(connection -> connection.sync().get(LAST_UUID_KEY)));
|
getLastUuidExperiment.compareSupplierResult(lastUuidString, () -> cacheCluster.withWriteCluster(connection -> connection.sync().get(LAST_UUID_KEY)));
|
||||||
|
|
||||||
if (lastUuidString == null) return Optional.empty();
|
if (lastUuidString == null) return Optional.empty();
|
||||||
else return Optional.of(UUID.fromString(lastUuidString));
|
else return Optional.of(UUID.fromString(lastUuidString));
|
||||||
|
|
Loading…
Reference in New Issue