Drop relational profiles store

This commit is contained in:
Jon Chambers 2021-11-24 17:04:05 -05:00 committed by Jon Chambers
parent 6aceb24fd2
commit afa910bbd7
8 changed files with 14 additions and 245 deletions

View File

@ -188,7 +188,6 @@ import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.NonNormalizedAccountCrawlerListener;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
@ -386,7 +385,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getPhoneNumberIdentifiersDynamoDbConfiguration().getTableName());
ReservedUsernames reservedUsernames = new ReservedUsernames(reservedUsernamesDynamoDbClient,
config.getReservedUsernamesDynamoDbConfiguration().getTableName());
Profiles profiles = new Profiles(accountDatabase);
ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
config.getDynamoDbTables().getProfiles().getTableName());
Keys keys = new Keys(preKeyDynamoDb, config.getKeysDynamoDbConfiguration().getTableName());
@ -468,7 +466,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);
StoredVerificationCodeManager pendingDevicesManager = new StoredVerificationCodeManager(pendingDevices);
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster, dynamicConfigurationManager);
ProfilesManager profilesManager = new ProfilesManager(profilesDynamoDb, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, Metrics.globalRegistry, config.getReportMessageConfiguration().getCounterTtl());

View File

@ -1,104 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import java.util.Optional;
import java.util.UUID;
import org.whispersystems.textsecuregcm.storage.mappers.VersionedProfileMapper;
import org.whispersystems.textsecuregcm.util.Constants;
public class Profiles implements ProfilesStore {
public static final String ID = "id";
public static final String UID = "uuid";
public static final String VERSION = "version";
public static final String NAME = "name";
public static final String AVATAR = "avatar";
public static final String ABOUT_EMOJI = "about_emoji";
public static final String ABOUT = "about";
public static final String PAYMENT_ADDRESS = "payment_address";
public static final String COMMITMENT = "commitment";
public static final String DELETED = "deleted";
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer setTimer = metricRegistry.timer(name(Profiles.class, "set" ));
private final Timer getTimer = metricRegistry.timer(name(Profiles.class, "get" ));
private final Timer deleteTimer = metricRegistry.timer(name(Profiles.class, "delete"));
private final FaultTolerantDatabase database;
public Profiles(FaultTolerantDatabase database) {
this.database = database;
this.database.getDatabase().registerRowMapper(new VersionedProfileMapper());
}
@Override
public void set(UUID uuid, VersionedProfile profile) {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = setTimer.time()) {
handle.createUpdate(
"INSERT INTO profiles ("
+ UID + ", "
+ VERSION + ", "
+ NAME + ", "
+ AVATAR + ", "
+ ABOUT_EMOJI + ", "
+ ABOUT + ", "
+ PAYMENT_ADDRESS + ", "
+ COMMITMENT + ") "
+ "VALUES (:uuid, :version, :name, :avatar, :about_emoji, :about, :payment_address, :commitment) "
+ "ON CONFLICT (" + UID + ", " + VERSION + ") "
+ "DO UPDATE SET "
+ NAME + " = EXCLUDED." + NAME + ", "
+ AVATAR + " = EXCLUDED." + AVATAR + ", "
+ ABOUT + " = EXCLUDED." + ABOUT + ", "
+ ABOUT_EMOJI + " = EXCLUDED." + ABOUT_EMOJI + ", "
+ PAYMENT_ADDRESS + " = EXCLUDED." + PAYMENT_ADDRESS + ", "
+ DELETED + " = FALSE, "
+ COMMITMENT + " = CASE WHEN profiles." + DELETED + " = TRUE THEN EXCLUDED." + COMMITMENT + " ELSE profiles." + COMMITMENT + " END")
.bind("uuid", uuid)
.bind("version", profile.getVersion())
.bind("name", profile.getName())
.bind("avatar", profile.getAvatar())
.bind("about_emoji", profile.getAboutEmoji())
.bind("about", profile.getAbout())
.bind("payment_address", profile.getPaymentAddress())
.bind("commitment", profile.getCommitment())
.execute();
}
}));
}
@Override
public Optional<VersionedProfile> get(UUID uuid, String version) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getTimer.time()) {
return handle.createQuery("SELECT * FROM profiles WHERE " + UID + " = :uuid AND " + VERSION + " = :version AND " + DELETED + "= FALSE")
.bind("uuid", uuid)
.bind("version", version)
.mapTo(VersionedProfile.class)
.findFirst();
}
}));
}
@Override
public void deleteAll(UUID uuid) {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = deleteTimer.time()) {
handle.createUpdate("UPDATE profiles SET " + DELETED + " = TRUE WHERE " + UID + " = :uuid")
.bind("uuid", uuid)
.execute();
}
}));
}
}

