Migrate profiles from a relational database to DynamoDB

This commit is contained in:
Jon Chambers 2021-11-24 14:48:41 -05:00 committed by GitHub
parent 3bb8e5bb00
commit 9e7010f185
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1021 additions and 114 deletions

View File

@ -22,6 +22,8 @@ dynamoDbTables:
expiration: P30D # Duration of time until rows expire
subscriptions:
tableName: Example_Subscriptions
profiles:
tableName: Example_Profiles
twilio: # Twilio gateway configuration
accountId: unset

View File

@ -189,6 +189,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.NonNormalizedAccountCrawlerListener;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.PushChallengeDynamoDb;
@ -217,6 +218,7 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.MigrateProfilesCommand;
import org.whispersystems.textsecuregcm.workers.ReserveUsernameCommand;
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask;
@ -244,6 +246,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new ServerVersionCommand());
bootstrap.addCommand(new CheckDynamicConfigurationCommand());
bootstrap.addCommand(new SetUserDiscoverabilityCommand());
bootstrap.addCommand(new MigrateProfilesCommand());
bootstrap.addCommand(new ReserveUsernameCommand());
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("accountdb", "accountsdb.xml") {
@ -325,6 +328,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient dynamoDbClient = DynamoDbFromConfig.client(
config.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient messageDynamoDb = DynamoDbFromConfig.client(config.getMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
@ -384,6 +391,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ReservedUsernames reservedUsernames = new ReservedUsernames(reservedUsernamesDynamoDbClient,
config.getReservedUsernamesDynamoDbConfiguration().getTableName());
Profiles profiles = new Profiles(accountDatabase);
ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
config.getDynamoDbTables().getProfiles().getTableName());
Keys keys = new Keys(preKeyDynamoDb, config.getKeysDynamoDbConfiguration().getTableName());
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb,
config.getMessageDynamoDbConfiguration().getTableName(),
@ -426,6 +435,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
minThreads(availableProcessors). // mostly this is IO bound so tying to number of processors is tenuous at best
allowCoreThreadTimeOut(true).
build();
ExecutorService profileMigrationExperimentExecutor = environment.lifecycle()
.executorService(name(getClass(), "profileMigrationExperiment-%d")).minThreads(8).maxThreads(8).build();
StripeManager stripeManager = new StripeManager(config.getStripe().getApiKey(), stripeExecutor,
config.getStripe().getIdempotencyKeyGenerator(), config.getStripe().getBoostDescription());
@ -464,7 +475,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);
StoredVerificationCodeManager pendingDevicesManager = new StoredVerificationCodeManager(pendingDevices);
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster,
dynamicConfigurationManager, profileMigrationExperimentExecutor);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, Metrics.globalRegistry, config.getReportMessageConfiguration().getCounterTtl());

View File

@ -49,15 +49,18 @@ public class DynamoDbTables {
private final IssuedReceiptsTableConfiguration issuedReceipts;
private final TableWithExpiration redeemedReceipts;
private final Table subscriptions;
private final Table profiles;
@JsonCreator
public DynamoDbTables(
@JsonProperty("issuedReceipts") final IssuedReceiptsTableConfiguration issuedReceipts,
@JsonProperty("redeemedReceipts") final TableWithExpiration redeemedReceipts,
@JsonProperty("subscriptions") final Table subscriptions) {
@JsonProperty("subscriptions") final Table subscriptions,
@JsonProperty("profiles") final Table profiles) {
this.issuedReceipts = issuedReceipts;
this.redeemedReceipts = redeemedReceipts;
this.subscriptions = subscriptions;
this.profiles = profiles;
}
@Valid
@ -77,4 +80,10 @@ public class DynamoDbTables {
public Table getSubscriptions() {
return subscriptions;
}
@Valid
@NotNull
public Table getProfiles() {
return profiles;
}
}

View File

@ -55,6 +55,10 @@ public class DynamicConfiguration {
@Valid
private DynamicPushLatencyConfiguration pushLatency = new DynamicPushLatencyConfiguration(Collections.emptyMap());
@JsonProperty
@Valid
private DynamicProfileMigrationConfiguration profileMigration = new DynamicProfileMigrationConfiguration();
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
@ -109,4 +113,8 @@ public class DynamicConfiguration {
public DynamicPushLatencyConfiguration getPushLatencyConfiguration() {
return pushLatency;
}
public DynamicProfileMigrationConfiguration getProfileMigrationConfiguration() {
return profileMigration;
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DynamicProfileMigrationConfiguration {
@JsonProperty
private boolean dynamoDbDeleteEnabled = false;
@JsonProperty
private boolean dynamoDbWriteEnabled = false;
@JsonProperty
private boolean dynamoDbReadForComparisonEnabled = false;
@JsonProperty
private boolean dynamoDbReadPrimary = false;
public boolean isDynamoDbDeleteEnabled() {
return dynamoDbDeleteEnabled;
}
public boolean isDynamoDbWriteEnabled() {
return dynamoDbWriteEnabled;
}
public boolean isDynamoDbReadForComparisonEnabled() {
return dynamoDbReadForComparisonEnabled;
}
public boolean isDynamoDbReadPrimary() {
return dynamoDbReadPrimary;
}
}

View File

@ -22,7 +22,7 @@ import org.whispersystems.textsecuregcm.storage.mappers.VersionedProfileMapper;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
public class Profiles {
public class Profiles implements ProfilesStore {
public static final String ID = "id";
public static final String UID = "uuid";
@ -48,6 +48,7 @@ public class Profiles {
this.database.getDatabase().registerRowMapper(new VersionedProfileMapper());
}
@Override
public void set(UUID uuid, VersionedProfile profile) {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = setTimer.time()) {
@ -82,6 +83,7 @@ public class Profiles {
}));
}
@Override
public Optional<VersionedProfile> get(UUID uuid, String version) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getTimer.time()) {
@ -94,13 +96,49 @@ public class Profiles {
}));
}
@Override
public void deleteAll(UUID uuid) {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = deleteTimer.time()) {
handle.createUpdate("DELETE FROM profiles WHERE " + UID + " = :uuid")
handle.createUpdate("UPDATE profiles SET " + DELETED + " = TRUE WHERE " + UID + " = :uuid")
.bind("uuid", uuid)
.execute();
}
}));
}
public ResultIterator<Pair<UUID, VersionedProfile>> getAll(final int fetchSize) {
return database.with(jdbi -> jdbi.withHandle(handle -> handle.inTransaction(transactionHandle ->
transactionHandle.createQuery("SELECT * FROM profiles WHERE " + DELETED + "= FALSE")
.setFetchSize(fetchSize)
.map((resultSet, ctx) -> {
final UUID uuid = UUID.fromString(resultSet.getString(UID));
final VersionedProfile profile = new VersionedProfile(
resultSet.getString(Profiles.VERSION),
resultSet.getString(Profiles.NAME),
resultSet.getString(Profiles.AVATAR),
resultSet.getString(Profiles.ABOUT_EMOJI),
resultSet.getString(Profiles.ABOUT),
resultSet.getString(Profiles.PAYMENT_ADDRESS),
resultSet.getBytes(Profiles.COMMITMENT));
return new Pair<>(uuid, profile);
})
.iterator())));
}
public ResultIterator<Pair<UUID, String>> getDeletedProfiles(final int fetchSize) {
return database.with(jdbi -> jdbi.withHandle(handle -> handle.inTransaction(transactionHandle ->
transactionHandle.createQuery("SELECT " + UID + ", " + VERSION + " FROM profiles WHERE " + DELETED + " = TRUE")
.setFetchSize(fetchSize)
.map((rs, ctx) -> new Pair<>(UUID.fromString(rs.getString(UID)), rs.getString(VERSION)))
.iterator())));
}
@VisibleForTesting
void purgeDeletedProfiles() {
database.use(jdbi -> jdbi.useHandle(handle ->
handle.createUpdate("DELETE FROM profiles WHERE " + DELETED + " = TRUE")
.execute()));
}
}

