From 41d15b738bacf7096fe195a5efa5c73bafa15cbb Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Sat, 6 Dec 2014 15:30:26 -0800 Subject: [PATCH] Refactor direct connect delivery pipeline and message store. 1) Make message store contents more memory efficient. 2) Make notification pipeline simpler and more memory efficient. 3) Don't b64 encode websocket message bodies. // FREEBIE --- pom.xml | 6 +- protobuf/Makefile | 2 +- protobuf/OutgoingMessageSignal.proto | 2 +- protobuf/PubSubMessage.proto | 32 + protobuf/StoredMessage.proto | 30 + .../WhisperServerConfiguration.java | 20 +- .../textsecuregcm/WhisperServerService.java | 23 +- ...ation.java => DirectoryConfiguration.java} | 2 +- .../MessageStoreConfiguration.java | 14 + .../controllers/KeepAliveController.java | 20 + .../entities/EncryptedOutgoingMessage.java | 13 +- .../entities/PendingMessage.java | 60 -- .../providers/RedisClientFactory.java | 6 +- .../textsecuregcm/push/PushSender.java | 56 +- .../textsecuregcm/push/WebsocketSender.java | 45 +- .../textsecuregcm/storage/Account.java | 6 +- .../textsecuregcm/storage/PubSubListener.java | 2 + .../textsecuregcm/storage/PubSubManager.java | 73 +- .../textsecuregcm/storage/PubSubMessage.java | 32 - .../textsecuregcm/storage/PubSubProtos.java | 642 ++++++++++++++++++ .../storage/StoredMessageProtos.java | 624 +++++++++++++++++ .../textsecuregcm/storage/StoredMessages.java | 69 +- .../WebSocketAccountAuthenticator.java | 1 + .../websocket/WebSocketConnection.java | 90 +-- .../workers/DirectoryCommand.java | 2 +- .../websocket/WebSocketConnectionTest.java | 35 +- 26 files changed, 1581 insertions(+), 326 deletions(-) create mode 100644 protobuf/PubSubMessage.proto create mode 100644 protobuf/StoredMessage.proto rename src/main/java/org/whispersystems/textsecuregcm/configuration/{RedisConfiguration.java => DirectoryConfiguration.java} (96%) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/configuration/MessageStoreConfiguration.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/controllers/KeepAliveController.java delete mode 100644 src/main/java/org/whispersystems/textsecuregcm/entities/PendingMessage.java delete mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/PubSubMessage.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/PubSubProtos.java create mode 100644 src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageProtos.java diff --git a/pom.xml b/pom.xml index c64afe7c5..5a8b5044e 100644 --- a/pom.xml +++ b/pom.xml @@ -116,7 +116,7 @@ redis.clients jedis - 2.2.1 + 2.6.1 jar compile @@ -137,9 +137,9 @@ 4.0.0 - org.whispersystems.websocket + org.whispersystems websocket-resources - 0.1-SNAPSHOT + 0.1 diff --git a/protobuf/Makefile b/protobuf/Makefile index ff53d0244..a4d390191 100644 --- a/protobuf/Makefile +++ b/protobuf/Makefile @@ -1,3 +1,3 @@ all: - protoc --java_out=../src/main/java/ OutgoingMessageSignal.proto \ No newline at end of file + protoc --java_out=../src/main/java/ OutgoingMessageSignal.proto PubSubMessage.proto StoredMessage.proto \ No newline at end of file diff --git a/protobuf/OutgoingMessageSignal.proto b/protobuf/OutgoingMessageSignal.proto index aec9e8fb5..d04ee5ffa 100644 --- a/protobuf/OutgoingMessageSignal.proto +++ b/protobuf/OutgoingMessageSignal.proto @@ -36,4 +36,4 @@ message OutgoingMessageSignal { // repeated string destinations = 4; optional uint64 timestamp = 5; optional bytes message = 6; -} \ No newline at end of file +} diff --git a/protobuf/PubSubMessage.proto b/protobuf/PubSubMessage.proto new file mode 100644 index 000000000..7b257a368 --- /dev/null +++ b/protobuf/PubSubMessage.proto @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2014 Open Whisper Systems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package textsecure; + +option java_package = "org.whispersystems.textsecuregcm.storage"; +option java_outer_classname = "PubSubProtos"; + +message PubSubMessage { + enum Type { + UNKNOWN = 0; + QUERY_DB = 1; + DELIVER = 2; + KEEPALIVE = 3; + } + + optional Type type = 1; + optional bytes content = 2; +} diff --git a/protobuf/StoredMessage.proto b/protobuf/StoredMessage.proto new file mode 100644 index 000000000..05a6b88e5 --- /dev/null +++ b/protobuf/StoredMessage.proto @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2014 Open Whisper Systems + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package textsecure; + +option java_package = "org.whispersystems.textsecuregcm.storage"; +option java_outer_classname = "StoredMessageProtos"; + +message StoredMessage { + enum Type { + UNKNOWN = 0; + MESSAGE = 1; + } + + optional Type type = 1; + optional bytes content = 2; +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 3618bb500..4fb95bfcd 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -17,17 +17,16 @@ package org.whispersystems.textsecuregcm; import com.fasterxml.jackson.annotation.JsonProperty; -import org.whispersystems.textsecuregcm.configuration.ApnConfiguration; import org.whispersystems.textsecuregcm.configuration.FederationConfiguration; -import org.whispersystems.textsecuregcm.configuration.GcmConfiguration; import org.whispersystems.textsecuregcm.configuration.GraphiteConfiguration; import org.whispersystems.textsecuregcm.configuration.MemcacheConfiguration; +import org.whispersystems.textsecuregcm.configuration.MessageStoreConfiguration; import org.whispersystems.textsecuregcm.configuration.MetricsConfiguration; import org.whispersystems.textsecuregcm.configuration.NexmoConfiguration; import org.whispersystems.textsecuregcm.configuration.PushConfiguration; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration; import org.whispersystems.textsecuregcm.configuration.RedPhoneConfiguration; -import org.whispersystems.textsecuregcm.configuration.RedisConfiguration; +import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration; import org.whispersystems.textsecuregcm.configuration.S3Configuration; import org.whispersystems.textsecuregcm.configuration.TwilioConfiguration; import org.whispersystems.textsecuregcm.configuration.WebsocketConfiguration; @@ -67,7 +66,12 @@ public class WhisperServerConfiguration extends Configuration { @NotNull @Valid @JsonProperty - private RedisConfiguration redis; + private DirectoryConfiguration directory; + + @NotNull + @Valid + @JsonProperty + private MessageStoreConfiguration messageStore; @Valid @JsonProperty @@ -132,8 +136,12 @@ public class WhisperServerConfiguration extends Configuration { return memcache; } - public RedisConfiguration getRedisConfiguration() { - return redis; + public DirectoryConfiguration getDirectoryConfiguration() { + return directory; + } + + public MessageStoreConfiguration getMessageStoreConfiguration() { + return messageStore; } public DataSourceFactory getDataSourceFactory() { diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index fd2365de0..aa4c251c7 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -35,6 +35,7 @@ import org.whispersystems.textsecuregcm.controllers.DeviceController; import org.whispersystems.textsecuregcm.controllers.DirectoryController; import org.whispersystems.textsecuregcm.controllers.FederationControllerV1; import org.whispersystems.textsecuregcm.controllers.FederationControllerV2; +import org.whispersystems.textsecuregcm.controllers.KeepAliveController; import org.whispersystems.textsecuregcm.controllers.KeysControllerV1; import org.whispersystems.textsecuregcm.controllers.KeysControllerV2; import org.whispersystems.textsecuregcm.controllers.MessageController; @@ -137,18 +138,19 @@ public class WhisperServerService extends Application authenticatedDevice; + private Device authenticatedDevice; public Account() {} @@ -54,11 +54,11 @@ public class Account { } public Optional getAuthenticatedDevice() { - return authenticatedDevice; + return Optional.fromNullable(authenticatedDevice); } public void setAuthenticatedDevice(Device device) { - this.authenticatedDevice = Optional.of(device); + this.authenticatedDevice = device; } public void setNumber(String number) { diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubListener.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubListener.java index d9a24d595..ea3361e91 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubListener.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubListener.java @@ -1,5 +1,7 @@ package org.whispersystems.textsecuregcm.storage; +import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage; + public interface PubSubListener { public void onPubSubMessage(PubSubMessage outgoingMessage); diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java index 3b4acd0ef..269ddc402 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java @@ -1,34 +1,31 @@ package org.whispersystems.textsecuregcm.storage; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.InvalidProtocolBufferException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.util.SystemMapper; -import org.whispersystems.textsecuregcm.websocket.InvalidWebsocketAddressException; import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; -import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage; +import redis.clients.jedis.BinaryJedisPubSub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPubSub; public class PubSubManager { - private static final String KEEPALIVE_CHANNEL = "KEEPALIVE"; + private static final byte[] KEEPALIVE_CHANNEL = "KEEPALIVE".getBytes(); private final Logger logger = LoggerFactory.getLogger(PubSubManager.class); - private final ObjectMapper mapper = SystemMapper.getMapper(); private final SubscriptionListener baseListener = new SubscriptionListener(); private final Map listeners = new HashMap<>(); private final JedisPool jedisPool; private boolean subscribed = false; - public PubSubManager(final JedisPool jedisPool) { + public PubSubManager(JedisPool jedisPool) { this.jedisPool = jedisPool; initializePubSubWorker(); waitForSubscription(); @@ -36,34 +33,23 @@ public class PubSubManager { public synchronized void subscribe(WebsocketAddress address, PubSubListener listener) { listeners.put(address.serialize(), listener); - baseListener.subscribe(address.serialize()); + baseListener.subscribe(address.serialize().getBytes()); } public synchronized void unsubscribe(WebsocketAddress address, PubSubListener listener) { if (listeners.get(address.serialize()) == listener) { listeners.remove(address.serialize()); - baseListener.unsubscribe(address.serialize()); + baseListener.unsubscribe(address.serialize().getBytes()); } } public synchronized boolean publish(WebsocketAddress address, PubSubMessage message) { - return publish(address.serialize(), message); + return publish(address.serialize().getBytes(), message); } - private synchronized boolean publish(String channel, PubSubMessage message) { - try { - String serialized = mapper.writeValueAsString(message); - Jedis jedis = null; - - try { - jedis = jedisPool.getResource(); - return jedis.publish(channel, serialized) != 0; - } finally { - if (jedis != null) - jedisPool.returnResource(jedis); - } - } catch (JsonProcessingException e) { - throw new AssertionError(e); + private synchronized boolean publish(byte[] channel, PubSubMessage message) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.publish(channel, message.toByteArray()) != 0; } } @@ -82,14 +68,9 @@ public class PubSubManager { @Override public void run() { for (;;) { - Jedis jedis = null; - try { - jedis = jedisPool.getResource(); + try (Jedis jedis = jedisPool.getResource()) { jedis.subscribe(baseListener, KEEPALIVE_CHANNEL); logger.warn("**** Unsubscribed from holding channel!!! ******"); - } finally { - if (jedis != null) - jedisPool.returnResource(jedis); } } } @@ -101,7 +82,9 @@ public class PubSubManager { for (;;) { try { Thread.sleep(20000); - publish(KEEPALIVE_CHANNEL, new PubSubMessage(0, "foo")); + publish(KEEPALIVE_CHANNEL, PubSubMessage.newBuilder() + .setType(PubSubMessage.Type.KEEPALIVE) + .build()); } catch (InterruptedException e) { throw new AssertionError(e); } @@ -110,33 +93,33 @@ public class PubSubManager { }.start(); } - private class SubscriptionListener extends JedisPubSub { + private class SubscriptionListener extends BinaryJedisPubSub { @Override - public void onMessage(String channel, String message) { + public void onMessage(byte[] channel, byte[] message) { try { - PubSubListener listener; + PubSubListener listener; synchronized (PubSubManager.this) { listener = listeners.get(channel); } if (listener != null) { - listener.onPubSubMessage(mapper.readValue(message, PubSubMessage.class)); + listener.onPubSubMessage(PubSubMessage.parseFrom(message)); } - } catch (IOException e) { - logger.warn("IOE", e); + } catch (InvalidProtocolBufferException e) { + logger.warn("Error parsing PubSub protobuf", e); } } @Override - public void onPMessage(String s, String s2, String s3) { + public void onPMessage(byte[] s, byte[] s2, byte[] s3) { logger.warn("Received PMessage!"); } @Override - public void onSubscribe(String channel, int count) { - if (KEEPALIVE_CHANNEL.equals(channel)) { + public void onSubscribe(byte[] channel, int count) { + if (Arrays.equals(KEEPALIVE_CHANNEL, channel)) { synchronized (PubSubManager.this) { subscribed = true; PubSubManager.this.notifyAll(); @@ -145,12 +128,12 @@ public class PubSubManager { } @Override - public void onUnsubscribe(String s, int i) {} + public void onUnsubscribe(byte[] s, int i) {} @Override - public void onPUnsubscribe(String s, int i) {} + public void onPUnsubscribe(byte[] s, int i) {} @Override - public void onPSubscribe(String s, int i) {} + public void onPSubscribe(byte[] s, int i) {} } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubMessage.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubMessage.java deleted file mode 100644 index fa24dbd48..000000000 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubMessage.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.whispersystems.textsecuregcm.storage; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; - -@JsonIgnoreProperties(ignoreUnknown = true) -public class PubSubMessage { - - public static final int TYPE_QUERY_DB = 1; - public static final int TYPE_DELIVER = 2; - - @JsonProperty - private int type; - - @JsonProperty - private String contents; - - public PubSubMessage() {} - - public PubSubMessage(int type, String contents) { - this.type = type; - this.contents = contents; - } - - public int getType() { - return type; - } - - public String getContents() { - return contents; - } -} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubProtos.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubProtos.java new file mode 100644 index 000000000..8785e372d --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubProtos.java @@ -0,0 +1,642 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: PubSubMessage.proto + +package org.whispersystems.textsecuregcm.storage; + +public final class PubSubProtos { + private PubSubProtos() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface PubSubMessageOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional .textsecure.PubSubMessage.Type type = 1; + /** + * optional .textsecure.PubSubMessage.Type type = 1; + */ + boolean hasType(); + /** + * optional .textsecure.PubSubMessage.Type type = 1; + */ + org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type getType(); + + // optional bytes content = 2; + /** + * optional bytes content = 2; + */ + boolean hasContent(); + /** + * optional bytes content = 2; + */ + com.google.protobuf.ByteString getContent(); + } + /** + * Protobuf type {@code textsecure.PubSubMessage} + */ + public static final class PubSubMessage extends + com.google.protobuf.GeneratedMessage + implements PubSubMessageOrBuilder { + // Use PubSubMessage.newBuilder() to construct. + private PubSubMessage(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private PubSubMessage(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final PubSubMessage defaultInstance; + public static PubSubMessage getDefaultInstance() { + return defaultInstance; + } + + public PubSubMessage getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private PubSubMessage( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + int rawValue = input.readEnum(); + org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type value = org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(1, rawValue); + } else { + bitField0_ |= 0x00000001; + type_ = value; + } + break; + } + case 18: { + bitField0_ |= 0x00000002; + content_ = input.readBytes(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.whispersystems.textsecuregcm.storage.PubSubProtos.internal_static_textsecure_PubSubMessage_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.whispersystems.textsecuregcm.storage.PubSubProtos.internal_static_textsecure_PubSubMessage_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.class, org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public PubSubMessage parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new PubSubMessage(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + /** + * Protobuf enum {@code textsecure.PubSubMessage.Type} + */ + public enum Type + implements com.google.protobuf.ProtocolMessageEnum { + /** + * UNKNOWN = 0; + */ + UNKNOWN(0, 0), + /** + * QUERY_DB = 1; + */ + QUERY_DB(1, 1), + /** + * DELIVER = 2; + */ + DELIVER(2, 2), + /** + * KEEPALIVE = 3; + */ + KEEPALIVE(3, 3), + ; + + /** + * UNKNOWN = 0; + */ + public static final int UNKNOWN_VALUE = 0; + /** + * QUERY_DB = 1; + */ + public static final int QUERY_DB_VALUE = 1; + /** + * DELIVER = 2; + */ + public static final int DELIVER_VALUE = 2; + /** + * KEEPALIVE = 3; + */ + public static final int KEEPALIVE_VALUE = 3; + + + public final int getNumber() { return value; } + + public static Type valueOf(int value) { + switch (value) { + case 0: return UNKNOWN; + case 1: return QUERY_DB; + case 2: return DELIVER; + case 3: return KEEPALIVE; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public Type findValueByNumber(int number) { + return Type.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.getDescriptor().getEnumTypes().get(0); + } + + private static final Type[] VALUES = values(); + + public static Type valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private Type(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:textsecure.PubSubMessage.Type) + } + + private int bitField0_; + // optional .textsecure.PubSubMessage.Type type = 1; + public static final int TYPE_FIELD_NUMBER = 1; + private org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type type_; + /** + * optional .textsecure.PubSubMessage.Type type = 1; + */ + public boolean hasType() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .textsecure.PubSubMessage.Type type = 1; + */ + public org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type getType() { + return type_; + } + + // optional bytes content = 2; + public static final int CONTENT_FIELD_NUMBER = 2; + private com.google.protobuf.ByteString content_; + /** + * optional bytes content = 2; + */ + public boolean hasContent() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional bytes content = 2; + */ + public com.google.protobuf.ByteString getContent() { + return content_; + } + + private void initFields() { + type_ = org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type.UNKNOWN; + content_ = com.google.protobuf.ByteString.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeEnum(1, type_.getNumber()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, content_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(1, type_.getNumber()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, content_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code textsecure.PubSubMessage} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessageOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.whispersystems.textsecuregcm.storage.PubSubProtos.internal_static_textsecure_PubSubMessage_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.whispersystems.textsecuregcm.storage.PubSubProtos.internal_static_textsecure_PubSubMessage_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.class, org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Builder.class); + } + + // Construct using org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + type_ = org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type.UNKNOWN; + bitField0_ = (bitField0_ & ~0x00000001); + content_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.whispersystems.textsecuregcm.storage.PubSubProtos.internal_static_textsecure_PubSubMessage_descriptor; + } + + public org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage getDefaultInstanceForType() { + return org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.getDefaultInstance(); + } + + public org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage build() { + org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage buildPartial() { + org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage result = new org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.type_ = type_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.content_ = content_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage) { + return mergeFrom((org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage other) { + if (other == org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.getDefaultInstance()) return this; + if (other.hasType()) { + setType(other.getType()); + } + if (other.hasContent()) { + setContent(other.getContent()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional .textsecure.PubSubMessage.Type type = 1; + private org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type type_ = org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type.UNKNOWN; + /** + * optional .textsecure.PubSubMessage.Type type = 1; + */ + public boolean hasType() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .textsecure.PubSubMessage.Type type = 1; + */ + public org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type getType() { + return type_; + } + /** + * optional .textsecure.PubSubMessage.Type type = 1; + */ + public Builder setType(org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + type_ = value; + onChanged(); + return this; + } + /** + * optional .textsecure.PubSubMessage.Type type = 1; + */ + public Builder clearType() { + bitField0_ = (bitField0_ & ~0x00000001); + type_ = org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type.UNKNOWN; + onChanged(); + return this; + } + + // optional bytes content = 2; + private com.google.protobuf.ByteString content_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes content = 2; + */ + public boolean hasContent() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional bytes content = 2; + */ + public com.google.protobuf.ByteString getContent() { + return content_; + } + /** + * optional bytes content = 2; + */ + public Builder setContent(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + content_ = value; + onChanged(); + return this; + } + /** + * optional bytes content = 2; + */ + public Builder clearContent() { + bitField0_ = (bitField0_ & ~0x00000002); + content_ = getDefaultInstance().getContent(); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:textsecure.PubSubMessage) + } + + static { + defaultInstance = new PubSubMessage(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:textsecure.PubSubMessage) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_textsecure_PubSubMessage_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_textsecure_PubSubMessage_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\023PubSubMessage.proto\022\ntextsecure\"\215\001\n\rPu" + + "bSubMessage\022,\n\004type\030\001 \001(\0162\036.textsecure.P" + + "ubSubMessage.Type\022\017\n\007content\030\002 \001(\014\"=\n\004Ty" + + "pe\022\013\n\007UNKNOWN\020\000\022\014\n\010QUERY_DB\020\001\022\013\n\007DELIVER" + + "\020\002\022\r\n\tKEEPALIVE\020\003B8\n(org.whispersystems." + + "textsecuregcm.storageB\014PubSubProtos" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_textsecure_PubSubMessage_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_textsecure_PubSubMessage_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_textsecure_PubSubMessage_descriptor, + new java.lang.String[] { "Type", "Content", }); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageProtos.java b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageProtos.java new file mode 100644 index 000000000..ab9ca5edd --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageProtos.java @@ -0,0 +1,624 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: StoredMessage.proto + +package org.whispersystems.textsecuregcm.storage; + +public final class StoredMessageProtos { + private StoredMessageProtos() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface StoredMessageOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional .textsecure.StoredMessage.Type type = 1; + /** + * optional .textsecure.StoredMessage.Type type = 1; + */ + boolean hasType(); + /** + * optional .textsecure.StoredMessage.Type type = 1; + */ + org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type getType(); + + // optional bytes content = 2; + /** + * optional bytes content = 2; + */ + boolean hasContent(); + /** + * optional bytes content = 2; + */ + com.google.protobuf.ByteString getContent(); + } + /** + * Protobuf type {@code textsecure.StoredMessage} + */ + public static final class StoredMessage extends + com.google.protobuf.GeneratedMessage + implements StoredMessageOrBuilder { + // Use StoredMessage.newBuilder() to construct. + private StoredMessage(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private StoredMessage(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final StoredMessage defaultInstance; + public static StoredMessage getDefaultInstance() { + return defaultInstance; + } + + public StoredMessage getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private StoredMessage( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + int rawValue = input.readEnum(); + org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type value = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(1, rawValue); + } else { + bitField0_ |= 0x00000001; + type_ = value; + } + break; + } + case 18: { + bitField0_ |= 0x00000002; + content_ = input.readBytes(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.class, org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public StoredMessage parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new StoredMessage(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + /** + * Protobuf enum {@code textsecure.StoredMessage.Type} + */ + public enum Type + implements com.google.protobuf.ProtocolMessageEnum { + /** + * UNKNOWN = 0; + */ + UNKNOWN(0, 0), + /** + * MESSAGE = 1; + */ + MESSAGE(1, 1), + ; + + /** + * UNKNOWN = 0; + */ + public static final int UNKNOWN_VALUE = 0; + /** + * MESSAGE = 1; + */ + public static final int MESSAGE_VALUE = 1; + + + public final int getNumber() { return value; } + + public static Type valueOf(int value) { + switch (value) { + case 0: return UNKNOWN; + case 1: return MESSAGE; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public Type findValueByNumber(int number) { + return Type.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.getDescriptor().getEnumTypes().get(0); + } + + private static final Type[] VALUES = values(); + + public static Type valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private Type(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:textsecure.StoredMessage.Type) + } + + private int bitField0_; + // optional .textsecure.StoredMessage.Type type = 1; + public static final int TYPE_FIELD_NUMBER = 1; + private org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type type_; + /** + * optional .textsecure.StoredMessage.Type type = 1; + */ + public boolean hasType() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .textsecure.StoredMessage.Type type = 1; + */ + public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type getType() { + return type_; + } + + // optional bytes content = 2; + public static final int CONTENT_FIELD_NUMBER = 2; + private com.google.protobuf.ByteString content_; + /** + * optional bytes content = 2; + */ + public boolean hasContent() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional bytes content = 2; + */ + public com.google.protobuf.ByteString getContent() { + return content_; + } + + private void initFields() { + type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN; + content_ = com.google.protobuf.ByteString.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeEnum(1, type_.getNumber()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, content_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(1, type_.getNumber()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, content_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code textsecure.StoredMessage} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessageOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.class, org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Builder.class); + } + + // Construct using org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN; + bitField0_ = (bitField0_ & ~0x00000001); + content_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_descriptor; + } + + public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage getDefaultInstanceForType() { + return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.getDefaultInstance(); + } + + public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage build() { + org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage buildPartial() { + org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage result = new org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.type_ = type_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.content_ = content_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage) { + return mergeFrom((org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage other) { + if (other == org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.getDefaultInstance()) return this; + if (other.hasType()) { + setType(other.getType()); + } + if (other.hasContent()) { + setContent(other.getContent()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional .textsecure.StoredMessage.Type type = 1; + private org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN; + /** + * optional .textsecure.StoredMessage.Type type = 1; + */ + public boolean hasType() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .textsecure.StoredMessage.Type type = 1; + */ + public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type getType() { + return type_; + } + /** + * optional .textsecure.StoredMessage.Type type = 1; + */ + public Builder setType(org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + type_ = value; + onChanged(); + return this; + } + /** + * optional .textsecure.StoredMessage.Type type = 1; + */ + public Builder clearType() { + bitField0_ = (bitField0_ & ~0x00000001); + type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN; + onChanged(); + return this; + } + + // optional bytes content = 2; + private com.google.protobuf.ByteString content_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes content = 2; + */ + public boolean hasContent() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional bytes content = 2; + */ + public com.google.protobuf.ByteString getContent() { + return content_; + } + /** + * optional bytes content = 2; + */ + public Builder setContent(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + content_ = value; + onChanged(); + return this; + } + /** + * optional bytes content = 2; + */ + public Builder clearContent() { + bitField0_ = (bitField0_ & ~0x00000002); + content_ = getDefaultInstance().getContent(); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:textsecure.StoredMessage) + } + + static { + defaultInstance = new StoredMessage(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:textsecure.StoredMessage) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_textsecure_StoredMessage_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_textsecure_StoredMessage_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\023StoredMessage.proto\022\ntextsecure\"p\n\rSto" + + "redMessage\022,\n\004type\030\001 \001(\0162\036.textsecure.St" + + "oredMessage.Type\022\017\n\007content\030\002 \001(\014\" \n\004Typ" + + "e\022\013\n\007UNKNOWN\020\000\022\013\n\007MESSAGE\020\001B?\n(org.whisp" + + "ersystems.textsecuregcm.storageB\023StoredM" + + "essageProtos" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_textsecure_StoredMessage_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_textsecure_StoredMessage_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_textsecure_StoredMessage_descriptor, + new java.lang.String[] { "Type", "Content", }); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java index 4ba277041..909aac599 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java @@ -19,21 +19,18 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - +import com.google.protobuf.InvalidProtocolBufferException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.entities.PendingMessage; import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; -import java.io.IOException; import java.util.LinkedList; import java.util.List; import static com.codahale.metrics.MetricRegistry.name; +import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal; +import static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; @@ -44,8 +41,6 @@ public class StoredMessages { private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final Histogram queueSizeHistogram = metricRegistry.histogram(name(getClass(), "queue_size")); - - private static final ObjectMapper mapper = SystemMapper.getMapper(); private static final String QUEUE_PREFIX = "msgs"; private final JedisPool jedisPool; @@ -55,64 +50,54 @@ public class StoredMessages { } public void clear(WebsocketAddress address) { - Jedis jedis = null; - - try { - jedis = jedisPool.getResource(); + try (Jedis jedis = jedisPool.getResource()) { jedis.del(getKey(address)); - } finally { - if (jedis != null) - jedisPool.returnResource(jedis); } } - public void insert(WebsocketAddress address, PendingMessage message) { - Jedis jedis = null; + public void insert(WebsocketAddress address, OutgoingMessageSignal message) { + try (Jedis jedis = jedisPool.getResource()) { + StoredMessage storedMessage = StoredMessage.newBuilder() + .setType(StoredMessage.Type.MESSAGE) + .setContent(message.toByteString()) + .build(); - try { - jedis = jedisPool.getResource(); - - String serializedMessage = mapper.writeValueAsString(message); - long queueSize = jedis.lpush(getKey(address), serializedMessage); + long queueSize = jedis.lpush(getKey(address), storedMessage.toByteArray()); queueSizeHistogram.update(queueSize); if (queueSize > 1000) { jedis.ltrim(getKey(address), 0, 999); } - - } catch (JsonProcessingException e) { - logger.warn("StoredMessages", "Unable to store correctly", e); - } finally { - if (jedis != null) - jedisPool.returnResource(jedis); } } - public List getMessagesForDevice(WebsocketAddress address) { - List messages = new LinkedList<>(); - Jedis jedis = null; + public List getMessagesForDevice(WebsocketAddress address) { + List messages = new LinkedList<>(); - try { - jedis = jedisPool.getResource(); - String message; + try (Jedis jedis = jedisPool.getResource()) { + byte[] message; while ((message = jedis.rpop(getKey(address))) != null) { try { - messages.add(mapper.readValue(message, PendingMessage.class)); - } catch (IOException e) { - logger.warn("StoredMessages", "Not a valid PendingMessage", e); + StoredMessage storedMessage = StoredMessage.parseFrom(message); + + if (storedMessage.getType().getNumber() == StoredMessage.Type.MESSAGE_VALUE) { + messages.add(OutgoingMessageSignal.parseFrom(storedMessage.getContent())); + } else { + logger.warn("Unkown stored message type: " + storedMessage.getType().getNumber()); + } + + } catch (InvalidProtocolBufferException e) { + logger.warn("Error parsing protobuf", e); } } return messages; - } finally { - if (jedis != null) - jedisPool.returnResource(jedis); } } - private String getKey(WebsocketAddress address) { - return QUEUE_PREFIX + ":" + address.serialize(); + private byte[] getKey(WebsocketAddress address) { + return (QUEUE_PREFIX + ":" + address.serialize()).getBytes(); } } \ No newline at end of file diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketAccountAuthenticator.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketAccountAuthenticator.java index d756876ac..df2d04618 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketAccountAuthenticator.java +++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketAccountAuthenticator.java @@ -4,6 +4,7 @@ import com.google.common.base.Optional; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.websocket.auth.AuthenticationException; import org.whispersystems.websocket.auth.WebSocketAuthenticator; diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 5c1902d37..cd91f2864 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -1,13 +1,14 @@ package org.whispersystems.textsecuregcm.websocket; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.InvalidProtocolBufferException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.entities.PendingMessage; +import org.whispersystems.textsecuregcm.entities.CryptoEncodingException; +import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.TransientPushFailureException; @@ -16,23 +17,20 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.PubSubListener; import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.PubSubMessage; import org.whispersystems.textsecuregcm.storage.StoredMessages; -import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.messages.WebSocketResponseMessage; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.IOException; import java.util.List; import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal; +import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage; public class WebSocketConnection implements PubSubListener { - private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class); - private static final ObjectMapper objectMapper = SystemMapper.getMapper(); + private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class); private final AccountsManager accountsManager; private final PushSender pushSender; @@ -72,52 +70,56 @@ public class WebSocketConnection implements PubSubListener { } @Override - public void onPubSubMessage(PubSubMessage message) { + public void onPubSubMessage(PubSubMessage pubSubMessage) { try { - switch (message.getType()) { - case PubSubMessage.TYPE_QUERY_DB: + switch (pubSubMessage.getType().getNumber()) { + case PubSubMessage.Type.QUERY_DB_VALUE: processStoredMessages(); break; - case PubSubMessage.TYPE_DELIVER: - PendingMessage pendingMessage = objectMapper.readValue(message.getContents(), - PendingMessage.class); - sendMessage(pendingMessage); + case PubSubMessage.Type.DELIVER_VALUE: + sendMessage(OutgoingMessageSignal.parseFrom(pubSubMessage.getContent())); break; default: - logger.warn("Unknown pubsub message: " + message.getType()); + logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber()); } - } catch (IOException e) { - logger.warn("Error deserializing PendingMessage", e); + } catch (InvalidProtocolBufferException e) { + logger.warn("Protobuf parse error", e); } } - private void sendMessage(final PendingMessage message) { - String content = message.getEncryptedOutgoingMessage(); - Optional body = Optional.fromNullable(content.getBytes()); - ListenableFuture response = client.sendRequest("PUT", "/api/v1/message", body); + private void sendMessage(final OutgoingMessageSignal message) { + try { + EncryptedOutgoingMessage encryptedMessage = new EncryptedOutgoingMessage(message, device.getSignalingKey()); + Optional body = Optional.fromNullable(encryptedMessage.toByteArray()); + ListenableFuture response = client.sendRequest("PUT", "/api/v1/message", body); - Futures.addCallback(response, new FutureCallback() { - @Override - public void onSuccess(@Nullable WebSocketResponseMessage response) { - if (isSuccessResponse(response) && !message.isReceipt()) { - sendDeliveryReceiptFor(message); - } else if (!isSuccessResponse(response)) { + Futures.addCallback(response, new FutureCallback() { + @Override + public void onSuccess(@Nullable WebSocketResponseMessage response) { + boolean isReceipt = message.getType() == OutgoingMessageSignal.Type.RECEIPT_VALUE; + + if (isSuccessResponse(response) && !isReceipt) { + sendDeliveryReceiptFor(message); + } else if (!isSuccessResponse(response)) { + requeueMessage(message); + } + } + + @Override + public void onFailure(@Nonnull Throwable throwable) { requeueMessage(message); } - } - @Override - public void onFailure(@Nonnull Throwable throwable) { - requeueMessage(message); - } - - private boolean isSuccessResponse(WebSocketResponseMessage response) { - return response != null && response.getStatus() >= 200 && response.getStatus() < 300; - } - }); + private boolean isSuccessResponse(WebSocketResponseMessage response) { + return response != null && response.getStatus() >= 200 && response.getStatus() < 300; + } + }); + } catch (CryptoEncodingException e) { + logger.warn("Bad signaling key", e); + } } - private void requeueMessage(PendingMessage message) { + private void requeueMessage(OutgoingMessageSignal message) { try { pushSender.sendMessage(account, device, message); } catch (NotPushRegisteredException | TransientPushFailureException e) { @@ -126,12 +128,12 @@ public class WebSocketConnection implements PubSubListener { } } - private void sendDeliveryReceiptFor(PendingMessage message) { + private void sendDeliveryReceiptFor(OutgoingMessageSignal message) { try { - Optional source = accountsManager.get(message.getSender()); + Optional source = accountsManager.get(message.getSource()); if (!source.isPresent()) { - logger.warn("Source account disappeared? (%s)", message.getSender()); + logger.warn("Source account disappeared? (%s)", message.getSource()); return; } @@ -139,7 +141,7 @@ public class WebSocketConnection implements PubSubListener { OutgoingMessageSignal.newBuilder() .setSource(account.getNumber()) .setSourceDevice((int) device.getId()) - .setTimestamp(message.getMessageId()) + .setTimestamp(message.getTimestamp()) .setType(OutgoingMessageSignal.Type.RECEIPT_VALUE); for (Device device : source.get().getDevices()) { @@ -151,9 +153,9 @@ public class WebSocketConnection implements PubSubListener { } private void processStoredMessages() { - List messages = storedMessages.getMessagesForDevice(address); + List messages = storedMessages.getMessagesForDevice(address); - for (PendingMessage message : messages) { + for (OutgoingMessageSignal message : messages) { sendMessage(message); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java index fcc8dd0fe..ec3ca8b20 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java +++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java @@ -63,7 +63,7 @@ public class DirectoryCommand extends ConfiguredCommand outgoingMessages = new LinkedList() {{ - add(new PendingMessage("sender1", 1111, false, "first")); - add(new PendingMessage("sender1", 2222, false, "second")); - add(new PendingMessage("sender2", 3333, false, "third")); + List outgoingMessages = new LinkedList() {{ + add(createMessage("sender1", 1111, false, "first")); + add(createMessage("sender1", 2222, false, "second")); + add(createMessage("sender2", 3333, false, "third")); }}; when(device.getId()).thenReturn(2L); + when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52])); + when(account.getAuthenticatedDevice()).thenReturn(Optional.of(device)); when(account.getNumber()).thenReturn("+14152222222"); @@ -167,16 +170,26 @@ public class WebSocketConnectionTest { futures.get(0).setException(new IOException()); futures.get(2).setException(new IOException()); - List pending = new LinkedList() {{ - add(new PendingMessage("sender1", 1111, false, "first")); - add(new PendingMessage("sender2", 3333, false, "third")); + List pending = new LinkedList() {{ + add(createMessage("sender1", 1111, false, "first")); + add(createMessage("sender2", 3333, false, "third")); }}; - verify(pushSender, times(2)).sendMessage(eq(account), eq(device), any(PendingMessage.class)); - verify(pushSender, times(1)).sendMessage(eq(sender1), eq(sender1device), any(MessageProtos.OutgoingMessageSignal.class)); + verify(pushSender, times(2)).sendMessage(eq(account), eq(device), any(OutgoingMessageSignal.class)); + verify(pushSender, times(1)).sendMessage(eq(sender1), eq(sender1device), any(OutgoingMessageSignal.class)); connection.onConnectionLost(); verify(pubSubManager).unsubscribe(eq(new WebsocketAddress("+14152222222", 2L)), eq(connection)); } + private OutgoingMessageSignal createMessage(String sender, long timestamp, boolean receipt, String content) { + return OutgoingMessageSignal.newBuilder() + .setSource(sender) + .setSourceDevice(1) + .setType(receipt ? OutgoingMessageSignal.Type.RECEIPT_VALUE : OutgoingMessageSignal.Type.CIPHERTEXT_VALUE) + .setTimestamp(timestamp) + .setMessage(ByteString.copyFrom(content.getBytes())) + .build(); + } + }