Use a "for each" strategy in profile migration methods

This commit is contained in:
Jon Chambers 2021-11-24 16:54:30 -05:00 committed by GitHub
parent 9e7010f185
commit 65b49b2d9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 97 additions and 78 deletions

View File

@ -10,14 +10,9 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import com.google.common.annotations.VisibleForTesting; import java.util.function.BiConsumer;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.result.ResultIterator;
import org.jdbi.v3.core.statement.StatementContext;
import org.whispersystems.textsecuregcm.storage.mappers.VersionedProfileMapper; import org.whispersystems.textsecuregcm.storage.mappers.VersionedProfileMapper;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Pair;
@ -107,8 +102,8 @@ public class Profiles implements ProfilesStore {
})); }));
} }
public ResultIterator<Pair<UUID, VersionedProfile>> getAll(final int fetchSize) { public void forEach(final BiConsumer<UUID, VersionedProfile> consumer, final int fetchSize) {
return database.with(jdbi -> jdbi.withHandle(handle -> handle.inTransaction(transactionHandle -> database.use(jdbi -> jdbi.useHandle(handle -> handle.useTransaction(transactionHandle ->
transactionHandle.createQuery("SELECT * FROM profiles WHERE " + DELETED + "= FALSE") transactionHandle.createQuery("SELECT * FROM profiles WHERE " + DELETED + "= FALSE")
.setFetchSize(fetchSize) .setFetchSize(fetchSize)
.map((resultSet, ctx) -> { .map((resultSet, ctx) -> {
@ -124,21 +119,14 @@ public class Profiles implements ProfilesStore {
return new Pair<>(uuid, profile); return new Pair<>(uuid, profile);
}) })
.iterator()))); .forEach(pair -> consumer.accept(pair.first(), pair.second())))));
} }
public ResultIterator<Pair<UUID, String>> getDeletedProfiles(final int fetchSize) { public void forEachDeletedProfile(final BiConsumer<UUID, String> consumer, final int fetchSize) {
return database.with(jdbi -> jdbi.withHandle(handle -> handle.inTransaction(transactionHandle -> database.use(jdbi -> jdbi.useHandle(handle -> handle.useTransaction(transactionHandle ->
transactionHandle.createQuery("SELECT " + UID + ", " + VERSION + " FROM profiles WHERE " + DELETED + " = TRUE") transactionHandle.createQuery("SELECT " + UID + ", " + VERSION + " FROM profiles WHERE " + DELETED + " = TRUE")
.setFetchSize(fetchSize) .setFetchSize(fetchSize)
.map((rs, ctx) -> new Pair<>(UUID.fromString(rs.getString(UID)), rs.getString(VERSION))) .map((rs, ctx) -> new Pair<>(UUID.fromString(rs.getString(UID)), rs.getString(VERSION)))
.iterator()))); .forEach(pair -> consumer.accept(pair.first(), pair.second())))));
}
@VisibleForTesting
void purgeDeletedProfiles() {
database.use(jdbi -> jdbi.useHandle(handle ->
handle.createUpdate("DELETE FROM profiles WHERE " + DELETED + " = TRUE")
.execute()));
} }
} }

View File

