Drop profile migration tools

This commit is contained in:
Jon Chambers 2021-11-24 16:56:38 -05:00 committed by Jon Chambers
parent d94e86781f
commit 6aceb24fd2
6 changed files with 0 additions and 372 deletions

View File

@ -216,7 +216,6 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.MigrateProfilesCommand;
import org.whispersystems.textsecuregcm.workers.ReserveUsernameCommand;
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask;
@ -244,7 +243,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new ServerVersionCommand());
bootstrap.addCommand(new CheckDynamicConfigurationCommand());
bootstrap.addCommand(new SetUserDiscoverabilityCommand());
bootstrap.addCommand(new MigrateProfilesCommand());
bootstrap.addCommand(new ReserveUsernameCommand());
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("accountdb", "accountsdb.xml") {

View File

@ -12,10 +12,8 @@ import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiConsumer;
import org.whispersystems.textsecuregcm.storage.mappers.VersionedProfileMapper;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
public class Profiles implements ProfilesStore {
@ -103,32 +101,4 @@ public class Profiles implements ProfilesStore {
}
}));
}
public void forEach(final BiConsumer<UUID, VersionedProfile> consumer, final int fetchSize) {
database.use(jdbi -> jdbi.useHandle(handle -> handle.useTransaction(transactionHandle ->
transactionHandle.createQuery("SELECT * FROM profiles WHERE " + DELETED + "= FALSE")
.setFetchSize(fetchSize)
.map((resultSet, ctx) -> {
final UUID uuid = UUID.fromString(resultSet.getString(UID));
final VersionedProfile profile = new VersionedProfile(
resultSet.getString(Profiles.VERSION),
resultSet.getString(Profiles.NAME),
resultSet.getString(Profiles.AVATAR),
resultSet.getString(Profiles.ABOUT_EMOJI),
resultSet.getString(Profiles.ABOUT),
resultSet.getString(Profiles.PAYMENT_ADDRESS),
resultSet.getBytes(Profiles.COMMITMENT));
return new Pair<>(uuid, profile);
})
.forEach(pair -> consumer.accept(pair.first(), pair.second())))));
}
public void forEachDeletedProfile(final BiConsumer<UUID, String> consumer, final int fetchSize) {
database.use(jdbi -> jdbi.useHandle(handle -> handle.useTransaction(transactionHandle ->
transactionHandle.createQuery("SELECT " + UID + ", " + VERSION + " FROM profiles WHERE " + DELETED + " = TRUE")
.setFetchSize(fetchSize)
.map((rs, ctx) -> new Pair<>(UUID.fromString(rs.getString(UID)), rs.getString(VERSION)))
.forEach(pair -> consumer.accept(pair.first(), pair.second())))));
}
}

View File

@ -17,20 +17,15 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.paginators.QueryIterable;
@ -79,8 +74,6 @@ public class ProfilesDynamoDb implements ProfilesStore {
private static final Timer GET_PROFILE_TIMER = Metrics.timer(name(ProfilesDynamoDb.class, "get"));
private static final Timer DELETE_PROFILES_TIMER = Metrics.timer(name(ProfilesDynamoDb.class, "delete"));
private static final Logger log = LoggerFactory.getLogger(ProfilesDynamoDb.class);
public ProfilesDynamoDb(final DynamoDbClient dynamoDbClient,
final DynamoDbAsyncClient dynamoDbAsyncClient,
final String tableName) {
@ -242,59 +235,4 @@ public class ProfilesDynamoDb implements ProfilesStore {
.toArray(CompletableFuture[]::new)).join();
});
}
public CompletableFuture<Boolean> migrate(final UUID uuid, final VersionedProfile profile) {
final Map<String, AttributeValue> item = new HashMap<>();
item.put(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid));
item.put(ATTR_VERSION, AttributeValues.fromString(profile.getVersion()));
item.put(ATTR_COMMITMENT, AttributeValues.fromByteArray(profile.getCommitment()));
if (profile.getName() != null) {
item.put(ATTR_NAME, AttributeValues.fromString(profile.getName()));
}
if (profile.getAvatar() != null) {
item.put(ATTR_AVATAR, AttributeValues.fromString(profile.getAvatar()));
}
if (profile.getAboutEmoji() != null) {
item.put(ATTR_EMOJI, AttributeValues.fromString(profile.getAboutEmoji()));
}
if (profile.getAbout() != null) {
item.put(ATTR_ABOUT, AttributeValues.fromString(profile.getAbout()));
}
if (profile.getPaymentAddress() != null) {
item.put(ATTR_PAYMENT_ADDRESS, AttributeValues.fromString(profile.getPaymentAddress()));
}
return dynamoDbAsyncClient.putItem(PutItemRequest.builder()
.tableName(tableName)
.item(item)
.conditionExpression("attribute_not_exists(#uuid)")
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID))
.build())
.handle((response, cause) -> {
if (cause == null) {
return true;
} else {
final boolean isConditionalCheckFailure = cause instanceof ConditionalCheckFailedException ||
(cause instanceof CompletionException && cause.getCause() instanceof ConditionalCheckFailedException);
if (!isConditionalCheckFailure) {
log.warn("Unexpected error migrating profiles {}/{}", uuid, profile.getVersion(), cause);
}
return false;
}
});
}
public CompletableFuture<?> delete(final UUID uuid, final String version) {
return dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(buildPrimaryKey(uuid, version))
.build());
}
}