View File

@ -0,0 +1,296 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.paginators.QueryIterable;
public class ProfilesDynamoDb implements ProfilesStore {
private final DynamoDbClient dynamoDbClient;
private final DynamoDbAsyncClient dynamoDbAsyncClient;
private final String tableName;
// UUID of the account that owns this profile; byte array
@VisibleForTesting
static final String KEY_ACCOUNT_UUID = "U";
// Version of this profile; string
@VisibleForTesting
static final String ATTR_VERSION = "V";
// User's name; string
private static final String ATTR_NAME = "N";
// Avatar path/filename; string
private static final String ATTR_AVATAR = "A";
// Bio/about text; string
private static final String ATTR_ABOUT = "B";
// Bio/about emoji; string
private static final String ATTR_EMOJI = "E";
// Payment address; string
private static final String ATTR_PAYMENT_ADDRESS = "P";
// Commitment; byte array
private static final String ATTR_COMMITMENT = "C";
private static final Map<String, String> UPDATE_EXPRESSION_ATTRIBUTE_NAMES = Map.of(
"#commitment", ATTR_COMMITMENT,
"#name", ATTR_NAME,
"#avatar", ATTR_AVATAR,
"#about", ATTR_ABOUT,
"#aboutEmoji", ATTR_EMOJI,
"#paymentAddress", ATTR_PAYMENT_ADDRESS);
private static final Timer SET_PROFILES_TIMER = Metrics.timer(name(ProfilesDynamoDb.class, "set"));
private static final Timer GET_PROFILE_TIMER = Metrics.timer(name(ProfilesDynamoDb.class, "get"));
private static final Timer DELETE_PROFILES_TIMER = Metrics.timer(name(ProfilesDynamoDb.class, "delete"));
private static final Logger log = LoggerFactory.getLogger(ProfilesDynamoDb.class);
public ProfilesDynamoDb(final DynamoDbClient dynamoDbClient,
final DynamoDbAsyncClient dynamoDbAsyncClient,
final String tableName) {
this.dynamoDbClient = dynamoDbClient;
this.dynamoDbAsyncClient = dynamoDbAsyncClient;
this.tableName = tableName;
}
@Override
public void set(final UUID uuid, final VersionedProfile profile) {
SET_PROFILES_TIMER.record(() -> {
dynamoDbClient.updateItem(UpdateItemRequest.builder()
.tableName(tableName)
.key(buildPrimaryKey(uuid, profile.getVersion()))
.updateExpression(buildUpdateExpression(profile))
.expressionAttributeNames(UPDATE_EXPRESSION_ATTRIBUTE_NAMES)
.expressionAttributeValues(buildUpdateExpressionAttributeValues(profile))
.build());
});
}
private static Map<String, AttributeValue> buildPrimaryKey(final UUID uuid, final String version) {
return Map.of(
KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
ATTR_VERSION, AttributeValues.fromString(version));
}
@VisibleForTesting
static String buildUpdateExpression(final VersionedProfile profile) {
final List<String> updatedAttributes = new ArrayList<>(5);
final List<String> deletedAttributes = new ArrayList<>(5);
if (StringUtils.isNotBlank(profile.getName())) {
updatedAttributes.add("name");
} else {
deletedAttributes.add("name");
}
if (StringUtils.isNotBlank(profile.getAvatar())) {
updatedAttributes.add("avatar");
} else {
deletedAttributes.add("avatar");
}
if (StringUtils.isNotBlank(profile.getAbout())) {
updatedAttributes.add("about");
} else {
deletedAttributes.add("about");
}
if (StringUtils.isNotBlank(profile.getAboutEmoji())) {
updatedAttributes.add("aboutEmoji");
} else {
deletedAttributes.add("aboutEmoji");
}
if (StringUtils.isNotBlank(profile.getPaymentAddress())) {
updatedAttributes.add("paymentAddress");
} else {
deletedAttributes.add("paymentAddress");
}
final StringBuilder updateExpressionBuilder = new StringBuilder(
"SET #commitment = if_not_exists(#commitment, :commitment)");
if (!updatedAttributes.isEmpty()) {
updatedAttributes.forEach(token -> updateExpressionBuilder
.append(", #")
.append(token)
.append(" = :")
.append(token));
}
if (!deletedAttributes.isEmpty()) {
updateExpressionBuilder.append(" REMOVE ");
updateExpressionBuilder.append(deletedAttributes.stream()
.map(token -> "#" + token)
.collect(Collectors.joining(", ")));
}
return updateExpressionBuilder.toString();
}
@VisibleForTesting
static Map<String, AttributeValue> buildUpdateExpressionAttributeValues(final VersionedProfile profile) {
final Map<String, AttributeValue> expressionValues = new HashMap<>();
expressionValues.put(":commitment", AttributeValues.fromByteArray(profile.getCommitment()));
if (StringUtils.isNotBlank(profile.getName())) {
expressionValues.put(":name", AttributeValues.fromString(profile.getName()));
}
if (StringUtils.isNotBlank(profile.getAvatar())) {
expressionValues.put(":avatar", AttributeValues.fromString(profile.getAvatar()));
}
if (StringUtils.isNotBlank(profile.getAbout())) {
expressionValues.put(":about", AttributeValues.fromString(profile.getAbout()));
}
if (StringUtils.isNotBlank(profile.getAboutEmoji())) {
expressionValues.put(":aboutEmoji", AttributeValues.fromString(profile.getAboutEmoji()));
}
if (StringUtils.isNotBlank(profile.getPaymentAddress())) {
expressionValues.put(":paymentAddress", AttributeValues.fromString(profile.getPaymentAddress()));
}
return expressionValues;
}
@Override
public Optional<VersionedProfile> get(final UUID uuid, final String version) {
return GET_PROFILE_TIMER.record(() -> {
final GetItemResponse response = dynamoDbClient.getItem(GetItemRequest.builder()
.tableName(tableName)
.key(buildPrimaryKey(uuid, version))
.consistentRead(true)
.build());
return response.hasItem() ? Optional.of(fromItem(response.item())) : Optional.empty();
});
}
private static VersionedProfile fromItem(final Map<String, AttributeValue> item) {
return new VersionedProfile(
AttributeValues.getString(item, ATTR_VERSION, null),
AttributeValues.getString(item, ATTR_NAME, null),
AttributeValues.getString(item, ATTR_AVATAR, null),
AttributeValues.getString(item, ATTR_EMOJI, null),
AttributeValues.getString(item, ATTR_ABOUT, null),
AttributeValues.getString(item, ATTR_PAYMENT_ADDRESS, null),
AttributeValues.getByteArray(item, ATTR_COMMITMENT, null));
}
@Override
public void deleteAll(final UUID uuid) {
DELETE_PROFILES_TIMER.record(() -> {
final AttributeValue uuidAttributeValue = AttributeValues.fromUUID(uuid);
final QueryIterable queryIterable = dynamoDbClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#uuid = :uuid")
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID))
.expressionAttributeValues(Map.of(":uuid", uuidAttributeValue))
.projectionExpression(ATTR_VERSION)
.consistentRead(true)
.build());
CompletableFuture.allOf(queryIterable.items().stream()
.map(item -> dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(
KEY_ACCOUNT_UUID, uuidAttributeValue,
ATTR_VERSION, item.get(ATTR_VERSION)))
.build()))
.toArray(CompletableFuture[]::new)).join();
});
}
public CompletableFuture<Boolean> migrate(final UUID uuid, final VersionedProfile profile) {
final Map<String, AttributeValue> item = new HashMap<>();
item.put(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid));
item.put(ATTR_VERSION, AttributeValues.fromString(profile.getVersion()));
item.put(ATTR_COMMITMENT, AttributeValues.fromByteArray(profile.getCommitment()));
if (profile.getName() != null) {
item.put(ATTR_NAME, AttributeValues.fromString(profile.getName()));
}
if (profile.getAvatar() != null) {
item.put(ATTR_AVATAR, AttributeValues.fromString(profile.getAvatar()));
}
if (profile.getAboutEmoji() != null) {
item.put(ATTR_EMOJI, AttributeValues.fromString(profile.getAboutEmoji()));
}
if (profile.getAbout() != null) {
item.put(ATTR_ABOUT, AttributeValues.fromString(profile.getAbout()));
}
if (profile.getPaymentAddress() != null) {
item.put(ATTR_PAYMENT_ADDRESS, AttributeValues.fromString(profile.getPaymentAddress()));
}
return dynamoDbAsyncClient.putItem(PutItemRequest.builder()
.tableName(tableName)
.item(item)
.conditionExpression("attribute_not_exists(#uuid)")
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID))
.build())
.handle((response, cause) -> {
if (cause == null) {
return true;
} else {
if (!(cause instanceof ConditionalCheckFailedException)) {
log.warn("Unexpected error migrating profiles {}/{}", uuid, profile.getVersion(), cause);
}
return false;
}
});
}
public CompletableFuture<?> delete(final UUID uuid, final String version) {
return dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(buildPrimaryKey(uuid, version))
.build());
}
}