@ -17,6 +17,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -278,7 +279,10 @@ public class ProfilesDynamoDb implements ProfilesStore {
if (cause == null) { if (cause == null) {
return true; return true;
} else { } else {
if (!(cause instanceof ConditionalCheckFailedException)) { final boolean isConditionalCheckFailure = cause instanceof ConditionalCheckFailedException ||
(cause instanceof CompletionException && cause.getCause() instanceof ConditionalCheckFailedException);
if (!isConditionalCheckFailure) {
log.warn("Unexpected error migrating profiles {}/{}", uuid, profile.getVersion(), cause); log.warn("Unexpected error migrating profiles {}/{}", uuid, profile.getVersion(), cause);
} }

View File

@ -9,22 +9,18 @@ import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand; import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.jdbi3.JdbiFactory; import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment; import io.dropwizard.setup.Environment;
import java.util.UUID;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser; import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.result.ResultIterator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.Profiles; import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb; import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb;
import org.whispersystems.textsecuregcm.storage.VersionedProfile;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import org.whispersystems.textsecuregcm.util.Pair;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@ -81,60 +77,72 @@ public class MigrateProfilesCommand extends EnvironmentCommand<WhisperServerConf
configuration.getDynamoDbTables().getProfiles().getTableName()); configuration.getDynamoDbTables().getProfiles().getTableName());
final int fetchSize = namespace.getInt("fetchSize"); final int fetchSize = namespace.getInt("fetchSize");
final Semaphore semaphore = new Semaphore(namespace.getInt("concurrency")); final int concurrency = namespace.getInt("concurrency");
final Semaphore semaphore = new Semaphore(concurrency);
log.info("Beginning migration"); log.info("Beginning migration");
try (final ResultIterator<Pair<UUID, VersionedProfile>> results = profiles.getAll(fetchSize)) { final AtomicInteger profilesProcessed = new AtomicInteger(0);
final AtomicInteger profilesProcessed = new AtomicInteger(0); final AtomicInteger profilesMigrated = new AtomicInteger(0);
final AtomicInteger profilesMigrated = new AtomicInteger(0);
while (results.hasNext()) { profiles.forEach((uuid, profile) -> {
try {
semaphore.acquire(); semaphore.acquire();
} catch (InterruptedException e) {
final Pair<UUID, VersionedProfile> uuidAndProfile = results.next(); log.warn("Interrupted while waiting to acquire permit");
profilesDynamoDb.migrate(uuidAndProfile.first(), uuidAndProfile.second()) throw new RuntimeException(e);
.whenComplete((migrated, cause) -> {
semaphore.release();
final int processed = profilesProcessed.incrementAndGet();
if (cause == null) {
if (migrated) {
profilesMigrated.incrementAndGet();
}
}
if (processed % 10_000 == 0) {
log.info("Processed {} profiles ({} migrated)", processed, profilesMigrated.get());
}
});
} }
log.info("Migration completed; processed {} profiles and migrated {}", profilesProcessed.get(), profilesMigrated.get()); profilesDynamoDb.migrate(uuid, profile)
} .whenComplete((migrated, cause) -> {
semaphore.release();
final int processed = profilesProcessed.incrementAndGet();
if (cause == null) {
if (migrated) {
profilesMigrated.incrementAndGet();
}
}
if (processed % 10_000 == 0) {
log.info("Processed {} profiles ({} migrated)", processed, profilesMigrated.get());
}
});
}, fetchSize);
// Wait for all outstanding operations to complete
semaphore.acquire(concurrency);
semaphore.release(concurrency);
log.info("Migration completed; processed {} profiles and migrated {}", profilesProcessed.get(), profilesMigrated.get());
log.info("Removing profiles that were deleted during migration"); log.info("Removing profiles that were deleted during migration");
final AtomicInteger profilesDeleted = new AtomicInteger(0);
try (final ResultIterator<Pair<UUID, String>> results = profiles.getDeletedProfiles(fetchSize)) { profiles.forEachDeletedProfile((uuid, version) -> {
final AtomicInteger profilesDeleted = new AtomicInteger(0); try {
while (results.hasNext()) {
semaphore.acquire(); semaphore.acquire();
} catch (InterruptedException e) {
final Pair<UUID, String> uuidAndVersion = results.next(); log.warn("Interrupted while waiting to acquire permit");
throw new RuntimeException(e);
profilesDynamoDb.delete(uuidAndVersion.first(), uuidAndVersion.second())
.whenComplete((response, cause) -> {
semaphore.release();
if (profilesDeleted.incrementAndGet() % 1_000 == 0) {
log.info("Attempted to remove {} profiles", profilesDeleted.get());
}
});
} }
log.info("Removal of deleted profiles complete; attempted to remove {} profiles", profilesDeleted.get()); profilesDynamoDb.delete(uuid, version)
} .whenComplete((response, cause) -> {
semaphore.release();
if (profilesDeleted.incrementAndGet() % 1_000 == 0) {
log.info("Attempted to remove {} profiles", profilesDeleted.get());
}
});
}, fetchSize);
// Wait for all outstanding operations to complete
semaphore.acquire(concurrency);
semaphore.release(concurrency);
log.info("Removal of deleted profiles complete; attempted to remove {} profiles", profilesDeleted.get());
} }
} }

View File

@ -5,21 +5,22 @@
package org.whispersystems.textsecuregcm.storage; package org.whispersystems.textsecuregcm.storage;
import com.google.common.collect.ImmutableList; import static org.junit.jupiter.api.Assertions.assertEquals;
import com.opentable.db.postgres.embedded.LiquibasePreparer; import com.opentable.db.postgres.embedded.LiquibasePreparer;
import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension; import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension;
import com.opentable.db.postgres.junit5.PreparedDbExtension; import com.opentable.db.postgres.junit5.PreparedDbExtension;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.result.ResultIterator;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Pair;
import java.util.List;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ProfilesPostgresTest extends ProfilesTest { public class ProfilesPostgresTest extends ProfilesTest {
@ -36,6 +37,8 @@ public class ProfilesPostgresTest extends ProfilesTest {
new CircuitBreakerConfiguration()); new CircuitBreakerConfiguration());
profiles = new Profiles(faultTolerantDatabase); profiles = new Profiles(faultTolerantDatabase);
faultTolerantDatabase.use(jdbi -> jdbi.useHandle(handle -> handle.createUpdate("DELETE FROM profiles").execute()));
} }
@Override @Override
@ -44,9 +47,24 @@ public class ProfilesPostgresTest extends ProfilesTest {
} }
@Test @Test
void testGetDeletedProfiles() { void testForEach() {
profiles.purgeDeletedProfiles(); UUID uuid = UUID.randomUUID();
VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null,
null, "aDigest".getBytes());
VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", null, null, null, "boof".getBytes());
profiles.set(uuid, profileOne);
profiles.set(uuid, profileTwo);
final Set<Pair<UUID, VersionedProfile>> retrievedProfiles = new HashSet<>();
profiles.forEach((u, profile) -> retrievedProfiles.add(new Pair<>(u, profile)), 1);
assertEquals(Set.of(new Pair<>(uuid, profileOne), new Pair<>(uuid, profileTwo)), retrievedProfiles);
}
@Test
void testForEachDeletedProfiles() {
UUID uuid = UUID.randomUUID(); UUID uuid = UUID.randomUUID();
VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null, VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null,
null, "aDigest".getBytes()); null, "aDigest".getBytes());
@ -54,11 +72,12 @@ public class ProfilesPostgresTest extends ProfilesTest {
profiles.set(uuid, profileOne); profiles.set(uuid, profileOne);
profiles.set(UUID.randomUUID(), profileTwo); profiles.set(UUID.randomUUID(), profileTwo);
profiles.deleteAll(uuid); profiles.deleteAll(uuid);
try (final ResultIterator<Pair<UUID, String>> resultIterator = profiles.getDeletedProfiles(10)) { final List<Pair<UUID, String>> deletedProfiles = new ArrayList<>();
assertEquals(List.of(new Pair<>(uuid, profileOne.getVersion())), ImmutableList.copyOf(resultIterator));
} profiles.forEachDeletedProfile((u, version) -> deletedProfiles.add(new Pair<>(u, version)), 2);
assertEquals(List.of(new Pair<>(uuid, profileOne.getVersion())), deletedProfiles);
} }
} }