Retry serializable key transactions.
This commit is contained in:
parent
ca25105f13
commit
67ed035b36
|
@ -16,6 +16,7 @@ import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration;
|
|||
import org.whispersystems.textsecuregcm.configuration.FeatureFlagConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.GcmConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.MicrometerConfiguration;
|
||||
|
@ -134,7 +135,7 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private DatabaseConfiguration accountsDatabase;
|
||||
private AccountsDatabaseConfiguration accountsDatabase;
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
|
@ -285,7 +286,7 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
return abuseDatabase;
|
||||
}
|
||||
|
||||
public DatabaseConfiguration getAccountsDatabaseConfiguration() {
|
||||
public AccountsDatabaseConfiguration getAccountsDatabaseConfiguration() {
|
||||
return accountsDatabase;
|
||||
}
|
||||
|
||||
|
|
|
@ -267,7 +267,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
Usernames usernames = new Usernames(accountDatabase);
|
||||
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
|
||||
Profiles profiles = new Profiles(accountDatabase);
|
||||
Keys keys = new Keys(accountDatabase);
|
||||
Keys keys = new Keys(accountDatabase, config.getAccountsDatabaseConfiguration().getKeyOperationRetryConfiguration());
|
||||
Messages messages = new Messages(messageDatabase);
|
||||
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
|
||||
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright 2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.validation.Valid;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
public class AccountsDatabaseConfiguration extends DatabaseConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
@Valid
|
||||
private RetryConfiguration keyOperationRetry = new RetryConfiguration();
|
||||
|
||||
public RetryConfiguration getKeyOperationRetryConfiguration() {
|
||||
return keyOperationRetry;
|
||||
}
|
||||
}
|
|
@ -4,23 +4,32 @@
|
|||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.github.resilience4j.retry.Retry;
|
||||
import org.jdbi.v3.core.JdbiException;
|
||||
import org.jdbi.v3.core.statement.PreparedBatch;
|
||||
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
|
||||
import org.jdbi.v3.core.transaction.SerializableTransactionRunner;
|
||||
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
|
||||
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.PreKey;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.KeyRecordRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class Keys {
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Meter fallbackMeter = metricRegistry.meter(name(Keys.class, "fallback"));
|
||||
private final Timer storeTimer = metricRegistry.timer(name(Keys.class, "store" ));
|
||||
private final Timer getDevicetTimer = metricRegistry.timer(name(Keys.class, "getDevice"));
|
||||
private final Timer getTimer = metricRegistry.timer(name(Keys.class, "get" ));
|
||||
|
@ -28,58 +37,75 @@ public class Keys {
|
|||
private final Timer vacuumTimer = metricRegistry.timer(name(Keys.class, "vacuum" ));
|
||||
|
||||
private final FaultTolerantDatabase database;
|
||||
private final Retry retry;
|
||||
|
||||
public Keys(FaultTolerantDatabase database) {
|
||||
public Keys(FaultTolerantDatabase database, RetryConfiguration retryConfiguration) {
|
||||
this.database = database;
|
||||
this.database.getDatabase().registerRowMapper(new KeyRecordRowMapper());
|
||||
this.database.getDatabase().setTransactionHandler(new SerializableTransactionRunner());
|
||||
this.database.getDatabase().getConfig(SerializableTransactionRunner.Configuration.class).setMaxRetries(10);
|
||||
|
||||
this.retry = Retry.of("keys", retryConfiguration.toRetryConfigBuilder().build());
|
||||
}
|
||||
|
||||
public void store(String number, long deviceId, List<PreKey> keys) {
|
||||
database.use(jdbi -> jdbi.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context ignored = storeTimer.time()) {
|
||||
PreparedBatch preparedBatch = handle.prepareBatch("INSERT INTO keys (number, device_id, key_id, public_key) VALUES (:number, :device_id, :key_id, :public_key)");
|
||||
retry.executeRunnable(() -> {
|
||||
database.use(jdbi -> jdbi.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context ignored = storeTimer.time()) {
|
||||
PreparedBatch preparedBatch = handle.prepareBatch("INSERT INTO keys (number, device_id, key_id, public_key) VALUES (:number, :device_id, :key_id, :public_key)");
|
||||
|
||||
for (PreKey key : keys) {
|
||||
preparedBatch.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.bind("key_id", key.getKeyId())
|
||||
.bind("public_key", key.getPublicKey())
|
||||
.add();
|
||||
for (PreKey key : keys) {
|
||||
preparedBatch.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.bind("key_id", key.getKeyId())
|
||||
.bind("public_key", key.getPublicKey())
|
||||
.add();
|
||||
}
|
||||
|
||||
handle.createUpdate("DELETE FROM keys WHERE number = :number AND device_id = :device_id")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.execute();
|
||||
|
||||
preparedBatch.execute();
|
||||
}
|
||||
|
||||
handle.createUpdate("DELETE FROM keys WHERE number = :number AND device_id = :device_id")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.execute();
|
||||
|
||||
preparedBatch.execute();
|
||||
}
|
||||
}));
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
public List<KeyRecord> get(String number, long deviceId) {
|
||||
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context ignored = getDevicetTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT id FROM keys WHERE number = :number AND device_id = :device_id ORDER BY key_id ASC LIMIT 1) RETURNING *")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.mapTo(KeyRecord.class)
|
||||
.list();
|
||||
}
|
||||
}));
|
||||
try {
|
||||
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context ignored = getDevicetTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT id FROM keys WHERE number = :number AND device_id = :device_id ORDER BY key_id ASC LIMIT 1) RETURNING *")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.mapTo(KeyRecord.class)
|
||||
.list();
|
||||
}
|
||||
}));
|
||||
} catch (JdbiException e) {
|
||||
// TODO 2021-01-13 Replace this with a retry once desktop clients better handle HTTP/500 responses
|
||||
fallbackMeter.mark();
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
public List<KeyRecord> get(String number) {
|
||||
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context ignored = getTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT DISTINCT ON (number, device_id) id FROM keys WHERE number = :number ORDER BY number, device_id, key_id ASC) RETURNING *")
|
||||
.bind("number", number)
|
||||
.mapTo(KeyRecord.class)
|
||||
.list();
|
||||
}
|
||||
}));
|
||||
try {
|
||||
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context ignored = getTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT DISTINCT ON (number, device_id) id FROM keys WHERE number = :number ORDER BY number, device_id, key_id ASC) RETURNING *")
|
||||
.bind("number", number)
|
||||
.mapTo(KeyRecord.class)
|
||||
.list();
|
||||
}
|
||||
}));
|
||||
} catch (JdbiException e) {
|
||||
// TODO 2021-01-13 Replace this with a retry once desktop clients better handle HTTP/500 responses
|
||||
fallbackMeter.mark();
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
public int getCount(String number, long deviceId) {
|
||||
|
|
|
@ -93,7 +93,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
|
|||
Usernames usernames = new Usernames(accountDatabase);
|
||||
Profiles profiles = new Profiles(accountDatabase);
|
||||
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
|
||||
Keys keys = new Keys(accountDatabase);
|
||||
Keys keys = new Keys(accountDatabase, configuration.getAccountsDatabaseConfiguration().getKeyOperationRetryConfiguration());
|
||||
Messages messages = new Messages(messageDatabase);
|
||||
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
|
||||
FaultTolerantRedisCluster messagesCacheCluster = new FaultTolerantRedisCluster("messages_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
|
||||
|
|
|
@ -46,7 +46,7 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
|
|||
FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database_vacuum", messageJdbi, messageDbConfig.getCircuitBreakerConfiguration());
|
||||
|
||||
Accounts accounts = new Accounts(accountDatabase);
|
||||
Keys keys = new Keys(accountDatabase);
|
||||
Keys keys = new Keys(accountDatabase, config.getAccountsDatabaseConfiguration().getKeyOperationRetryConfiguration());
|
||||
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
|
||||
Messages messages = new Messages(messageDatabase);
|
||||
FeatureFlags featureFlags = new FeatureFlags(accountDatabase);
|
||||
|
|
|
@ -9,6 +9,7 @@ import com.opentable.db.postgres.embedded.LiquibasePreparer;
|
|||
import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
|
||||
import com.opentable.db.postgres.junit.PreparedDbRule;
|
||||
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
|
||||
import org.jdbi.v3.core.HandleCallback;
|
||||
import org.jdbi.v3.core.HandleConsumer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
|
||||
|
@ -19,6 +20,8 @@ import org.junit.Before;
|
|||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.PreKey;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.KeyRecord;
|
||||
|
@ -27,6 +30,7 @@ import org.whispersystems.textsecuregcm.storage.Keys;
|
|||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -49,7 +53,7 @@ public class KeysTest {
|
|||
Jdbi.create(db.getTestDatabase()),
|
||||
new CircuitBreakerConfiguration());
|
||||
|
||||
this.keys = new Keys(faultTolerantDatabase);
|
||||
this.keys = new Keys(faultTolerantDatabase, new RetryConfiguration());
|
||||
}
|
||||
|
||||
|
||||
|
@ -343,7 +347,10 @@ public class KeysTest {
|
|||
configuration.setRingBufferSizeInClosedState(2);
|
||||
configuration.setFailureRateThreshold(50);
|
||||
|
||||
Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, configuration));
|
||||
RetryConfiguration retryConfiguration = new RetryConfiguration();
|
||||
retryConfiguration.setMaxAttempts(1);
|
||||
|
||||
Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, configuration), retryConfiguration);
|
||||
|
||||
List<PreKey> deviceOnePreKeys = new LinkedList<>();
|
||||
|
||||
|
@ -383,6 +390,31 @@ public class KeysTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetry() {
|
||||
Jdbi jdbi = mock(Jdbi.class);
|
||||
doThrow(new TransactionException("Database error!")).doNothing().when(jdbi).useTransaction(any(TransactionIsolationLevel.class), any(HandleConsumer.class));
|
||||
when(jdbi.getConfig(any())).thenReturn(mock(SerializableTransactionRunner.Configuration.class));
|
||||
|
||||
Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, new CircuitBreakerConfiguration()), new RetryConfiguration());
|
||||
|
||||
// We're happy as long as nothing throws an exception
|
||||
keys.store("+18005551234", 1, Collections.emptyList());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeysWithException() {
|
||||
Jdbi jdbi = mock(Jdbi.class);
|
||||
when(jdbi.getConfig(any())).thenReturn(mock(SerializableTransactionRunner.Configuration.class));
|
||||
|
||||
when(jdbi.inTransaction(any(TransactionIsolationLevel.class), any(HandleCallback.class)))
|
||||
.thenThrow(new TransactionException("Database error!"));
|
||||
|
||||
Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, new CircuitBreakerConfiguration()), new RetryConfiguration());
|
||||
|
||||
assertThat(keys.get("+18005551234")).isEqualTo(Collections.emptyList());
|
||||
assertThat(keys.get("+18005551234", 1)).isEqualTo(Collections.emptyList());
|
||||
}
|
||||
|
||||
private void verifyStoredState(PreparedStatement statement, String number, int deviceId) throws SQLException {
|
||||
statement.setString(1, number);
|
||||
|
|
Loading…
Reference in New Issue