From 67ed035b36efba7b48822dc60c353481f6fd8dda Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 13 Jan 2021 11:30:43 -0500 Subject: [PATCH] Retry serializable key transactions. --- .../WhisperServerConfiguration.java | 5 +- .../textsecuregcm/WhisperServerService.java | 2 +- .../AccountsDatabaseConfiguration.java | 23 +++++ .../textsecuregcm/storage/Keys.java | 98 ++++++++++++------- .../workers/DeleteUserCommand.java | 2 +- .../textsecuregcm/workers/VacuumCommand.java | 2 +- .../textsecuregcm/tests/storage/KeysTest.java | 36 ++++++- 7 files changed, 125 insertions(+), 43 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/AccountsDatabaseConfiguration.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 5671a5b03..a2ba77abb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -16,6 +16,7 @@ import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; import org.whispersystems.textsecuregcm.configuration.FeatureFlagConfiguration; import org.whispersystems.textsecuregcm.configuration.GcmConfiguration; import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration; +import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration; import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration; import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration; import org.whispersystems.textsecuregcm.configuration.MicrometerConfiguration; @@ -134,7 +135,7 @@ public class WhisperServerConfiguration extends Configuration { @Valid @NotNull @JsonProperty - private DatabaseConfiguration accountsDatabase; + private AccountsDatabaseConfiguration accountsDatabase; @Valid @NotNull @@ -285,7 +286,7 @@ public class WhisperServerConfiguration extends Configuration { return abuseDatabase; } - public DatabaseConfiguration getAccountsDatabaseConfiguration() { + public AccountsDatabaseConfiguration getAccountsDatabaseConfiguration() { return accountsDatabase; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 984284034..4d4f86302 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -267,7 +267,7 @@ public class WhisperServerService extends Application keys) { - database.use(jdbi -> jdbi.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { - try (Timer.Context ignored = storeTimer.time()) { - PreparedBatch preparedBatch = handle.prepareBatch("INSERT INTO keys (number, device_id, key_id, public_key) VALUES (:number, :device_id, :key_id, :public_key)"); + retry.executeRunnable(() -> { + database.use(jdbi -> jdbi.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { + try (Timer.Context ignored = storeTimer.time()) { + PreparedBatch preparedBatch = handle.prepareBatch("INSERT INTO keys (number, device_id, key_id, public_key) VALUES (:number, :device_id, :key_id, :public_key)"); - for (PreKey key : keys) { - preparedBatch.bind("number", number) - .bind("device_id", deviceId) - .bind("key_id", key.getKeyId()) - .bind("public_key", key.getPublicKey()) - .add(); + for (PreKey key : keys) { + preparedBatch.bind("number", number) + .bind("device_id", deviceId) + .bind("key_id", key.getKeyId()) + .bind("public_key", key.getPublicKey()) + .add(); + } + + handle.createUpdate("DELETE FROM keys WHERE number = :number AND device_id = :device_id") + .bind("number", number) + .bind("device_id", deviceId) + .execute(); + + preparedBatch.execute(); } - - handle.createUpdate("DELETE FROM keys WHERE number = :number AND device_id = :device_id") - .bind("number", number) - .bind("device_id", deviceId) - .execute(); - - preparedBatch.execute(); - } - })); + })); + }); } public List get(String number, long deviceId) { - return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { - try (Timer.Context ignored = getDevicetTimer.time()) { - return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT id FROM keys WHERE number = :number AND device_id = :device_id ORDER BY key_id ASC LIMIT 1) RETURNING *") - .bind("number", number) - .bind("device_id", deviceId) - .mapTo(KeyRecord.class) - .list(); - } - })); + try { + return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { + try (Timer.Context ignored = getDevicetTimer.time()) { + return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT id FROM keys WHERE number = :number AND device_id = :device_id ORDER BY key_id ASC LIMIT 1) RETURNING *") + .bind("number", number) + .bind("device_id", deviceId) + .mapTo(KeyRecord.class) + .list(); + } + })); + } catch (JdbiException e) { + // TODO 2021-01-13 Replace this with a retry once desktop clients better handle HTTP/500 responses + fallbackMeter.mark(); + return Collections.emptyList(); + } } public List get(String number) { - return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { - try (Timer.Context ignored = getTimer.time()) { - return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT DISTINCT ON (number, device_id) id FROM keys WHERE number = :number ORDER BY number, device_id, key_id ASC) RETURNING *") - .bind("number", number) - .mapTo(KeyRecord.class) - .list(); - } - })); + try { + return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { + try (Timer.Context ignored = getTimer.time()) { + return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT DISTINCT ON (number, device_id) id FROM keys WHERE number = :number ORDER BY number, device_id, key_id ASC) RETURNING *") + .bind("number", number) + .mapTo(KeyRecord.class) + .list(); + } + })); + } catch (JdbiException e) { + // TODO 2021-01-13 Replace this with a retry once desktop clients better handle HTTP/500 responses + fallbackMeter.mark(); + return Collections.emptyList(); + } } public int getCount(String number, long deviceId) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index 4fe7df015..689a0881a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -93,7 +93,7 @@ public class DeleteUserCommand extends EnvironmentCommand FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database_vacuum", messageJdbi, messageDbConfig.getCircuitBreakerConfiguration()); Accounts accounts = new Accounts(accountDatabase); - Keys keys = new Keys(accountDatabase); + Keys keys = new Keys(accountDatabase, config.getAccountsDatabaseConfiguration().getKeyOperationRetryConfiguration()); PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase); Messages messages = new Messages(messageDatabase); FeatureFlags featureFlags = new FeatureFlags(accountDatabase); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/KeysTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/KeysTest.java index 6efd64ede..5db14741e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/KeysTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/KeysTest.java @@ -9,6 +9,7 @@ import com.opentable.db.postgres.embedded.LiquibasePreparer; import com.opentable.db.postgres.junit.EmbeddedPostgresRules; import com.opentable.db.postgres.junit.PreparedDbRule; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import org.jdbi.v3.core.HandleCallback; import org.jdbi.v3.core.HandleConsumer; import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.core.statement.UnableToExecuteStatementException; @@ -19,6 +20,8 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.entities.PreKey; import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.KeyRecord; @@ -27,6 +30,7 @@ import org.whispersystems.textsecuregcm.storage.Keys; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -49,7 +53,7 @@ public class KeysTest { Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration()); - this.keys = new Keys(faultTolerantDatabase); + this.keys = new Keys(faultTolerantDatabase, new RetryConfiguration()); } @@ -343,7 +347,10 @@ public class KeysTest { configuration.setRingBufferSizeInClosedState(2); configuration.setFailureRateThreshold(50); - Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, configuration)); + RetryConfiguration retryConfiguration = new RetryConfiguration(); + retryConfiguration.setMaxAttempts(1); + + Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, configuration), retryConfiguration); List deviceOnePreKeys = new LinkedList<>(); @@ -383,6 +390,31 @@ public class KeysTest { } + @Test + public void testRetry() { + Jdbi jdbi = mock(Jdbi.class); + doThrow(new TransactionException("Database error!")).doNothing().when(jdbi).useTransaction(any(TransactionIsolationLevel.class), any(HandleConsumer.class)); + when(jdbi.getConfig(any())).thenReturn(mock(SerializableTransactionRunner.Configuration.class)); + + Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, new CircuitBreakerConfiguration()), new RetryConfiguration()); + + // We're happy as long as nothing throws an exception + keys.store("+18005551234", 1, Collections.emptyList()); + } + + @Test + public void testGetKeysWithException() { + Jdbi jdbi = mock(Jdbi.class); + when(jdbi.getConfig(any())).thenReturn(mock(SerializableTransactionRunner.Configuration.class)); + + when(jdbi.inTransaction(any(TransactionIsolationLevel.class), any(HandleCallback.class))) + .thenThrow(new TransactionException("Database error!")); + + Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, new CircuitBreakerConfiguration()), new RetryConfiguration()); + + assertThat(keys.get("+18005551234")).isEqualTo(Collections.emptyList()); + assertThat(keys.get("+18005551234", 1)).isEqualTo(Collections.emptyList()); + } private void verifyStoredState(PreparedStatement statement, String number, int deviceId) throws SQLException { statement.setString(1, number);