From ee6785eff9cc1d9983bc2fcf6df2d4ff92040293 Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Thu, 20 Sep 2018 09:38:57 -0700 Subject: [PATCH] Normalized migration result, clean up --- .gitignore | 1 + .../WhisperServerConfiguration.java | 5 +-- .../textsecuregcm/WhisperServerService.java | 19 +-------- .../MessageCacheConfiguration.java | 12 ------ .../MessageStoreConfiguration.java | 14 ------- .../textsecuregcm/storage/Messages.java | 19 ++------- .../textsecuregcm/storage/MessagesCache.java | 5 ++- .../storage/MessagesManager.java | 41 +------------------ .../workers/TrimMessagesCommand.java | 4 +- .../textsecuregcm/workers/VacuumCommand.java | 2 +- src/main/resources/messagedb.xml | 30 ++++++++++++++ 11 files changed, 46 insertions(+), 106 deletions(-) delete mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/MessageStoreConfiguration.java diff --git a/.gitignore b/.gitignore index 0fac36d71..f9e44cec3 100644 --- a/.gitignore +++ b/.gitignore @@ -8,5 +8,6 @@ local.yml config/production.yml config/federated.yml config/staging.yml +config/testing.yml .opsmanage put.sh diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index a31024cfe..fe6a93cad 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -18,6 +18,8 @@ package org.whispersystems.textsecuregcm; import com.fasterxml.jackson.annotation.JsonProperty; import org.whispersystems.textsecuregcm.configuration.ApnConfiguration; +import org.whispersystems.textsecuregcm.configuration.AttachmentsConfiguration; +import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; import org.whispersystems.textsecuregcm.configuration.FederationConfiguration; import org.whispersystems.textsecuregcm.configuration.GcmConfiguration; import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration; @@ -25,10 +27,7 @@ import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration; import org.whispersystems.textsecuregcm.configuration.ProfilesConfiguration; import org.whispersystems.textsecuregcm.configuration.PushConfiguration; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration; -import org.whispersystems.textsecuregcm.configuration.RedPhoneConfiguration; import org.whispersystems.textsecuregcm.configuration.RedisConfiguration; -import org.whispersystems.textsecuregcm.configuration.AttachmentsConfiguration; -import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; import org.whispersystems.textsecuregcm.configuration.TestDeviceConfiguration; import org.whispersystems.textsecuregcm.configuration.TurnConfiguration; import org.whispersystems.textsecuregcm.configuration.TwilioConfiguration; diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index bfd8b2ed3..f6a13ce3c 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -69,22 +69,7 @@ import org.whispersystems.textsecuregcm.s3.UrlSigner; import org.whispersystems.textsecuregcm.sms.SmsSender; import org.whispersystems.textsecuregcm.sms.TwilioSmsSender; import org.whispersystems.textsecuregcm.sqs.DirectoryQueue; -import org.whispersystems.textsecuregcm.storage.Account; -import org.whispersystems.textsecuregcm.storage.Accounts; -import org.whispersystems.textsecuregcm.storage.AccountsManager; -import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient; -import org.whispersystems.textsecuregcm.storage.DirectoryManager; -import org.whispersystems.textsecuregcm.storage.DirectoryReconciler; -import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationCache; -import org.whispersystems.textsecuregcm.storage.Keys; -import org.whispersystems.textsecuregcm.storage.Messages; -import org.whispersystems.textsecuregcm.storage.MessagesCache; -import org.whispersystems.textsecuregcm.storage.MessagesManager; -import org.whispersystems.textsecuregcm.storage.PendingAccounts; -import org.whispersystems.textsecuregcm.storage.PendingAccountsManager; -import org.whispersystems.textsecuregcm.storage.PendingDevices; -import org.whispersystems.textsecuregcm.storage.PendingDevicesManager; -import org.whispersystems.textsecuregcm.storage.PubSubManager; +import org.whispersystems.textsecuregcm.storage.*; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener; import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler; @@ -180,7 +165,7 @@ public class WhisperServerService extends Application load(@Bind("destination") String destination, @Bind("destination_device") long destinationDevice); @@ -56,10 +55,6 @@ public abstract class Messages { @Bind("source") String source, @Bind("timestamp") long timestamp); - @Mapper(DestinationMapper.class) - @SqlQuery("SELECT DISTINCT ON (destination, destination_device) destination, destination_device FROM messages WHERE timestamp > :timestamp ORDER BY destination, destination_device OFFSET :offset LIMIT :limit") - public abstract List> getPendingDestinations(@Bind("timestamp") long sinceTimestamp, @Bind("offset") int offset, @Bind("limit") int limit); - @Mapper(MessageMapper.class) @SqlUpdate("DELETE FROM messages WHERE " + ID + " = :id AND " + DESTINATION + " = :destination") abstract void remove(@Bind("destination") String destination, @Bind("id") long id); @@ -76,14 +71,6 @@ public abstract class Messages { @SqlUpdate("VACUUM messages") public abstract void vacuum(); - public static class DestinationMapper implements ResultSetMapper> { - - @Override - public Pair map(int i, ResultSet resultSet, StatementContext statementContext) throws SQLException { - return new Pair<>(resultSet.getString(DESTINATION), resultSet.getInt(DESTINATION_DEVICE)); - } - } - public static class MessageMapper implements ResultSetMapper { @Override public OutgoingMessageEntity map(int i, ResultSet resultSet, StatementContext statementContext) @@ -110,11 +97,11 @@ public abstract class Messages { } } - @BindingAnnotation(MessageBinder.AccountBinderFactory.class) + @BindingAnnotation(MessageBinder.MessageBinderFactory.class) @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.PARAMETER}) public @interface MessageBinder { - public static class AccountBinderFactory implements BinderFactory { + public static class MessageBinderFactory implements BinderFactory { @Override public Binder build(Annotation annotation) { return new Binder() { diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 64e15dc14..fd3db0678 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -369,8 +369,8 @@ public class MessagesCache implements Managed { throws IOException { super(MessagePersister.class.getSimpleName()); - this.jedisPool = jedisPool; - this.database = database; + this.jedisPool = jedisPool; + this.database = database; this.pubSubManager = pubSubManager; this.pushSender = pushSender; @@ -446,6 +446,7 @@ public class MessagesCache implements Managed { try { Envelope envelope = Envelope.parseFrom(message); database.store(envelope, key.getAddress(), key.getDeviceId()); + } catch (InvalidProtocolBufferException e) { logger.error("Error parsing envelope", e); } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index fbafcd232..228885e7a 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -9,10 +9,7 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList; import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.Conversions; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.List; import static com.codahale.metrics.MetricRegistry.name; @@ -27,20 +24,14 @@ public class MessagesManager { private final Messages messages; private final MessagesCache messagesCache; - private final Distribution distribution; - public MessagesManager(Messages messages, MessagesCache messagesCache, float cacheRate) { + public MessagesManager(Messages messages, MessagesCache messagesCache) { this.messages = messages; this.messagesCache = messagesCache; - this.distribution = new Distribution(cacheRate); } public void insert(String destination, long destinationDevice, Envelope message) { - if (distribution.isQualified(destination, destinationDevice)) { - messagesCache.insert(destination, destinationDevice, message); - } else { - messages.store(message, destination, destinationDevice); - } + messagesCache.insert(destination, destinationDevice, message); } public OutgoingMessageEntityList getMessagesForDevice(String destination, long destinationDevice) { @@ -87,32 +78,4 @@ public class MessagesManager { } } - public static class Distribution { - - private final float percentage; - - public Distribution(float percentage) { - this.percentage = percentage; - } - - public boolean isQualified(String address, long device) { - if (percentage <= 0) return false; - if (percentage >= 100) return true; - - try { - MessageDigest digest = MessageDigest.getInstance("SHA1"); - digest.update(address.getBytes()); - digest.update(Conversions.longToByteArray(device)); - - byte[] result = digest.digest(); - int hashCode = Conversions.byteArrayToShort(result); - - return hashCode <= 65535 * percentage; - } catch (NoSuchAlgorithmException e) { - throw new AssertionError(e); - } - } - - } - } diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/TrimMessagesCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/TrimMessagesCommand.java index 979c794e4..dd43e2bf6 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/workers/TrimMessagesCommand.java +++ b/src/main/java/org/whispersystems/textsecuregcm/workers/TrimMessagesCommand.java @@ -18,7 +18,7 @@ import io.dropwizard.jdbi.args.OptionalArgumentFactory; import io.dropwizard.setup.Bootstrap; public class TrimMessagesCommand extends ConfiguredCommand { - private final Logger logger = LoggerFactory.getLogger(VacuumCommand.class); + private final Logger logger = LoggerFactory.getLogger(TrimMessagesCommand.class); public TrimMessagesCommand() { super("trim", "Trim Messages Database"); @@ -39,7 +39,7 @@ public class TrimMessagesCommand extends ConfiguredCommand Accounts accounts = dbi.onDemand(Accounts.class ); Keys keys = dbi.onDemand(Keys.class ); PendingAccounts pendingAccounts = dbi.onDemand(PendingAccounts.class); - Messages messages = messageDbi.onDemand(Messages.class ); + Messages messages = messageDbi.onDemand(Messages.class); logger.info("Vacuuming accounts..."); accounts.vacuum(); diff --git a/src/main/resources/messagedb.xml b/src/main/resources/messagedb.xml index 3c09edf5c..3add10ae9 100644 --- a/src/main/resources/messagedb.xml +++ b/src/main/resources/messagedb.xml @@ -74,5 +74,35 @@ CREATE RULE bounded_message_queue AS ON INSERT TO messages DO ALSO DELETE FROM messages WHERE id IN (SELECT id FROM messages WHERE destination = NEW.destination AND destination_device = NEW.destination_device ORDER BY timestamp DESC OFFSET 1000); + + + + + + DROP RULE bounded_message_queue ON messages; + + + + CREATE RULE bounded_message_queue AS ON INSERT TO messages DO ALSO DELETE FROM messages WHERE id IN (SELECT id FROM messages WHERE destination = NEW.destination AND destination_device = NEW.destination_device ORDER BY timestamp DESC OFFSET 1000); + + + + + + DROP RULE bounded_message_queue ON messages; + + + + CREATE RULE bounded_message_queue AS ON INSERT TO messages DO ALSO DELETE FROM messages WHERE id IN (SELECT id FROM messages WHERE destination = NEW.destination AND destination_device = NEW.destination_device ORDER BY timestamp DESC OFFSET 1000); + + + + DROP RULE bounded_message_queue ON messages; + + + + CREATE RULE bounded_message_queue AS ON INSERT TO messages DO ALSO DELETE FROM messages WHERE id IN (SELECT id FROM messages WHERE destination = NEW.destination AND destination_device = NEW.destination_device ORDER BY timestamp DESC OFFSET 1000); + +