From 0e300df68c1aeb05995f77dee7c9d7e09f2f730f Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Fri, 5 Apr 2019 13:38:42 -0700 Subject: [PATCH] Support for circuit breaker on database access --- .../WhisperServerConfiguration.java | 20 ++-- .../textsecuregcm/WhisperServerService.java | 14 ++- .../configuration/DatabaseConfiguration.java | 19 +++ .../redis/ReplicatedJedisPool.java | 22 +--- .../storage/AbusiveHostRules.java | 11 +- .../textsecuregcm/storage/Accounts.java | 45 ++++---- .../storage/FaultTolerantDatabase.java | 49 ++++++++ .../textsecuregcm/storage/Keys.java | 43 ++++--- .../textsecuregcm/storage/Messages.java | 56 +++++---- .../storage/PendingAccounts.java | 33 +++--- .../textsecuregcm/storage/PendingDevices.java | 19 ++- .../util/CircuitBreakerUtil.java | 23 ++++ .../workers/DeleteUserCommand.java | 6 +- .../textsecuregcm/workers/VacuumCommand.java | 13 ++- .../tests/storage/AbusiveHostRulesTest.java | 4 +- .../tests/storage/AccountsTest.java | 63 +++++++++- .../textsecuregcm/tests/storage/KeysTest.java | 109 +++++++++++++----- .../tests/storage/MessagesTest.java | 4 +- .../tests/storage/PendingAccountsTest.java | 4 +- .../tests/storage/PendingDevicesTest.java | 4 +- 20 files changed, 378 insertions(+), 183 deletions(-) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/DatabaseConfiguration.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/FaultTolerantDatabase.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 6e926cd23..2f493f50b 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (C) 2013 Open WhisperSystems * * This program is free software: you can redistribute it and/or modify @@ -29,8 +29,8 @@ import java.util.Map; import io.dropwizard.Configuration; import io.dropwizard.client.JerseyClientConfiguration; -import io.dropwizard.db.DataSourceFactory; +/** @noinspection MismatchedQueryAndUpdateOfCollection, WeakerAccess */ public class WhisperServerConfiguration extends Configuration { @NotNull @@ -81,12 +81,12 @@ public class WhisperServerConfiguration extends Configuration { @Valid @NotNull @JsonProperty - private DataSourceFactory messageStore; + private DatabaseConfiguration messageStore; @Valid @NotNull @JsonProperty - private DataSourceFactory abuseDatabase; + private DatabaseConfiguration abuseDatabase; @Valid @NotNull @@ -101,10 +101,10 @@ public class WhisperServerConfiguration extends Configuration { @Valid @NotNull @JsonProperty - private DataSourceFactory database = new DataSourceFactory(); + private DatabaseConfiguration database = new DatabaseConfiguration(); @JsonProperty - private DataSourceFactory read_database; + private DatabaseConfiguration read_database; @Valid @NotNull @@ -201,19 +201,19 @@ public class WhisperServerConfiguration extends Configuration { return pushScheduler; } - public DataSourceFactory getMessageStoreConfiguration() { + public DatabaseConfiguration getMessageStoreConfiguration() { return messageStore; } - public DataSourceFactory getAbuseDatabaseConfiguration() { + public DatabaseConfiguration getAbuseDatabaseConfiguration() { return abuseDatabase; } - public DataSourceFactory getDataSourceFactory() { + public DatabaseConfiguration getAccountsDatabaseConfiguration() { return database; } - public DataSourceFactory getReadDataSourceFactory() { + public DatabaseConfiguration getAccountsReadDatabaseConfiguration() { return read_database; } diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 846fbd379..33faaf4a6 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -112,7 +112,7 @@ public class WhisperServerService extends Application("accountdb", "accountsdb.xml") { @Override public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) { - return configuration.getDataSourceFactory(); + return configuration.getAccountsDatabaseConfiguration(); } }); @@ -145,10 +145,14 @@ public class WhisperServerService extends Application(replicas.size()); @@ -55,7 +55,7 @@ public class ReplicatedJedisPool { JedisPool replica = replicas.get(i); CircuitBreaker slaveBreaker = CircuitBreaker.of(String.format("%s-slave-%d", name, i), config); - registerMetrics(slaveBreaker); + CircuitBreakerUtil.registerMetrics(metricRegistry, slaveBreaker, ReplicatedJedisPool.class); this.replicas.add(CircuitBreaker.decorateSupplier(slaveBreaker, replica::getResource)); } } @@ -80,16 +80,4 @@ public class ReplicatedJedisPool { throw new JedisException("All read replica pools failed!"); } - private void registerMetrics(CircuitBreaker circuitBreaker) { - Meter successMeter = metricRegistry.meter(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "success" )); - Meter failureMeter = metricRegistry.meter(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "failure" )); - Meter unpermittedMeter = metricRegistry.meter(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "unpermitted")); - - metricRegistry.gauge(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "state"), () -> ()-> circuitBreaker.getState().getOrder()); - - circuitBreaker.getEventPublisher().onSuccess(event -> successMeter.mark()); - circuitBreaker.getEventPublisher().onError(event -> failureMeter.mark()); - circuitBreaker.getEventPublisher().onCallNotPermitted(event -> unpermittedMeter.mark()); - } - } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/AbusiveHostRules.java b/src/main/java/org/whispersystems/textsecuregcm/storage/AbusiveHostRules.java index 1219f556c..6de1f7b7d 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/AbusiveHostRules.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/AbusiveHostRules.java @@ -3,7 +3,6 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; -import org.jdbi.v3.core.Jdbi; import org.whispersystems.textsecuregcm.storage.mappers.AbusiveHostRuleRowMapper; import org.whispersystems.textsecuregcm.util.Constants; @@ -21,22 +20,22 @@ public class AbusiveHostRules { private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final Timer getTimer = metricRegistry.timer(name(AbusiveHostRules.class, "get")); - private final Jdbi database; + private final FaultTolerantDatabase database; - public AbusiveHostRules(Jdbi database) { + public AbusiveHostRules(FaultTolerantDatabase database) { this.database = database; - this.database.registerRowMapper(new AbusiveHostRuleRowMapper()); + this.database.getDatabase().registerRowMapper(new AbusiveHostRuleRowMapper()); } public List getAbusiveHostRulesFor(String host) { - return database.withHandle(handle -> { + return database.with(jdbi -> jdbi.withHandle(handle -> { try (Timer.Context timer = getTimer.time()) { return handle.createQuery("SELECT * FROM abusive_host_rules WHERE :host::inet <<= " + HOST) .bind("host", host) .mapTo(AbusiveHostRule.class) .list(); } - }); + })); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index 736f3c5d9..f138760c6 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (C) 2013 Open WhisperSystems * * This program is free software: you can redistribute it and/or modify @@ -21,7 +21,6 @@ import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.core.transaction.TransactionIsolationLevel; import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper; import org.whispersystems.textsecuregcm.util.Constants; @@ -48,16 +47,16 @@ public class Accounts { private final Timer getAllFromOffsetTimer = metricRegistry.timer(name(Accounts.class, "getAllFromOffset")); private final Timer vacuumTimer = metricRegistry.timer(name(Accounts.class, "vacuum")); - private final Jdbi database; + private final FaultTolerantDatabase database; - public Accounts(Jdbi database) { + public Accounts(FaultTolerantDatabase database) { this.database = database; - this.database.registerRowMapper(new AccountRowMapper()); + this.database.getDatabase().registerRowMapper(new AccountRowMapper()); } public boolean create(Account account) { - return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { - try (Timer.Context timer = createTimer.time()) { + return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { + try (Timer.Context ignored = createTimer.time()) { int rows = handle.createUpdate("DELETE FROM accounts WHERE " + NUMBER + " = :number") .bind("number", account.getNumber()) .execute(); @@ -72,12 +71,12 @@ public class Accounts { } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } - }); + })); } public void update(Account account) { - database.useHandle(handle -> { - try (Timer.Context timer = updateTimer.time()) { + database.use(jdbi -> jdbi.useHandle(handle -> { + try (Timer.Context ignored = updateTimer.time()) { handle.createUpdate("UPDATE accounts SET " + DATA + " = CAST(:data AS json) WHERE " + NUMBER + " = :number") .bind("number", account.getNumber()) .bind("data", mapper.writeValueAsString(account)) @@ -85,50 +84,50 @@ public class Accounts { } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); } - }); + })); } public Optional get(String number) { - return database.withHandle(handle -> { - try (Timer.Context timer = getTimer.time()) { + return database.with(jdbi -> jdbi.withHandle(handle -> { + try (Timer.Context ignored = getTimer.time()) { return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " = :number") .bind("number", number) .mapTo(Account.class) .findFirst(); } - }); + })); } public List getAllFrom(String from, int length) { - return database.withHandle(handle -> { - try (Timer.Context timer = getAllFromOffsetTimer.time()) { + return database.with(jdbi -> jdbi.withHandle(handle -> { + try (Timer.Context ignored = getAllFromOffsetTimer.time()) { return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " > :from ORDER BY " + NUMBER + " LIMIT :limit") .bind("from", from) .bind("limit", length) .mapTo(Account.class) .list(); } - }); + })); } public List getAllFrom(int length) { - return database.withHandle(handle -> { - try (Timer.Context timer = getAllFromTimer.time()) { + return database.with(jdbi -> jdbi.withHandle(handle -> { + try (Timer.Context ignored = getAllFromTimer.time()) { return handle.createQuery("SELECT * FROM accounts ORDER BY " + NUMBER + " LIMIT :limit") .bind("limit", length) .mapTo(Account.class) .list(); } - }); + })); } public void vacuum() { - database.useHandle(handle -> { - try (Timer.Context timer = vacuumTimer.time()) { + database.use(jdbi -> jdbi.useHandle(handle -> { + try (Timer.Context ignored = vacuumTimer.time()) { handle.execute("VACUUM accounts"); } - }); + })); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/FaultTolerantDatabase.java b/src/main/java/org/whispersystems/textsecuregcm/storage/FaultTolerantDatabase.java new file mode 100644 index 000000000..505056740 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/FaultTolerantDatabase.java @@ -0,0 +1,49 @@ +package org.whispersystems.textsecuregcm.storage; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import org.jdbi.v3.core.Jdbi; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; +import org.whispersystems.textsecuregcm.util.Constants; + +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; + +public class FaultTolerantDatabase { + + private final Jdbi database; + private final CircuitBreaker circuitBreaker; + + public FaultTolerantDatabase(String name, Jdbi database, CircuitBreakerConfiguration circuitBreakerConfiguration) { + MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + + CircuitBreakerConfig config = CircuitBreakerConfig.custom() + .failureRateThreshold(circuitBreakerConfiguration.getFailureRateThreshold()) + .ringBufferSizeInHalfOpenState(circuitBreakerConfiguration.getRingBufferSizeInHalfOpenState()) + .waitDurationInOpenState(Duration.ofSeconds(circuitBreakerConfiguration.getWaitDurationInOpenStateInSeconds())) + .ringBufferSizeInClosedState(circuitBreakerConfiguration.getRingBufferSizeInClosedState()) + .build(); + + this.database = database; + this.circuitBreaker = CircuitBreaker.of(name, config); + + CircuitBreakerUtil.registerMetrics(metricRegistry, circuitBreaker, FaultTolerantDatabase.class); + } + + public void use(Consumer consumer) { + this.circuitBreaker.executeRunnable(() -> consumer.accept(database)); + } + + public T with(Function consumer) { + return this.circuitBreaker.executeSupplier(() -> consumer.apply(database)); + } + + public Jdbi getDatabase() { + return database; + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/Keys.java b/src/main/java/org/whispersystems/textsecuregcm/storage/Keys.java index 8a792b6c3..ca391b65f 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/Keys.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/Keys.java @@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; -import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.core.statement.PreparedBatch; import org.jdbi.v3.core.transaction.SerializableTransactionRunner; import org.jdbi.v3.core.transaction.TransactionIsolationLevel; @@ -27,7 +26,6 @@ import org.whispersystems.textsecuregcm.entities.PreKey; import org.whispersystems.textsecuregcm.storage.mappers.KeyRecordRowMapper; import org.whispersystems.textsecuregcm.util.Constants; -import java.security.Key; import java.util.List; import static com.codahale.metrics.MetricRegistry.name; @@ -41,18 +39,18 @@ public class Keys { private final Timer getCountTimer = metricRegistry.timer(name(Keys.class, "getCount" )); private final Timer vacuumTimer = metricRegistry.timer(name(Keys.class, "vacuum" )); - private final Jdbi database; + private final FaultTolerantDatabase database; - public Keys(Jdbi database) { + public Keys(FaultTolerantDatabase database) { this.database = database; - this.database.registerRowMapper(new KeyRecordRowMapper()); - this.database.setTransactionHandler(new SerializableTransactionRunner()); - this.database.getConfig(SerializableTransactionRunner.Configuration.class).setMaxRetries(10); + this.database.getDatabase().registerRowMapper(new KeyRecordRowMapper()); + this.database.getDatabase().setTransactionHandler(new SerializableTransactionRunner()); + this.database.getDatabase().getConfig(SerializableTransactionRunner.Configuration.class).setMaxRetries(10); } public void store(String number, long deviceId, List keys) { - database.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { - try (Timer.Context timer = storeTimer.time()) { + database.use(jdbi -> jdbi.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { + try (Timer.Context ignored = storeTimer.time()) { PreparedBatch preparedBatch = handle.prepareBatch("INSERT INTO keys (number, device_id, key_id, public_key) VALUES (:number, :device_id, :key_id, :public_key)"); for (PreKey key : keys) { @@ -70,51 +68,50 @@ public class Keys { preparedBatch.execute(); } - }); + })); } public List get(String number, long deviceId) { - return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { - try (Timer.Context timer = getDevicetTimer.time()) { + return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { + try (Timer.Context ignored = getDevicetTimer.time()) { return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT id FROM keys WHERE number = :number AND device_id = :device_id ORDER BY key_id ASC LIMIT 1) RETURNING *") .bind("number", number) .bind("device_id", deviceId) .mapTo(KeyRecord.class) .list(); } - }); + })); } public List get(String number) { - return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { - try (Timer.Context timer = getTimer.time()) { + return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { + try (Timer.Context ignored = getTimer.time()) { return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT DISTINCT ON (number, device_id) id FROM keys WHERE number = :number ORDER BY number, device_id, key_id ASC) RETURNING *") .bind("number", number) .mapTo(KeyRecord.class) .list(); } - }); - + })); } public int getCount(String number, long deviceId) { - return database.withHandle(handle -> { - try (Timer.Context timer = getCountTimer.time()) { + return database.with(jdbi -> jdbi.withHandle(handle -> { + try (Timer.Context ignored = getCountTimer.time()) { return handle.createQuery("SELECT COUNT(*) FROM keys WHERE number = :number AND device_id = :device_id") .bind("number", number) .bind("device_id", deviceId) .mapTo(Integer.class) .findOnly(); } - }); + })); } public void vacuum() { - database.useHandle(handle -> { - try (Timer.Context timer = vacuumTimer.time()) { + database.use(jdbi -> jdbi.useHandle(handle -> { + try (Timer.Context ignored = vacuumTimer.time()) { handle.execute("VACUUM keys"); } - }); + })); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java b/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java index ee3e81eee..7af026dc0 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java @@ -3,13 +3,11 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; -import org.jdbi.v3.core.Jdbi; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.storage.mappers.OutgoingMessageEntityRowMapper; import org.whispersystems.textsecuregcm.util.Constants; -import java.security.MessageDigest; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -43,16 +41,16 @@ public class Messages { private final Timer clearTimer = metricRegistry.timer(name(Messages.class, "clear" )); private final Timer vacuumTimer = metricRegistry.timer(name(Messages.class, "vacuum")); - private final Jdbi database; + private final FaultTolerantDatabase database; - public Messages(Jdbi database) { + public Messages(FaultTolerantDatabase database) { this.database = database; - this.database.registerRowMapper(new OutgoingMessageEntityRowMapper()); + this.database.getDatabase().registerRowMapper(new OutgoingMessageEntityRowMapper()); } public void store(UUID guid, Envelope message, String destination, long destinationDevice) { - database.useHandle(handle -> { - try (Timer.Context timer = storeTimer.time()) { + database.use(jdbi ->jdbi.useHandle(handle -> { + try (Timer.Context ignored = storeTimer.time()) { handle.createUpdate("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " + "VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_device, :destination, :destination_device, :message, :content)") .bind("guid", guid) @@ -68,24 +66,24 @@ public class Messages { .bind("content", message.hasContent() ? message.getContent().toByteArray() : null) .execute(); } - }); + })); } public List load(String destination, long destinationDevice) { - return database.withHandle(handle -> { - try (Timer.Context timer = loadTimer.time()) { + return database.with(jdbi-> jdbi.withHandle(handle -> { + try (Timer.Context ignored = loadTimer.time()) { return handle.createQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC LIMIT " + RESULT_SET_CHUNK_SIZE) .bind("destination", destination) .bind("destination_device", destinationDevice) .mapTo(OutgoingMessageEntity.class) .list(); } - }); + })); } public Optional remove(String destination, long destinationDevice, String source, long timestamp) { - return database.withHandle(handle -> { - try (Timer.Context timer = removeBySourceTimer.time()) { + return database.with(jdbi -> jdbi.withHandle(handle -> { + try (Timer.Context ignored = removeBySourceTimer.time()) { return handle.createQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device AND " + SOURCE + " = :source AND " + TIMESTAMP + " = :timestamp ORDER BY " + ID + " LIMIT 1) RETURNING *") .bind("destination", destination) .bind("destination_device", destinationDevice) @@ -94,59 +92,59 @@ public class Messages { .mapTo(OutgoingMessageEntity.class) .findFirst(); } - }); + })); } public Optional remove(String destination, UUID guid) { - return database.withHandle(handle -> { - try (Timer.Context timer = removeByGuidTimer.time()) { + return database.with(jdbi -> jdbi.withHandle(handle -> { + try (Timer.Context ignored = removeByGuidTimer.time()) { return handle.createQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM MESSAGES WHERE " + GUID + " = :guid AND " + DESTINATION + " = :destination ORDER BY " + ID + " LIMIT 1) RETURNING *") .bind("destination", destination) .bind("guid", guid) .mapTo(OutgoingMessageEntity.class) .findFirst(); } - }); + })); } public void remove(String destination, long id) { - database.useHandle(handle -> { - try (Timer.Context timer = removeByIdTimer.time()) { + database.use(jdbi -> jdbi.useHandle(handle -> { + try (Timer.Context ignored = removeByIdTimer.time()) { handle.createUpdate("DELETE FROM messages WHERE " + ID + " = :id AND " + DESTINATION + " = :destination") .bind("destination", destination) .bind("id", id) .execute(); } - }); + })); } public void clear(String destination) { - database.useHandle(handle -> { - try (Timer.Context timer = clearTimer.time()) { + database.use(jdbi ->jdbi.useHandle(handle -> { + try (Timer.Context ignored = clearTimer.time()) { handle.createUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination") .bind("destination", destination) .execute(); } - }); + })); } public void clear(String destination, long destinationDevice) { - database.useHandle(handle -> { - try (Timer.Context timer = clearDeviceTimer.time()) { + database.use(jdbi -> jdbi.useHandle(handle -> { + try (Timer.Context ignored = clearDeviceTimer.time()) { handle.createUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device") .bind("destination", destination) .bind("destination_device", destinationDevice) .execute(); } - }); + })); } public void vacuum() { - database.useHandle(handle -> { - try (Timer.Context timer = vacuumTimer.time()) { + database.use(jdbi -> jdbi.useHandle(handle -> { + try (Timer.Context ignored = vacuumTimer.time()) { handle.execute("VACUUM messages"); } - }); + })); } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccounts.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccounts.java index 59f0ac9c5..aa8638e40 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccounts.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccounts.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (C) 2013 Open WhisperSystems * * This program is free software: you can redistribute it and/or modify @@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; -import org.jdbi.v3.core.Jdbi; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; import org.whispersystems.textsecuregcm.storage.mappers.StoredVerificationCodeRowMapper; import org.whispersystems.textsecuregcm.util.Constants; @@ -36,16 +35,16 @@ public class PendingAccounts { private final Timer removeTimer = metricRegistry.timer(name(PendingAccounts.class, "remove" )); private final Timer vacuumTimer = metricRegistry.timer(name(PendingAccounts.class, "vacuum" )); - private final Jdbi database; + private final FaultTolerantDatabase database; - public PendingAccounts(Jdbi database) { + public PendingAccounts(FaultTolerantDatabase database) { this.database = database; - this.database.registerRowMapper(new StoredVerificationCodeRowMapper()); + this.database.getDatabase().registerRowMapper(new StoredVerificationCodeRowMapper()); } public void insert(String number, String verificationCode, long timestamp) { - database.useHandle(handle -> { - try (Timer.Context timer = insertTimer.time()) { + database.use(jdbi -> jdbi.useHandle(handle -> { + try (Timer.Context ignored = insertTimer.time()) { handle.createUpdate("WITH upsert AS (UPDATE pending_accounts SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " + "INSERT INTO pending_accounts (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)") .bind("verification_code", verificationCode) @@ -53,36 +52,36 @@ public class PendingAccounts { .bind("number", number) .execute(); } - }); + })); } public Optional getCodeForNumber(String number) { - return database.withHandle(handle -> { - try (Timer.Context timer = getCodeForNumberTimer.time()) { + return database.with(jdbi ->jdbi.withHandle(handle -> { + try (Timer.Context ignored = getCodeForNumberTimer.time()) { return handle.createQuery("SELECT verification_code, timestamp FROM pending_accounts WHERE number = :number") .bind("number", number) .mapTo(StoredVerificationCode.class) .findFirst(); } - }); + })); } public void remove(String number) { - database.useHandle(handle -> { - try (Timer.Context timer = removeTimer.time()) { + database.use(jdbi-> jdbi.useHandle(handle -> { + try (Timer.Context ignored = removeTimer.time()) { handle.createUpdate("DELETE FROM pending_accounts WHERE number = :number") .bind("number", number) .execute(); } - }); + })); } public void vacuum() { - database.useHandle(handle -> { - try (Timer.Context timer = vacuumTimer.time()) { + database.use(jdbi -> jdbi.useHandle(handle -> { + try (Timer.Context ignored = vacuumTimer.time()) { handle.execute("VACUUM pending_accounts"); } - }); + })); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevices.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevices.java index f9509411a..9cce4d7ac 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevices.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevices.java @@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; -import org.jdbi.v3.core.Jdbi; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; import org.whispersystems.textsecuregcm.storage.mappers.StoredVerificationCodeRowMapper; import org.whispersystems.textsecuregcm.util.Constants; @@ -35,15 +34,15 @@ public class PendingDevices { private final Timer getCodeForNumberTimer = metricRegistry.timer(name(PendingDevices.class, "getcodeForNumber")); private final Timer removeTimer = metricRegistry.timer(name(PendingDevices.class, "remove" )); - private final Jdbi database; + private final FaultTolerantDatabase database; - public PendingDevices(Jdbi database) { + public PendingDevices(FaultTolerantDatabase database) { this.database = database; - this.database.registerRowMapper(new StoredVerificationCodeRowMapper()); + this.database.getDatabase().registerRowMapper(new StoredVerificationCodeRowMapper()); } public void insert(String number, String verificationCode, long timestamp) { - database.useHandle(handle -> { + database.use(jdbi ->jdbi.useHandle(handle -> { try (Timer.Context timer = insertTimer.time()) { handle.createUpdate("WITH upsert AS (UPDATE pending_devices SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " + "INSERT INTO pending_devices (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)") @@ -52,28 +51,28 @@ public class PendingDevices { .bind("timestamp", timestamp) .execute(); } - }); + })); } public Optional getCodeForNumber(String number) { - return database.withHandle(handle -> { + return database.with(jdbi -> jdbi.withHandle(handle -> { try (Timer.Context timer = getCodeForNumberTimer.time()) { return handle.createQuery("SELECT verification_code, timestamp FROM pending_devices WHERE number = :number") .bind("number", number) .mapTo(StoredVerificationCode.class) .findFirst(); } - }); + })); } public void remove(String number) { - database.useHandle(handle -> { + database.use(jdbi -> jdbi.useHandle(handle -> { try (Timer.Context timer = removeTimer.time()) { handle.createUpdate("DELETE FROM pending_devices WHERE number = :number") .bind("number", number) .execute(); } - }); + })); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java b/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java new file mode 100644 index 000000000..fd349fd2b --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java @@ -0,0 +1,23 @@ +package org.whispersystems.textsecuregcm.util; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; + +import static com.codahale.metrics.MetricRegistry.name; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; + +public class CircuitBreakerUtil { + + public static void registerMetrics(MetricRegistry metricRegistry, CircuitBreaker circuitBreaker, Class clazz) { + Meter successMeter = metricRegistry.meter(name(clazz, circuitBreaker.getName(), "success" )); + Meter failureMeter = metricRegistry.meter(name(clazz, circuitBreaker.getName(), "failure" )); + Meter unpermittedMeter = metricRegistry.meter(name(clazz, circuitBreaker.getName(), "unpermitted")); + + metricRegistry.gauge(name(clazz, circuitBreaker.getName(), "state"), () -> ()-> circuitBreaker.getState().getOrder()); + + circuitBreaker.getEventPublisher().onSuccess(event -> successMeter.mark()); + circuitBreaker.getEventPublisher().onError(event -> failureMeter.mark()); + circuitBreaker.getEventPublisher().onCallNotPermitted(event -> unpermittedMeter.mark()); + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index f99ac0c59..dc1d2b92b 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -16,6 +16,7 @@ import org.whispersystems.textsecuregcm.storage.Accounts; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.DirectoryManager; +import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.util.Base64; import java.security.SecureRandom; @@ -61,8 +62,9 @@ public class DeleteUserCommand extends EnvironmentCommand WhisperServerConfiguration config) throws Exception { - DataSourceFactory dbConfig = config.getDataSourceFactory(); - DataSourceFactory messageDbConfig = config.getMessageStoreConfiguration(); + DatabaseConfiguration accountDbConfig = config.getAccountsDatabaseConfiguration(); + DatabaseConfiguration messageDbConfig = config.getMessageStoreConfiguration(); - Jdbi accountDatabase = Jdbi.create(dbConfig.getUrl(), dbConfig.getUser(), dbConfig.getPassword()); - Jdbi messageDatabase = Jdbi.create(messageDbConfig.getUrl(), messageDbConfig.getUser(), messageDbConfig.getPassword()); + Jdbi accountJdbi = Jdbi.create(accountDbConfig.getUrl(), accountDbConfig.getUser(), accountDbConfig.getPassword()); + Jdbi messageJdbi = Jdbi.create(messageDbConfig.getUrl(), messageDbConfig.getUser(), messageDbConfig.getPassword()); + FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_vacuum", accountJdbi, accountDbConfig.getCircuitBreakerConfiguration()); + FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database_vacuum", messageJdbi, messageDbConfig.getCircuitBreakerConfiguration()); Accounts accounts = new Accounts(accountDatabase); Keys keys = new Keys(accountDatabase); diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AbusiveHostRulesTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AbusiveHostRulesTest.java index a5164123c..36fce342c 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AbusiveHostRulesTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AbusiveHostRulesTest.java @@ -7,8 +7,10 @@ import org.jdbi.v3.core.Jdbi; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.storage.AbusiveHostRule; import org.whispersystems.textsecuregcm.storage.AbusiveHostRules; +import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -26,7 +28,7 @@ public class AbusiveHostRulesTest { @Before public void setup() { - this.abusiveHostRules = new AbusiveHostRules(Jdbi.create(db.getTestDatabase())); + this.abusiveHostRules = new AbusiveHostRules(new FaultTolerantDatabase("abusive_hosts-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration())); } @Test diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java index 15c6e8014..596fdebf3 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsTest.java @@ -3,14 +3,19 @@ package org.whispersystems.textsecuregcm.tests.storage; import com.opentable.db.postgres.embedded.LiquibasePreparer; import com.opentable.db.postgres.junit.EmbeddedPostgresRules; import com.opentable.db.postgres.junit.PreparedDbRule; +import org.jdbi.v3.core.HandleCallback; +import org.jdbi.v3.core.HandleConsumer; import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.transaction.TransactionException; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.entities.SignedPreKey; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Accounts; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper; import java.io.IOException; @@ -26,7 +31,11 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; public class AccountsTest { @@ -37,7 +46,11 @@ public class AccountsTest { @Before public void setupAccountsDao() { - this.accounts = new Accounts(Jdbi.create(db.getTestDatabase())); + FaultTolerantDatabase faultTolerantDatabase = new FaultTolerantDatabase("accountsTest", + Jdbi.create(db.getTestDatabase()), + new CircuitBreakerConfiguration()); + + this.accounts = new Accounts(faultTolerantDatabase); } @Test @@ -178,11 +191,57 @@ public class AccountsTest { assertThat(retrieved.isPresent()).isFalse(); } + @Test + public void testBreaker() throws InterruptedException { + Jdbi jdbi = mock(Jdbi.class); + doThrow(new TransactionException("Database error!")).when(jdbi).useHandle(any(HandleConsumer.class)); + + CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration(); + configuration.setWaitDurationInOpenStateInSeconds(1); + configuration.setRingBufferSizeInHalfOpenState(1); + configuration.setRingBufferSizeInClosedState(2); + configuration.setFailureRateThreshold(50); + + Accounts accounts = new Accounts(new FaultTolerantDatabase("testAccountBreaker", jdbi, configuration)); + Account account = generateAccount("+14151112222"); + + try { + accounts.update(account); + throw new AssertionError(); + } catch (TransactionException e) { + // good + } + + try { + accounts.update(account); + throw new AssertionError(); + } catch (TransactionException e) { + // good + } + + try { + accounts.update(account); + throw new AssertionError(); + } catch (CircuitBreakerOpenException e) { + // good + } + + Thread.sleep(1100); + + try { + accounts.update(account); + throw new AssertionError(); + } catch (TransactionException e) { + // good + } + + } + private Device generateDevice(long id) { Random random = new Random(System.currentTimeMillis()); SignedPreKey signedPreKey = new SignedPreKey(random.nextInt(), "testPublicKey-" + random.nextInt(), "testSignature-" + random.nextInt()); - return new Device(1, "testName-" + random.nextInt(), "testAuthToken-" + random.nextInt(), "testSalt-" + random.nextInt(), null, "testGcmId-" + random.nextInt(), "testApnId-" + random.nextInt(), "testVoipApnId-" + random.nextInt(), random.nextBoolean(), random.nextInt(), signedPreKey, random.nextInt(), random.nextInt(), "testUserAgent-" + random.nextInt(), random.nextBoolean()); + return new Device(id, "testName-" + random.nextInt(), "testAuthToken-" + random.nextInt(), "testSalt-" + random.nextInt(), null, "testGcmId-" + random.nextInt(), "testApnId-" + random.nextInt(), "testVoipApnId-" + random.nextInt(), random.nextBoolean(), random.nextInt(), signedPreKey, random.nextInt(), random.nextInt(), "testUserAgent-" + random.nextInt(), random.nextBoolean()); } private Account generateAccount(String number) { diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/KeysTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/KeysTest.java index c407c626f..cd9076966 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/KeysTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/KeysTest.java @@ -3,33 +3,54 @@ package org.whispersystems.textsecuregcm.tests.storage; import com.opentable.db.postgres.embedded.LiquibasePreparer; import com.opentable.db.postgres.junit.EmbeddedPostgresRules; import com.opentable.db.postgres.junit.PreparedDbRule; +import org.jdbi.v3.core.HandleCallback; +import org.jdbi.v3.core.HandleConsumer; import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.transaction.SerializableTransactionRunner; +import org.jdbi.v3.core.transaction.TransactionException; +import org.jdbi.v3.core.transaction.TransactionIsolationLevel; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.postgresql.util.PSQLException; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.entities.PreKey; +import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.KeyRecord; import org.whispersystems.textsecuregcm.storage.Keys; -import javax.sql.DataSource; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.LinkedList; import java.util.List; +import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class KeysTest { @Rule public PreparedDbRule db = EmbeddedPostgresRules.preparedDatabase(LiquibasePreparer.forClasspathLocation("accountsdb.xml")); + private Keys keys; + + @Before + public void setup() { + FaultTolerantDatabase faultTolerantDatabase = new FaultTolerantDatabase("keysTest", + Jdbi.create(db.getTestDatabase()), + new CircuitBreakerConfiguration()); + + this.keys = new Keys(faultTolerantDatabase); + } + + @Test public void testPopulateKeys() throws SQLException { - DataSource dataSource = db.getTestDatabase(); - Jdbi jdbi = Jdbi.create(dataSource); - Keys keys = new Keys(jdbi); - List deviceOnePreKeys = new LinkedList<>(); List deviceTwoPreKeys = new LinkedList<>(); @@ -55,7 +76,7 @@ public class KeysTest { keys.store("+14151111111", 1, anotherDeviceOnePreKeys); keys.store("+14151111111", 2, anotherDeviceTwoPreKeys); - PreparedStatement statement = dataSource.getConnection().prepareStatement("SELECT * FROM keys WHERE number = ? AND device_id = ? ORDER BY key_id"); + PreparedStatement statement = db.getTestDatabase().getConnection().prepareStatement("SELECT * FROM keys WHERE number = ? AND device_id = ? ORDER BY key_id"); verifyStoredState(statement, "+14152222222", 1); verifyStoredState(statement, "+14152222222", 2); verifyStoredState(statement, "+14151111111", 1); @@ -64,10 +85,6 @@ public class KeysTest { @Test public void testKeyCount() { - DataSource dataSource = db.getTestDatabase(); - Jdbi jdbi = Jdbi.create(dataSource); - Keys keys = new Keys(jdbi); - List deviceOnePreKeys = new LinkedList<>(); for (int i=1;i<=100;i++) { @@ -82,10 +99,6 @@ public class KeysTest { @Test public void testGetForDevice() { - DataSource dataSource = db.getTestDatabase(); - Jdbi jdbi = Jdbi.create(dataSource); - Keys keys = new Keys(jdbi); - List deviceOnePreKeys = new LinkedList<>(); List deviceTwoPreKeys = new LinkedList<>(); @@ -143,10 +156,6 @@ public class KeysTest { @Test public void testGetForAllDevices() { - DataSource dataSource = db.getTestDatabase(); - Jdbi jdbi = Jdbi.create(dataSource); - Keys keys = new Keys(jdbi); - List deviceOnePreKeys = new LinkedList<>(); List deviceTwoPreKeys = new LinkedList<>(); @@ -219,10 +228,6 @@ public class KeysTest { @Test public void testGetForAllDevicesParallel() throws InterruptedException { - DataSource dataSource = db.getTestDatabase(); - Jdbi jdbi = Jdbi.create(dataSource); - Keys keys = new Keys(jdbi); - List deviceOnePreKeys = new LinkedList<>(); List deviceTwoPreKeys = new LinkedList<>(); @@ -259,10 +264,6 @@ public class KeysTest { @Test public void testEmptyKeyGet() { - DataSource dataSource = db.getTestDatabase(); - Jdbi jdbi = Jdbi.create(dataSource); - Keys keys = new Keys(jdbi); - List records = keys.get("+14152222222"); assertThat(records.isEmpty()).isTrue(); @@ -270,13 +271,61 @@ public class KeysTest { @Test public void testVacuum() { - DataSource dataSource = db.getTestDatabase(); - Jdbi jdbi = Jdbi.create(dataSource); - Keys keys = new Keys(jdbi); - keys.vacuum(); } + @Test + public void testBreaker() throws InterruptedException { + Jdbi jdbi = mock(Jdbi.class); + doThrow(new TransactionException("Database error!")).when(jdbi).useTransaction(any(TransactionIsolationLevel.class), any(HandleConsumer.class)); + when(jdbi.getConfig(any())).thenReturn(mock(SerializableTransactionRunner.Configuration.class)); + + CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration(); + configuration.setWaitDurationInOpenStateInSeconds(1); + configuration.setRingBufferSizeInHalfOpenState(1); + configuration.setRingBufferSizeInClosedState(2); + configuration.setFailureRateThreshold(50); + + Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, configuration)); + + List deviceOnePreKeys = new LinkedList<>(); + + for (int i=1;i<=100;i++) { + deviceOnePreKeys.add(new PreKey(i, "+14152222222Device1PublicKey" + i)); + } + + try { + keys.store("+14152222222", 1, deviceOnePreKeys); + throw new AssertionError(); + } catch (TransactionException e) { + // good + } + + try { + keys.store("+14152222222", 1, deviceOnePreKeys); + throw new AssertionError(); + } catch (TransactionException e) { + // good + } + + try { + keys.store("+14152222222", 1, deviceOnePreKeys); + throw new AssertionError(); + } catch (CircuitBreakerOpenException e) { + // good + } + + Thread.sleep(1100); + + try { + keys.store("+14152222222", 1, deviceOnePreKeys); + throw new AssertionError(); + } catch (TransactionException e) { + // good + } + + } + private void verifyStoredState(PreparedStatement statement, String number, int deviceId) throws SQLException { statement.setString(1, number); diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesTest.java index 48ba1b619..cd1bdffac 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/MessagesTest.java @@ -8,8 +8,10 @@ import org.jdbi.v3.core.Jdbi; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; +import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.Messages; import java.sql.PreparedStatement; @@ -34,7 +36,7 @@ public class MessagesTest { @Before public void setupAccountsDao() { - this.messages = new Messages(Jdbi.create(db.getTestDatabase())); + this.messages = new Messages(new FaultTolerantDatabase("messages-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration())); } @Test diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PendingAccountsTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PendingAccountsTest.java index 41148c057..5a2644a3e 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PendingAccountsTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PendingAccountsTest.java @@ -8,6 +8,8 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.PendingAccounts; import java.sql.PreparedStatement; @@ -26,7 +28,7 @@ public class PendingAccountsTest { @Before public void setupAccountsDao() { - this.pendingAccounts = new PendingAccounts(Jdbi.create(db.getTestDatabase())); + this.pendingAccounts = new PendingAccounts(new FaultTolerantDatabase("pending_accounts-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration())); } @Test diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PendingDevicesTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PendingDevicesTest.java index 6cc2ba5d4..462bca795 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PendingDevicesTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/PendingDevicesTest.java @@ -8,6 +8,8 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase; import org.whispersystems.textsecuregcm.storage.PendingDevices; import java.sql.PreparedStatement; @@ -26,7 +28,7 @@ public class PendingDevicesTest { @Before public void setupAccountsDao() { - this.pendingDevices = new PendingDevices(Jdbi.create(db.getTestDatabase())); + this.pendingDevices = new PendingDevices(new FaultTolerantDatabase("peding_devices-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration())); } @Test