Normalized migration result, clean up

This commit is contained in:
Moxie Marlinspike 2018-09-20 09:38:57 -07:00
parent a341a20e2c
commit ee6785eff9
11 changed files with 46 additions and 106 deletions

1
.gitignore vendored
View File

@ -8,5 +8,6 @@ local.yml
config/production.yml
config/federated.yml
config/staging.yml
config/testing.yml
.opsmanage
put.sh

View File

@ -18,6 +18,8 @@ package org.whispersystems.textsecuregcm;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.whispersystems.textsecuregcm.configuration.ApnConfiguration;
import org.whispersystems.textsecuregcm.configuration.AttachmentsConfiguration;
import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration;
import org.whispersystems.textsecuregcm.configuration.FederationConfiguration;
import org.whispersystems.textsecuregcm.configuration.GcmConfiguration;
import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration;
@ -25,10 +27,7 @@ import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration;
import org.whispersystems.textsecuregcm.configuration.ProfilesConfiguration;
import org.whispersystems.textsecuregcm.configuration.PushConfiguration;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedPhoneConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisConfiguration;
import org.whispersystems.textsecuregcm.configuration.AttachmentsConfiguration;
import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration;
import org.whispersystems.textsecuregcm.configuration.TestDeviceConfiguration;
import org.whispersystems.textsecuregcm.configuration.TurnConfiguration;
import org.whispersystems.textsecuregcm.configuration.TwilioConfiguration;

View File

@ -69,22 +69,7 @@ import org.whispersystems.textsecuregcm.s3.UrlSigner;
import org.whispersystems.textsecuregcm.sms.SmsSender;
import org.whispersystems.textsecuregcm.sms.TwilioSmsSender;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient;
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationCache;
import org.whispersystems.textsecuregcm.storage.Keys;
import org.whispersystems.textsecuregcm.storage.Messages;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
import org.whispersystems.textsecuregcm.storage.PendingDevices;
import org.whispersystems.textsecuregcm.storage.PendingDevicesManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.storage.*;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler;
@ -180,7 +165,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);
FederatedClientManager federatedClientManager = new FederatedClientManager(environment, config.getJerseyClientConfiguration(), config.getFederationConfiguration());
MessagesCache messagesCache = new MessagesCache(messagesClient, messages, accountsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes());
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, config.getMessageCacheConfiguration().getCacheRate());
MessagesManager messagesManager = new MessagesManager(messages, messagesCache);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(messagesManager);
DispatchManager dispatchManager = new DispatchManager(cacheClientFactory, Optional.of(deadLetterHandler));
PubSubManager pubSubManager = new PubSubManager(cacheClient, dispatchManager);

View File

