From 9e7010f1850badf4a49ee69a3dc5ab37ab42e39a Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Wed, 24 Nov 2021 14:48:41 -0500 Subject: [PATCH] Migrate profiles from a relational database to DynamoDB --- service/config/sample.yml | 2 + .../textsecuregcm/WhisperServerService.java | 14 +- .../configuration/DynamoDbTables.java | 11 +- .../dynamic/DynamicConfiguration.java | 8 + .../DynamicProfileMigrationConfiguration.java | 39 +++ .../textsecuregcm/storage/Profiles.java | 42 ++- .../storage/ProfilesDynamoDb.java | 296 ++++++++++++++++++ .../storage/ProfilesManager.java | 48 ++- .../textsecuregcm/storage/ProfilesStore.java | 18 ++ .../storage/VersionedProfile.java | 58 ++-- .../util/DynamoDbFromConfig.java | 11 + .../workers/DeleteUserCommand.java | 15 +- .../workers/MigrateProfilesCommand.java | 140 +++++++++ .../SetUserDiscoverabilityCommand.java | 15 +- .../storage/ProfilesDynamoDbTest.java | 193 ++++++++++++ .../storage/ProfilesPostgresTest.java | 64 ++++ .../{tests => }/storage/ProfilesTest.java | 80 ++--- .../tests/storage/ProfilesManagerTest.java | 81 +++-- 18 files changed, 1021 insertions(+), 114 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicProfileMigrationConfiguration.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesStore.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateProfilesCommand.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDbTest.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java rename service/src/test/java/org/whispersystems/textsecuregcm/{tests => }/storage/ProfilesTest.java (72%) diff --git a/service/config/sample.yml b/service/config/sample.yml index 57fcf7b0c..7e46f4a0f 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -22,6 +22,8 @@ dynamoDbTables: expiration: P30D # Duration of time until rows expire subscriptions: tableName: Example_Subscriptions + profiles: + tableName: Example_Profiles twilio: # Twilio gateway configuration accountId: unset diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 48e80597e..b48ecaeeb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -189,6 +189,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.NonNormalizedAccountCrawlerListener; import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers; import org.whispersystems.textsecuregcm.storage.Profiles; +import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.PubSubManager; import org.whispersystems.textsecuregcm.storage.PushChallengeDynamoDb; @@ -217,6 +218,7 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; +import org.whispersystems.textsecuregcm.workers.MigrateProfilesCommand; import org.whispersystems.textsecuregcm.workers.ReserveUsernameCommand; import org.whispersystems.textsecuregcm.workers.ServerVersionCommand; import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask; @@ -244,6 +246,7 @@ public class WhisperServerService extends Application("accountdb", "accountsdb.xml") { @@ -325,6 +328,10 @@ public class WhisperServerService extends Application getExperimentEnrollmentConfiguration( final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); @@ -109,4 +113,8 @@ public class DynamicConfiguration { public DynamicPushLatencyConfiguration getPushLatencyConfiguration() { return pushLatency; } + + public DynamicProfileMigrationConfiguration getProfileMigrationConfiguration() { + return profileMigration; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicProfileMigrationConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicProfileMigrationConfiguration.java new file mode 100644 index 000000000..e12cbcb3d --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicProfileMigrationConfiguration.java @@ -0,0 +1,39 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.configuration.dynamic; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class DynamicProfileMigrationConfiguration { + + @JsonProperty + private boolean dynamoDbDeleteEnabled = false; + + @JsonProperty + private boolean dynamoDbWriteEnabled = false; + + @JsonProperty + private boolean dynamoDbReadForComparisonEnabled = false; + + @JsonProperty + private boolean dynamoDbReadPrimary = false; + + public boolean isDynamoDbDeleteEnabled() { + return dynamoDbDeleteEnabled; + } + + public boolean isDynamoDbWriteEnabled() { + return dynamoDbWriteEnabled; + } + + public boolean isDynamoDbReadForComparisonEnabled() { + return dynamoDbReadForComparisonEnabled; + } + + public boolean isDynamoDbReadPrimary() { + return dynamoDbReadPrimary; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java index 517c503cf..b94685ad1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java @@ -22,7 +22,7 @@ import org.whispersystems.textsecuregcm.storage.mappers.VersionedProfileMapper; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; -public class Profiles { +public class Profiles implements ProfilesStore { public static final String ID = "id"; public static final String UID = "uuid"; @@ -48,6 +48,7 @@ public class Profiles { this.database.getDatabase().registerRowMapper(new VersionedProfileMapper()); } + @Override public void set(UUID uuid, VersionedProfile profile) { database.use(jdbi -> jdbi.useHandle(handle -> { try (Timer.Context ignored = setTimer.time()) { @@ -82,6 +83,7 @@ public class Profiles { })); } + @Override public Optional get(UUID uuid, String version) { return database.with(jdbi -> jdbi.withHandle(handle -> { try (Timer.Context ignored = getTimer.time()) { @@ -94,13 +96,49 @@ public class Profiles { })); } + @Override public void deleteAll(UUID uuid) { database.use(jdbi -> jdbi.useHandle(handle -> { try (Timer.Context ignored = deleteTimer.time()) { - handle.createUpdate("DELETE FROM profiles WHERE " + UID + " = :uuid") + handle.createUpdate("UPDATE profiles SET " + DELETED + " = TRUE WHERE " + UID + " = :uuid") .bind("uuid", uuid) .execute(); } })); } + + public ResultIterator> getAll(final int fetchSize) { + return database.with(jdbi -> jdbi.withHandle(handle -> handle.inTransaction(transactionHandle -> + transactionHandle.createQuery("SELECT * FROM profiles WHERE " + DELETED + "= FALSE") + .setFetchSize(fetchSize) + .map((resultSet, ctx) -> { + final UUID uuid = UUID.fromString(resultSet.getString(UID)); + final VersionedProfile profile = new VersionedProfile( + resultSet.getString(Profiles.VERSION), + resultSet.getString(Profiles.NAME), + resultSet.getString(Profiles.AVATAR), + resultSet.getString(Profiles.ABOUT_EMOJI), + resultSet.getString(Profiles.ABOUT), + resultSet.getString(Profiles.PAYMENT_ADDRESS), + resultSet.getBytes(Profiles.COMMITMENT)); + + return new Pair<>(uuid, profile); + }) + .iterator()))); + } + + public ResultIterator> getDeletedProfiles(final int fetchSize) { + return database.with(jdbi -> jdbi.withHandle(handle -> handle.inTransaction(transactionHandle -> + transactionHandle.createQuery("SELECT " + UID + ", " + VERSION + " FROM profiles WHERE " + DELETED + " = TRUE") + .setFetchSize(fetchSize) + .map((rs, ctx) -> new Pair<>(UUID.fromString(rs.getString(UID)), rs.getString(VERSION))) + .iterator()))); + } + + @VisibleForTesting + void purgeDeletedProfiles() { + database.use(jdbi -> jdbi.useHandle(handle -> + handle.createUpdate("DELETE FROM profiles WHERE " + DELETED + " = TRUE") + .execute())); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java new file mode 100644 index 000000000..bb613b952 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java @@ -0,0 +1,296 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + +import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.util.AttributeValues; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.QueryRequest; +import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; +import software.amazon.awssdk.services.dynamodb.paginators.QueryIterable; + +public class ProfilesDynamoDb implements ProfilesStore { + + private final DynamoDbClient dynamoDbClient; + private final DynamoDbAsyncClient dynamoDbAsyncClient; + private final String tableName; + + // UUID of the account that owns this profile; byte array + @VisibleForTesting + static final String KEY_ACCOUNT_UUID = "U"; + + // Version of this profile; string + @VisibleForTesting + static final String ATTR_VERSION = "V"; + + // User's name; string + private static final String ATTR_NAME = "N"; + + // Avatar path/filename; string + private static final String ATTR_AVATAR = "A"; + + // Bio/about text; string + private static final String ATTR_ABOUT = "B"; + + // Bio/about emoji; string + private static final String ATTR_EMOJI = "E"; + + // Payment address; string + private static final String ATTR_PAYMENT_ADDRESS = "P"; + + // Commitment; byte array + private static final String ATTR_COMMITMENT = "C"; + + private static final Map UPDATE_EXPRESSION_ATTRIBUTE_NAMES = Map.of( + "#commitment", ATTR_COMMITMENT, + "#name", ATTR_NAME, + "#avatar", ATTR_AVATAR, + "#about", ATTR_ABOUT, + "#aboutEmoji", ATTR_EMOJI, + "#paymentAddress", ATTR_PAYMENT_ADDRESS); + + private static final Timer SET_PROFILES_TIMER = Metrics.timer(name(ProfilesDynamoDb.class, "set")); + private static final Timer GET_PROFILE_TIMER = Metrics.timer(name(ProfilesDynamoDb.class, "get")); + private static final Timer DELETE_PROFILES_TIMER = Metrics.timer(name(ProfilesDynamoDb.class, "delete")); + + private static final Logger log = LoggerFactory.getLogger(ProfilesDynamoDb.class); + + public ProfilesDynamoDb(final DynamoDbClient dynamoDbClient, + final DynamoDbAsyncClient dynamoDbAsyncClient, + final String tableName) { + + this.dynamoDbClient = dynamoDbClient; + this.dynamoDbAsyncClient = dynamoDbAsyncClient; + this.tableName = tableName; + } + + @Override + public void set(final UUID uuid, final VersionedProfile profile) { + SET_PROFILES_TIMER.record(() -> { + dynamoDbClient.updateItem(UpdateItemRequest.builder() + .tableName(tableName) + .key(buildPrimaryKey(uuid, profile.getVersion())) + .updateExpression(buildUpdateExpression(profile)) + .expressionAttributeNames(UPDATE_EXPRESSION_ATTRIBUTE_NAMES) + .expressionAttributeValues(buildUpdateExpressionAttributeValues(profile)) + .build()); + }); + } + + private static Map buildPrimaryKey(final UUID uuid, final String version) { + return Map.of( + KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid), + ATTR_VERSION, AttributeValues.fromString(version)); + } + + @VisibleForTesting + static String buildUpdateExpression(final VersionedProfile profile) { + final List updatedAttributes = new ArrayList<>(5); + final List deletedAttributes = new ArrayList<>(5); + + if (StringUtils.isNotBlank(profile.getName())) { + updatedAttributes.add("name"); + } else { + deletedAttributes.add("name"); + } + + if (StringUtils.isNotBlank(profile.getAvatar())) { + updatedAttributes.add("avatar"); + } else { + deletedAttributes.add("avatar"); + } + + if (StringUtils.isNotBlank(profile.getAbout())) { + updatedAttributes.add("about"); + } else { + deletedAttributes.add("about"); + } + + if (StringUtils.isNotBlank(profile.getAboutEmoji())) { + updatedAttributes.add("aboutEmoji"); + } else { + deletedAttributes.add("aboutEmoji"); + } + + if (StringUtils.isNotBlank(profile.getPaymentAddress())) { + updatedAttributes.add("paymentAddress"); + } else { + deletedAttributes.add("paymentAddress"); + } + + final StringBuilder updateExpressionBuilder = new StringBuilder( + "SET #commitment = if_not_exists(#commitment, :commitment)"); + + if (!updatedAttributes.isEmpty()) { + updatedAttributes.forEach(token -> updateExpressionBuilder + .append(", #") + .append(token) + .append(" = :") + .append(token)); + } + + if (!deletedAttributes.isEmpty()) { + updateExpressionBuilder.append(" REMOVE "); + updateExpressionBuilder.append(deletedAttributes.stream() + .map(token -> "#" + token) + .collect(Collectors.joining(", "))); + } + + return updateExpressionBuilder.toString(); + } + + @VisibleForTesting + static Map buildUpdateExpressionAttributeValues(final VersionedProfile profile) { + final Map expressionValues = new HashMap<>(); + + expressionValues.put(":commitment", AttributeValues.fromByteArray(profile.getCommitment())); + + if (StringUtils.isNotBlank(profile.getName())) { + expressionValues.put(":name", AttributeValues.fromString(profile.getName())); + } + + if (StringUtils.isNotBlank(profile.getAvatar())) { + expressionValues.put(":avatar", AttributeValues.fromString(profile.getAvatar())); + } + + if (StringUtils.isNotBlank(profile.getAbout())) { + expressionValues.put(":about", AttributeValues.fromString(profile.getAbout())); + } + + if (StringUtils.isNotBlank(profile.getAboutEmoji())) { + expressionValues.put(":aboutEmoji", AttributeValues.fromString(profile.getAboutEmoji())); + } + + if (StringUtils.isNotBlank(profile.getPaymentAddress())) { + expressionValues.put(":paymentAddress", AttributeValues.fromString(profile.getPaymentAddress())); + } + + return expressionValues; + } + + @Override + public Optional get(final UUID uuid, final String version) { + return GET_PROFILE_TIMER.record(() -> { + final GetItemResponse response = dynamoDbClient.getItem(GetItemRequest.builder() + .tableName(tableName) + .key(buildPrimaryKey(uuid, version)) + .consistentRead(true) + .build()); + + return response.hasItem() ? Optional.of(fromItem(response.item())) : Optional.empty(); + }); + } + + private static VersionedProfile fromItem(final Map item) { + return new VersionedProfile( + AttributeValues.getString(item, ATTR_VERSION, null), + AttributeValues.getString(item, ATTR_NAME, null), + AttributeValues.getString(item, ATTR_AVATAR, null), + AttributeValues.getString(item, ATTR_EMOJI, null), + AttributeValues.getString(item, ATTR_ABOUT, null), + AttributeValues.getString(item, ATTR_PAYMENT_ADDRESS, null), + AttributeValues.getByteArray(item, ATTR_COMMITMENT, null)); + } + + @Override + public void deleteAll(final UUID uuid) { + DELETE_PROFILES_TIMER.record(() -> { + final AttributeValue uuidAttributeValue = AttributeValues.fromUUID(uuid); + + final QueryIterable queryIterable = dynamoDbClient.queryPaginator(QueryRequest.builder() + .tableName(tableName) + .keyConditionExpression("#uuid = :uuid") + .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID)) + .expressionAttributeValues(Map.of(":uuid", uuidAttributeValue)) + .projectionExpression(ATTR_VERSION) + .consistentRead(true) + .build()); + + CompletableFuture.allOf(queryIterable.items().stream() + .map(item -> dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of( + KEY_ACCOUNT_UUID, uuidAttributeValue, + ATTR_VERSION, item.get(ATTR_VERSION))) + .build())) + .toArray(CompletableFuture[]::new)).join(); + }); + } + + public CompletableFuture migrate(final UUID uuid, final VersionedProfile profile) { + final Map item = new HashMap<>(); + item.put(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid)); + item.put(ATTR_VERSION, AttributeValues.fromString(profile.getVersion())); + item.put(ATTR_COMMITMENT, AttributeValues.fromByteArray(profile.getCommitment())); + + if (profile.getName() != null) { + item.put(ATTR_NAME, AttributeValues.fromString(profile.getName())); + } + + if (profile.getAvatar() != null) { + item.put(ATTR_AVATAR, AttributeValues.fromString(profile.getAvatar())); + } + + if (profile.getAboutEmoji() != null) { + item.put(ATTR_EMOJI, AttributeValues.fromString(profile.getAboutEmoji())); + } + + if (profile.getAbout() != null) { + item.put(ATTR_ABOUT, AttributeValues.fromString(profile.getAbout())); + } + + if (profile.getPaymentAddress() != null) { + item.put(ATTR_PAYMENT_ADDRESS, AttributeValues.fromString(profile.getPaymentAddress())); + } + + return dynamoDbAsyncClient.putItem(PutItemRequest.builder() + .tableName(tableName) + .item(item) + .conditionExpression("attribute_not_exists(#uuid)") + .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID)) + .build()) + .handle((response, cause) -> { + if (cause == null) { + return true; + } else { + if (!(cause instanceof ConditionalCheckFailedException)) { + log.warn("Unexpected error migrating profiles {}/{}", uuid, profile.getVersion(), cause); + } + + return false; + } + }); + } + + public CompletableFuture delete(final UUID uuid, final String version) { + return dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder() + .tableName(tableName) + .key(buildPrimaryKey(uuid, version)) + .build()); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java index 18b7c7ddb..1f1236822 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesManager.java @@ -10,12 +10,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.lettuce.core.RedisException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.experiment.Experiment; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.util.SystemMapper; import java.io.IOException; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.Executor; public class ProfilesManager { @@ -23,31 +26,60 @@ public class ProfilesManager { private static final String CACHE_PREFIX = "profiles::"; - private final Profiles profiles; + private final Profiles profiles; + private final ProfilesDynamoDb profilesDynamoDb; private final FaultTolerantRedisCluster cacheCluster; - private final ObjectMapper mapper; + private final DynamicConfigurationManager dynamicConfigurationManager; + private final ObjectMapper mapper; - public ProfilesManager(Profiles profiles, FaultTolerantRedisCluster cacheCluster) { - this.profiles = profiles; - this.cacheCluster = cacheCluster; - this.mapper = SystemMapper.getMapper(); + private final Executor migrationExperimentExecutor; + private final Experiment migrationExperiment = new Experiment("profileMigration"); + + public ProfilesManager(final Profiles profiles, + final ProfilesDynamoDb profilesDynamoDb, + final FaultTolerantRedisCluster cacheCluster, + final DynamicConfigurationManager dynamicConfigurationManager, + final Executor migrationExperimentExecutor) { + this.profiles = profiles; + this.profilesDynamoDb = profilesDynamoDb; + this.cacheCluster = cacheCluster; + this.dynamicConfigurationManager = dynamicConfigurationManager; + this.migrationExperimentExecutor = migrationExperimentExecutor; + this.mapper = SystemMapper.getMapper(); } public void set(UUID uuid, VersionedProfile versionedProfile) { memcacheSet(uuid, versionedProfile); profiles.set(uuid, versionedProfile); + + if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbWriteEnabled()) { + profilesDynamoDb.set(uuid, versionedProfile); + } } public void deleteAll(UUID uuid) { memcacheDelete(uuid); profiles.deleteAll(uuid); + + if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbDeleteEnabled()) { + profilesDynamoDb.deleteAll(uuid); + } } public Optional get(UUID uuid, String version) { Optional profile = memcacheGet(uuid, version); - if (!profile.isPresent()) { - profile = profiles.get(uuid, version); + if (profile.isEmpty()) { + if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbReadPrimary()) { + profile = profilesDynamoDb.get(uuid, version); + } else { + profile = profiles.get(uuid, version); + + if (dynamicConfigurationManager.getConfiguration().getProfileMigrationConfiguration().isDynamoDbReadForComparisonEnabled()) { + migrationExperiment.compareSupplierResultAsync(profile, () -> profilesDynamoDb.get(uuid, version), migrationExperimentExecutor); + } + } + profile.ifPresent(versionedProfile -> memcacheSet(uuid, versionedProfile)); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesStore.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesStore.java new file mode 100644 index 000000000..b7f513889 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesStore.java @@ -0,0 +1,18 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import java.util.Optional; +import java.util.UUID; + +public interface ProfilesStore { + + void set(UUID uuid, VersionedProfile profile); + + Optional get(UUID uuid, String version); + + void deleteAll(UUID uuid); +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/VersionedProfile.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/VersionedProfile.java index 368104e07..38cd0cf52 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/VersionedProfile.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/VersionedProfile.java @@ -5,41 +5,36 @@ package org.whispersystems.textsecuregcm.storage; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import org.whispersystems.textsecuregcm.util.ByteArrayAdapter; +import java.util.Arrays; +import java.util.Objects; public class VersionedProfile { - @JsonProperty - private String version; + private final String version; + private final String name; + private final String avatar; + private final String aboutEmoji; + private final String about; + private final String paymentAddress; - @JsonProperty - private String name; - - @JsonProperty - private String avatar; - - @JsonProperty - private String aboutEmoji; - - @JsonProperty - private String about; - - @JsonProperty - private String paymentAddress; - - @JsonProperty @JsonSerialize(using = ByteArrayAdapter.Serializing.class) @JsonDeserialize(using = ByteArrayAdapter.Deserializing.class) private byte[] commitment; - public VersionedProfile() {} - + @JsonCreator public VersionedProfile( - String version, String name, String avatar, String aboutEmoji, String about, String paymentAddress, - byte[] commitment) { + @JsonProperty("version") final String version, + @JsonProperty("name") final String name, + @JsonProperty("avatar") final String avatar, + @JsonProperty("aboutEmoji") final String aboutEmoji, + @JsonProperty("about") final String about, + @JsonProperty("paymentAddress") final String paymentAddress, + @JsonProperty("commitment") final byte[] commitment) { this.version = version; this.name = name; this.avatar = avatar; @@ -76,4 +71,23 @@ public class VersionedProfile { public byte[] getCommitment() { return commitment; } + + @Override + public boolean equals(final Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + final VersionedProfile that = (VersionedProfile) o; + return Objects.equals(version, that.version) && Objects.equals(name, that.name) && Objects.equals(avatar, + that.avatar) && Objects.equals(aboutEmoji, that.aboutEmoji) && Objects.equals(about, that.about) + && Objects.equals(paymentAddress, that.paymentAddress) && Arrays.equals(commitment, that.commitment); + } + + @Override + public int hashCode() { + int result = Objects.hash(version, name, avatar, aboutEmoji, about, paymentAddress); + result = 31 * result + Arrays.hashCode(commitment); + return result; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/DynamoDbFromConfig.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/DynamoDbFromConfig.java index 52004cee5..1ab86997e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/util/DynamoDbFromConfig.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/DynamoDbFromConfig.java @@ -21,6 +21,17 @@ public class DynamoDbFromConfig { .build(); } + public static DynamoDbClient client(DynamoDbClientConfiguration config, AwsCredentialsProvider credentialsProvider) { + return DynamoDbClient.builder() + .region(Region.of(config.getRegion())) + .credentialsProvider(credentialsProvider) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .apiCallTimeout(config.getClientExecutionTimeout()) + .apiCallAttemptTimeout(config.getClientRequestTimeout()) + .build()) + .build(); + } + public static DynamoDbAsyncClient asyncClient( DynamoDbClientConfiguration config, AwsCredentialsProvider credentialsProvider) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index e401606dd..a0c305a77 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -50,6 +50,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers; import org.whispersystems.textsecuregcm.storage.Profiles; +import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; @@ -59,6 +60,7 @@ import org.whispersystems.textsecuregcm.storage.Usernames; import org.whispersystems.textsecuregcm.storage.UsernamesManager; import org.whispersystems.textsecuregcm.storage.VerificationCodeStore; import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; public class DeleteUserCommand extends EnvironmentCommand { @@ -144,6 +146,14 @@ public class DeleteUserCommand extends EnvironmentCommand { + + private static final Logger log = LoggerFactory.getLogger(MigrateProfilesCommand.class); + + public MigrateProfilesCommand() { + super(new Application<>() { + @Override + public void run(WhisperServerConfiguration configuration, Environment environment) { + } + }, "migrate-profiles", "Migrate versioned profiles from Postgres to DynamoDB"); + } + + @Override + public void configure(Subparser subparser) { + super.configure(subparser); + + subparser.addArgument("-s", "--fetch-size") + .dest("fetchSize") + .type(Integer.class) + .required(false) + .setDefault(512) + .help("The number of profiles to fetch from Postgres at once"); + + subparser.addArgument("-c", "--concurrency") + .dest("concurrency") + .type(Integer.class) + .required(false) + .setDefault(64) + .help("The maximum number of concurrent DynamoDB requests"); + } + + @Override + protected void run(final Environment environment, final Namespace namespace, + final WhisperServerConfiguration configuration) throws Exception { + + JdbiFactory jdbiFactory = new JdbiFactory(); + Jdbi accountJdbi = jdbiFactory.build(environment, configuration.getAccountsDatabaseConfiguration(), "accountdb"); + FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, + configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration()); + + DynamoDbClient dynamoDbClient = DynamoDbFromConfig.client( + configuration.getDynamoDbClientConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); + + DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbFromConfig.asyncClient( + configuration.getDynamoDbClientConfiguration(), + software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); + + Profiles profiles = new Profiles(accountDatabase); + ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient, + configuration.getDynamoDbTables().getProfiles().getTableName()); + + final int fetchSize = namespace.getInt("fetchSize"); + final Semaphore semaphore = new Semaphore(namespace.getInt("concurrency")); + + log.info("Beginning migration"); + + try (final ResultIterator> results = profiles.getAll(fetchSize)) { + final AtomicInteger profilesProcessed = new AtomicInteger(0); + final AtomicInteger profilesMigrated = new AtomicInteger(0); + + while (results.hasNext()) { + semaphore.acquire(); + + final Pair uuidAndProfile = results.next(); + profilesDynamoDb.migrate(uuidAndProfile.first(), uuidAndProfile.second()) + .whenComplete((migrated, cause) -> { + semaphore.release(); + + final int processed = profilesProcessed.incrementAndGet(); + + if (cause == null) { + if (migrated) { + profilesMigrated.incrementAndGet(); + } + } + + if (processed % 10_000 == 0) { + log.info("Processed {} profiles ({} migrated)", processed, profilesMigrated.get()); + } + }); + } + + log.info("Migration completed; processed {} profiles and migrated {}", profilesProcessed.get(), profilesMigrated.get()); + } + + log.info("Removing profiles that were deleted during migration"); + + try (final ResultIterator> results = profiles.getDeletedProfiles(fetchSize)) { + final AtomicInteger profilesDeleted = new AtomicInteger(0); + + while (results.hasNext()) { + semaphore.acquire(); + + final Pair uuidAndVersion = results.next(); + + profilesDynamoDb.delete(uuidAndVersion.first(), uuidAndVersion.second()) + .whenComplete((response, cause) -> { + semaphore.release(); + + if (profilesDeleted.incrementAndGet() % 1_000 == 0) { + log.info("Attempted to remove {} profiles", profilesDeleted.get()); + } + }); + } + + log.info("Removal of deleted profiles complete; attempted to remove {} profiles", profilesDeleted.get()); + } + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/SetUserDiscoverabilityCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/SetUserDiscoverabilityCommand.java index f6e282e0b..1b885cfd7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/SetUserDiscoverabilityCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/SetUserDiscoverabilityCommand.java @@ -48,6 +48,7 @@ import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers; import org.whispersystems.textsecuregcm.storage.Profiles; +import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb; import org.whispersystems.textsecuregcm.storage.ReportMessageManager; @@ -57,6 +58,7 @@ import org.whispersystems.textsecuregcm.storage.Usernames; import org.whispersystems.textsecuregcm.storage.UsernamesManager; import org.whispersystems.textsecuregcm.storage.VerificationCodeStore; import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; public class SetUserDiscoverabilityCommand extends EnvironmentCommand { @@ -148,6 +150,14 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand buildUpdateExpression() { + final byte[] commitment = "commitment".getBytes(StandardCharsets.UTF_8); + + return Stream.of( + Arguments.of( + new VersionedProfile("version", "name", "avatar", "emoji", "about", "paymentAddress", commitment), + "SET #commitment = if_not_exists(#commitment, :commitment), #name = :name, #avatar = :avatar, #about = :about, #aboutEmoji = :aboutEmoji, #paymentAddress = :paymentAddress"), + + Arguments.of( + new VersionedProfile("version", "name", "avatar", "emoji", "about", null, commitment), + "SET #commitment = if_not_exists(#commitment, :commitment), #name = :name, #avatar = :avatar, #about = :about, #aboutEmoji = :aboutEmoji REMOVE #paymentAddress"), + + Arguments.of( + new VersionedProfile("version", "name", "avatar", "emoji", null, null, commitment), + "SET #commitment = if_not_exists(#commitment, :commitment), #name = :name, #avatar = :avatar, #aboutEmoji = :aboutEmoji REMOVE #about, #paymentAddress"), + + Arguments.of( + new VersionedProfile("version", "name", "avatar", null, null, null, commitment), + "SET #commitment = if_not_exists(#commitment, :commitment), #name = :name, #avatar = :avatar REMOVE #about, #aboutEmoji, #paymentAddress"), + + Arguments.of( + new VersionedProfile("version", "name", null, null, null, null, commitment), + "SET #commitment = if_not_exists(#commitment, :commitment), #name = :name REMOVE #avatar, #about, #aboutEmoji, #paymentAddress"), + + Arguments.of( + new VersionedProfile("version", null, null, null, null, null, commitment), + "SET #commitment = if_not_exists(#commitment, :commitment) REMOVE #name, #avatar, #about, #aboutEmoji, #paymentAddress") + ); + } + + @ParameterizedTest + @MethodSource + void buildUpdateExpressionAttributeValues(final VersionedProfile profile, final Map expectedAttributeValues) { + assertEquals(expectedAttributeValues, ProfilesDynamoDb.buildUpdateExpressionAttributeValues(profile)); + } + + private static Stream buildUpdateExpressionAttributeValues() { + final byte[] commitment = "commitment".getBytes(StandardCharsets.UTF_8); + + return Stream.of( + Arguments.of( + new VersionedProfile("version", "name", "avatar", "emoji", "about", "paymentAddress", commitment), + Map.of( + ":commitment", AttributeValues.fromByteArray(commitment), + ":name", AttributeValues.fromString("name"), + ":avatar", AttributeValues.fromString("avatar"), + ":aboutEmoji", AttributeValues.fromString("emoji"), + ":about", AttributeValues.fromString("about"), + ":paymentAddress", AttributeValues.fromString("paymentAddress"))), + + Arguments.of( + new VersionedProfile("version", "name", "avatar", "emoji", "about", null, commitment), + Map.of( + ":commitment", AttributeValues.fromByteArray(commitment), + ":name", AttributeValues.fromString("name"), + ":avatar", AttributeValues.fromString("avatar"), + ":aboutEmoji", AttributeValues.fromString("emoji"), + ":about", AttributeValues.fromString("about"))), + + Arguments.of( + new VersionedProfile("version", "name", "avatar", "emoji", null, null, commitment), + Map.of( + ":commitment", AttributeValues.fromByteArray(commitment), + ":name", AttributeValues.fromString("name"), + ":avatar", AttributeValues.fromString("avatar"), + ":aboutEmoji", AttributeValues.fromString("emoji"))), + + Arguments.of( + new VersionedProfile("version", "name", "avatar", null, null, null, commitment), + Map.of( + ":commitment", AttributeValues.fromByteArray(commitment), + ":name", AttributeValues.fromString("name"), + ":avatar", AttributeValues.fromString("avatar"))), + + Arguments.of( + new VersionedProfile("version", "name", null, null, null, null, commitment), + Map.of( + ":commitment", AttributeValues.fromByteArray(commitment), + ":name", AttributeValues.fromString("name"))), + + Arguments.of( + new VersionedProfile("version", null, null, null, null, null, commitment), + Map.of(":commitment", AttributeValues.fromByteArray(commitment))) + ); + } + + @ParameterizedTest + @MethodSource + void migrate(final VersionedProfile profile) { + final UUID uuid = UUID.randomUUID(); + + assertTrue(assertDoesNotThrow(() -> profiles.migrate(uuid, profile).join())); + assertFalse(assertDoesNotThrow(() -> profiles.migrate(uuid, profile).join())); + + assertEquals(Optional.of(profile), profiles.get(uuid, profile.getVersion())); + } + + private static Stream migrate() { + return Stream.of( + Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))), + Arguments.of(new VersionedProfile("version", null, "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))), + Arguments.of(new VersionedProfile("version", "name", null, "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))), + Arguments.of(new VersionedProfile("version", "name", "avatar", null, "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))), + Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", null, "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))), + Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", "about", null, "commitment".getBytes(StandardCharsets.UTF_8))) + ); + } + + @Test + void delete() { + final UUID uuid = UUID.randomUUID(); + final VersionedProfile firstProfile = + new VersionedProfile("version1", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8)); + + final VersionedProfile secondProfile = + new VersionedProfile("version2", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8)); + + profiles.set(uuid, firstProfile); + profiles.set(uuid, secondProfile); + + profiles.delete(uuid, firstProfile.getVersion()).join(); + + assertTrue(profiles.get(uuid, firstProfile.getVersion()).isEmpty()); + assertTrue(profiles.get(uuid, secondProfile.getVersion()).isPresent()); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java new file mode 100644 index 000000000..170f8c14a --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java @@ -0,0 +1,64 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import com.google.common.collect.ImmutableList; +import com.opentable.db.postgres.embedded.LiquibasePreparer; +import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension; +import com.opentable.db.postgres.junit5.PreparedDbExtension; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.result.ResultIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.util.Pair; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ProfilesPostgresTest extends ProfilesTest { + + @RegisterExtension + static PreparedDbExtension ACCOUNTS_POSTGRES_EXTENSION = + EmbeddedPostgresExtension.preparedDatabase(LiquibasePreparer.forClasspathLocation("accountsdb.xml")); + + private Profiles profiles; + + @BeforeEach + void setUp() { + final FaultTolerantDatabase faultTolerantDatabase = new FaultTolerantDatabase("profilesTest", + Jdbi.create(ACCOUNTS_POSTGRES_EXTENSION.getTestDatabase()), + new CircuitBreakerConfiguration()); + + profiles = new Profiles(faultTolerantDatabase); + } + + @Override + protected ProfilesStore getProfilesStore() { + return profiles; + } + + @Test + void testGetDeletedProfiles() { + profiles.purgeDeletedProfiles(); + + UUID uuid = UUID.randomUUID(); + VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null, + null, "aDigest".getBytes()); + VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", null, null, null, "boof".getBytes()); + + profiles.set(uuid, profileOne); + profiles.set(UUID.randomUUID(), profileTwo); + + profiles.deleteAll(uuid); + + try (final ResultIterator> resultIterator = profiles.getDeletedProfiles(10)) { + assertEquals(List.of(new Pair<>(uuid, profileOne.getVersion())), ImmutableList.copyOf(resultIterator)); + } + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesTest.java similarity index 72% rename from service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesTest.java index c86f85510..d8432ad65 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesTest.java @@ -1,47 +1,27 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2021 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ -package org.whispersystems.textsecuregcm.tests.storage; +package org.whispersystems.textsecuregcm.storage; -import com.opentable.db.postgres.embedded.LiquibasePreparer; -import com.opentable.db.postgres.junit.EmbeddedPostgresRules; -import com.opentable.db.postgres.junit.PreparedDbRule; -import org.jdbi.v3.core.Jdbi; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; -import org.whispersystems.textsecuregcm.storage.Profiles; -import org.whispersystems.textsecuregcm.storage.VersionedProfile; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import java.util.Optional; import java.util.UUID; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +public abstract class ProfilesTest { -public class ProfilesTest { - - @Rule - public PreparedDbRule db = EmbeddedPostgresRules.preparedDatabase(LiquibasePreparer.forClasspathLocation("accountsdb.xml")); - - private Profiles profiles; - - @Before - public void setupProfilesDao() { - FaultTolerantDatabase faultTolerantDatabase = new FaultTolerantDatabase("profilesTest", - Jdbi.create(db.getTestDatabase()), - new CircuitBreakerConfiguration()); - - this.profiles = new Profiles(faultTolerantDatabase); - } + protected abstract ProfilesStore getProfilesStore(); @Test - public void testSetGet() { - UUID uuid = UUID.randomUUID(); - VersionedProfile profile = new VersionedProfile("123", "foo", "avatarLocation", "emoji", "the very model of a modern major general", + void testSetGet() { + ProfilesStore profiles = getProfilesStore(); + UUID uuid = UUID.randomUUID(); + VersionedProfile profile = new VersionedProfile("123", "foo", "avatarLocation", "emoji", + "the very model of a modern major general", null, "acommitment".getBytes()); profiles.set(uuid, profile); @@ -56,8 +36,9 @@ public class ProfilesTest { } @Test - public void testSetGetNullOptionalFields() { - UUID uuid = UUID.randomUUID(); + void testSetGetNullOptionalFields() { + ProfilesStore profiles = getProfilesStore(); + UUID uuid = UUID.randomUUID(); VersionedProfile profile = new VersionedProfile("123", "foo", null, null, null, null, "acommitment".getBytes()); profiles.set(uuid, profile); @@ -73,10 +54,11 @@ public class ProfilesTest { } @Test - public void testSetReplace() { - UUID uuid = UUID.randomUUID(); + void testSetReplace() { + ProfilesStore profiles = getProfilesStore(); + UUID uuid = UUID.randomUUID(); VersionedProfile profile = new VersionedProfile("123", "foo", "avatarLocation", null, null, - null, "acommitment".getBytes()); + "paymentAddress", "acommitment".getBytes()); profiles.set(uuid, profile); Optional retrieved = profiles.get(uuid, "123"); @@ -96,20 +78,22 @@ public class ProfilesTest { assertThat(retrieved.isPresent()).isTrue(); assertThat(retrieved.get().getName()).isEqualTo(updated.getName()); - assertThat(retrieved.get().getCommitment()).isEqualTo(profile.getCommitment()); assertThat(retrieved.get().getAbout()).isEqualTo(updated.getAbout()); assertThat(retrieved.get().getAboutEmoji()).isEqualTo(updated.getAboutEmoji()); + assertThat(retrieved.get().getAvatar()).isEqualTo(updated.getAvatar()); // Commitment should be unchanged after an overwrite - assertThat(retrieved.get().getAvatar()).isEqualTo(updated.getAvatar()); + assertThat(retrieved.get().getCommitment()).isEqualTo(profile.getCommitment()); } @Test - public void testMultipleVersions() { - UUID uuid = UUID.randomUUID(); + void testMultipleVersions() { + ProfilesStore profiles = getProfilesStore(); + UUID uuid = UUID.randomUUID(); VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null, null, "acommitmnet".getBytes()); - VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", "emoji", "i keep typing emoju for some reason", + VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", "emoji", + "i keep typing emoju for some reason", null, "boof".getBytes()); profiles.set(uuid, profileOne); @@ -135,8 +119,9 @@ public class ProfilesTest { } @Test - public void testMissing() { - UUID uuid = UUID.randomUUID(); + void testMissing() { + ProfilesStore profiles = getProfilesStore(); + UUID uuid = UUID.randomUUID(); VersionedProfile profile = new VersionedProfile("123", "foo", "avatarLocation", null, null, null, "aDigest".getBytes()); profiles.set(uuid, profile); @@ -147,8 +132,9 @@ public class ProfilesTest { @Test - public void testDelete() { - UUID uuid = UUID.randomUUID(); + void testDelete() { + ProfilesStore profiles = getProfilesStore(); + UUID uuid = UUID.randomUUID(); VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null, null, "aDigest".getBytes()); VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", null, null, null, "boof".getBytes()); @@ -166,6 +152,4 @@ public class ProfilesTest { assertThat(retrieved.isPresent()).isFalse(); } - - } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java index 086c8db86..9a16b52dc 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/ProfilesManagerTest.java @@ -5,10 +5,10 @@ package org.whispersystems.textsecuregcm.tests.storage; -import static junit.framework.TestCase.assertSame; -import static junit.framework.TestCase.assertTrue; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -22,59 +22,85 @@ import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import java.util.Base64; import java.util.Optional; import java.util.UUID; -import org.junit.Test; +import java.util.concurrent.Executor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicProfileMigrationConfiguration; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.Profiles; +import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb; import org.whispersystems.textsecuregcm.storage.ProfilesManager; import org.whispersystems.textsecuregcm.storage.VersionedProfile; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; public class ProfilesManagerTest { + private Profiles profiles; + private RedisAdvancedClusterCommands commands; + + private ProfilesManager profilesManager; + + @BeforeEach + void setUp() { + //noinspection unchecked + commands = mock(RedisAdvancedClusterCommands.class); + final FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); + + profiles = mock(Profiles.class); + + @SuppressWarnings("unchecked") final DynamicConfigurationManager dynamicConfigurationManager = + mock(DynamicConfigurationManager.class); + + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + final DynamicProfileMigrationConfiguration profileMigrationConfiguration = + mock(DynamicProfileMigrationConfiguration.class); + + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + when(dynamicConfiguration.getProfileMigrationConfiguration()).thenReturn(profileMigrationConfiguration); + + profilesManager = new ProfilesManager(profiles, + mock(ProfilesDynamoDb.class), + cacheCluster, + dynamicConfigurationManager, + mock(Executor.class)); + } + @Test public void testGetProfileInCache() { - RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); - FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); - Profiles profiles = mock(Profiles.class); - UUID uuid = UUID.randomUUID(); - when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn("{\"version\": \"someversion\", \"name\": \"somename\", \"avatar\": \"someavatar\", \"commitment\":\"" + Base64.getEncoder().encodeToString("somecommitment".getBytes()) + "\"}"); + when(commands.hget(eq("profiles::" + uuid), eq("someversion"))).thenReturn("{\"version\": \"someversion\", \"name\": \"somename\", \"avatar\": \"someavatar\", \"commitment\":\"" + Base64.getEncoder().encodeToString("somecommitment".getBytes()) + "\"}"); - ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); - Optional profile = profilesManager.get(uuid, "someversion"); + Optional profile = profilesManager.get(uuid, "someversion"); assertTrue(profile.isPresent()); assertEquals(profile.get().getName(), "somename"); assertEquals(profile.get().getAvatar(), "someavatar"); assertThat(profile.get().getCommitment()).isEqualTo("somecommitment".getBytes()); - verify(commands, times(1)).hget(eq("profiles::" + uuid.toString()), eq("someversion")); + verify(commands, times(1)).hget(eq("profiles::" + uuid), eq("someversion")); verifyNoMoreInteractions(commands); verifyNoMoreInteractions(profiles); } @Test public void testGetProfileNotInCache() { - RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); - FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); - Profiles profiles = mock(Profiles.class); - UUID uuid = UUID.randomUUID(); VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", null, null, null, "somecommitment".getBytes()); - when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenReturn(null); + when(commands.hget(eq("profiles::" + uuid), eq("someversion"))).thenReturn(null); when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile)); - ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); - Optional retrieved = profilesManager.get(uuid, "someversion"); + Optional retrieved = profilesManager.get(uuid, "someversion"); assertTrue(retrieved.isPresent()); assertSame(retrieved.get(), profile); - verify(commands, times(1)).hget(eq("profiles::" + uuid.toString()), eq("someversion")); - verify(commands, times(1)).hset(eq("profiles::" + uuid.toString()), eq("someversion"), anyString()); + verify(commands, times(1)).hget(eq("profiles::" + uuid), eq("someversion")); + verify(commands, times(1)).hset(eq("profiles::" + uuid), eq("someversion"), anyString()); verifyNoMoreInteractions(commands); verify(profiles, times(1)).get(eq(uuid), eq("someversion")); @@ -83,25 +109,20 @@ public class ProfilesManagerTest { @Test public void testGetProfileBrokenCache() { - RedisAdvancedClusterCommands commands = mock(RedisAdvancedClusterCommands.class); - FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands); - Profiles profiles = mock(Profiles.class); - UUID uuid = UUID.randomUUID(); VersionedProfile profile = new VersionedProfile("someversion", "somename", "someavatar", null, null, null, "somecommitment".getBytes()); - when(commands.hget(eq("profiles::" + uuid.toString()), eq("someversion"))).thenThrow(new RedisException("Connection lost")); + when(commands.hget(eq("profiles::" + uuid), eq("someversion"))).thenThrow(new RedisException("Connection lost")); when(profiles.get(eq(uuid), eq("someversion"))).thenReturn(Optional.of(profile)); - ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); - Optional retrieved = profilesManager.get(uuid, "someversion"); + Optional retrieved = profilesManager.get(uuid, "someversion"); assertTrue(retrieved.isPresent()); assertSame(retrieved.get(), profile); - verify(commands, times(1)).hget(eq("profiles::" + uuid.toString()), eq("someversion")); - verify(commands, times(1)).hset(eq("profiles::" + uuid.toString()), eq("someversion"), anyString()); + verify(commands, times(1)).hget(eq("profiles::" + uuid), eq("someversion")); + verify(commands, times(1)).hset(eq("profiles::" + uuid), eq("someversion"), anyString()); verifyNoMoreInteractions(commands); verify(profiles, times(1)).get(eq(uuid), eq("someversion"));