View File

@ -8,16 +8,13 @@ package org.whispersystems.textsecuregcm.storage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.RedisException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.SystemMapper;
public class ProfilesManager {
@ -25,63 +22,31 @@ public class ProfilesManager {
private static final String CACHE_PREFIX = "profiles::";
private final Profiles profiles;
private final ProfilesDynamoDb profilesDynamoDb;
private final FaultTolerantRedisCluster cacheCluster;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private final ObjectMapper mapper;
private final Experiment migrationExperiment = new Experiment("profileMigration");
public ProfilesManager(final Profiles profiles,
final ProfilesDynamoDb profilesDynamoDb,
final FaultTolerantRedisCluster cacheCluster,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this.profiles = profiles;
public ProfilesManager(final ProfilesDynamoDb profilesDynamoDb, final FaultTolerantRedisCluster cacheCluster) {
this.profilesDynamoDb = profilesDynamoDb;
this.cacheCluster = cacheCluster;
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.mapper = SystemMapper.getMapper();
}
public void set(UUID uuid, VersionedProfile versionedProfile) {
memcacheSet(uuid, versionedProfile);
profiles.set(uuid, versionedProfile);
if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbWriteEnabled()) {
profilesDynamoDb.set(uuid, versionedProfile);
}
profilesDynamoDb.set(uuid, versionedProfile);
}
public void deleteAll(UUID uuid) {
memcacheDelete(uuid);
profiles.deleteAll(uuid);
if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbDeleteEnabled()) {
profilesDynamoDb.deleteAll(uuid);
}
profilesDynamoDb.deleteAll(uuid);
}
public Optional<VersionedProfile> get(UUID uuid, String version) {
Optional<VersionedProfile> profile = memcacheGet(uuid, version);
if (profile.isEmpty()) {
if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbReadPrimary()) {
profile = profilesDynamoDb.get(uuid, version);
} else {
profile = profiles.get(uuid, version);
if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbReadForComparisonEnabled()) {
final Optional<VersionedProfile> dynamoProfile = profilesDynamoDb.get(uuid, version);
migrationExperiment.compareSupplierResult(profile, () -> dynamoProfile);
if (profile.isEmpty() && dynamoProfile.isPresent() &&
dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isLogMismatches()) {
logger.info("Profile {}/{} absent from relational database, but present in DynamoDB", uuid, version);
}
}
}
profile = profilesDynamoDb.get(uuid, version);
profile.ifPresent(versionedProfile -> memcacheSet(uuid, versionedProfile));
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage.mappers;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.VersionedProfile;
public class VersionedProfileMapper implements RowMapper<VersionedProfile> {
@Override
public VersionedProfile map(ResultSet resultSet, StatementContext ctx) throws SQLException {
return new VersionedProfile(
resultSet.getString(Profiles.VERSION),
resultSet.getString(Profiles.NAME),
resultSet.getString(Profiles.AVATAR),
resultSet.getString(Profiles.ABOUT_EMOJI),
resultSet.getString(Profiles.ABOUT),
resultSet.getString(Profiles.PAYMENT_ADDRESS),
resultSet.getBytes(Profiles.COMMITMENT));
}
}

View File

@ -49,7 +49,6 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
@ -177,7 +176,6 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
configuration.getAccountsDynamoDbConfiguration().getScanPageSize());
PhoneNumberIdentifiers phoneNumberIdentifiers = new PhoneNumberIdentifiers(phoneNumberIdentifiersDynamoDbClient,
configuration.getPhoneNumberIdentifiersDynamoDbConfiguration().getTableName());
Profiles profiles = new Profiles(accountDatabase);
ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getProfiles().getTableName());
ReservedUsernames reservedUsernames = new ReservedUsernames(reservedUsernamesDynamoDbClient,
@ -208,8 +206,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration());
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster,
dynamicConfigurationManager);
ProfilesManager profilesManager = new ProfilesManager(profilesDynamoDb, cacheCluster);
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb,
configuration.getReportMessageDynamoDbConfiguration().getTableName(),
configuration.getReportMessageConfiguration().getReportTtl());