@ -1,12 +1,8 @@
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.glassfish.jersey.server.JSONP;
import org.hibernate.validator.constraints.NotEmpty;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
public class MessageCacheConfiguration {
@ -19,11 +15,6 @@ public class MessageCacheConfiguration {
@JsonProperty
private int persistDelayMinutes = 10;
@JsonProperty
@Min(0)
@Max(1)
private float cacheRate = 1;
public RedisConfiguration getRedisConfiguration() {
return redis;
}
@ -32,7 +23,4 @@ public class MessageCacheConfiguration {
return persistDelayMinutes;
}
public float getCacheRate() {
return cacheRate;
}
}

View File

@ -1,14 +0,0 @@
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.hibernate.validator.constraints.NotEmpty;
public class MessageStoreConfiguration {
@JsonProperty
@NotEmpty
private String url;
public String getUrl() {
return url;
}
}

View File

@ -12,7 +12,6 @@ import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.util.Pair;
import java.lang.annotation.Annotation;
import java.lang.annotation.ElementType;
@ -45,7 +44,7 @@ public abstract class Messages {
@Bind("destination_device") long destinationDevice);
@Mapper(MessageMapper.class)
@SqlQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC LIMIT " + RESULT_SET_CHUNK_SIZE)
@SqlQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC LIMIT " + RESULT_SET_CHUNK_SIZE)
abstract List<OutgoingMessageEntity> load(@Bind("destination") String destination,
@Bind("destination_device") long destinationDevice);
@ -56,10 +55,6 @@ public abstract class Messages {
@Bind("source") String source,
@Bind("timestamp") long timestamp);
@Mapper(DestinationMapper.class)
@SqlQuery("SELECT DISTINCT ON (destination, destination_device) destination, destination_device FROM messages WHERE timestamp > :timestamp ORDER BY destination, destination_device OFFSET :offset LIMIT :limit")
public abstract List<Pair<String, Integer>> getPendingDestinations(@Bind("timestamp") long sinceTimestamp, @Bind("offset") int offset, @Bind("limit") int limit);
@Mapper(MessageMapper.class)
@SqlUpdate("DELETE FROM messages WHERE " + ID + " = :id AND " + DESTINATION + " = :destination")
abstract void remove(@Bind("destination") String destination, @Bind("id") long id);
@ -76,14 +71,6 @@ public abstract class Messages {
@SqlUpdate("VACUUM messages")
public abstract void vacuum();
public static class DestinationMapper implements ResultSetMapper<Pair<String, Integer>> {
@Override
public Pair<String, Integer> map(int i, ResultSet resultSet, StatementContext statementContext) throws SQLException {
return new Pair<>(resultSet.getString(DESTINATION), resultSet.getInt(DESTINATION_DEVICE));
}
}
public static class MessageMapper implements ResultSetMapper<OutgoingMessageEntity> {
@Override
public OutgoingMessageEntity map(int i, ResultSet resultSet, StatementContext statementContext)
@ -110,11 +97,11 @@ public abstract class Messages {
}
}
@BindingAnnotation(MessageBinder.AccountBinderFactory.class)
@BindingAnnotation(MessageBinder.MessageBinderFactory.class)
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
public @interface MessageBinder {
public static class AccountBinderFactory implements BinderFactory {
public static class MessageBinderFactory implements BinderFactory {
@Override
public Binder build(Annotation annotation) {
return new Binder<MessageBinder, Envelope>() {

View File

@ -369,8 +369,8 @@ public class MessagesCache implements Managed {
throws IOException
{
super(MessagePersister.class.getSimpleName());
this.jedisPool = jedisPool;
this.database = database;
this.jedisPool = jedisPool;
this.database = database;
this.pubSubManager = pubSubManager;
this.pushSender = pushSender;
@ -446,6 +446,7 @@ public class MessagesCache implements Managed {
try {
Envelope envelope = Envelope.parseFrom(message);
database.store(envelope, key.getAddress(), key.getDeviceId());
} catch (InvalidProtocolBufferException e) {
logger.error("Error parsing envelope", e);
}

View File

@ -9,10 +9,7 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Conversions;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import static com.codahale.metrics.MetricRegistry.name;
@ -27,20 +24,14 @@ public class MessagesManager {
private final Messages messages;
private final MessagesCache messagesCache;
private final Distribution distribution;
public MessagesManager(Messages messages, MessagesCache messagesCache, float cacheRate) {
public MessagesManager(Messages messages, MessagesCache messagesCache) {
this.messages = messages;
this.messagesCache = messagesCache;
this.distribution = new Distribution(cacheRate);
}
public void insert(String destination, long destinationDevice, Envelope message) {
if (distribution.isQualified(destination, destinationDevice)) {
messagesCache.insert(destination, destinationDevice, message);
} else {
messages.store(message, destination, destinationDevice);
}
messagesCache.insert(destination, destinationDevice, message);
}
public OutgoingMessageEntityList getMessagesForDevice(String destination, long destinationDevice) {
@ -87,32 +78,4 @@ public class MessagesManager {
}
}
public static class Distribution {
private final float percentage;
public Distribution(float percentage) {
this.percentage = percentage;
}
public boolean isQualified(String address, long device) {
if (percentage <= 0) return false;
if (percentage >= 100) return true;
try {
MessageDigest digest = MessageDigest.getInstance("SHA1");
digest.update(address.getBytes());
digest.update(Conversions.longToByteArray(device));
byte[] result = digest.digest();
int hashCode = Conversions.byteArrayToShort(result);
return hashCode <= 65535 * percentage;
} catch (NoSuchAlgorithmException e) {
throw new AssertionError(e);
}
}
}
}

View File

@ -18,7 +18,7 @@ import io.dropwizard.jdbi.args.OptionalArgumentFactory;
import io.dropwizard.setup.Bootstrap;
public class TrimMessagesCommand extends ConfiguredCommand<WhisperServerConfiguration> {
private final Logger logger = LoggerFactory.getLogger(VacuumCommand.class);
private final Logger logger = LoggerFactory.getLogger(TrimMessagesCommand.class);
public TrimMessagesCommand() {
super("trim", "Trim Messages Database");
@ -39,7 +39,7 @@ public class TrimMessagesCommand extends ConfiguredCommand<WhisperServerConfigur
messageDbi.registerContainerFactory(new OptionalContainerFactory());
Messages messages = messageDbi.onDemand(Messages.class);
long timestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(60);
long timestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(90);
logger.info("Trimming old messages: " + timestamp + "...");
messages.removeOld(timestamp);

View File

@ -51,7 +51,7 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
Accounts accounts = dbi.onDemand(Accounts.class );
Keys keys = dbi.onDemand(Keys.class );
PendingAccounts pendingAccounts = dbi.onDemand(PendingAccounts.class);
Messages messages = messageDbi.onDemand(Messages.class );
Messages messages = messageDbi.onDemand(Messages.class);
logger.info("Vacuuming accounts...");
accounts.vacuum();

View File

@ -74,5 +74,35 @@
<sql>CREATE RULE bounded_message_queue AS ON INSERT TO messages DO ALSO DELETE FROM messages WHERE id IN (SELECT id FROM messages WHERE destination = NEW.destination AND destination_device = NEW.destination_device ORDER BY timestamp DESC OFFSET 1000);</sql>
</changeSet>
<changeSet id="5" author="moxie">
<addColumn tableName="messages">
<column name="deleted" type="integer"/>
</addColumn>
<sql>DROP RULE bounded_message_queue ON messages;</sql>
</changeSet>
<changeSet id="6" author="moxie">
<sql>CREATE RULE bounded_message_queue AS ON INSERT TO messages DO ALSO DELETE FROM messages WHERE id IN (SELECT id FROM messages WHERE destination = NEW.destination AND destination_device = NEW.destination_device ORDER BY timestamp DESC OFFSET 1000);</sql>
</changeSet>
<changeSet id="7" author="moxie">
<dropColumn tableName="messages" columnName="deleted"/>
<sql>DROP RULE bounded_message_queue ON messages;</sql>
</changeSet>
<changeSet id="8" author="moxie">
<sql>CREATE RULE bounded_message_queue AS ON INSERT TO messages DO ALSO DELETE FROM messages WHERE id IN (SELECT id FROM messages WHERE destination = NEW.destination AND destination_device = NEW.destination_device ORDER BY timestamp DESC OFFSET 1000);</sql>
</changeSet>
<changeSet id="9" author="moxie">
<sql>DROP RULE bounded_message_queue ON messages;</sql>
</changeSet>
<changeSet id="10" author="moxie">
<sql>CREATE RULE bounded_message_queue AS ON INSERT TO messages DO ALSO DELETE FROM messages WHERE id IN (SELECT id FROM messages WHERE destination = NEW.destination AND destination_device = NEW.destination_device ORDER BY timestamp DESC OFFSET 1000);</sql>
</changeSet>
</databaseChangeLog>