Support for circuit breaker on database access
This commit is contained in:
parent
6a9c4cf8cc
commit
0e300df68c
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Copyright (C) 2013 Open WhisperSystems
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
|
@ -29,8 +29,8 @@ import java.util.Map;
|
|||
|
||||
import io.dropwizard.Configuration;
|
||||
import io.dropwizard.client.JerseyClientConfiguration;
|
||||
import io.dropwizard.db.DataSourceFactory;
|
||||
|
||||
/** @noinspection MismatchedQueryAndUpdateOfCollection, WeakerAccess */
|
||||
public class WhisperServerConfiguration extends Configuration {
|
||||
|
||||
@NotNull
|
||||
|
@ -81,12 +81,12 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private DataSourceFactory messageStore;
|
||||
private DatabaseConfiguration messageStore;
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private DataSourceFactory abuseDatabase;
|
||||
private DatabaseConfiguration abuseDatabase;
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
|
@ -101,10 +101,10 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private DataSourceFactory database = new DataSourceFactory();
|
||||
private DatabaseConfiguration database = new DatabaseConfiguration();
|
||||
|
||||
@JsonProperty
|
||||
private DataSourceFactory read_database;
|
||||
private DatabaseConfiguration read_database;
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
|
@ -201,19 +201,19 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
return pushScheduler;
|
||||
}
|
||||
|
||||
public DataSourceFactory getMessageStoreConfiguration() {
|
||||
public DatabaseConfiguration getMessageStoreConfiguration() {
|
||||
return messageStore;
|
||||
}
|
||||
|
||||
public DataSourceFactory getAbuseDatabaseConfiguration() {
|
||||
public DatabaseConfiguration getAbuseDatabaseConfiguration() {
|
||||
return abuseDatabase;
|
||||
}
|
||||
|
||||
public DataSourceFactory getDataSourceFactory() {
|
||||
public DatabaseConfiguration getAccountsDatabaseConfiguration() {
|
||||
return database;
|
||||
}
|
||||
|
||||
public DataSourceFactory getReadDataSourceFactory() {
|
||||
public DatabaseConfiguration getAccountsReadDatabaseConfiguration() {
|
||||
return read_database;
|
||||
}
|
||||
|
||||
|
|
|
@ -112,7 +112,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("accountdb", "accountsdb.xml") {
|
||||
@Override
|
||||
public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) {
|
||||
return configuration.getDataSourceFactory();
|
||||
return configuration.getAccountsDatabaseConfiguration();
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -145,10 +145,14 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
environment.getObjectMapper().setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
|
||||
environment.getObjectMapper().setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
|
||||
|
||||
JdbiFactory jdbiFactory = new JdbiFactory(DefaultNameStrategy.CHECK_EMPTY);
|
||||
Jdbi accountDatabase = jdbiFactory.build(environment, config.getDataSourceFactory(), "accountdb");
|
||||
Jdbi messageDatabase = jdbiFactory.build(environment, config.getMessageStoreConfiguration(), "messagedb");
|
||||
Jdbi abuseDatabase = jdbiFactory.build(environment, config.getAbuseDatabaseConfiguration(), "abusedb");
|
||||
JdbiFactory jdbiFactory = new JdbiFactory(DefaultNameStrategy.CHECK_EMPTY);
|
||||
Jdbi accountJdbi = jdbiFactory.build(environment, config.getAccountsDatabaseConfiguration(), "accountdb");
|
||||
Jdbi messageJdbi = jdbiFactory.build(environment, config.getMessageStoreConfiguration(), "messagedb" );
|
||||
Jdbi abuseJdbi = jdbiFactory.build(environment, config.getAbuseDatabaseConfiguration (), "abusedb" );
|
||||
|
||||
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("accounts_database", accountJdbi, config.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
|
||||
FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database", messageJdbi, config.getMessageStoreConfiguration().getCircuitBreakerConfiguration());
|
||||
FaultTolerantDatabase abuseDatabase = new FaultTolerantDatabase("abuse_database", abuseJdbi, config.getAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
|
||||
|
||||
Accounts accounts = new Accounts(accountDatabase);
|
||||
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
package org.whispersystems.textsecuregcm.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
import io.dropwizard.db.DataSourceFactory;
|
||||
|
||||
public class DatabaseConfiguration extends DataSourceFactory {
|
||||
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
|
||||
|
||||
public CircuitBreakerConfiguration getCircuitBreakerConfiguration() {
|
||||
return circuitBreaker;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,11 +1,11 @@
|
|||
package org.whispersystems.textsecuregcm.redis;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.time.Duration;
|
||||
|
@ -14,7 +14,6 @@ import java.util.List;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
|
||||
import redis.clients.jedis.Jedis;
|
||||
|
@ -23,7 +22,6 @@ import redis.clients.jedis.exceptions.JedisException;
|
|||
|
||||
public class ReplicatedJedisPool {
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Logger logger = LoggerFactory.getLogger(ReplicatedJedisPool.class);
|
||||
private final AtomicInteger replicaIndex = new AtomicInteger(0);
|
||||
|
||||
|
@ -37,6 +35,7 @@ public class ReplicatedJedisPool {
|
|||
{
|
||||
if (replicas.size() < 1) throw new IllegalArgumentException("There must be at least one replica");
|
||||
|
||||
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
|
||||
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
|
||||
.failureRateThreshold(circuitBreakerConfiguration.getFailureRateThreshold())
|
||||
|
@ -46,7 +45,8 @@ public class ReplicatedJedisPool {
|
|||
.build();
|
||||
|
||||
CircuitBreaker masterBreaker = CircuitBreaker.of(String.format("%s-master", name), config);
|
||||
registerMetrics(masterBreaker);
|
||||
|
||||
CircuitBreakerUtil.registerMetrics(metricRegistry, masterBreaker, ReplicatedJedisPool.class);
|
||||
|
||||
this.master = CircuitBreaker.decorateSupplier(masterBreaker, master::getResource);
|
||||
this.replicas = new ArrayList<>(replicas.size());
|
||||
|
@ -55,7 +55,7 @@ public class ReplicatedJedisPool {
|
|||
JedisPool replica = replicas.get(i);
|
||||
CircuitBreaker slaveBreaker = CircuitBreaker.of(String.format("%s-slave-%d", name, i), config);
|
||||
|
||||
registerMetrics(slaveBreaker);
|
||||
CircuitBreakerUtil.registerMetrics(metricRegistry, slaveBreaker, ReplicatedJedisPool.class);
|
||||
this.replicas.add(CircuitBreaker.decorateSupplier(slaveBreaker, replica::getResource));
|
||||
}
|
||||
}
|
||||
|
@ -80,16 +80,4 @@ public class ReplicatedJedisPool {
|
|||
throw new JedisException("All read replica pools failed!");
|
||||
}
|
||||
|
||||
private void registerMetrics(CircuitBreaker circuitBreaker) {
|
||||
Meter successMeter = metricRegistry.meter(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "success" ));
|
||||
Meter failureMeter = metricRegistry.meter(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "failure" ));
|
||||
Meter unpermittedMeter = metricRegistry.meter(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "unpermitted"));
|
||||
|
||||
metricRegistry.gauge(name(ReplicatedJedisPool.class, circuitBreaker.getName(), "state"), () -> ()-> circuitBreaker.getState().getOrder());
|
||||
|
||||
circuitBreaker.getEventPublisher().onSuccess(event -> successMeter.mark());
|
||||
circuitBreaker.getEventPublisher().onError(event -> failureMeter.mark());
|
||||
circuitBreaker.getEventPublisher().onCallNotPermitted(event -> unpermittedMeter.mark());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package org.whispersystems.textsecuregcm.storage;
|
|||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.AbusiveHostRuleRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
|
@ -21,22 +20,22 @@ public class AbusiveHostRules {
|
|||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Timer getTimer = metricRegistry.timer(name(AbusiveHostRules.class, "get"));
|
||||
|
||||
private final Jdbi database;
|
||||
private final FaultTolerantDatabase database;
|
||||
|
||||
public AbusiveHostRules(Jdbi database) {
|
||||
public AbusiveHostRules(FaultTolerantDatabase database) {
|
||||
this.database = database;
|
||||
this.database.registerRowMapper(new AbusiveHostRuleRowMapper());
|
||||
this.database.getDatabase().registerRowMapper(new AbusiveHostRuleRowMapper());
|
||||
}
|
||||
|
||||
public List<AbusiveHostRule> getAbusiveHostRulesFor(String host) {
|
||||
return database.withHandle(handle -> {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context timer = getTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM abusive_host_rules WHERE :host::inet <<= " + HOST)
|
||||
.bind("host", host)
|
||||
.mapTo(AbusiveHostRule.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Copyright (C) 2013 Open WhisperSystems
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
|
@ -21,7 +21,6 @@ import com.codahale.metrics.SharedMetricRegistries;
|
|||
import com.codahale.metrics.Timer;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
@ -48,16 +47,16 @@ public class Accounts {
|
|||
private final Timer getAllFromOffsetTimer = metricRegistry.timer(name(Accounts.class, "getAllFromOffset"));
|
||||
private final Timer vacuumTimer = metricRegistry.timer(name(Accounts.class, "vacuum"));
|
||||
|
||||
private final Jdbi database;
|
||||
private final FaultTolerantDatabase database;
|
||||
|
||||
public Accounts(Jdbi database) {
|
||||
public Accounts(FaultTolerantDatabase database) {
|
||||
this.database = database;
|
||||
this.database.registerRowMapper(new AccountRowMapper());
|
||||
this.database.getDatabase().registerRowMapper(new AccountRowMapper());
|
||||
}
|
||||
|
||||
public boolean create(Account account) {
|
||||
return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context timer = createTimer.time()) {
|
||||
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context ignored = createTimer.time()) {
|
||||
int rows = handle.createUpdate("DELETE FROM accounts WHERE " + NUMBER + " = :number")
|
||||
.bind("number", account.getNumber())
|
||||
.execute();
|
||||
|
@ -72,12 +71,12 @@ public class Accounts {
|
|||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public void update(Account account) {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = updateTimer.time()) {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = updateTimer.time()) {
|
||||
handle.createUpdate("UPDATE accounts SET " + DATA + " = CAST(:data AS json) WHERE " + NUMBER + " = :number")
|
||||
.bind("number", account.getNumber())
|
||||
.bind("data", mapper.writeValueAsString(account))
|
||||
|
@ -85,50 +84,50 @@ public class Accounts {
|
|||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public Optional<Account> get(String number) {
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getTimer.time()) {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = getTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(Account.class)
|
||||
.findFirst();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
public List<Account> getAllFrom(String from, int length) {
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getAllFromOffsetTimer.time()) {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = getAllFromOffsetTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " > :from ORDER BY " + NUMBER + " LIMIT :limit")
|
||||
.bind("from", from)
|
||||
.bind("limit", length)
|
||||
.mapTo(Account.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public List<Account> getAllFrom(int length) {
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getAllFromTimer.time()) {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = getAllFromTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM accounts ORDER BY " + NUMBER + " LIMIT :limit")
|
||||
.bind("limit", length)
|
||||
.mapTo(Account.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public void vacuum() {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = vacuumTimer.time()) {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = vacuumTimer.time()) {
|
||||
handle.execute("VACUUM accounts");
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
|
||||
|
||||
public class FaultTolerantDatabase {
|
||||
|
||||
private final Jdbi database;
|
||||
private final CircuitBreaker circuitBreaker;
|
||||
|
||||
public FaultTolerantDatabase(String name, Jdbi database, CircuitBreakerConfiguration circuitBreakerConfiguration) {
|
||||
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
|
||||
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
|
||||
.failureRateThreshold(circuitBreakerConfiguration.getFailureRateThreshold())
|
||||
.ringBufferSizeInHalfOpenState(circuitBreakerConfiguration.getRingBufferSizeInHalfOpenState())
|
||||
.waitDurationInOpenState(Duration.ofSeconds(circuitBreakerConfiguration.getWaitDurationInOpenStateInSeconds()))
|
||||
.ringBufferSizeInClosedState(circuitBreakerConfiguration.getRingBufferSizeInClosedState())
|
||||
.build();
|
||||
|
||||
this.database = database;
|
||||
this.circuitBreaker = CircuitBreaker.of(name, config);
|
||||
|
||||
CircuitBreakerUtil.registerMetrics(metricRegistry, circuitBreaker, FaultTolerantDatabase.class);
|
||||
}
|
||||
|
||||
public void use(Consumer<Jdbi> consumer) {
|
||||
this.circuitBreaker.executeRunnable(() -> consumer.accept(database));
|
||||
}
|
||||
|
||||
public <T> T with(Function<Jdbi, T> consumer) {
|
||||
return this.circuitBreaker.executeSupplier(() -> consumer.apply(database));
|
||||
}
|
||||
|
||||
public Jdbi getDatabase() {
|
||||
return database;
|
||||
}
|
||||
}
|
|
@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.storage;
|
|||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.jdbi.v3.core.statement.PreparedBatch;
|
||||
import org.jdbi.v3.core.transaction.SerializableTransactionRunner;
|
||||
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
|
||||
|
@ -27,7 +26,6 @@ import org.whispersystems.textsecuregcm.entities.PreKey;
|
|||
import org.whispersystems.textsecuregcm.storage.mappers.KeyRecordRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.security.Key;
|
||||
import java.util.List;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
@ -41,18 +39,18 @@ public class Keys {
|
|||
private final Timer getCountTimer = metricRegistry.timer(name(Keys.class, "getCount" ));
|
||||
private final Timer vacuumTimer = metricRegistry.timer(name(Keys.class, "vacuum" ));
|
||||
|
||||
private final Jdbi database;
|
||||
private final FaultTolerantDatabase database;
|
||||
|
||||
public Keys(Jdbi database) {
|
||||
public Keys(FaultTolerantDatabase database) {
|
||||
this.database = database;
|
||||
this.database.registerRowMapper(new KeyRecordRowMapper());
|
||||
this.database.setTransactionHandler(new SerializableTransactionRunner());
|
||||
this.database.getConfig(SerializableTransactionRunner.Configuration.class).setMaxRetries(10);
|
||||
this.database.getDatabase().registerRowMapper(new KeyRecordRowMapper());
|
||||
this.database.getDatabase().setTransactionHandler(new SerializableTransactionRunner());
|
||||
this.database.getDatabase().getConfig(SerializableTransactionRunner.Configuration.class).setMaxRetries(10);
|
||||
}
|
||||
|
||||
public void store(String number, long deviceId, List<PreKey> keys) {
|
||||
database.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context timer = storeTimer.time()) {
|
||||
database.use(jdbi -> jdbi.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context ignored = storeTimer.time()) {
|
||||
PreparedBatch preparedBatch = handle.prepareBatch("INSERT INTO keys (number, device_id, key_id, public_key) VALUES (:number, :device_id, :key_id, :public_key)");
|
||||
|
||||
for (PreKey key : keys) {
|
||||
|
@ -70,51 +68,50 @@ public class Keys {
|
|||
|
||||
preparedBatch.execute();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public List<KeyRecord> get(String number, long deviceId) {
|
||||
return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context timer = getDevicetTimer.time()) {
|
||||
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context ignored = getDevicetTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT id FROM keys WHERE number = :number AND device_id = :device_id ORDER BY key_id ASC LIMIT 1) RETURNING *")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.mapTo(KeyRecord.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public List<KeyRecord> get(String number) {
|
||||
return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context timer = getTimer.time()) {
|
||||
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context ignored = getTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT DISTINCT ON (number, device_id) id FROM keys WHERE number = :number ORDER BY number, device_id, key_id ASC) RETURNING *")
|
||||
.bind("number", number)
|
||||
.mapTo(KeyRecord.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
|
||||
}));
|
||||
}
|
||||
|
||||
public int getCount(String number, long deviceId) {
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getCountTimer.time()) {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = getCountTimer.time()) {
|
||||
return handle.createQuery("SELECT COUNT(*) FROM keys WHERE number = :number AND device_id = :device_id")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.mapTo(Integer.class)
|
||||
.findOnly();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public void vacuum() {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = vacuumTimer.time()) {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = vacuumTimer.time()) {
|
||||
handle.execute("VACUUM keys");
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3,13 +3,11 @@ package org.whispersystems.textsecuregcm.storage;
|
|||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.OutgoingMessageEntityRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.security.MessageDigest;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
@ -43,16 +41,16 @@ public class Messages {
|
|||
private final Timer clearTimer = metricRegistry.timer(name(Messages.class, "clear" ));
|
||||
private final Timer vacuumTimer = metricRegistry.timer(name(Messages.class, "vacuum"));
|
||||
|
||||
private final Jdbi database;
|
||||
private final FaultTolerantDatabase database;
|
||||
|
||||
public Messages(Jdbi database) {
|
||||
public Messages(FaultTolerantDatabase database) {
|
||||
this.database = database;
|
||||
this.database.registerRowMapper(new OutgoingMessageEntityRowMapper());
|
||||
this.database.getDatabase().registerRowMapper(new OutgoingMessageEntityRowMapper());
|
||||
}
|
||||
|
||||
public void store(UUID guid, Envelope message, String destination, long destinationDevice) {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = storeTimer.time()) {
|
||||
database.use(jdbi ->jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = storeTimer.time()) {
|
||||
handle.createUpdate("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " +
|
||||
"VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_device, :destination, :destination_device, :message, :content)")
|
||||
.bind("guid", guid)
|
||||
|
@ -68,24 +66,24 @@ public class Messages {
|
|||
.bind("content", message.hasContent() ? message.getContent().toByteArray() : null)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public List<OutgoingMessageEntity> load(String destination, long destinationDevice) {
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = loadTimer.time()) {
|
||||
return database.with(jdbi-> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = loadTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC LIMIT " + RESULT_SET_CHUNK_SIZE)
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.mapTo(OutgoingMessageEntity.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public Optional<OutgoingMessageEntity> remove(String destination, long destinationDevice, String source, long timestamp) {
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = removeBySourceTimer.time()) {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = removeBySourceTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device AND " + SOURCE + " = :source AND " + TIMESTAMP + " = :timestamp ORDER BY " + ID + " LIMIT 1) RETURNING *")
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
|
@ -94,59 +92,59 @@ public class Messages {
|
|||
.mapTo(OutgoingMessageEntity.class)
|
||||
.findFirst();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public Optional<OutgoingMessageEntity> remove(String destination, UUID guid) {
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = removeByGuidTimer.time()) {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = removeByGuidTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM MESSAGES WHERE " + GUID + " = :guid AND " + DESTINATION + " = :destination ORDER BY " + ID + " LIMIT 1) RETURNING *")
|
||||
.bind("destination", destination)
|
||||
.bind("guid", guid)
|
||||
.mapTo(OutgoingMessageEntity.class)
|
||||
.findFirst();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public void remove(String destination, long id) {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = removeByIdTimer.time()) {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = removeByIdTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM messages WHERE " + ID + " = :id AND " + DESTINATION + " = :destination")
|
||||
.bind("destination", destination)
|
||||
.bind("id", id)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public void clear(String destination) {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = clearTimer.time()) {
|
||||
database.use(jdbi ->jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = clearTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination")
|
||||
.bind("destination", destination)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public void clear(String destination, long destinationDevice) {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = clearDeviceTimer.time()) {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = clearDeviceTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device")
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public void vacuum() {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = vacuumTimer.time()) {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = vacuumTimer.time()) {
|
||||
handle.execute("VACUUM messages");
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Copyright (C) 2013 Open WhisperSystems
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
|
@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.storage;
|
|||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.StoredVerificationCodeRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
@ -36,16 +35,16 @@ public class PendingAccounts {
|
|||
private final Timer removeTimer = metricRegistry.timer(name(PendingAccounts.class, "remove" ));
|
||||
private final Timer vacuumTimer = metricRegistry.timer(name(PendingAccounts.class, "vacuum" ));
|
||||
|
||||
private final Jdbi database;
|
||||
private final FaultTolerantDatabase database;
|
||||
|
||||
public PendingAccounts(Jdbi database) {
|
||||
public PendingAccounts(FaultTolerantDatabase database) {
|
||||
this.database = database;
|
||||
this.database.registerRowMapper(new StoredVerificationCodeRowMapper());
|
||||
this.database.getDatabase().registerRowMapper(new StoredVerificationCodeRowMapper());
|
||||
}
|
||||
|
||||
public void insert(String number, String verificationCode, long timestamp) {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = insertTimer.time()) {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = insertTimer.time()) {
|
||||
handle.createUpdate("WITH upsert AS (UPDATE pending_accounts SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " +
|
||||
"INSERT INTO pending_accounts (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)")
|
||||
.bind("verification_code", verificationCode)
|
||||
|
@ -53,36 +52,36 @@ public class PendingAccounts {
|
|||
.bind("number", number)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public Optional<StoredVerificationCode> getCodeForNumber(String number) {
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getCodeForNumberTimer.time()) {
|
||||
return database.with(jdbi ->jdbi.withHandle(handle -> {
|
||||
try (Timer.Context ignored = getCodeForNumberTimer.time()) {
|
||||
return handle.createQuery("SELECT verification_code, timestamp FROM pending_accounts WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(StoredVerificationCode.class)
|
||||
.findFirst();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public void remove(String number) {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = removeTimer.time()) {
|
||||
database.use(jdbi-> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = removeTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM pending_accounts WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public void vacuum() {
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = vacuumTimer.time()) {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context ignored = vacuumTimer.time()) {
|
||||
handle.execute("VACUUM pending_accounts");
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.whispersystems.textsecuregcm.storage;
|
|||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.StoredVerificationCodeRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
@ -35,15 +34,15 @@ public class PendingDevices {
|
|||
private final Timer getCodeForNumberTimer = metricRegistry.timer(name(PendingDevices.class, "getcodeForNumber"));
|
||||
private final Timer removeTimer = metricRegistry.timer(name(PendingDevices.class, "remove" ));
|
||||
|
||||
private final Jdbi database;
|
||||
private final FaultTolerantDatabase database;
|
||||
|
||||
public PendingDevices(Jdbi database) {
|
||||
public PendingDevices(FaultTolerantDatabase database) {
|
||||
this.database = database;
|
||||
this.database.registerRowMapper(new StoredVerificationCodeRowMapper());
|
||||
this.database.getDatabase().registerRowMapper(new StoredVerificationCodeRowMapper());
|
||||
}
|
||||
|
||||
public void insert(String number, String verificationCode, long timestamp) {
|
||||
database.useHandle(handle -> {
|
||||
database.use(jdbi ->jdbi.useHandle(handle -> {
|
||||
try (Timer.Context timer = insertTimer.time()) {
|
||||
handle.createUpdate("WITH upsert AS (UPDATE pending_devices SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " +
|
||||
"INSERT INTO pending_devices (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)")
|
||||
|
@ -52,28 +51,28 @@ public class PendingDevices {
|
|||
.bind("timestamp", timestamp)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public Optional<StoredVerificationCode> getCodeForNumber(String number) {
|
||||
return database.withHandle(handle -> {
|
||||
return database.with(jdbi -> jdbi.withHandle(handle -> {
|
||||
try (Timer.Context timer = getCodeForNumberTimer.time()) {
|
||||
return handle.createQuery("SELECT verification_code, timestamp FROM pending_devices WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(StoredVerificationCode.class)
|
||||
.findFirst();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public void remove(String number) {
|
||||
database.useHandle(handle -> {
|
||||
database.use(jdbi -> jdbi.useHandle(handle -> {
|
||||
try (Timer.Context timer = removeTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM pending_devices WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
package org.whispersystems.textsecuregcm.util;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
|
||||
public class CircuitBreakerUtil {
|
||||
|
||||
public static void registerMetrics(MetricRegistry metricRegistry, CircuitBreaker circuitBreaker, Class<?> clazz) {
|
||||
Meter successMeter = metricRegistry.meter(name(clazz, circuitBreaker.getName(), "success" ));
|
||||
Meter failureMeter = metricRegistry.meter(name(clazz, circuitBreaker.getName(), "failure" ));
|
||||
Meter unpermittedMeter = metricRegistry.meter(name(clazz, circuitBreaker.getName(), "unpermitted"));
|
||||
|
||||
metricRegistry.gauge(name(clazz, circuitBreaker.getName(), "state"), () -> ()-> circuitBreaker.getState().getOrder());
|
||||
|
||||
circuitBreaker.getEventPublisher().onSuccess(event -> successMeter.mark());
|
||||
circuitBreaker.getEventPublisher().onError(event -> failureMeter.mark());
|
||||
circuitBreaker.getEventPublisher().onCallNotPermitted(event -> unpermittedMeter.mark());
|
||||
}
|
||||
|
||||
}
|
|
@ -16,6 +16,7 @@ import org.whispersystems.textsecuregcm.storage.Accounts;
|
|||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.util.Base64;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
|
@ -61,8 +62,9 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
|
|||
|
||||
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
JdbiFactory jdbiFactory = new JdbiFactory();
|
||||
Jdbi accountDatabase = jdbiFactory.build(environment, configuration.getDataSourceFactory(), "accountdb");
|
||||
JdbiFactory jdbiFactory = new JdbiFactory();
|
||||
Jdbi accountJdbi = jdbiFactory.build(environment, configuration.getAccountsDatabaseConfiguration(), "accountdb");
|
||||
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, configuration.getAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
|
||||
|
||||
Accounts accounts = new Accounts(accountDatabase);
|
||||
ReplicatedJedisPool cacheClient = new RedisClientFactory("main_cache_delete_command", configuration.getCacheConfiguration().getUrl(), configuration.getCacheConfiguration().getReplicaUrls(), configuration.getCacheConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
|
||||
|
|
|
@ -5,13 +5,14 @@ import org.jdbi.v3.core.Jdbi;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.DatabaseConfiguration;
|
||||
import org.whispersystems.textsecuregcm.storage.Accounts;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.Keys;
|
||||
import org.whispersystems.textsecuregcm.storage.Messages;
|
||||
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
|
||||
|
||||
import io.dropwizard.cli.ConfiguredCommand;
|
||||
import io.dropwizard.db.DataSourceFactory;
|
||||
import io.dropwizard.setup.Bootstrap;
|
||||
|
||||
|
||||
|
@ -29,12 +30,14 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
|
|||
WhisperServerConfiguration config)
|
||||
throws Exception
|
||||
{
|
||||
DataSourceFactory dbConfig = config.getDataSourceFactory();
|
||||
DataSourceFactory messageDbConfig = config.getMessageStoreConfiguration();
|
||||
DatabaseConfiguration accountDbConfig = config.getAccountsDatabaseConfiguration();
|
||||
DatabaseConfiguration messageDbConfig = config.getMessageStoreConfiguration();
|
||||
|
||||
Jdbi accountDatabase = Jdbi.create(dbConfig.getUrl(), dbConfig.getUser(), dbConfig.getPassword());
|
||||
Jdbi messageDatabase = Jdbi.create(messageDbConfig.getUrl(), messageDbConfig.getUser(), messageDbConfig.getPassword());
|
||||
Jdbi accountJdbi = Jdbi.create(accountDbConfig.getUrl(), accountDbConfig.getUser(), accountDbConfig.getPassword());
|
||||
Jdbi messageJdbi = Jdbi.create(messageDbConfig.getUrl(), messageDbConfig.getUser(), messageDbConfig.getPassword());
|
||||
|
||||
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_vacuum", accountJdbi, accountDbConfig.getCircuitBreakerConfiguration());
|
||||
FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database_vacuum", messageJdbi, messageDbConfig.getCircuitBreakerConfiguration());
|
||||
|
||||
Accounts accounts = new Accounts(accountDatabase);
|
||||
Keys keys = new Keys(accountDatabase);
|
||||
|
|
|
@ -7,8 +7,10 @@ import org.jdbi.v3.core.Jdbi;
|
|||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.storage.AbusiveHostRule;
|
||||
import org.whispersystems.textsecuregcm.storage.AbusiveHostRules;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
|
@ -26,7 +28,7 @@ public class AbusiveHostRulesTest {
|
|||
|
||||
@Before
|
||||
public void setup() {
|
||||
this.abusiveHostRules = new AbusiveHostRules(Jdbi.create(db.getTestDatabase()));
|
||||
this.abusiveHostRules = new AbusiveHostRules(new FaultTolerantDatabase("abusive_hosts-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -3,14 +3,19 @@ package org.whispersystems.textsecuregcm.tests.storage;
|
|||
import com.opentable.db.postgres.embedded.LiquibasePreparer;
|
||||
import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
|
||||
import com.opentable.db.postgres.junit.PreparedDbRule;
|
||||
import org.jdbi.v3.core.HandleCallback;
|
||||
import org.jdbi.v3.core.HandleConsumer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.jdbi.v3.core.transaction.TransactionException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.SignedPreKey;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Accounts;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -26,7 +31,11 @@ import java.util.Optional;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class AccountsTest {
|
||||
|
||||
|
@ -37,7 +46,11 @@ public class AccountsTest {
|
|||
|
||||
@Before
|
||||
public void setupAccountsDao() {
|
||||
this.accounts = new Accounts(Jdbi.create(db.getTestDatabase()));
|
||||
FaultTolerantDatabase faultTolerantDatabase = new FaultTolerantDatabase("accountsTest",
|
||||
Jdbi.create(db.getTestDatabase()),
|
||||
new CircuitBreakerConfiguration());
|
||||
|
||||
this.accounts = new Accounts(faultTolerantDatabase);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -178,11 +191,57 @@ public class AccountsTest {
|
|||
assertThat(retrieved.isPresent()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBreaker() throws InterruptedException {
|
||||
Jdbi jdbi = mock(Jdbi.class);
|
||||
doThrow(new TransactionException("Database error!")).when(jdbi).useHandle(any(HandleConsumer.class));
|
||||
|
||||
CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration();
|
||||
configuration.setWaitDurationInOpenStateInSeconds(1);
|
||||
configuration.setRingBufferSizeInHalfOpenState(1);
|
||||
configuration.setRingBufferSizeInClosedState(2);
|
||||
configuration.setFailureRateThreshold(50);
|
||||
|
||||
Accounts accounts = new Accounts(new FaultTolerantDatabase("testAccountBreaker", jdbi, configuration));
|
||||
Account account = generateAccount("+14151112222");
|
||||
|
||||
try {
|
||||
accounts.update(account);
|
||||
throw new AssertionError();
|
||||
} catch (TransactionException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
try {
|
||||
accounts.update(account);
|
||||
throw new AssertionError();
|
||||
} catch (TransactionException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
try {
|
||||
accounts.update(account);
|
||||
throw new AssertionError();
|
||||
} catch (CircuitBreakerOpenException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
Thread.sleep(1100);
|
||||
|
||||
try {
|
||||
accounts.update(account);
|
||||
throw new AssertionError();
|
||||
} catch (TransactionException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private Device generateDevice(long id) {
|
||||
Random random = new Random(System.currentTimeMillis());
|
||||
SignedPreKey signedPreKey = new SignedPreKey(random.nextInt(), "testPublicKey-" + random.nextInt(), "testSignature-" + random.nextInt());
|
||||
return new Device(1, "testName-" + random.nextInt(), "testAuthToken-" + random.nextInt(), "testSalt-" + random.nextInt(), null, "testGcmId-" + random.nextInt(), "testApnId-" + random.nextInt(), "testVoipApnId-" + random.nextInt(), random.nextBoolean(), random.nextInt(), signedPreKey, random.nextInt(), random.nextInt(), "testUserAgent-" + random.nextInt(), random.nextBoolean());
|
||||
return new Device(id, "testName-" + random.nextInt(), "testAuthToken-" + random.nextInt(), "testSalt-" + random.nextInt(), null, "testGcmId-" + random.nextInt(), "testApnId-" + random.nextInt(), "testVoipApnId-" + random.nextInt(), random.nextBoolean(), random.nextInt(), signedPreKey, random.nextInt(), random.nextInt(), "testUserAgent-" + random.nextInt(), random.nextBoolean());
|
||||
}
|
||||
|
||||
private Account generateAccount(String number) {
|
||||
|
|
|
@ -3,33 +3,54 @@ package org.whispersystems.textsecuregcm.tests.storage;
|
|||
import com.opentable.db.postgres.embedded.LiquibasePreparer;
|
||||
import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
|
||||
import com.opentable.db.postgres.junit.PreparedDbRule;
|
||||
import org.jdbi.v3.core.HandleCallback;
|
||||
import org.jdbi.v3.core.HandleConsumer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.jdbi.v3.core.transaction.SerializableTransactionRunner;
|
||||
import org.jdbi.v3.core.transaction.TransactionException;
|
||||
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.postgresql.util.PSQLException;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.PreKey;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.KeyRecord;
|
||||
import org.whispersystems.textsecuregcm.storage.Keys;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class KeysTest {
|
||||
|
||||
@Rule
|
||||
public PreparedDbRule db = EmbeddedPostgresRules.preparedDatabase(LiquibasePreparer.forClasspathLocation("accountsdb.xml"));
|
||||
|
||||
private Keys keys;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
FaultTolerantDatabase faultTolerantDatabase = new FaultTolerantDatabase("keysTest",
|
||||
Jdbi.create(db.getTestDatabase()),
|
||||
new CircuitBreakerConfiguration());
|
||||
|
||||
this.keys = new Keys(faultTolerantDatabase);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPopulateKeys() throws SQLException {
|
||||
DataSource dataSource = db.getTestDatabase();
|
||||
Jdbi jdbi = Jdbi.create(dataSource);
|
||||
Keys keys = new Keys(jdbi);
|
||||
|
||||
List<PreKey> deviceOnePreKeys = new LinkedList<>();
|
||||
List<PreKey> deviceTwoPreKeys = new LinkedList<>();
|
||||
|
||||
|
@ -55,7 +76,7 @@ public class KeysTest {
|
|||
keys.store("+14151111111", 1, anotherDeviceOnePreKeys);
|
||||
keys.store("+14151111111", 2, anotherDeviceTwoPreKeys);
|
||||
|
||||
PreparedStatement statement = dataSource.getConnection().prepareStatement("SELECT * FROM keys WHERE number = ? AND device_id = ? ORDER BY key_id");
|
||||
PreparedStatement statement = db.getTestDatabase().getConnection().prepareStatement("SELECT * FROM keys WHERE number = ? AND device_id = ? ORDER BY key_id");
|
||||
verifyStoredState(statement, "+14152222222", 1);
|
||||
verifyStoredState(statement, "+14152222222", 2);
|
||||
verifyStoredState(statement, "+14151111111", 1);
|
||||
|
@ -64,10 +85,6 @@ public class KeysTest {
|
|||
|
||||
@Test
|
||||
public void testKeyCount() {
|
||||
DataSource dataSource = db.getTestDatabase();
|
||||
Jdbi jdbi = Jdbi.create(dataSource);
|
||||
Keys keys = new Keys(jdbi);
|
||||
|
||||
List<PreKey> deviceOnePreKeys = new LinkedList<>();
|
||||
|
||||
for (int i=1;i<=100;i++) {
|
||||
|
@ -82,10 +99,6 @@ public class KeysTest {
|
|||
|
||||
@Test
|
||||
public void testGetForDevice() {
|
||||
DataSource dataSource = db.getTestDatabase();
|
||||
Jdbi jdbi = Jdbi.create(dataSource);
|
||||
Keys keys = new Keys(jdbi);
|
||||
|
||||
List<PreKey> deviceOnePreKeys = new LinkedList<>();
|
||||
List<PreKey> deviceTwoPreKeys = new LinkedList<>();
|
||||
|
||||
|
@ -143,10 +156,6 @@ public class KeysTest {
|
|||
|
||||
@Test
|
||||
public void testGetForAllDevices() {
|
||||
DataSource dataSource = db.getTestDatabase();
|
||||
Jdbi jdbi = Jdbi.create(dataSource);
|
||||
Keys keys = new Keys(jdbi);
|
||||
|
||||
List<PreKey> deviceOnePreKeys = new LinkedList<>();
|
||||
List<PreKey> deviceTwoPreKeys = new LinkedList<>();
|
||||
|
||||
|
@ -219,10 +228,6 @@ public class KeysTest {
|
|||
|
||||
@Test
|
||||
public void testGetForAllDevicesParallel() throws InterruptedException {
|
||||
DataSource dataSource = db.getTestDatabase();
|
||||
Jdbi jdbi = Jdbi.create(dataSource);
|
||||
Keys keys = new Keys(jdbi);
|
||||
|
||||
List<PreKey> deviceOnePreKeys = new LinkedList<>();
|
||||
List<PreKey> deviceTwoPreKeys = new LinkedList<>();
|
||||
|
||||
|
@ -259,10 +264,6 @@ public class KeysTest {
|
|||
|
||||
@Test
|
||||
public void testEmptyKeyGet() {
|
||||
DataSource dataSource = db.getTestDatabase();
|
||||
Jdbi jdbi = Jdbi.create(dataSource);
|
||||
Keys keys = new Keys(jdbi);
|
||||
|
||||
List<KeyRecord> records = keys.get("+14152222222");
|
||||
|
||||
assertThat(records.isEmpty()).isTrue();
|
||||
|
@ -270,13 +271,61 @@ public class KeysTest {
|
|||
|
||||
@Test
|
||||
public void testVacuum() {
|
||||
DataSource dataSource = db.getTestDatabase();
|
||||
Jdbi jdbi = Jdbi.create(dataSource);
|
||||
Keys keys = new Keys(jdbi);
|
||||
|
||||
keys.vacuum();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBreaker() throws InterruptedException {
|
||||
Jdbi jdbi = mock(Jdbi.class);
|
||||
doThrow(new TransactionException("Database error!")).when(jdbi).useTransaction(any(TransactionIsolationLevel.class), any(HandleConsumer.class));
|
||||
when(jdbi.getConfig(any())).thenReturn(mock(SerializableTransactionRunner.Configuration.class));
|
||||
|
||||
CircuitBreakerConfiguration configuration = new CircuitBreakerConfiguration();
|
||||
configuration.setWaitDurationInOpenStateInSeconds(1);
|
||||
configuration.setRingBufferSizeInHalfOpenState(1);
|
||||
configuration.setRingBufferSizeInClosedState(2);
|
||||
configuration.setFailureRateThreshold(50);
|
||||
|
||||
Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, configuration));
|
||||
|
||||
List<PreKey> deviceOnePreKeys = new LinkedList<>();
|
||||
|
||||
for (int i=1;i<=100;i++) {
|
||||
deviceOnePreKeys.add(new PreKey(i, "+14152222222Device1PublicKey" + i));
|
||||
}
|
||||
|
||||
try {
|
||||
keys.store("+14152222222", 1, deviceOnePreKeys);
|
||||
throw new AssertionError();
|
||||
} catch (TransactionException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
try {
|
||||
keys.store("+14152222222", 1, deviceOnePreKeys);
|
||||
throw new AssertionError();
|
||||
} catch (TransactionException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
try {
|
||||
keys.store("+14152222222", 1, deviceOnePreKeys);
|
||||
throw new AssertionError();
|
||||
} catch (CircuitBreakerOpenException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
Thread.sleep(1100);
|
||||
|
||||
try {
|
||||
keys.store("+14152222222", 1, deviceOnePreKeys);
|
||||
throw new AssertionError();
|
||||
} catch (TransactionException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void verifyStoredState(PreparedStatement statement, String number, int deviceId) throws SQLException {
|
||||
statement.setString(1, number);
|
||||
|
|
|
@ -8,8 +8,10 @@ import org.jdbi.v3.core.Jdbi;
|
|||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.Messages;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
|
@ -34,7 +36,7 @@ public class MessagesTest {
|
|||
|
||||
@Before
|
||||
public void setupAccountsDao() {
|
||||
this.messages = new Messages(Jdbi.create(db.getTestDatabase()));
|
||||
this.messages = new Messages(new FaultTolerantDatabase("messages-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -8,6 +8,8 @@ import org.junit.Before;
|
|||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
|
@ -26,7 +28,7 @@ public class PendingAccountsTest {
|
|||
|
||||
@Before
|
||||
public void setupAccountsDao() {
|
||||
this.pendingAccounts = new PendingAccounts(Jdbi.create(db.getTestDatabase()));
|
||||
this.pendingAccounts = new PendingAccounts(new FaultTolerantDatabase("pending_accounts-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -8,6 +8,8 @@ import org.junit.Before;
|
|||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.PendingDevices;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
|
@ -26,7 +28,7 @@ public class PendingDevicesTest {
|
|||
|
||||
@Before
|
||||
public void setupAccountsDao() {
|
||||
this.pendingDevices = new PendingDevices(Jdbi.create(db.getTestDatabase()));
|
||||
this.pendingDevices = new PendingDevices(new FaultTolerantDatabase("peding_devices-test", Jdbi.create(db.getTestDatabase()), new CircuitBreakerConfiguration()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue