diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 2bce4bf89..766f04d2c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -216,7 +216,6 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.workers.CertificateCommand; import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; -import org.whispersystems.textsecuregcm.workers.MigrateProfilesCommand; import org.whispersystems.textsecuregcm.workers.ReserveUsernameCommand; import org.whispersystems.textsecuregcm.workers.ServerVersionCommand; import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask; @@ -244,7 +243,6 @@ public class WhisperServerService extends Application("accountdb", "accountsdb.xml") { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java index 3551efcd9..37f8c0e50 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java @@ -12,10 +12,8 @@ import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import java.util.Optional; import java.util.UUID; -import java.util.function.BiConsumer; import org.whispersystems.textsecuregcm.storage.mappers.VersionedProfileMapper; import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.Pair; public class Profiles implements ProfilesStore { @@ -103,32 +101,4 @@ public class Profiles implements ProfilesStore { } })); } - - public void forEach(final BiConsumer consumer, final int fetchSize) { - database.use(jdbi -> jdbi.useHandle(handle -> handle.useTransaction(transactionHandle -> - transactionHandle.createQuery("SELECT * FROM profiles WHERE " + DELETED + "= FALSE") - .setFetchSize(fetchSize) - .map((resultSet, ctx) -> { - final UUID uuid = UUID.fromString(resultSet.getString(UID)); - final VersionedProfile profile = new VersionedProfile( - resultSet.getString(Profiles.VERSION), - resultSet.getString(Profiles.NAME), - resultSet.getString(Profiles.AVATAR), - resultSet.getString(Profiles.ABOUT_EMOJI), - resultSet.getString(Profiles.ABOUT), - resultSet.getString(Profiles.PAYMENT_ADDRESS), - resultSet.getBytes(Profiles.COMMITMENT)); - - return new Pair<>(uuid, profile); - }) - .forEach(pair -> consumer.accept(pair.first(), pair.second()))))); - } - - public void forEachDeletedProfile(final BiConsumer consumer, final int fetchSize) { - database.use(jdbi -> jdbi.useHandle(handle -> handle.useTransaction(transactionHandle -> - transactionHandle.createQuery("SELECT " + UID + ", " + VERSION + " FROM profiles WHERE " + DELETED + " = TRUE") - .setFetchSize(fetchSize) - .map((rs, ctx) -> new Pair<>(UUID.fromString(rs.getString(UID)), rs.getString(VERSION))) - .forEach(pair -> consumer.accept(pair.first(), pair.second()))))); - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java index d138e65f8..ddf005633 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java @@ -17,20 +17,15 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.util.AttributeValues; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; -import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; -import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.QueryRequest; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import software.amazon.awssdk.services.dynamodb.paginators.QueryIterable; @@ -79,8 +74,6 @@ public class ProfilesDynamoDb implements ProfilesStore { private static final Timer GET_PROFILE_TIMER = Metrics.timer(name(ProfilesDynamoDb.class, "get")); private static final Timer DELETE_PROFILES_TIMER = Metrics.timer(name(ProfilesDynamoDb.class, "delete")); - private static final Logger log = LoggerFactory.getLogger(ProfilesDynamoDb.class); - public ProfilesDynamoDb(final DynamoDbClient dynamoDbClient, final DynamoDbAsyncClient dynamoDbAsyncClient, final String tableName) { @@ -242,59 +235,4 @@ public class ProfilesDynamoDb implements ProfilesStore { .toArray(CompletableFuture[]::new)).join(); }); } - - public CompletableFuture migrate(final UUID uuid, final VersionedProfile profile) { - final Map item = new HashMap<>(); - item.put(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid)); - item.put(ATTR_VERSION, AttributeValues.fromString(profile.getVersion())); - item.put(ATTR_COMMITMENT, AttributeValues.fromByteArray(profile.getCommitment())); - - if (profile.getName() != null) { - item.put(ATTR_NAME, AttributeValues.fromString(profile.getName())); - } - - if (profile.getAvatar() != null) { - item.put(ATTR_AVATAR, AttributeValues.fromString(profile.getAvatar())); - } - - if (profile.getAboutEmoji() != null) { - item.put(ATTR_EMOJI, AttributeValues.fromString(profile.getAboutEmoji())); - } - - if (profile.getAbout() != null) { - item.put(ATTR_ABOUT, AttributeValues.fromString(profile.getAbout())); - } - - if (profile.getPaymentAddress() != null) { - item.put(ATTR_PAYMENT_ADDRESS, AttributeValues.fromString(profile.getPaymentAddress())); - } - - return dynamoDbAsyncClient.putItem(PutItemRequest.builder() - .tableName(tableName) - .item(item) - .conditionExpression("attribute_not_exists(#uuid)") - .expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID)) - .build()) - .handle((response, cause) -> { - if (cause == null) { - return true; - } else { - final boolean isConditionalCheckFailure = cause instanceof ConditionalCheckFailedException || - (cause instanceof CompletionException && cause.getCause() instanceof ConditionalCheckFailedException); - - if (!isConditionalCheckFailure) { - log.warn("Unexpected error migrating profiles {}/{}", uuid, profile.getVersion(), cause); - } - - return false; - } - }); - } - - public CompletableFuture delete(final UUID uuid, final String version) { - return dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder() - .tableName(tableName) - .key(buildPrimaryKey(uuid, version)) - .build()); - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateProfilesCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateProfilesCommand.java deleted file mode 100644 index 7a644bdc6..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateProfilesCommand.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright 2013-2021 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.workers; - -import io.dropwizard.Application; -import io.dropwizard.cli.EnvironmentCommand; -import io.dropwizard.jdbi3.JdbiFactory; -import io.dropwizard.setup.Environment; -import java.io.FileReader; -import java.io.IOException; -import java.util.UUID; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import net.sourceforge.argparse4j.inf.Namespace; -import net.sourceforge.argparse4j.inf.Subparser; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVRecord; -import org.apache.commons.lang3.StringUtils; -import org.jdbi.v3.core.Jdbi; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.WhisperServerConfiguration; -import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; -import org.whispersystems.textsecuregcm.storage.Profiles; -import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb; -import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.awssdk.services.dynamodb.DynamoDbClient; - -public class MigrateProfilesCommand extends EnvironmentCommand { - - private static final Logger log = LoggerFactory.getLogger(MigrateProfilesCommand.class); - - public MigrateProfilesCommand() { - super(new Application<>() { - @Override - public void run(WhisperServerConfiguration configuration, Environment environment) { - } - }, "migrate-profiles", "Migrate versioned profiles from Postgres to DynamoDB"); - } - - @Override - public void configure(Subparser subparser) { - super.configure(subparser); - - subparser.addArgument("-s", "--fetch-size") - .dest("fetchSize") - .type(Integer.class) - .required(false) - .setDefault(512) - .help("The number of profiles to fetch from Postgres at once"); - - subparser.addArgument("-c", "--concurrency") - .dest("concurrency") - .type(Integer.class) - .required(false) - .setDefault(64) - .help("The maximum number of concurrent DynamoDB requests"); - - subparser.addArgument("--csv-file") - .dest("csvFile") - .type(String.class) - .required(false) - .help("A CSV containing UUID/version pairs to migrate; if not specified, all profiles are migrated"); - } - - @Override - protected void run(final Environment environment, final Namespace namespace, - final WhisperServerConfiguration configuration) throws Exception { - - JdbiFactory jdbiFactory = new JdbiFactory(); - Jdbi accountJdbi = jdbiFactory.build(environment, configuration.getAccountsDatabaseConfiguration(), "accountdb"); - FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, - configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration()); - - DynamoDbClient dynamoDbClient = DynamoDbFromConfig.client( - configuration.getDynamoDbClientConfiguration(), - software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); - - DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbFromConfig.asyncClient( - configuration.getDynamoDbClientConfiguration(), - software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create()); - - Profiles profiles = new Profiles(accountDatabase); - ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient, - configuration.getDynamoDbTables().getProfiles().getTableName()); - - final String csvFile = namespace.getString("csvFile"); - - if (StringUtils.isNotBlank(csvFile)) { - migrateFromCsvFile(profiles, profilesDynamoDb, csvFile); - } else { - final int fetchSize = namespace.getInt("fetchSize"); - final int concurrency = namespace.getInt("concurrency"); - - migrateAll(profiles, profilesDynamoDb, concurrency, fetchSize); - } - } - - private void migrateFromCsvFile(final Profiles profiles, final ProfilesDynamoDb profilesDynamoDb, final String csvFile) - throws IOException { - log.info("Beginning migration of profiles specified in {}", csvFile); - - try (final FileReader fileReader = new FileReader(csvFile)) { - for (final CSVRecord csvRecord : CSVFormat.DEFAULT.parse(fileReader)) { - final UUID uuid = UUID.fromString(csvRecord.get(0)); - final String version = csvRecord.get(1); - - profiles.get(uuid, version).ifPresent(profile -> profilesDynamoDb.set(uuid, profile)); - log.info("Migrated {}/{}", uuid, version); - } - } - - log.info("Done"); - } - - private void migrateAll(final Profiles profiles, final ProfilesDynamoDb profilesDynamoDb, final int concurrency, final int fetchSize) - throws InterruptedException { - final Semaphore semaphore = new Semaphore(concurrency); - - log.info("Beginning migration of all profiles"); - - final AtomicInteger profilesProcessed = new AtomicInteger(0); - final AtomicInteger profilesMigrated = new AtomicInteger(0); - - profiles.forEach((uuid, profile) -> { - try { - semaphore.acquire(); - } catch (InterruptedException e) { - log.warn("Interrupted while waiting to acquire permit"); - throw new RuntimeException(e); - } - - profilesDynamoDb.migrate(uuid, profile) - .whenComplete((migrated, cause) -> { - semaphore.release(); - - final int processed = profilesProcessed.incrementAndGet(); - - if (cause == null) { - if (migrated) { - profilesMigrated.incrementAndGet(); - } - } - - if (processed % 10_000 == 0) { - log.info("Processed {} profiles ({} migrated)", processed, profilesMigrated.get()); - } - }); - }, fetchSize); - - // Wait for all outstanding operations to complete - semaphore.acquire(concurrency); - semaphore.release(concurrency); - - log.info("Migration completed; processed {} profiles and migrated {}", profilesProcessed.get(), profilesMigrated.get()); - - log.info("Removing profiles that were deleted during migration"); - final AtomicInteger profilesDeleted = new AtomicInteger(0); - - profiles.forEachDeletedProfile((uuid, version) -> { - try { - semaphore.acquire(); - } catch (InterruptedException e) { - log.warn("Interrupted while waiting to acquire permit"); - throw new RuntimeException(e); - } - - profilesDynamoDb.delete(uuid, version) - .whenComplete((response, cause) -> { - semaphore.release(); - - if (profilesDeleted.incrementAndGet() % 1_000 == 0) { - log.info("Attempted to remove {} profiles", profilesDeleted.get()); - } - }); - }, fetchSize); - - // Wait for all outstanding operations to complete - semaphore.acquire(concurrency); - semaphore.release(concurrency); - - log.info("Removal of deleted profiles complete; attempted to remove {} profiles", profilesDeleted.get()); - } -} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDbTest.java index c9144eac8..f6cb2c318 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDbTest.java @@ -5,18 +5,12 @@ package org.whispersystems.textsecuregcm.storage; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.Optional; -import java.util.UUID; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -150,44 +144,4 @@ class ProfilesDynamoDbTest extends ProfilesTest { Map.of(":commitment", AttributeValues.fromByteArray(commitment))) ); } - - @ParameterizedTest - @MethodSource - void migrate(final VersionedProfile profile) { - final UUID uuid = UUID.randomUUID(); - - assertTrue(assertDoesNotThrow(() -> profiles.migrate(uuid, profile).join())); - assertFalse(assertDoesNotThrow(() -> profiles.migrate(uuid, profile).join())); - - assertEquals(Optional.of(profile), profiles.get(uuid, profile.getVersion())); - } - - private static Stream migrate() { - return Stream.of( - Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))), - Arguments.of(new VersionedProfile("version", null, "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))), - Arguments.of(new VersionedProfile("version", "name", null, "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))), - Arguments.of(new VersionedProfile("version", "name", "avatar", null, "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))), - Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", null, "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))), - Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", "about", null, "commitment".getBytes(StandardCharsets.UTF_8))) - ); - } - - @Test - void delete() { - final UUID uuid = UUID.randomUUID(); - final VersionedProfile firstProfile = - new VersionedProfile("version1", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8)); - - final VersionedProfile secondProfile = - new VersionedProfile("version2", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8)); - - profiles.set(uuid, firstProfile); - profiles.set(uuid, secondProfile); - - profiles.delete(uuid, firstProfile.getVersion()).join(); - - assertTrue(profiles.get(uuid, firstProfile.getVersion()).isEmpty()); - assertTrue(profiles.get(uuid, secondProfile.getVersion()).isPresent()); - } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java index cda5db705..e980a0b43 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java @@ -5,22 +5,13 @@ package org.whispersystems.textsecuregcm.storage; -import static org.junit.jupiter.api.Assertions.assertEquals; - import com.opentable.db.postgres.embedded.LiquibasePreparer; import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension; import com.opentable.db.postgres.junit5.PreparedDbExtension; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.util.Pair; public class ProfilesPostgresTest extends ProfilesTest { @@ -45,39 +36,4 @@ public class ProfilesPostgresTest extends ProfilesTest { protected ProfilesStore getProfilesStore() { return profiles; } - - @Test - void testForEach() { - UUID uuid = UUID.randomUUID(); - VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null, - null, "aDigest".getBytes()); - VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", null, null, null, "boof".getBytes()); - - profiles.set(uuid, profileOne); - profiles.set(uuid, profileTwo); - - final Set> retrievedProfiles = new HashSet<>(); - - profiles.forEach((u, profile) -> retrievedProfiles.add(new Pair<>(u, profile)), 1); - - assertEquals(Set.of(new Pair<>(uuid, profileOne), new Pair<>(uuid, profileTwo)), retrievedProfiles); - } - - @Test - void testForEachDeletedProfiles() { - UUID uuid = UUID.randomUUID(); - VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null, - null, "aDigest".getBytes()); - VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", null, null, null, "boof".getBytes()); - - profiles.set(uuid, profileOne); - profiles.set(UUID.randomUUID(), profileTwo); - profiles.deleteAll(uuid); - - final List> deletedProfiles = new ArrayList<>(); - - profiles.forEachDeletedProfile((u, version) -> deletedProfiles.add(new Pair<>(u, version)), 2); - - assertEquals(List.of(new Pair<>(uuid, profileOne.getVersion())), deletedProfiles); - } }