View File

@ -10,12 +10,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.RedisException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executor;
public class ProfilesManager {
@ -23,31 +26,60 @@ public class ProfilesManager {
private static final String CACHE_PREFIX = "profiles::";
private final Profiles profiles;
private final Profiles profiles;
private final ProfilesDynamoDb profilesDynamoDb;
private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private final ObjectMapper mapper;
public ProfilesManager(Profiles profiles, FaultTolerantRedisCluster cacheCluster) {
this.profiles = profiles;
this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.getMapper();
private final Executor migrationExperimentExecutor;
private final Experiment migrationExperiment = new Experiment("profileMigration");
public ProfilesManager(final Profiles profiles,
final ProfilesDynamoDb profilesDynamoDb,
final FaultTolerantRedisCluster cacheCluster,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final Executor migrationExperimentExecutor) {
this.profiles = profiles;
this.profilesDynamoDb = profilesDynamoDb;
this.cacheCluster = cacheCluster;
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.migrationExperimentExecutor = migrationExperimentExecutor;
this.mapper = SystemMapper.getMapper();
}
public void set(UUID uuid, VersionedProfile versionedProfile) {
memcacheSet(uuid, versionedProfile);
profiles.set(uuid, versionedProfile);
if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbWriteEnabled()) {
profilesDynamoDb.set(uuid, versionedProfile);
}
}
public void deleteAll(UUID uuid) {
memcacheDelete(uuid);
profiles.deleteAll(uuid);
if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbDeleteEnabled()) {
profilesDynamoDb.deleteAll(uuid);
}
}
public Optional<VersionedProfile> get(UUID uuid, String version) {
Optional<VersionedProfile> profile = memcacheGet(uuid, version);
if (!profile.isPresent()) {
profile = profiles.get(uuid, version);
if (profile.isEmpty()) {
if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbReadPrimary()) {
profile = profilesDynamoDb.get(uuid, version);
} else {
profile = profiles.get(uuid, version);
if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbReadForComparisonEnabled()) {
migrationExperiment.compareSupplierResultAsync(profile, () -> profilesDynamoDb.get(uuid, version), migrationExperimentExecutor);
}
}
profile.ifPresent(versionedProfile -> memcacheSet(uuid, versionedProfile));
}

View File

@ -0,0 +1,18 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import java.util.Optional;
import java.util.UUID;
public interface ProfilesStore {
void set(UUID uuid, VersionedProfile profile);
Optional<VersionedProfile> get(UUID uuid, String version);
void deleteAll(UUID uuid);
}

View File

@ -5,41 +5,36 @@
package org.whispersystems.textsecuregcm.storage;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.whispersystems.textsecuregcm.util.ByteArrayAdapter;
import java.util.Arrays;
import java.util.Objects;
public class VersionedProfile {
@JsonProperty
private String version;
private final String version;
private final String name;
private final String avatar;
private final String aboutEmoji;
private final String about;
private final String paymentAddress;
@JsonProperty
private String name;
@JsonProperty
private String avatar;
@JsonProperty
private String aboutEmoji;
@JsonProperty
private String about;
@JsonProperty
private String paymentAddress;
@JsonProperty
@JsonSerialize(using = ByteArrayAdapter.Serializing.class)
@JsonDeserialize(using = ByteArrayAdapter.Deserializing.class)
private byte[] commitment;
public VersionedProfile() {}
@JsonCreator
public VersionedProfile(
String version, String name, String avatar, String aboutEmoji, String about, String paymentAddress,
byte[] commitment) {
@JsonProperty("version") final String version,
@JsonProperty("name") final String name,
@JsonProperty("avatar") final String avatar,
@JsonProperty("aboutEmoji") final String aboutEmoji,
@JsonProperty("about") final String about,
@JsonProperty("paymentAddress") final String paymentAddress,
@JsonProperty("commitment") final byte[] commitment) {
this.version = version;
this.name = name;
this.avatar = avatar;
@ -76,4 +71,23 @@ public class VersionedProfile {
public byte[] getCommitment() {
return commitment;
}
@Override
public boolean equals(final Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
final VersionedProfile that = (VersionedProfile) o;
return Objects.equals(version, that.version) && Objects.equals(name, that.name) && Objects.equals(avatar,
that.avatar) && Objects.equals(aboutEmoji, that.aboutEmoji) && Objects.equals(about, that.about)
&& Objects.equals(paymentAddress, that.paymentAddress) && Arrays.equals(commitment, that.commitment);
}
@Override
public int hashCode() {
int result = Objects.hash(version, name, avatar, aboutEmoji, about, paymentAddress);
result = 31 * result + Arrays.hashCode(commitment);
return result;
}
}

View File

@ -21,6 +21,17 @@ public class DynamoDbFromConfig {
.build();
}
public static DynamoDbClient client(DynamoDbClientConfiguration config, AwsCredentialsProvider credentialsProvider) {
return DynamoDbClient.builder()
.region(Region.of(config.getRegion()))
.credentialsProvider(credentialsProvider)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.apiCallTimeout(config.getClientExecutionTimeout())
.apiCallAttemptTimeout(config.getClientRequestTimeout())
.build())
.build();
}
public static DynamoDbAsyncClient asyncClient(
DynamoDbClientConfiguration config,
AwsCredentialsProvider credentialsProvider) {

View File

@ -50,6 +50,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
@ -59,6 +60,7 @@ import org.whispersystems.textsecuregcm.storage.Usernames;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
import org.whispersystems.textsecuregcm.storage.VerificationCodeStore;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfiguration> {
@ -144,6 +146,14 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
DynamoDbFromConfig.client(configuration.getReservedUsernamesDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(
configuration.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient dynamoDbClient = DynamoDbFromConfig.client(
configuration.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
AmazonDynamoDB deletedAccountsLockDynamoDbClient = AmazonDynamoDBClientBuilder.standard()
.withRegion(configuration.getDeletedAccountsLockDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(
@ -170,6 +180,8 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
configuration.getPhoneNumberIdentifiersDynamoDbConfiguration().getTableName());
Usernames usernames = new Usernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getProfiles().getTableName());
ReservedUsernames reservedUsernames = new ReservedUsernames(reservedUsernamesDynamoDbClient,
configuration.getReservedUsernamesDynamoDbConfiguration().getTableName());
Keys keys = new Keys(preKeysDynamoDb,
@ -199,7 +211,8 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration());
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster,
dynamicConfigurationManager, Executors.newSingleThreadExecutor());
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb,
configuration.getReportMessageDynamoDbConfiguration().getTableName(),
configuration.getReportMessageConfiguration().getReportTtl());

View File

@ -0,0 +1,140 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.result.ResultIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.storage.VersionedProfile;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import org.whispersystems.textsecuregcm.util.Pair;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
public class MigrateProfilesCommand extends EnvironmentCommand<WhisperServerConfiguration> {
private static final Logger log = LoggerFactory.getLogger(MigrateProfilesCommand.class);
public MigrateProfilesCommand() {
super(new Application<>() {
@Override
public void run(WhisperServerConfiguration configuration, Environment environment) {
}
}, "migrate-profiles", "Migrate versioned profiles from Postgres to DynamoDB");
}
@Override
public void configure(Subparser subparser) {
super.configure(subparser);
subparser.addArgument("-s", "--fetch-size")
.dest("fetchSize")
.type(Integer.class)
.required(false)
.setDefault(512)
.help("The number of profiles to fetch from Postgres at once");
subparser.addArgument("-c", "--concurrency")
.dest("concurrency")
.type(Integer.class)
.required(false)
.setDefault(64)
.help("The maximum number of concurrent DynamoDB requests");
}
@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception {
JdbiFactory jdbiFactory = new JdbiFactory();
Jdbi accountJdbi = jdbiFactory.build(environment, configuration.getAccountsDatabaseConfiguration(), "accountdb");
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi,
configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
DynamoDbClient dynamoDbClient = DynamoDbFromConfig.client(
configuration.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(
configuration.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
Profiles profiles = new Profiles(accountDatabase);
ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getProfiles().getTableName());
final int fetchSize = namespace.getInt("fetchSize");
final Semaphore semaphore = new Semaphore(namespace.getInt("concurrency"));
log.info("Beginning migration");
try (final ResultIterator<Pair<UUID, VersionedProfile>> results = profiles.getAll(fetchSize)) {
final AtomicInteger profilesProcessed = new AtomicInteger(0);
final AtomicInteger profilesMigrated = new AtomicInteger(0);
while (results.hasNext()) {
semaphore.acquire();
final Pair<UUID, VersionedProfile> uuidAndProfile = results.next();
profilesDynamoDb.migrate(uuidAndProfile.first(), uuidAndProfile.second())
.whenComplete((migrated, cause) -> {
semaphore.release();
final int processed = profilesProcessed.incrementAndGet();
if (cause == null) {
if (migrated) {
profilesMigrated.incrementAndGet();
}
}
if (processed % 10_000 == 0) {
log.info("Processed {} profiles ({} migrated)", processed, profilesMigrated.get());
}
});
}
log.info("Migration completed; processed {} profiles and migrated {}", profilesProcessed.get(), profilesMigrated.get());
}
log.info("Removing profiles that were deleted during migration");
try (final ResultIterator<Pair<UUID, String>> results = profiles.getDeletedProfiles(fetchSize)) {
final AtomicInteger profilesDeleted = new AtomicInteger(0);
while (results.hasNext()) {
semaphore.acquire();
final Pair<UUID, String> uuidAndVersion = results.next();
profilesDynamoDb.delete(uuidAndVersion.first(), uuidAndVersion.second())
.whenComplete((response, cause) -> {
semaphore.release();
if (profilesDeleted.incrementAndGet() % 1_000 == 0) {
log.info("Attempted to remove {} profiles", profilesDeleted.get());
}
});
}
log.info("Removal of deleted profiles complete; attempted to remove {} profiles", profilesDeleted.get());
}
}
}

View File

@ -48,6 +48,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
@ -57,6 +58,7 @@ import org.whispersystems.textsecuregcm.storage.Usernames;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
import org.whispersystems.textsecuregcm.storage.VerificationCodeStore;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperServerConfiguration> {
@ -148,6 +150,14 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
.client(configuration.getPendingAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(
configuration.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient dynamoDbClient = DynamoDbFromConfig.client(
configuration.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
AmazonDynamoDB deletedAccountsLockDynamoDbClient = AmazonDynamoDBClientBuilder.standard()
.withRegion(configuration.getDeletedAccountsLockDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(
@ -174,6 +184,8 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
configuration.getPhoneNumberIdentifiersDynamoDbConfiguration().getTableName());
Usernames usernames = new Usernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getProfiles().getTableName());
ReservedUsernames reservedUsernames = new ReservedUsernames(reservedUsernamesDynamoDbClient,
configuration.getReservedUsernamesDynamoDbConfiguration().getTableName());
Keys keys = new Keys(preKeysDynamoDb,
@ -201,7 +213,8 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
DirectoryQueue directoryQueue = new DirectoryQueue(
configuration.getDirectoryConfiguration().getSqsConfiguration());
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, profilesDynamoDb, cacheCluster,
dynamicConfigurationManager, Executors.newSingleThreadExecutor());
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb,
configuration.getReportMessageDynamoDbConfiguration().getTableName(),
configuration.getReportMessageConfiguration().getReportTtl());

View File

@ -0,0 +1,193 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
class ProfilesDynamoDbTest extends ProfilesTest {
private static final String PROFILES_TABLE_NAME = "profiles_test";
@RegisterExtension
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
.tableName(PROFILES_TABLE_NAME)
.hashKey(ProfilesDynamoDb.KEY_ACCOUNT_UUID)
.rangeKey(ProfilesDynamoDb.ATTR_VERSION)
.attributeDefinition(AttributeDefinition.builder()
.attributeName(ProfilesDynamoDb.KEY_ACCOUNT_UUID)
.attributeType(ScalarAttributeType.B)
.build())
.attributeDefinition(AttributeDefinition.builder()
.attributeName(ProfilesDynamoDb.ATTR_VERSION)
.attributeType(ScalarAttributeType.S)
.build())
.build();
private ProfilesDynamoDb profiles;
@BeforeEach
void setUp() {
profiles = new ProfilesDynamoDb(dynamoDbExtension.getDynamoDbClient(),
dynamoDbExtension.getDynamoDbAsyncClient(),
PROFILES_TABLE_NAME);
}
@Override
protected ProfilesStore getProfilesStore() {
return profiles;
}
@ParameterizedTest
@MethodSource
void buildUpdateExpression(final VersionedProfile profile, final String expectedUpdateExpression) {
assertEquals(expectedUpdateExpression, ProfilesDynamoDb.buildUpdateExpression(profile));
}
private static Stream<Arguments> buildUpdateExpression() {
final byte[] commitment = "commitment".getBytes(StandardCharsets.UTF_8);
return Stream.of(
Arguments.of(
new VersionedProfile("version", "name", "avatar", "emoji", "about", "paymentAddress", commitment),
"SET #commitment = if_not_exists(#commitment, :commitment), #name = :name, #avatar = :avatar, #about = :about, #aboutEmoji = :aboutEmoji, #paymentAddress = :paymentAddress"),
Arguments.of(
new VersionedProfile("version", "name", "avatar", "emoji", "about", null, commitment),
"SET #commitment = if_not_exists(#commitment, :commitment), #name = :name, #avatar = :avatar, #about = :about, #aboutEmoji = :aboutEmoji REMOVE #paymentAddress"),
Arguments.of(
new VersionedProfile("version", "name", "avatar", "emoji", null, null, commitment),
"SET #commitment = if_not_exists(#commitment, :commitment), #name = :name, #avatar = :avatar, #aboutEmoji = :aboutEmoji REMOVE #about, #paymentAddress"),
Arguments.of(
new VersionedProfile("version", "name", "avatar", null, null, null, commitment),
"SET #commitment = if_not_exists(#commitment, :commitment), #name = :name, #avatar = :avatar REMOVE #about, #aboutEmoji, #paymentAddress"),
Arguments.of(
new VersionedProfile("version", "name", null, null, null, null, commitment),
"SET #commitment = if_not_exists(#commitment, :commitment), #name = :name REMOVE #avatar, #about, #aboutEmoji, #paymentAddress"),
Arguments.of(
new VersionedProfile("version", null, null, null, null, null, commitment),
"SET #commitment = if_not_exists(#commitment, :commitment) REMOVE #name, #avatar, #about, #aboutEmoji, #paymentAddress")
);
}
@ParameterizedTest
@MethodSource
void buildUpdateExpressionAttributeValues(final VersionedProfile profile, final Map<String, AttributeValue> expectedAttributeValues) {
assertEquals(expectedAttributeValues, ProfilesDynamoDb.buildUpdateExpressionAttributeValues(profile));
}
private static Stream<Arguments> buildUpdateExpressionAttributeValues() {
final byte[] commitment = "commitment".getBytes(StandardCharsets.UTF_8);
return Stream.of(
Arguments.of(
new VersionedProfile("version", "name", "avatar", "emoji", "about", "paymentAddress", commitment),
Map.of(
":commitment", AttributeValues.fromByteArray(commitment),
":name", AttributeValues.fromString("name"),
":avatar", AttributeValues.fromString("avatar"),
":aboutEmoji", AttributeValues.fromString("emoji"),
":about", AttributeValues.fromString("about"),
":paymentAddress", AttributeValues.fromString("paymentAddress"))),
Arguments.of(
new VersionedProfile("version", "name", "avatar", "emoji", "about", null, commitment),
Map.of(
":commitment", AttributeValues.fromByteArray(commitment),
":name", AttributeValues.fromString("name"),
":avatar", AttributeValues.fromString("avatar"),
":aboutEmoji", AttributeValues.fromString("emoji"),
":about", AttributeValues.fromString("about"))),
Arguments.of(
new VersionedProfile("version", "name", "avatar", "emoji", null, null, commitment),
Map.of(
":commitment", AttributeValues.fromByteArray(commitment),
":name", AttributeValues.fromString("name"),
":avatar", AttributeValues.fromString("avatar"),
":aboutEmoji", AttributeValues.fromString("emoji"))),
Arguments.of(
new VersionedProfile("version", "name", "avatar", null, null, null, commitment),
Map.of(
":commitment", AttributeValues.fromByteArray(commitment),
":name", AttributeValues.fromString("name"),
":avatar", AttributeValues.fromString("avatar"))),
Arguments.of(
new VersionedProfile("version", "name", null, null, null, null, commitment),
Map.of(
":commitment", AttributeValues.fromByteArray(commitment),
":name", AttributeValues.fromString("name"))),
Arguments.of(
new VersionedProfile("version", null, null, null, null, null, commitment),
Map.of(":commitment", AttributeValues.fromByteArray(commitment)))
);
}
@ParameterizedTest
@MethodSource
void migrate(final VersionedProfile profile) {
final UUID uuid = UUID.randomUUID();
assertTrue(assertDoesNotThrow(() -> profiles.migrate(uuid, profile).join()));
assertFalse(assertDoesNotThrow(() -> profiles.migrate(uuid, profile).join()));
assertEquals(Optional.of(profile), profiles.get(uuid, profile.getVersion()));
}
private static Stream<Arguments> migrate() {
return Stream.of(
Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))),
Arguments.of(new VersionedProfile("version", null, "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))),
Arguments.of(new VersionedProfile("version", "name", null, "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))),
Arguments.of(new VersionedProfile("version", "name", "avatar", null, "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))),
Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", null, "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))),
Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", "about", null, "commitment".getBytes(StandardCharsets.UTF_8)))
);
}
@Test
void delete() {
final UUID uuid = UUID.randomUUID();
final VersionedProfile firstProfile =
new VersionedProfile("version1", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8));
final VersionedProfile secondProfile =
new VersionedProfile("version2", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8));
profiles.set(uuid, firstProfile);
profiles.set(uuid, secondProfile);
profiles.delete(uuid, firstProfile.getVersion()).join();
assertTrue(profiles.get(uuid, firstProfile.getVersion()).isEmpty());
assertTrue(profiles.get(uuid, secondProfile.getVersion()).isPresent());
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.google.common.collect.ImmutableList;
import com.opentable.db.postgres.embedded.LiquibasePreparer;
import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension;
import com.opentable.db.postgres.junit5.PreparedDbExtension;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.result.ResultIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.util.Pair;
import java.util.List;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ProfilesPostgresTest extends ProfilesTest {
@RegisterExtension
static PreparedDbExtension ACCOUNTS_POSTGRES_EXTENSION =
EmbeddedPostgresExtension.preparedDatabase(LiquibasePreparer.forClasspathLocation("accountsdb.xml"));
private Profiles profiles;
@BeforeEach
void setUp() {
final FaultTolerantDatabase faultTolerantDatabase = new FaultTolerantDatabase("profilesTest",
Jdbi.create(ACCOUNTS_POSTGRES_EXTENSION.getTestDatabase()),
new CircuitBreakerConfiguration());
profiles = new Profiles(faultTolerantDatabase);
}
@Override
protected ProfilesStore getProfilesStore() {
return profiles;
}
@Test
void testGetDeletedProfiles() {
profiles.purgeDeletedProfiles();
UUID uuid = UUID.randomUUID();
VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null,
null, "aDigest".getBytes());
VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", null, null, null, "boof".getBytes());
profiles.set(uuid, profileOne);
profiles.set(UUID.randomUUID(), profileTwo);
profiles.deleteAll(uuid);
try (final ResultIterator<Pair<UUID, String>> resultIterator = profiles.getDeletedProfiles(10)) {
assertEquals(List.of(new Pair<>(uuid, profileOne.getVersion())), ImmutableList.copyOf(resultIterator));
}
}
}

View File

@ -1,47 +1,27 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.tests.storage;
package org.whispersystems.textsecuregcm.storage;
import com.opentable.db.postgres.embedded.LiquibasePreparer;
import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
import com.opentable.db.postgres.junit.PreparedDbRule;
import org.jdbi.v3.core.Jdbi;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.VersionedProfile;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import java.util.Optional;
import java.util.UUID;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
public abstract class ProfilesTest {
public class ProfilesTest {
@Rule
public PreparedDbRule db = EmbeddedPostgresRules.preparedDatabase(LiquibasePreparer.forClasspathLocation("accountsdb.xml"));
private Profiles profiles;
@Before
public void setupProfilesDao() {
FaultTolerantDatabase faultTolerantDatabase = new FaultTolerantDatabase("profilesTest",
Jdbi.create(db.getTestDatabase()),
new CircuitBreakerConfiguration());
this.profiles = new Profiles(faultTolerantDatabase);
}
protected abstract ProfilesStore getProfilesStore();
@Test
public void testSetGet() {
UUID uuid = UUID.randomUUID();
VersionedProfile profile = new VersionedProfile("123", "foo", "avatarLocation", "emoji", "the very model of a modern major general",
void testSetGet() {
ProfilesStore profiles = getProfilesStore();
UUID uuid = UUID.randomUUID();
VersionedProfile profile = new VersionedProfile("123", "foo", "avatarLocation", "emoji",
"the very model of a modern major general",
null, "acommitment".getBytes());
profiles.set(uuid, profile);
@ -56,8 +36,9 @@ public class ProfilesTest {
}
@Test
public void testSetGetNullOptionalFields() {
UUID uuid = UUID.randomUUID();
void testSetGetNullOptionalFields() {
ProfilesStore profiles = getProfilesStore();
UUID uuid = UUID.randomUUID();
VersionedProfile profile = new VersionedProfile("123", "foo", null, null, null, null,
"acommitment".getBytes());
profiles.set(uuid, profile);
@ -73,10 +54,11 @@ public class ProfilesTest {
}
@Test
public void testSetReplace() {
UUID uuid = UUID.randomUUID();
void testSetReplace() {
ProfilesStore profiles = getProfilesStore();
UUID uuid = UUID.randomUUID();
VersionedProfile profile = new VersionedProfile("123", "foo", "avatarLocation", null, null,
null, "acommitment".getBytes());
"paymentAddress", "acommitment".getBytes());
profiles.set(uuid, profile);
Optional<VersionedProfile> retrieved = profiles.get(uuid, "123");
@ -96,20 +78,22 @@ public class ProfilesTest {
assertThat(retrieved.isPresent()).isTrue();
assertThat(retrieved.get().getName()).isEqualTo(updated.getName());
assertThat(retrieved.get().getCommitment()).isEqualTo(profile.getCommitment());
assertThat(retrieved.get().getAbout()).isEqualTo(updated.getAbout());
assertThat(retrieved.get().getAboutEmoji()).isEqualTo(updated.getAboutEmoji());
assertThat(retrieved.get().getAvatar()).isEqualTo(updated.getAvatar());
// Commitment should be unchanged after an overwrite
assertThat(retrieved.get().getAvatar()).isEqualTo(updated.getAvatar());
assertThat(retrieved.get().getCommitment()).isEqualTo(profile.getCommitment());
}
@Test
public void testMultipleVersions() {
UUID uuid = UUID.randomUUID();
void testMultipleVersions() {
ProfilesStore profiles = getProfilesStore();
UUID uuid = UUID.randomUUID();
VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null,
null, "acommitmnet".getBytes());
VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", "emoji", "i keep typing emoju for some reason",
VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", "emoji",
"i keep typing emoju for some reason",
null, "boof".getBytes());
profiles.set(uuid, profileOne);
@ -135,8 +119,9 @@ public class ProfilesTest {
}
@Test
public void testMissing() {
UUID uuid = UUID.randomUUID();
void testMissing() {
ProfilesStore profiles = getProfilesStore();
UUID uuid = UUID.randomUUID();
VersionedProfile profile = new VersionedProfile("123", "foo", "avatarLocation", null, null,
null, "aDigest".getBytes());
profiles.set(uuid, profile);
@ -147,8 +132,9 @@ public class ProfilesTest {
@Test
public void testDelete() {
UUID uuid = UUID.randomUUID();
void testDelete() {
ProfilesStore profiles = getProfilesStore();
UUID uuid = UUID.randomUUID();
VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null,
null, "aDigest".getBytes());
VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", null, null, null, "boof".getBytes());
@ -166,6 +152,4 @@ public class ProfilesTest {
assertThat(retrieved.isPresent()).isFalse();
}
}

View File

@ -5,10 +5,10 @@
package org.whispersystems.textsecuregcm.tests.storage;
import static junit.framework.TestCase.assertSame;
import static junit.framework.TestCase.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@ -22,59 +22,85 @@ import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import java.util.Base64;
import java.util.Optional;
import java.util.UUID;
import org.junit.Test;
import java.util.concurrent.Executor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicProfileMigrationConfiguration;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.VersionedProfile;
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
public class ProfilesManagerTest {
private Profiles profiles;
private RedisAdvancedClusterCommands<String, String> commands;
private ProfilesManager profilesManager;
@BeforeEach
void setUp() {
//noinspection unchecked
commands = mock(RedisAdvancedClusterCommands.class);
final FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
profiles = mock(Profiles.class);
@SuppressWarnings("unchecked") final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
mock(DynamicConfigurationManager.class);
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
final DynamicProfileMigrationConfiguration profileMigrationConfiguration =
mock(DynamicProfileMigrationConfiguration.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
when(dynamicConfiguration.getProfileMigrationConfiguration()).thenReturn(profileMigrationConfiguration);
profilesManager = new ProfilesManager(profiles,
mock(ProfilesDynamoDb.class),
cacheCluster,
dynamicConfigurationManager,
mock(Executor.class));
}
@Test
public void testGetProfileInCache() {
RedisAdvancedClusterCommands<String, String> commands = mock(RedisAdvancedClusterCommands.class);
FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
Profiles profiles = mock(Profiles.class);
UUID uuid = UUID.randomUUID();
when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn("{\"version\": \"someversion\", \"name\": \"somename\", \"avatar\": \"someavatar\", \"commitment\":\"" + Base64.getEncoder().encodeToString("somecommitment".getBytes()) + "\"}");
when(commands.hget(eq("profiles::" + uuid), eq("someversion"))).thenReturn("{\"version\": \"someversion\", \"name\": \"somename\", \"avatar\": \"someavatar\", \"commitment\":\"" + Base64.getEncoder().encodeToString("somecommitment".getBytes()) + "\"}");
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
Optional<VersionedProfile> profile = profilesManager.get(uuid, "someversion");
Optional<VersionedProfile> profile = profilesManager.get(uuid, "someversion");
assertTrue(profile.isPresent());
assertEquals(profile.get().getName(), "somename");
assertEquals(profile.get().getAvatar(), "someavatar");
assertThat(profile.get().getCommitment()).isEqualTo("somecommitment".getBytes());
verify(commands, times(1)).hget(eq("profiles::" + uuid.toString()), eq("someversion"));
verify(commands, times(1)).hget(eq("profiles::" + uuid), eq("someversion"));
verifyNoMoreInteractions(commands);
verifyNoMoreInteractions(profiles);
}
@Test
public void testGetProfileNotInCache() {
RedisAdvancedClusterCommands<String, String> commands = mock(RedisAdvancedClusterCommands.class);
FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
Profiles profiles = mock(Profiles.class);
UUID uuid = UUID.randomUUID();
VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", null, null,
null, "somecommitment".getBytes());
when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn(null);
when(commands.hget(eq("profiles::" + uuid), eq("someversion"))).thenReturn(null);
when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile));
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
Optional<VersionedProfile> retrieved = profilesManager.get(uuid, "someversion");
Optional<VersionedProfile> retrieved = profilesManager.get(uuid, "someversion");
assertTrue(retrieved.isPresent());
assertSame(retrieved.get(), profile);
verify(commands, times(1)).hget(eq("profiles::" + uuid.toString()), eq("someversion"));
verify(commands, times(1)).hset(eq("profiles::" + uuid.toString()), eq("someversion"), anyString());
verify(commands, times(1)).hget(eq("profiles::" + uuid), eq("someversion"));
verify(commands, times(1)).hset(eq("profiles::" + uuid), eq("someversion"), anyString());
verifyNoMoreInteractions(commands);
verify(profiles, times(1)).get(eq(uuid), eq("someversion"));
@ -83,25 +109,20 @@ public class ProfilesManagerTest {
@Test
public void testGetProfileBrokenCache() {
RedisAdvancedClusterCommands<String, String> commands = mock(RedisAdvancedClusterCommands.class);
FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
Profiles profiles = mock(Profiles.class);
UUID uuid = UUID.randomUUID();
VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", null, null,
null, "somecommitment".getBytes());
when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenThrow(new RedisException("Connection lost"));
when(commands.hget(eq("profiles::" + uuid), eq("someversion"))).thenThrow(new RedisException("Connection lost"));
when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile));
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
Optional<VersionedProfile> retrieved = profilesManager.get(uuid, "someversion");
Optional<VersionedProfile> retrieved = profilesManager.get(uuid, "someversion");
assertTrue(retrieved.isPresent());
assertSame(retrieved.get(), profile);
verify(commands, times(1)).hget(eq("profiles::" + uuid.toString()), eq("someversion"));
verify(commands, times(1)).hset(eq("profiles::" + uuid.toString()), eq("someversion"), anyString());
verify(commands, times(1)).hget(eq("profiles::" + uuid), eq("someversion"));
verify(commands, times(1)).hset(eq("profiles::" + uuid), eq("someversion"), anyString());
verifyNoMoreInteractions(commands);
verify(profiles, times(1)).get(eq(uuid), eq("someversion"));