diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java index b94685ad1..025f4519e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Profiles.java @@ -10,14 +10,9 @@ import static com.codahale.metrics.MetricRegistry.name; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.Optional; import java.util.UUID; -import com.google.common.annotations.VisibleForTesting; -import org.jdbi.v3.core.mapper.RowMapper; -import org.jdbi.v3.core.result.ResultIterator; -import org.jdbi.v3.core.statement.StatementContext; +import java.util.function.BiConsumer; import org.whispersystems.textsecuregcm.storage.mappers.VersionedProfileMapper; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; @@ -107,8 +102,8 @@ public class Profiles implements ProfilesStore { })); } - public ResultIterator> getAll(final int fetchSize) { - return database.with(jdbi -> jdbi.withHandle(handle -> handle.inTransaction(transactionHandle -> + public void forEach(final BiConsumer consumer, final int fetchSize) { + database.use(jdbi -> jdbi.useHandle(handle -> handle.useTransaction(transactionHandle -> transactionHandle.createQuery("SELECT * FROM profiles WHERE " + DELETED + "= FALSE") .setFetchSize(fetchSize) .map((resultSet, ctx) -> { @@ -124,21 +119,14 @@ public class Profiles implements ProfilesStore { return new Pair<>(uuid, profile); }) - .iterator()))); + .forEach(pair -> consumer.accept(pair.first(), pair.second()))))); } - public ResultIterator> getDeletedProfiles(final int fetchSize) { - return database.with(jdbi -> jdbi.withHandle(handle -> handle.inTransaction(transactionHandle -> + public void forEachDeletedProfile(final BiConsumer consumer, final int fetchSize) { + database.use(jdbi -> jdbi.useHandle(handle -> handle.useTransaction(transactionHandle -> transactionHandle.createQuery("SELECT " + UID + ", " + VERSION + " FROM profiles WHERE " + DELETED + " = TRUE") .setFetchSize(fetchSize) .map((rs, ctx) -> new Pair<>(UUID.fromString(rs.getString(UID)), rs.getString(VERSION))) - .iterator()))); - } - - @VisibleForTesting - void purgeDeletedProfiles() { - database.use(jdbi -> jdbi.useHandle(handle -> - handle.createUpdate("DELETE FROM profiles WHERE " + DELETED + " = TRUE") - .execute())); + .forEach(pair -> consumer.accept(pair.first(), pair.second()))))); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java index bb613b952..d138e65f8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ProfilesDynamoDb.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -278,7 +279,10 @@ public class ProfilesDynamoDb implements ProfilesStore { if (cause == null) { return true; } else { - if (!(cause instanceof ConditionalCheckFailedException)) { + final boolean isConditionalCheckFailure = cause instanceof ConditionalCheckFailedException || + (cause instanceof CompletionException && cause.getCause() instanceof ConditionalCheckFailedException); + + if (!isConditionalCheckFailure) { log.warn("Unexpected error migrating profiles {}/{}", uuid, profile.getVersion(), cause); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateProfilesCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateProfilesCommand.java index 796a7ae3d..40fe47c7a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateProfilesCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MigrateProfilesCommand.java @@ -9,22 +9,18 @@ import io.dropwizard.Application; import io.dropwizard.cli.EnvironmentCommand; import io.dropwizard.jdbi3.JdbiFactory; import io.dropwizard.setup.Environment; -import java.util.UUID; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; import org.jdbi.v3.core.Jdbi; -import org.jdbi.v3.core.result.ResultIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.Profiles; import org.whispersystems.textsecuregcm.storage.ProfilesDynamoDb; -import org.whispersystems.textsecuregcm.storage.VersionedProfile; import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig; -import org.whispersystems.textsecuregcm.util.Pair; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; @@ -81,60 +77,72 @@ public class MigrateProfilesCommand extends EnvironmentCommand> results = profiles.getAll(fetchSize)) { - final AtomicInteger profilesProcessed = new AtomicInteger(0); - final AtomicInteger profilesMigrated = new AtomicInteger(0); + final AtomicInteger profilesProcessed = new AtomicInteger(0); + final AtomicInteger profilesMigrated = new AtomicInteger(0); - while (results.hasNext()) { + profiles.forEach((uuid, profile) -> { + try { semaphore.acquire(); - - final Pair uuidAndProfile = results.next(); - profilesDynamoDb.migrate(uuidAndProfile.first(), uuidAndProfile.second()) - .whenComplete((migrated, cause) -> { - semaphore.release(); - - final int processed = profilesProcessed.incrementAndGet(); - - if (cause == null) { - if (migrated) { - profilesMigrated.incrementAndGet(); - } - } - - if (processed % 10_000 == 0) { - log.info("Processed {} profiles ({} migrated)", processed, profilesMigrated.get()); - } - }); + } catch (InterruptedException e) { + log.warn("Interrupted while waiting to acquire permit"); + throw new RuntimeException(e); } - log.info("Migration completed; processed {} profiles and migrated {}", profilesProcessed.get(), profilesMigrated.get()); - } + profilesDynamoDb.migrate(uuid, profile) + .whenComplete((migrated, cause) -> { + semaphore.release(); + + final int processed = profilesProcessed.incrementAndGet(); + + if (cause == null) { + if (migrated) { + profilesMigrated.incrementAndGet(); + } + } + + if (processed % 10_000 == 0) { + log.info("Processed {} profiles ({} migrated)", processed, profilesMigrated.get()); + } + }); + }, fetchSize); + + // Wait for all outstanding operations to complete + semaphore.acquire(concurrency); + semaphore.release(concurrency); + + log.info("Migration completed; processed {} profiles and migrated {}", profilesProcessed.get(), profilesMigrated.get()); log.info("Removing profiles that were deleted during migration"); + final AtomicInteger profilesDeleted = new AtomicInteger(0); - try (final ResultIterator> results = profiles.getDeletedProfiles(fetchSize)) { - final AtomicInteger profilesDeleted = new AtomicInteger(0); - - while (results.hasNext()) { + profiles.forEachDeletedProfile((uuid, version) -> { + try { semaphore.acquire(); - - final Pair uuidAndVersion = results.next(); - - profilesDynamoDb.delete(uuidAndVersion.first(), uuidAndVersion.second()) - .whenComplete((response, cause) -> { - semaphore.release(); - - if (profilesDeleted.incrementAndGet() % 1_000 == 0) { - log.info("Attempted to remove {} profiles", profilesDeleted.get()); - } - }); + } catch (InterruptedException e) { + log.warn("Interrupted while waiting to acquire permit"); + throw new RuntimeException(e); } - log.info("Removal of deleted profiles complete; attempted to remove {} profiles", profilesDeleted.get()); - } + profilesDynamoDb.delete(uuid, version) + .whenComplete((response, cause) -> { + semaphore.release(); + + if (profilesDeleted.incrementAndGet() % 1_000 == 0) { + log.info("Attempted to remove {} profiles", profilesDeleted.get()); + } + }); + }, fetchSize); + + // Wait for all outstanding operations to complete + semaphore.acquire(concurrency); + semaphore.release(concurrency); + + log.info("Removal of deleted profiles complete; attempted to remove {} profiles", profilesDeleted.get()); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java index 170f8c14a..cda5db705 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ProfilesPostgresTest.java @@ -5,21 +5,22 @@ package org.whispersystems.textsecuregcm.storage; -import com.google.common.collect.ImmutableList; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.opentable.db.postgres.embedded.LiquibasePreparer; import com.opentable.db.postgres.junit5.EmbeddedPostgresExtension; import com.opentable.db.postgres.junit5.PreparedDbExtension; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; import org.jdbi.v3.core.Jdbi; -import org.jdbi.v3.core.result.ResultIterator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.util.Pair; -import java.util.List; -import java.util.UUID; - -import static org.junit.jupiter.api.Assertions.assertEquals; public class ProfilesPostgresTest extends ProfilesTest { @@ -36,6 +37,8 @@ public class ProfilesPostgresTest extends ProfilesTest { new CircuitBreakerConfiguration()); profiles = new Profiles(faultTolerantDatabase); + + faultTolerantDatabase.use(jdbi -> jdbi.useHandle(handle -> handle.createUpdate("DELETE FROM profiles").execute())); } @Override @@ -44,9 +47,24 @@ public class ProfilesPostgresTest extends ProfilesTest { } @Test - void testGetDeletedProfiles() { - profiles.purgeDeletedProfiles(); + void testForEach() { + UUID uuid = UUID.randomUUID(); + VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null, + null, "aDigest".getBytes()); + VersionedProfile profileTwo = new VersionedProfile("345", "bar", "baz", null, null, null, "boof".getBytes()); + profiles.set(uuid, profileOne); + profiles.set(uuid, profileTwo); + + final Set> retrievedProfiles = new HashSet<>(); + + profiles.forEach((u, profile) -> retrievedProfiles.add(new Pair<>(u, profile)), 1); + + assertEquals(Set.of(new Pair<>(uuid, profileOne), new Pair<>(uuid, profileTwo)), retrievedProfiles); + } + + @Test + void testForEachDeletedProfiles() { UUID uuid = UUID.randomUUID(); VersionedProfile profileOne = new VersionedProfile("123", "foo", "avatarLocation", null, null, null, "aDigest".getBytes()); @@ -54,11 +72,12 @@ public class ProfilesPostgresTest extends ProfilesTest { profiles.set(uuid, profileOne); profiles.set(UUID.randomUUID(), profileTwo); - profiles.deleteAll(uuid); - try (final ResultIterator> resultIterator = profiles.getDeletedProfiles(10)) { - assertEquals(List.of(new Pair<>(uuid, profileOne.getVersion())), ImmutableList.copyOf(resultIterator)); - } + final List> deletedProfiles = new ArrayList<>(); + + profiles.forEachDeletedProfile((u, version) -> deletedProfiles.add(new Pair<>(u, version)), 2); + + assertEquals(List.of(new Pair<>(uuid, profileOne.getVersion())), deletedProfiles); } }