View File

@ -1,188 +0,0 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment;
import java.io.FileReader;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
public class MigrateProfilesCommand extends EnvironmentCommand<WhisperServerConfiguration> {
private static final Logger log = LoggerFactory.getLogger(MigrateProfilesCommand.class);
public MigrateProfilesCommand() {
super(new Application<>() {
@Override
public void run(WhisperServerConfiguration configuration, Environment environment) {
}
}, "migrate-profiles", "Migrate versioned profiles from Postgres to DynamoDB");
}
@Override
public void configure(Subparser subparser) {
super.configure(subparser);
subparser.addArgument("-s", "--fetch-size")
.dest("fetchSize")
.type(Integer.class)
.required(false)
.setDefault(512)
.help("The number of profiles to fetch from Postgres at once");
subparser.addArgument("-c", "--concurrency")
.dest("concurrency")
.type(Integer.class)
.required(false)
.setDefault(64)
.help("The maximum number of concurrent DynamoDB requests");
subparser.addArgument("--csv-file")
.dest("csvFile")
.type(String.class)
.required(false)
.help("A CSV containing UUID/version pairs to migrate; if not specified, all profiles are migrated");
}
@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception {
JdbiFactory jdbiFactory = new JdbiFactory();
Jdbi accountJdbi = jdbiFactory.build(environment, configuration.getAccountsDatabaseConfiguration(), "accountdb");
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi,
configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
DynamoDbClient dynamoDbClient = DynamoDbFromConfig.client(
configuration.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(
configuration.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
Profiles profiles = new Profiles(accountDatabase);
ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getProfiles().getTableName());
final String csvFile = namespace.getString("csvFile");
if (StringUtils.isNotBlank(csvFile)) {
migrateFromCsvFile(profiles, profilesDynamoDb, csvFile);
} else {
final int fetchSize = namespace.getInt("fetchSize");
final int concurrency = namespace.getInt("concurrency");
migrateAll(profiles, profilesDynamoDb, concurrency, fetchSize);
}
}
private void migrateFromCsvFile(final Profiles profiles, final ProfilesDynamoDb profilesDynamoDb, final String csvFile)
throws IOException {
log.info("Beginning migration of profiles specified in {}", csvFile);
try (final FileReader fileReader = new FileReader(csvFile)) {
for (final CSVRecord csvRecord : CSVFormat.DEFAULT.parse(fileReader)) {
final UUID uuid = UUID.fromString(csvRecord.get(0));
final String version = csvRecord.get(1);
profiles.get(uuid, version).ifPresent(profile -> profilesDynamoDb.set(uuid, profile));
log.info("Migrated {}/{}", uuid, version);
}
}
log.info("Done");
}
private void migrateAll(final Profiles profiles, final ProfilesDynamoDb profilesDynamoDb, final int concurrency, final int fetchSize)
throws InterruptedException {
final Semaphore semaphore = new Semaphore(concurrency);
log.info("Beginning migration of all profiles");
final AtomicInteger profilesProcessed = new AtomicInteger(0);
final AtomicInteger profilesMigrated = new AtomicInteger(0);
profiles.forEach((uuid, profile) -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
log.warn("Interrupted while waiting to acquire permit");
throw new RuntimeException(e);
}
profilesDynamoDb.migrate(uuid, profile)
.whenComplete((migrated, cause) -> {
semaphore.release();
final int processed = profilesProcessed.incrementAndGet();
if (cause == null) {
if (migrated) {
profilesMigrated.incrementAndGet();
}
}
if (processed % 10_000 == 0) {
log.info("Processed {} profiles ({} migrated)", processed, profilesMigrated.get());
}
});
}, fetchSize);
// Wait for all outstanding operations to complete
semaphore.acquire(concurrency);
semaphore.release(concurrency);
log.info("Migration completed; processed {} profiles and migrated {}", profilesProcessed.get(), profilesMigrated.get());
log.info("Removing profiles that were deleted during migration");
final AtomicInteger profilesDeleted = new AtomicInteger(0);
profiles.forEachDeletedProfile((uuid, version) -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
log.warn("Interrupted while waiting to acquire permit");
throw new RuntimeException(e);
}
profilesDynamoDb.delete(uuid, version)
.whenComplete((response, cause) -> {
semaphore.release();
if (profilesDeleted.incrementAndGet() % 1_000 == 0) {
log.info("Attempted to remove {} profiles", profilesDeleted.get());
}
});
}, fetchSize);
// Wait for all outstanding operations to complete
semaphore.acquire(concurrency);
semaphore.release(concurrency);
log.info("Removal of deleted profiles complete; attempted to remove {} profiles", profilesDeleted.get());
}
}

