Compare versioned profiles synchronously; log a subset of mismatches for further investigation
This commit is contained in:
parent
795b226b90
commit
d89b4f7e95
|
@ -435,8 +435,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
minThreads(availableProcessors). // mostly this is IO bound so tying to number of processors is tenuous at best
|
minThreads(availableProcessors). // mostly this is IO bound so tying to number of processors is tenuous at best
|
||||||
allowCoreThreadTimeOut(true).
|
allowCoreThreadTimeOut(true).
|
||||||
build();
|
build();
|
||||||
ExecutorService profileMigrationExperimentExecutor = environment.lifecycle()
|
|
||||||
.executorService(name(getClass(), "profileMigrationExperiment-%d")).minThreads(8).maxThreads(8).build();
|
|
||||||
|
|
||||||
StripeManager stripeManager = new StripeManager(config.getStripe().getApiKey(), stripeExecutor,
|
StripeManager stripeManager = new StripeManager(config.getStripe().getApiKey(), stripeExecutor,
|
||||||
config.getStripe().getIdempotencyKeyGenerator(), config.getStripe().getBoostDescription());
|
config.getStripe().getIdempotencyKeyGenerator(), config.getStripe().getBoostDescription());
|
||||||
|
@ -475,8 +473,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);
|
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);
|
||||||
StoredVerificationCodeManager pendingDevicesManager = new StoredVerificationCodeManager(pendingDevices);
|
StoredVerificationCodeManager pendingDevicesManager = new StoredVerificationCodeManager(pendingDevices);
|
||||||
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
||||||
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster,
|
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster, dynamicConfigurationManager);
|
||||||
dynamicConfigurationManager, profileMigrationExperimentExecutor);
|
|
||||||
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
|
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
|
||||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
|
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
|
||||||
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, Metrics.globalRegistry, config.getReportMessageConfiguration().getCounterTtl());
|
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, Metrics.globalRegistry, config.getReportMessageConfiguration().getCounterTtl());
|
||||||
|
|
|
@ -21,6 +21,9 @@ public class DynamicProfileMigrationConfiguration {
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private boolean dynamoDbReadPrimary = false;
|
private boolean dynamoDbReadPrimary = false;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private boolean logMismatches = false;
|
||||||
|
|
||||||
public boolean isDynamoDbDeleteEnabled() {
|
public boolean isDynamoDbDeleteEnabled() {
|
||||||
return dynamoDbDeleteEnabled;
|
return dynamoDbDeleteEnabled;
|
||||||
}
|
}
|
||||||
|
@ -36,4 +39,8 @@ public class DynamicProfileMigrationConfiguration {
|
||||||
public boolean isDynamoDbReadPrimary() {
|
public boolean isDynamoDbReadPrimary() {
|
||||||
return dynamoDbReadPrimary;
|
return dynamoDbReadPrimary;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isLogMismatches() {
|
||||||
|
return logMismatches;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@ import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
|
|
||||||
public class ProfilesManager {
|
public class ProfilesManager {
|
||||||
|
|
||||||
|
@ -32,19 +31,16 @@ public class ProfilesManager {
|
||||||
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
|
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
|
||||||
private final ObjectMapper mapper;
|
private final ObjectMapper mapper;
|
||||||
|
|
||||||
private final Executor migrationExperimentExecutor;
|
|
||||||
private final Experiment migrationExperiment = new Experiment("profileMigration");
|
private final Experiment migrationExperiment = new Experiment("profileMigration");
|
||||||
|
|
||||||
public ProfilesManager(final Profiles profiles,
|
public ProfilesManager(final Profiles profiles,
|
||||||
final ProfilesDynamoDb profilesDynamoDb,
|
final ProfilesDynamoDb profilesDynamoDb,
|
||||||
final FaultTolerantRedisCluster cacheCluster,
|
final FaultTolerantRedisCluster cacheCluster,
|
||||||
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
|
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
|
||||||
final Executor migrationExperimentExecutor) {
|
|
||||||
this.profiles = profiles;
|
this.profiles = profiles;
|
||||||
this.profilesDynamoDb = profilesDynamoDb;
|
this.profilesDynamoDb = profilesDynamoDb;
|
||||||
this.cacheCluster = cacheCluster;
|
this.cacheCluster = cacheCluster;
|
||||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||||
this.migrationExperimentExecutor = migrationExperimentExecutor;
|
|
||||||
this.mapper = SystemMapper.getMapper();
|
this.mapper = SystemMapper.getMapper();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,7 +72,13 @@ public class ProfilesManager {
|
||||||
profile = profiles.get(uuid, version);
|
profile = profiles.get(uuid, version);
|
||||||
|
|
||||||
if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbReadForComparisonEnabled()) {
|
if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbReadForComparisonEnabled()) {
|
||||||
migrationExperiment.compareSupplierResultAsync(profile, () -> profilesDynamoDb.get(uuid, version), migrationExperimentExecutor);
|
final Optional<VersionedProfile> dynamoProfile = profilesDynamoDb.get(uuid, version);
|
||||||
|
migrationExperiment.compareSupplierResult(profile, () -> dynamoProfile);
|
||||||
|
|
||||||
|
if (profile.isEmpty() && dynamoProfile.isPresent() &&
|
||||||
|
dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isLogMismatches()) {
|
||||||
|
logger.info("Profile {}/{} absent from relational database, but present in DynamoDB", uuid, version);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -212,7 +212,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
|
||||||
configuration.getDirectoryConfiguration().getSqsConfiguration());
|
configuration.getDirectoryConfiguration().getSqsConfiguration());
|
||||||
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
||||||
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster,
|
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster,
|
||||||
dynamicConfigurationManager, Executors.newSingleThreadExecutor());
|
dynamicConfigurationManager);
|
||||||
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb,
|
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb,
|
||||||
configuration.getReportMessageDynamoDbConfiguration().getTableName(),
|
configuration.getReportMessageDynamoDbConfiguration().getTableName(),
|
||||||
configuration.getReportMessageConfiguration().getReportTtl());
|
configuration.getReportMessageConfiguration().getReportTtl());
|
||||||
|
|
|
@ -214,7 +214,7 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
|
||||||
configuration.getDirectoryConfiguration().getSqsConfiguration());
|
configuration.getDirectoryConfiguration().getSqsConfiguration());
|
||||||
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
||||||
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster,
|
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster,
|
||||||
dynamicConfigurationManager, Executors.newSingleThreadExecutor());
|
dynamicConfigurationManager);
|
||||||
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb,
|
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb,
|
||||||
configuration.getReportMessageDynamoDbConfiguration().getTableName(),
|
configuration.getReportMessageDynamoDbConfiguration().getTableName(),
|
||||||
configuration.getReportMessageConfiguration().getReportTtl());
|
configuration.getReportMessageConfiguration().getReportTtl());
|
||||||
|
|
|
@ -22,7 +22,6 @@ import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||||
|
@ -63,8 +62,7 @@ public class ProfilesManagerTest {
|
||||||
profilesManager = new ProfilesManager(profiles,
|
profilesManager = new ProfilesManager(profiles,
|
||||||
mock(ProfilesDynamoDb.class),
|
mock(ProfilesDynamoDb.class),
|
||||||
cacheCluster,
|
cacheCluster,
|
||||||
dynamicConfigurationManager,
|
dynamicConfigurationManager);
|
||||||
mock(Executor.class));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue