Support for timing jdbi3 queries
This commit is contained in:
parent
20f09e6c6e
commit
457459671c
|
@ -17,6 +17,7 @@
|
|||
package org.whispersystems.textsecuregcm;
|
||||
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.jdbi3.strategies.DefaultNameStrategy;
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
|
@ -144,7 +145,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
environment.getObjectMapper().setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
|
||||
environment.getObjectMapper().setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
|
||||
|
||||
JdbiFactory jdbiFactory = new JdbiFactory();
|
||||
JdbiFactory jdbiFactory = new JdbiFactory(DefaultNameStrategy.CHECK_EMPTY);
|
||||
Jdbi accountDatabase = jdbiFactory.build(environment, config.getDataSourceFactory(), "accountdb");
|
||||
Jdbi messageDatabase = jdbiFactory.build(environment, config.getMessageStoreConfiguration(), "messagedb");
|
||||
Jdbi abuseDatabase = jdbiFactory.build(environment, config.getAbuseDatabaseConfiguration(), "abusedb");
|
||||
|
|
|
@ -1,10 +1,16 @@
|
|||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.AbusiveHostRuleRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class AbusiveHostRules {
|
||||
|
||||
public static final String ID = "id";
|
||||
|
@ -12,6 +18,9 @@ public class AbusiveHostRules {
|
|||
public static final String BLOCKED = "blocked";
|
||||
public static final String REGIONS = "regions";
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Timer getTimer = metricRegistry.timer(name(AbusiveHostRules.class, "get"));
|
||||
|
||||
private final Jdbi database;
|
||||
|
||||
public AbusiveHostRules(Jdbi database) {
|
||||
|
@ -20,10 +29,14 @@ public class AbusiveHostRules {
|
|||
}
|
||||
|
||||
public List<AbusiveHostRule> getAbusiveHostRulesFor(String host) {
|
||||
return database.withHandle(handle -> handle.createQuery("SELECT * FROM abusive_host_rules WHERE :host::inet <<= " + HOST)
|
||||
.bind("host", host)
|
||||
.mapTo(AbusiveHostRule.class)
|
||||
.list());
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM abusive_host_rules WHERE :host::inet <<= " + HOST)
|
||||
.bind("host", host)
|
||||
.mapTo(AbusiveHostRule.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,16 +16,22 @@
|
|||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class Accounts {
|
||||
|
||||
public static final String ID = "id";
|
||||
|
@ -34,6 +40,14 @@ public class Accounts {
|
|||
|
||||
private static final ObjectMapper mapper = SystemMapper.getMapper();
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Timer createTimer = metricRegistry.timer(name(Accounts.class, "create"));
|
||||
private final Timer updateTimer = metricRegistry.timer(name(Accounts.class, "update"));
|
||||
private final Timer getTimer = metricRegistry.timer(name(Accounts.class, "get"));
|
||||
private final Timer getAllFromTimer = metricRegistry.timer(name(Accounts.class, "getAllFrom"));
|
||||
private final Timer getAllFromOffsetTimer = metricRegistry.timer(name(Accounts.class, "getAllFromOffset"));
|
||||
private final Timer vacuumTimer = metricRegistry.timer(name(Accounts.class, "vacuum"));
|
||||
|
||||
private final Jdbi database;
|
||||
|
||||
public Accounts(Jdbi database) {
|
||||
|
@ -43,7 +57,7 @@ public class Accounts {
|
|||
|
||||
public boolean create(Account account) {
|
||||
return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try {
|
||||
try (Timer.Context timer = createTimer.time()) {
|
||||
int rows = handle.createUpdate("DELETE FROM accounts WHERE " + NUMBER + " = :number")
|
||||
.bind("number", account.getNumber())
|
||||
.execute();
|
||||
|
@ -63,7 +77,7 @@ public class Accounts {
|
|||
|
||||
public void update(Account account) {
|
||||
database.useHandle(handle -> {
|
||||
try {
|
||||
try (Timer.Context timer = updateTimer.time()) {
|
||||
handle.createUpdate("UPDATE accounts SET " + DATA + " = CAST(:data AS json) WHERE " + NUMBER + " = :number")
|
||||
.bind("number", account.getNumber())
|
||||
.bind("data", mapper.writeValueAsString(account))
|
||||
|
@ -75,30 +89,46 @@ public class Accounts {
|
|||
}
|
||||
|
||||
public Optional<Account> get(String number) {
|
||||
return database.withHandle(handle -> handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(Account.class)
|
||||
.findFirst());
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(Account.class)
|
||||
.findFirst();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
public List<Account> getAllFrom(String from, int length) {
|
||||
return database.withHandle(handle -> handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " > :from ORDER BY " + NUMBER + " LIMIT :limit")
|
||||
.bind("from", from)
|
||||
.bind("limit", length)
|
||||
.mapTo(Account.class)
|
||||
.list());
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getAllFromOffsetTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " > :from ORDER BY " + NUMBER + " LIMIT :limit")
|
||||
.bind("from", from)
|
||||
.bind("limit", length)
|
||||
.mapTo(Account.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public List<Account> getAllFrom(int length) {
|
||||
return database.withHandle(handle -> handle.createQuery("SELECT * FROM accounts ORDER BY " + NUMBER + " LIMIT :limit")
|
||||
.bind("limit", length)
|
||||
.mapTo(Account.class)
|
||||
.list());
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getAllFromTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM accounts ORDER BY " + NUMBER + " LIMIT :limit")
|
||||
.bind("limit", length)
|
||||
.mapTo(Account.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void vacuum() {
|
||||
database.useHandle(handle -> handle.execute("VACUUM accounts"));
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = vacuumTimer.time()) {
|
||||
handle.execute("VACUUM accounts");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,17 +16,31 @@
|
|||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.jdbi.v3.core.statement.PreparedBatch;
|
||||
import org.jdbi.v3.core.transaction.SerializableTransactionRunner;
|
||||
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
|
||||
import org.whispersystems.textsecuregcm.entities.PreKey;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.KeyRecordRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.security.Key;
|
||||
import java.util.List;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class Keys {
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Timer storeTimer = metricRegistry.timer(name(Keys.class, "store" ));
|
||||
private final Timer getDevicetTimer = metricRegistry.timer(name(Keys.class, "getDevice"));
|
||||
private final Timer getTimer = metricRegistry.timer(name(Keys.class, "get" ));
|
||||
private final Timer getCountTimer = metricRegistry.timer(name(Keys.class, "getCount" ));
|
||||
private final Timer vacuumTimer = metricRegistry.timer(name(Keys.class, "vacuum" ));
|
||||
|
||||
private final Jdbi database;
|
||||
|
||||
public Keys(Jdbi database) {
|
||||
|
@ -38,53 +52,69 @@ public class Keys {
|
|||
|
||||
public void store(String number, long deviceId, List<PreKey> keys) {
|
||||
database.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
PreparedBatch preparedBatch = handle.prepareBatch("INSERT INTO keys (number, device_id, key_id, public_key) VALUES (:number, :device_id, :key_id, :public_key)");
|
||||
try (Timer.Context timer = storeTimer.time()) {
|
||||
PreparedBatch preparedBatch = handle.prepareBatch("INSERT INTO keys (number, device_id, key_id, public_key) VALUES (:number, :device_id, :key_id, :public_key)");
|
||||
|
||||
for (PreKey key : keys) {
|
||||
preparedBatch.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.bind("key_id", key.getKeyId())
|
||||
.bind("public_key", key.getPublicKey())
|
||||
.add();
|
||||
for (PreKey key : keys) {
|
||||
preparedBatch.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.bind("key_id", key.getKeyId())
|
||||
.bind("public_key", key.getPublicKey())
|
||||
.add();
|
||||
}
|
||||
|
||||
handle.createUpdate("DELETE FROM keys WHERE number = :number AND device_id = :device_id")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.execute();
|
||||
|
||||
preparedBatch.execute();
|
||||
}
|
||||
|
||||
handle.createUpdate("DELETE FROM keys WHERE number = :number AND device_id = :device_id")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.execute();
|
||||
|
||||
preparedBatch.execute();
|
||||
});
|
||||
}
|
||||
|
||||
public List<KeyRecord> get(String number, long deviceId) {
|
||||
return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE,
|
||||
handle -> handle.createQuery("DELETE FROM keys WHERE id IN (SELECT id FROM keys WHERE number = :number AND device_id = :device_id ORDER BY key_id ASC LIMIT 1) RETURNING *")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.mapTo(KeyRecord.class)
|
||||
.list());
|
||||
return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context timer = getDevicetTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT id FROM keys WHERE number = :number AND device_id = :device_id ORDER BY key_id ASC LIMIT 1) RETURNING *")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.mapTo(KeyRecord.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public List<KeyRecord> get(String number) {
|
||||
return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE,
|
||||
handle -> handle.createQuery("DELETE FROM keys WHERE id IN (SELECT DISTINCT ON (number, device_id) id FROM keys WHERE number = :number ORDER BY number, device_id, key_id ASC) RETURNING *")
|
||||
.bind("number", number)
|
||||
.mapTo(KeyRecord.class)
|
||||
.list());
|
||||
return database.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
|
||||
try (Timer.Context timer = getTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT DISTINCT ON (number, device_id) id FROM keys WHERE number = :number ORDER BY number, device_id, key_id ASC) RETURNING *")
|
||||
.bind("number", number)
|
||||
.mapTo(KeyRecord.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
public int getCount(String number, long deviceId) {
|
||||
return database.withHandle(handle -> handle.createQuery("SELECT COUNT(*) FROM keys WHERE number = :number AND device_id = :device_id")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.mapTo(Integer.class)
|
||||
.findOnly());
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getCountTimer.time()) {
|
||||
return handle.createQuery("SELECT COUNT(*) FROM keys WHERE number = :number AND device_id = :device_id")
|
||||
.bind("number", number)
|
||||
.bind("device_id", deviceId)
|
||||
.mapTo(Integer.class)
|
||||
.findOnly();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void vacuum() {
|
||||
database.useHandle(handle -> handle.execute("VACUUM keys"));
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = vacuumTimer.time()) {
|
||||
handle.execute("VACUUM keys");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,14 +1,21 @@
|
|||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.OutgoingMessageEntityRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.security.MessageDigest;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class Messages {
|
||||
|
||||
static final int RESULT_SET_CHUNK_SIZE = 100;
|
||||
|
@ -26,6 +33,16 @@ public class Messages {
|
|||
public static final String MESSAGE = "message";
|
||||
public static final String CONTENT = "content";
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Timer storeTimer = metricRegistry.timer(name(Messages.class, "store" ));
|
||||
private final Timer loadTimer = metricRegistry.timer(name(Messages.class, "load" ));
|
||||
private final Timer removeBySourceTimer = metricRegistry.timer(name(Messages.class, "removeBySource"));
|
||||
private final Timer removeByGuidTimer = metricRegistry.timer(name(Messages.class, "removeByGuid" ));
|
||||
private final Timer removeByIdTimer = metricRegistry.timer(name(Messages.class, "removeById" ));
|
||||
private final Timer clearDeviceTimer = metricRegistry.timer(name(Messages.class, "clearDevice" ));
|
||||
private final Timer clearTimer = metricRegistry.timer(name(Messages.class, "clear" ));
|
||||
private final Timer vacuumTimer = metricRegistry.timer(name(Messages.class, "vacuum"));
|
||||
|
||||
private final Jdbi database;
|
||||
|
||||
public Messages(Jdbi database) {
|
||||
|
@ -35,71 +52,101 @@ public class Messages {
|
|||
|
||||
public void store(UUID guid, Envelope message, String destination, long destinationDevice) {
|
||||
database.useHandle(handle -> {
|
||||
handle.createUpdate("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " +
|
||||
"VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_device, :destination, :destination_device, :message, :content)")
|
||||
.bind("guid", guid)
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.bind("type", message.getType().getNumber())
|
||||
.bind("relay", message.getRelay())
|
||||
.bind("timestamp", message.getTimestamp())
|
||||
.bind("server_timestamp", message.getServerTimestamp())
|
||||
.bind("source", message.hasSource() ? message.getSource() : null)
|
||||
.bind("source_device", message.hasSourceDevice() ? message.getSourceDevice() : null)
|
||||
.bind("message", message.hasLegacyMessage() ? message.getLegacyMessage().toByteArray() : null)
|
||||
.bind("content", message.hasContent() ? message.getContent().toByteArray() : null)
|
||||
.execute();
|
||||
try (Timer.Context timer = storeTimer.time()) {
|
||||
handle.createUpdate("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " +
|
||||
"VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_device, :destination, :destination_device, :message, :content)")
|
||||
.bind("guid", guid)
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.bind("type", message.getType().getNumber())
|
||||
.bind("relay", message.getRelay())
|
||||
.bind("timestamp", message.getTimestamp())
|
||||
.bind("server_timestamp", message.getServerTimestamp())
|
||||
.bind("source", message.hasSource() ? message.getSource() : null)
|
||||
.bind("source_device", message.hasSourceDevice() ? message.getSourceDevice() : null)
|
||||
.bind("message", message.hasLegacyMessage() ? message.getLegacyMessage().toByteArray() : null)
|
||||
.bind("content", message.hasContent() ? message.getContent().toByteArray() : null)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public List<OutgoingMessageEntity> load(String destination, long destinationDevice) {
|
||||
return database.withHandle(handle -> handle.createQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC LIMIT " + RESULT_SET_CHUNK_SIZE)
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.mapTo(OutgoingMessageEntity.class)
|
||||
.list());
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = loadTimer.time()) {
|
||||
return handle.createQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC LIMIT " + RESULT_SET_CHUNK_SIZE)
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.mapTo(OutgoingMessageEntity.class)
|
||||
.list();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public Optional<OutgoingMessageEntity> remove(String destination, long destinationDevice, String source, long timestamp) {
|
||||
return database.withHandle(handle -> handle.createQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device AND " + SOURCE + " = :source AND " + TIMESTAMP + " = :timestamp ORDER BY " + ID + " LIMIT 1) RETURNING *")
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.bind("source", source)
|
||||
.bind("timestamp", timestamp)
|
||||
.mapTo(OutgoingMessageEntity.class)
|
||||
.findFirst());
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = removeBySourceTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device AND " + SOURCE + " = :source AND " + TIMESTAMP + " = :timestamp ORDER BY " + ID + " LIMIT 1) RETURNING *")
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.bind("source", source)
|
||||
.bind("timestamp", timestamp)
|
||||
.mapTo(OutgoingMessageEntity.class)
|
||||
.findFirst();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public Optional<OutgoingMessageEntity> remove(String destination, UUID guid) {
|
||||
return database.withHandle(handle -> handle.createQuery("DELETE FROM messages WHERE "+ ID + " IN (SELECT " + ID + " FROM MESSAGES WHERE " + GUID + " = :guid AND " + DESTINATION + " = :destination ORDER BY " + ID + " LIMIT 1) RETURNING *")
|
||||
.bind("destination", destination)
|
||||
.bind("guid", guid)
|
||||
.mapTo(OutgoingMessageEntity.class)
|
||||
.findFirst());
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = removeByGuidTimer.time()) {
|
||||
return handle.createQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM MESSAGES WHERE " + GUID + " = :guid AND " + DESTINATION + " = :destination ORDER BY " + ID + " LIMIT 1) RETURNING *")
|
||||
.bind("destination", destination)
|
||||
.bind("guid", guid)
|
||||
.mapTo(OutgoingMessageEntity.class)
|
||||
.findFirst();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void remove(String destination, long id) {
|
||||
database.useHandle(handle -> handle.createUpdate("DELETE FROM messages WHERE " + ID + " = :id AND " + DESTINATION + " = :destination")
|
||||
.bind("destination", destination)
|
||||
.bind("id", id)
|
||||
.execute());
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = removeByIdTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM messages WHERE " + ID + " = :id AND " + DESTINATION + " = :destination")
|
||||
.bind("destination", destination)
|
||||
.bind("id", id)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void clear(String destination) {
|
||||
database.useHandle(handle -> handle.createUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination")
|
||||
.bind("destination", destination)
|
||||
.execute());
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = clearTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination")
|
||||
.bind("destination", destination)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void clear(String destination, long destinationDevice) {
|
||||
database.useHandle(handle -> handle.createUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device")
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.execute());
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = clearDeviceTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device")
|
||||
.bind("destination", destination)
|
||||
.bind("destination_device", destinationDevice)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void vacuum() {
|
||||
database.useHandle(handle -> handle.execute("VACUUM messages"));
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = vacuumTimer.time()) {
|
||||
handle.execute("VACUUM messages");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -16,14 +16,26 @@
|
|||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.StoredVerificationCodeRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class PendingAccounts {
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Timer insertTimer = metricRegistry.timer(name(PendingAccounts.class, "insert" ));
|
||||
private final Timer getCodeForNumberTimer = metricRegistry.timer(name(PendingAccounts.class, "getCodeForNumber"));
|
||||
private final Timer removeTimer = metricRegistry.timer(name(PendingAccounts.class, "remove" ));
|
||||
private final Timer vacuumTimer = metricRegistry.timer(name(PendingAccounts.class, "vacuum" ));
|
||||
|
||||
private final Jdbi database;
|
||||
|
||||
public PendingAccounts(Jdbi database) {
|
||||
|
@ -32,29 +44,45 @@ public class PendingAccounts {
|
|||
}
|
||||
|
||||
public void insert(String number, String verificationCode, long timestamp) {
|
||||
database.useHandle(handle -> handle.createUpdate("WITH upsert AS (UPDATE pending_accounts SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " +
|
||||
"INSERT INTO pending_accounts (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)")
|
||||
.bind("verification_code", verificationCode)
|
||||
.bind("timestamp", timestamp)
|
||||
.bind("number", number)
|
||||
.execute());
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = insertTimer.time()) {
|
||||
handle.createUpdate("WITH upsert AS (UPDATE pending_accounts SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " +
|
||||
"INSERT INTO pending_accounts (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)")
|
||||
.bind("verification_code", verificationCode)
|
||||
.bind("timestamp", timestamp)
|
||||
.bind("number", number)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public Optional<StoredVerificationCode> getCodeForNumber(String number) {
|
||||
return database.withHandle(handle -> handle.createQuery("SELECT verification_code, timestamp FROM pending_accounts WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(StoredVerificationCode.class)
|
||||
.findFirst());
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getCodeForNumberTimer.time()) {
|
||||
return handle.createQuery("SELECT verification_code, timestamp FROM pending_accounts WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(StoredVerificationCode.class)
|
||||
.findFirst();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void remove(String number) {
|
||||
database.useHandle(handle -> handle.createUpdate("DELETE FROM pending_accounts WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.execute());
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = removeTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM pending_accounts WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void vacuum() {
|
||||
database.useHandle(handle -> handle.execute("VACUUM pending_accounts"));
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = vacuumTimer.time()) {
|
||||
handle.execute("VACUUM pending_accounts");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,14 +16,25 @@
|
|||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
|
||||
import org.whispersystems.textsecuregcm.storage.mappers.StoredVerificationCodeRowMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class PendingDevices {
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Timer insertTimer = metricRegistry.timer(name(PendingDevices.class, "insert" ));
|
||||
private final Timer getCodeForNumberTimer = metricRegistry.timer(name(PendingDevices.class, "getcodeForNumber"));
|
||||
private final Timer removeTimer = metricRegistry.timer(name(PendingDevices.class, "remove" ));
|
||||
|
||||
private final Jdbi database;
|
||||
|
||||
public PendingDevices(Jdbi database) {
|
||||
|
@ -32,25 +43,37 @@ public class PendingDevices {
|
|||
}
|
||||
|
||||
public void insert(String number, String verificationCode, long timestamp) {
|
||||
database.useHandle(handle -> handle.createUpdate("WITH upsert AS (UPDATE pending_devices SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " +
|
||||
"INSERT INTO pending_devices (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)")
|
||||
.bind("number", number)
|
||||
.bind("verification_code", verificationCode)
|
||||
.bind("timestamp", timestamp)
|
||||
.execute());
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = insertTimer.time()) {
|
||||
handle.createUpdate("WITH upsert AS (UPDATE pending_devices SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " +
|
||||
"INSERT INTO pending_devices (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)")
|
||||
.bind("number", number)
|
||||
.bind("verification_code", verificationCode)
|
||||
.bind("timestamp", timestamp)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public Optional<StoredVerificationCode> getCodeForNumber(String number) {
|
||||
return database.withHandle(handle -> handle.createQuery("SELECT verification_code, timestamp FROM pending_devices WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(StoredVerificationCode.class)
|
||||
.findFirst());
|
||||
return database.withHandle(handle -> {
|
||||
try (Timer.Context timer = getCodeForNumberTimer.time()) {
|
||||
return handle.createQuery("SELECT verification_code, timestamp FROM pending_devices WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.mapTo(StoredVerificationCode.class)
|
||||
.findFirst();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void remove(String number) {
|
||||
database.useHandle(handle -> handle.createUpdate("DELETE FROM pending_devices WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.execute());
|
||||
database.useHandle(handle -> {
|
||||
try (Timer.Context timer = removeTimer.time()) {
|
||||
handle.createUpdate("DELETE FROM pending_devices WHERE number = :number")
|
||||
.bind("number", number)
|
||||
.execute();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -205,8 +205,8 @@ public class MessagesTest {
|
|||
|
||||
|
||||
private void verifyExpected(OutgoingMessageEntity retrieved, Envelope inserted, UUID guid) {
|
||||
assertThat(retrieved.getSource()).isEqualTo(inserted.getSource());
|
||||
assertThat(retrieved.getTimestamp()).isEqualTo(inserted.getTimestamp());
|
||||
assertThat(retrieved.getSource()).isEqualTo(inserted.getSource());
|
||||
assertThat(retrieved.getRelay()).isEqualTo(inserted.getRelay());
|
||||
assertThat(retrieved.getType()).isEqualTo(inserted.getType().getNumber());
|
||||
assertThat(retrieved.getContent()).isEqualTo(inserted.getContent().toByteArray());
|
||||
|
|
Loading…
Reference in New Issue