View File

@ -5,18 +5,12 @@
package org.whispersystems.textsecuregcm.storage;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@ -150,44 +144,4 @@ class ProfilesDynamoDbTest extends ProfilesTest {
Map.of(":commitment", AttributeValues.fromByteArray(commitment)))
);
}
@ParameterizedTest
@MethodSource
void migrate(final VersionedProfile profile) {
final UUID uuid = UUID.randomUUID();
assertTrue(assertDoesNotThrow(() -> profiles.migrate(uuid, profile).join()));
assertFalse(assertDoesNotThrow(() -> profiles.migrate(uuid, profile).join()));
assertEquals(Optional.of(profile), profiles.get(uuid, profile.getVersion()));
}
private static Stream<Arguments> migrate() {
return Stream.of(
Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))),
Arguments.of(new VersionedProfile("version", null, "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))),
Arguments.of(new VersionedProfile("version", "name", null, "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))),
Arguments.of(new VersionedProfile("version", "name", "avatar", null, "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))),
Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", null, "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8))),
Arguments.of(new VersionedProfile("version", "name", "avatar", "emoji", "about", null, "commitment".getBytes(StandardCharsets.UTF_8)))
);
}
@Test
void delete() {
final UUID uuid = UUID.randomUUID();
final VersionedProfile firstProfile =
new VersionedProfile("version1", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8));
final VersionedProfile secondProfile =
new VersionedProfile("version2", "name", "avatar", "emoji", "about", "paymentAddress", "commitment".getBytes(StandardCharsets.UTF_8));
profiles.set(uuid, firstProfile);
profiles.set(uuid, secondProfile);
profiles.delete(uuid, firstProfile.getVersion()).join();
assertTrue(profiles.get(uuid, firstProfile.getVersion()).isEmpty());
assertTrue(profiles.get(uuid, secondProfile.getVersion()).isPresent());
}
}

View File

@ -5,22 +5,13 @@
package org.whispersystems.textsecuregcm.storage;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.opentable.db.postgres.embedded.LiquibasePreparer;
import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension;
import com.opentable.db.postgres.junit5.PreparedDbExtension;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.util.Pair;
public class ProfilesPostgresTest extends ProfilesTest {
@ -45,39 +36,4 @@ public class ProfilesPostgresTest extends ProfilesTest {
protected ProfilesStore getProfilesStore() {
return profiles;
}
@Test
void testForEach() {
UUID uuid = UUID.randomUUID();
VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null,
null, "aDigest".getBytes());
VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", null, null, null, "boof".getBytes());
profiles.set(uuid, profileOne);
profiles.set(uuid, profileTwo);
final Set<Pair<UUID, VersionedProfile>> retrievedProfiles = new HashSet<>();
profiles.forEach((u, profile) -> retrievedProfiles.add(new Pair<>(u, profile)), 1);
assertEquals(Set.of(new Pair<>(uuid, profileOne), new Pair<>(uuid, profileTwo)), retrievedProfiles);
}
@Test
void testForEachDeletedProfiles() {
UUID uuid = UUID.randomUUID();
VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null,
null, "aDigest".getBytes());
VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", null, null, null, "boof".getBytes());
profiles.set(uuid, profileOne);
profiles.set(UUID.randomUUID(), profileTwo);
profiles.deleteAll(uuid);
final List<Pair<UUID, String>> deletedProfiles = new ArrayList<>();
profiles.forEachDeletedProfile((u, version) -> deletedProfiles.add(new Pair<>(u, version)), 2);
assertEquals(List.of(new Pair<>(uuid, profileOne.getVersion())), deletedProfiles);
}
}