View File

@ -47,7 +47,6 @@ import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
@ -181,7 +180,6 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
configuration.getAccountsDynamoDbConfiguration().getScanPageSize());
PhoneNumberIdentifiers phoneNumberIdentifiers = new PhoneNumberIdentifiers(phoneNumberIdentifiersDynamoDbClient,
configuration.getPhoneNumberIdentifiersDynamoDbConfiguration().getTableName());
Profiles profiles = new Profiles(accountDatabase);
ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getProfiles().getTableName());
ReservedUsernames reservedUsernames = new ReservedUsernames(reservedUsernamesDynamoDbClient,
@ -210,8 +208,7 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration());
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster,
dynamicConfigurationManager);
ProfilesManager profilesManager = new ProfilesManager(profilesDynamoDb, cacheCluster);
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb,
configuration.getReportMessageDynamoDbConfiguration().getTableName(),
configuration.getReportMessageConfiguration().getReportTtl());

View File

@ -1,39 +0,0 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.opentable.db.postgres.embedded.LiquibasePreparer;
import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension;
import com.opentable.db.postgres.junit5.PreparedDbExtension;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
public class ProfilesPostgresTest extends ProfilesTest {
@RegisterExtension
static PreparedDbExtension ACCOUNTS_POSTGRES_EXTENSION =
EmbeddedPostgresExtension.preparedDatabase(LiquibasePreparer.forClasspathLocation("accountsdb.xml"));
private Profiles profiles;
@BeforeEach
void setUp() {
final FaultTolerantDatabase faultTolerantDatabase = new FaultTolerantDatabase("profilesTest",
Jdbi.create(ACCOUNTS_POSTGRES_EXTENSION.getTestDatabase()),
new CircuitBreakerConfiguration());
profiles = new Profiles(faultTolerantDatabase);
faultTolerantDatabase.use(jdbi -> jdbi.useHandle(handle -> handle.createUpdate("DELETE FROM profiles").execute()));
}
@Override
protected ProfilesStore getProfilesStore() {
return profiles;
}
}

View File

@ -24,11 +24,7 @@ import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicProfileMigrationConfiguration;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.VersionedProfile;
@ -36,7 +32,7 @@ import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
public class ProfilesManagerTest {
private Profiles profiles;
private ProfilesDynamoDb profiles;
private RedisAdvancedClusterCommands<String, String> commands;
private ProfilesManager profilesManager;
@ -47,22 +43,9 @@ public class ProfilesManagerTest {
commands = mock(RedisAdvancedClusterCommands.class);
final FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
profiles = mock(Profiles.class);
profiles = mock(ProfilesDynamoDb.class);
@SuppressWarnings("unchecked") final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
mock(DynamicConfigurationManager.class);
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
final DynamicProfileMigrationConfiguration profileMigrationConfiguration =
mock(DynamicProfileMigrationConfiguration.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
when(dynamicConfiguration.getProfileMigrationConfiguration()).thenReturn(profileMigrationConfiguration);
profilesManager = new ProfilesManager(profiles,
mock(ProfilesDynamoDb.class),
cacheCluster,
dynamicConfigurationManager);
profilesManager = new ProfilesManager(profiles, cacheCluster);
}
@Test