diff --git a/pom.xml b/pom.xml
index c64afe7c5..5a8b5044e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,7 +116,7 @@
redis.clients
jedis
- 2.2.1
+ 2.6.1
jar
compile
@@ -137,9 +137,9 @@
4.0.0
- org.whispersystems.websocket
+ org.whispersystems
websocket-resources
- 0.1-SNAPSHOT
+ 0.1
diff --git a/protobuf/Makefile b/protobuf/Makefile
index ff53d0244..a4d390191 100644
--- a/protobuf/Makefile
+++ b/protobuf/Makefile
@@ -1,3 +1,3 @@
all:
- protoc --java_out=../src/main/java/ OutgoingMessageSignal.proto
\ No newline at end of file
+ protoc --java_out=../src/main/java/ OutgoingMessageSignal.proto PubSubMessage.proto StoredMessage.proto
\ No newline at end of file
diff --git a/protobuf/OutgoingMessageSignal.proto b/protobuf/OutgoingMessageSignal.proto
index aec9e8fb5..d04ee5ffa 100644
--- a/protobuf/OutgoingMessageSignal.proto
+++ b/protobuf/OutgoingMessageSignal.proto
@@ -36,4 +36,4 @@ message OutgoingMessageSignal {
// repeated string destinations = 4;
optional uint64 timestamp = 5;
optional bytes message = 6;
-}
\ No newline at end of file
+}
diff --git a/protobuf/PubSubMessage.proto b/protobuf/PubSubMessage.proto
new file mode 100644
index 000000000..7b257a368
--- /dev/null
+++ b/protobuf/PubSubMessage.proto
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2014 Open Whisper Systems
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package textsecure;
+
+option java_package = "org.whispersystems.textsecuregcm.storage";
+option java_outer_classname = "PubSubProtos";
+
+message PubSubMessage {
+ enum Type {
+ UNKNOWN = 0;
+ QUERY_DB = 1;
+ DELIVER = 2;
+ KEEPALIVE = 3;
+ }
+
+ optional Type type = 1;
+ optional bytes content = 2;
+}
diff --git a/protobuf/StoredMessage.proto b/protobuf/StoredMessage.proto
new file mode 100644
index 000000000..05a6b88e5
--- /dev/null
+++ b/protobuf/StoredMessage.proto
@@ -0,0 +1,30 @@
+/**
+ * Copyright (C) 2014 Open Whisper Systems
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package textsecure;
+
+option java_package = "org.whispersystems.textsecuregcm.storage";
+option java_outer_classname = "StoredMessageProtos";
+
+message StoredMessage {
+ enum Type {
+ UNKNOWN = 0;
+ MESSAGE = 1;
+ }
+
+ optional Type type = 1;
+ optional bytes content = 2;
+}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
index 3618bb500..4fb95bfcd 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
@@ -17,17 +17,16 @@
package org.whispersystems.textsecuregcm;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.whispersystems.textsecuregcm.configuration.ApnConfiguration;
import org.whispersystems.textsecuregcm.configuration.FederationConfiguration;
-import org.whispersystems.textsecuregcm.configuration.GcmConfiguration;
import org.whispersystems.textsecuregcm.configuration.GraphiteConfiguration;
import org.whispersystems.textsecuregcm.configuration.MemcacheConfiguration;
+import org.whispersystems.textsecuregcm.configuration.MessageStoreConfiguration;
import org.whispersystems.textsecuregcm.configuration.MetricsConfiguration;
import org.whispersystems.textsecuregcm.configuration.NexmoConfiguration;
import org.whispersystems.textsecuregcm.configuration.PushConfiguration;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedPhoneConfiguration;
-import org.whispersystems.textsecuregcm.configuration.RedisConfiguration;
+import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration;
import org.whispersystems.textsecuregcm.configuration.S3Configuration;
import org.whispersystems.textsecuregcm.configuration.TwilioConfiguration;
import org.whispersystems.textsecuregcm.configuration.WebsocketConfiguration;
@@ -67,7 +66,12 @@ public class WhisperServerConfiguration extends Configuration {
@NotNull
@Valid
@JsonProperty
- private RedisConfiguration redis;
+ private DirectoryConfiguration directory;
+
+ @NotNull
+ @Valid
+ @JsonProperty
+ private MessageStoreConfiguration messageStore;
@Valid
@JsonProperty
@@ -132,8 +136,12 @@ public class WhisperServerConfiguration extends Configuration {
return memcache;
}
- public RedisConfiguration getRedisConfiguration() {
- return redis;
+ public DirectoryConfiguration getDirectoryConfiguration() {
+ return directory;
+ }
+
+ public MessageStoreConfiguration getMessageStoreConfiguration() {
+ return messageStore;
}
public DataSourceFactory getDataSourceFactory() {
diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index fd2365de0..aa4c251c7 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -35,6 +35,7 @@ import org.whispersystems.textsecuregcm.controllers.DeviceController;
import org.whispersystems.textsecuregcm.controllers.DirectoryController;
import org.whispersystems.textsecuregcm.controllers.FederationControllerV1;
import org.whispersystems.textsecuregcm.controllers.FederationControllerV2;
+import org.whispersystems.textsecuregcm.controllers.KeepAliveController;
import org.whispersystems.textsecuregcm.controllers.KeysControllerV1;
import org.whispersystems.textsecuregcm.controllers.KeysControllerV2;
import org.whispersystems.textsecuregcm.controllers.MessageController;
@@ -137,18 +138,19 @@ public class WhisperServerService extends Application authenticatedDevice;
+ private Device authenticatedDevice;
public Account() {}
@@ -54,11 +54,11 @@ public class Account {
}
public Optional getAuthenticatedDevice() {
- return authenticatedDevice;
+ return Optional.fromNullable(authenticatedDevice);
}
public void setAuthenticatedDevice(Device device) {
- this.authenticatedDevice = Optional.of(device);
+ this.authenticatedDevice = device;
}
public void setNumber(String number) {
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubListener.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubListener.java
index d9a24d595..ea3361e91 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubListener.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubListener.java
@@ -1,5 +1,7 @@
package org.whispersystems.textsecuregcm.storage;
+import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
+
public interface PubSubListener {
public void onPubSubMessage(PubSubMessage outgoingMessage);
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java
index 3b4acd0ef..269ddc402 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java
@@ -1,34 +1,31 @@
package org.whispersystems.textsecuregcm.storage;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.whispersystems.textsecuregcm.util.SystemMapper;
-import org.whispersystems.textsecuregcm.websocket.InvalidWebsocketAddressException;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
-import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
+import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisPubSub;
public class PubSubManager {
- private static final String KEEPALIVE_CHANNEL = "KEEPALIVE";
+ private static final byte[] KEEPALIVE_CHANNEL = "KEEPALIVE".getBytes();
private final Logger logger = LoggerFactory.getLogger(PubSubManager.class);
- private final ObjectMapper mapper = SystemMapper.getMapper();
private final SubscriptionListener baseListener = new SubscriptionListener();
private final Map listeners = new HashMap<>();
private final JedisPool jedisPool;
private boolean subscribed = false;
- public PubSubManager(final JedisPool jedisPool) {
+ public PubSubManager(JedisPool jedisPool) {
this.jedisPool = jedisPool;
initializePubSubWorker();
waitForSubscription();
@@ -36,34 +33,23 @@ public class PubSubManager {
public synchronized void subscribe(WebsocketAddress address, PubSubListener listener) {
listeners.put(address.serialize(), listener);
- baseListener.subscribe(address.serialize());
+ baseListener.subscribe(address.serialize().getBytes());
}
public synchronized void unsubscribe(WebsocketAddress address, PubSubListener listener) {
if (listeners.get(address.serialize()) == listener) {
listeners.remove(address.serialize());
- baseListener.unsubscribe(address.serialize());
+ baseListener.unsubscribe(address.serialize().getBytes());
}
}
public synchronized boolean publish(WebsocketAddress address, PubSubMessage message) {
- return publish(address.serialize(), message);
+ return publish(address.serialize().getBytes(), message);
}
- private synchronized boolean publish(String channel, PubSubMessage message) {
- try {
- String serialized = mapper.writeValueAsString(message);
- Jedis jedis = null;
-
- try {
- jedis = jedisPool.getResource();
- return jedis.publish(channel, serialized) != 0;
- } finally {
- if (jedis != null)
- jedisPool.returnResource(jedis);
- }
- } catch (JsonProcessingException e) {
- throw new AssertionError(e);
+ private synchronized boolean publish(byte[] channel, PubSubMessage message) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ return jedis.publish(channel, message.toByteArray()) != 0;
}
}
@@ -82,14 +68,9 @@ public class PubSubManager {
@Override
public void run() {
for (;;) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
+ try (Jedis jedis = jedisPool.getResource()) {
jedis.subscribe(baseListener, KEEPALIVE_CHANNEL);
logger.warn("**** Unsubscribed from holding channel!!! ******");
- } finally {
- if (jedis != null)
- jedisPool.returnResource(jedis);
}
}
}
@@ -101,7 +82,9 @@ public class PubSubManager {
for (;;) {
try {
Thread.sleep(20000);
- publish(KEEPALIVE_CHANNEL, new PubSubMessage(0, "foo"));
+ publish(KEEPALIVE_CHANNEL, PubSubMessage.newBuilder()
+ .setType(PubSubMessage.Type.KEEPALIVE)
+ .build());
} catch (InterruptedException e) {
throw new AssertionError(e);
}
@@ -110,33 +93,33 @@ public class PubSubManager {
}.start();
}
- private class SubscriptionListener extends JedisPubSub {
+ private class SubscriptionListener extends BinaryJedisPubSub {
@Override
- public void onMessage(String channel, String message) {
+ public void onMessage(byte[] channel, byte[] message) {
try {
- PubSubListener listener;
+ PubSubListener listener;
synchronized (PubSubManager.this) {
listener = listeners.get(channel);
}
if (listener != null) {
- listener.onPubSubMessage(mapper.readValue(message, PubSubMessage.class));
+ listener.onPubSubMessage(PubSubMessage.parseFrom(message));
}
- } catch (IOException e) {
- logger.warn("IOE", e);
+ } catch (InvalidProtocolBufferException e) {
+ logger.warn("Error parsing PubSub protobuf", e);
}
}
@Override
- public void onPMessage(String s, String s2, String s3) {
+ public void onPMessage(byte[] s, byte[] s2, byte[] s3) {
logger.warn("Received PMessage!");
}
@Override
- public void onSubscribe(String channel, int count) {
- if (KEEPALIVE_CHANNEL.equals(channel)) {
+ public void onSubscribe(byte[] channel, int count) {
+ if (Arrays.equals(KEEPALIVE_CHANNEL, channel)) {
synchronized (PubSubManager.this) {
subscribed = true;
PubSubManager.this.notifyAll();
@@ -145,12 +128,12 @@ public class PubSubManager {
}
@Override
- public void onUnsubscribe(String s, int i) {}
+ public void onUnsubscribe(byte[] s, int i) {}
@Override
- public void onPUnsubscribe(String s, int i) {}
+ public void onPUnsubscribe(byte[] s, int i) {}
@Override
- public void onPSubscribe(String s, int i) {}
+ public void onPSubscribe(byte[] s, int i) {}
}
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubMessage.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubMessage.java
deleted file mode 100644
index fa24dbd48..000000000
--- a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubMessage.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.whispersystems.textsecuregcm.storage;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PubSubMessage {
-
- public static final int TYPE_QUERY_DB = 1;
- public static final int TYPE_DELIVER = 2;
-
- @JsonProperty
- private int type;
-
- @JsonProperty
- private String contents;
-
- public PubSubMessage() {}
-
- public PubSubMessage(int type, String contents) {
- this.type = type;
- this.contents = contents;
- }
-
- public int getType() {
- return type;
- }
-
- public String getContents() {
- return contents;
- }
-}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubProtos.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubProtos.java
new file mode 100644
index 000000000..8785e372d
--- /dev/null
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubProtos.java
@@ -0,0 +1,642 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: PubSubMessage.proto
+
+package org.whispersystems.textsecuregcm.storage;
+
+public final class PubSubProtos {
+ private PubSubProtos() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ }
+ public interface PubSubMessageOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // optional .textsecure.PubSubMessage.Type type = 1;
+ /**
+ * optional .textsecure.PubSubMessage.Type type = 1;
+ */
+ boolean hasType();
+ /**
+ * optional .textsecure.PubSubMessage.Type type = 1;
+ */
+ org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type getType();
+
+ // optional bytes content = 2;
+ /**
+ * optional bytes content = 2;
+ */
+ boolean hasContent();
+ /**
+ * optional bytes content = 2;
+ */
+ com.google.protobuf.ByteString getContent();
+ }
+ /**
+ * Protobuf type {@code textsecure.PubSubMessage}
+ */
+ public static final class PubSubMessage extends
+ com.google.protobuf.GeneratedMessage
+ implements PubSubMessageOrBuilder {
+ // Use PubSubMessage.newBuilder() to construct.
+ private PubSubMessage(com.google.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private PubSubMessage(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final PubSubMessage defaultInstance;
+ public static PubSubMessage getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public PubSubMessage getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private PubSubMessage(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 8: {
+ int rawValue = input.readEnum();
+ org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type value = org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(1, rawValue);
+ } else {
+ bitField0_ |= 0x00000001;
+ type_ = value;
+ }
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ content_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.whispersystems.textsecuregcm.storage.PubSubProtos.internal_static_textsecure_PubSubMessage_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.whispersystems.textsecuregcm.storage.PubSubProtos.internal_static_textsecure_PubSubMessage_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.class, org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser PARSER =
+ new com.google.protobuf.AbstractParser() {
+ public PubSubMessage parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new PubSubMessage(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ /**
+ * Protobuf enum {@code textsecure.PubSubMessage.Type}
+ */
+ public enum Type
+ implements com.google.protobuf.ProtocolMessageEnum {
+ /**
+ * UNKNOWN = 0;
+ */
+ UNKNOWN(0, 0),
+ /**
+ * QUERY_DB = 1;
+ */
+ QUERY_DB(1, 1),
+ /**
+ * DELIVER = 2;
+ */
+ DELIVER(2, 2),
+ /**
+ * KEEPALIVE = 3;
+ */
+ KEEPALIVE(3, 3),
+ ;
+
+ /**
+ * UNKNOWN = 0;
+ */
+ public static final int UNKNOWN_VALUE = 0;
+ /**
+ * QUERY_DB = 1;
+ */
+ public static final int QUERY_DB_VALUE = 1;
+ /**
+ * DELIVER = 2;
+ */
+ public static final int DELIVER_VALUE = 2;
+ /**
+ * KEEPALIVE = 3;
+ */
+ public static final int KEEPALIVE_VALUE = 3;
+
+
+ public final int getNumber() { return value; }
+
+ public static Type valueOf(int value) {
+ switch (value) {
+ case 0: return UNKNOWN;
+ case 1: return QUERY_DB;
+ case 2: return DELIVER;
+ case 3: return KEEPALIVE;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap() {
+ public Type findValueByNumber(int number) {
+ return Type.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final Type[] VALUES = values();
+
+ public static Type valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private Type(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:textsecure.PubSubMessage.Type)
+ }
+
+ private int bitField0_;
+ // optional .textsecure.PubSubMessage.Type type = 1;
+ public static final int TYPE_FIELD_NUMBER = 1;
+ private org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type type_;
+ /**
+ * optional .textsecure.PubSubMessage.Type type = 1;
+ */
+ public boolean hasType() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * optional .textsecure.PubSubMessage.Type type = 1;
+ */
+ public org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type getType() {
+ return type_;
+ }
+
+ // optional bytes content = 2;
+ public static final int CONTENT_FIELD_NUMBER = 2;
+ private com.google.protobuf.ByteString content_;
+ /**
+ * optional bytes content = 2;
+ */
+ public boolean hasContent() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * optional bytes content = 2;
+ */
+ public com.google.protobuf.ByteString getContent() {
+ return content_;
+ }
+
+ private void initFields() {
+ type_ = org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type.UNKNOWN;
+ content_ = com.google.protobuf.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeEnum(1, type_.getNumber());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, content_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(1, type_.getNumber());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, content_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code textsecure.PubSubMessage}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessageOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.whispersystems.textsecuregcm.storage.PubSubProtos.internal_static_textsecure_PubSubMessage_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.whispersystems.textsecuregcm.storage.PubSubProtos.internal_static_textsecure_PubSubMessage_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.class, org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Builder.class);
+ }
+
+ // Construct using org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ type_ = org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type.UNKNOWN;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ content_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.whispersystems.textsecuregcm.storage.PubSubProtos.internal_static_textsecure_PubSubMessage_descriptor;
+ }
+
+ public org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage getDefaultInstanceForType() {
+ return org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.getDefaultInstance();
+ }
+
+ public org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage build() {
+ org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage buildPartial() {
+ org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage result = new org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.type_ = type_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.content_ = content_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage) {
+ return mergeFrom((org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage other) {
+ if (other == org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.getDefaultInstance()) return this;
+ if (other.hasType()) {
+ setType(other.getType());
+ }
+ if (other.hasContent()) {
+ setContent(other.getContent());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // optional .textsecure.PubSubMessage.Type type = 1;
+ private org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type type_ = org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type.UNKNOWN;
+ /**
+ * optional .textsecure.PubSubMessage.Type type = 1;
+ */
+ public boolean hasType() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * optional .textsecure.PubSubMessage.Type type = 1;
+ */
+ public org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type getType() {
+ return type_;
+ }
+ /**
+ * optional .textsecure.PubSubMessage.Type type = 1;
+ */
+ public Builder setType(org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ type_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional .textsecure.PubSubMessage.Type type = 1;
+ */
+ public Builder clearType() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ type_ = org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage.Type.UNKNOWN;
+ onChanged();
+ return this;
+ }
+
+ // optional bytes content = 2;
+ private com.google.protobuf.ByteString content_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * optional bytes content = 2;
+ */
+ public boolean hasContent() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * optional bytes content = 2;
+ */
+ public com.google.protobuf.ByteString getContent() {
+ return content_;
+ }
+ /**
+ * optional bytes content = 2;
+ */
+ public Builder setContent(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ content_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional bytes content = 2;
+ */
+ public Builder clearContent() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ content_ = getDefaultInstance().getContent();
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:textsecure.PubSubMessage)
+ }
+
+ static {
+ defaultInstance = new PubSubMessage(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:textsecure.PubSubMessage)
+ }
+
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_textsecure_PubSubMessage_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_textsecure_PubSubMessage_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n\023PubSubMessage.proto\022\ntextsecure\"\215\001\n\rPu" +
+ "bSubMessage\022,\n\004type\030\001 \001(\0162\036.textsecure.P" +
+ "ubSubMessage.Type\022\017\n\007content\030\002 \001(\014\"=\n\004Ty" +
+ "pe\022\013\n\007UNKNOWN\020\000\022\014\n\010QUERY_DB\020\001\022\013\n\007DELIVER" +
+ "\020\002\022\r\n\tKEEPALIVE\020\003B8\n(org.whispersystems." +
+ "textsecuregcm.storageB\014PubSubProtos"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ internal_static_textsecure_PubSubMessage_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_textsecure_PubSubMessage_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_textsecure_PubSubMessage_descriptor,
+ new java.lang.String[] { "Type", "Content", });
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageProtos.java b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageProtos.java
new file mode 100644
index 000000000..ab9ca5edd
--- /dev/null
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageProtos.java
@@ -0,0 +1,624 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: StoredMessage.proto
+
+package org.whispersystems.textsecuregcm.storage;
+
+public final class StoredMessageProtos {
+ private StoredMessageProtos() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ }
+ public interface StoredMessageOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // optional .textsecure.StoredMessage.Type type = 1;
+ /**
+ * optional .textsecure.StoredMessage.Type type = 1;
+ */
+ boolean hasType();
+ /**
+ * optional .textsecure.StoredMessage.Type type = 1;
+ */
+ org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type getType();
+
+ // optional bytes content = 2;
+ /**
+ * optional bytes content = 2;
+ */
+ boolean hasContent();
+ /**
+ * optional bytes content = 2;
+ */
+ com.google.protobuf.ByteString getContent();
+ }
+ /**
+ * Protobuf type {@code textsecure.StoredMessage}
+ */
+ public static final class StoredMessage extends
+ com.google.protobuf.GeneratedMessage
+ implements StoredMessageOrBuilder {
+ // Use StoredMessage.newBuilder() to construct.
+ private StoredMessage(com.google.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private StoredMessage(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final StoredMessage defaultInstance;
+ public static StoredMessage getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public StoredMessage getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private StoredMessage(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 8: {
+ int rawValue = input.readEnum();
+ org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type value = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(1, rawValue);
+ } else {
+ bitField0_ |= 0x00000001;
+ type_ = value;
+ }
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ content_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.class, org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser PARSER =
+ new com.google.protobuf.AbstractParser() {
+ public StoredMessage parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new StoredMessage(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ /**
+ * Protobuf enum {@code textsecure.StoredMessage.Type}
+ */
+ public enum Type
+ implements com.google.protobuf.ProtocolMessageEnum {
+ /**
+ * UNKNOWN = 0;
+ */
+ UNKNOWN(0, 0),
+ /**
+ * MESSAGE = 1;
+ */
+ MESSAGE(1, 1),
+ ;
+
+ /**
+ * UNKNOWN = 0;
+ */
+ public static final int UNKNOWN_VALUE = 0;
+ /**
+ * MESSAGE = 1;
+ */
+ public static final int MESSAGE_VALUE = 1;
+
+
+ public final int getNumber() { return value; }
+
+ public static Type valueOf(int value) {
+ switch (value) {
+ case 0: return UNKNOWN;
+ case 1: return MESSAGE;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap() {
+ public Type findValueByNumber(int number) {
+ return Type.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final Type[] VALUES = values();
+
+ public static Type valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private Type(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:textsecure.StoredMessage.Type)
+ }
+
+ private int bitField0_;
+ // optional .textsecure.StoredMessage.Type type = 1;
+ public static final int TYPE_FIELD_NUMBER = 1;
+ private org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type type_;
+ /**
+ * optional .textsecure.StoredMessage.Type type = 1;
+ */
+ public boolean hasType() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * optional .textsecure.StoredMessage.Type type = 1;
+ */
+ public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type getType() {
+ return type_;
+ }
+
+ // optional bytes content = 2;
+ public static final int CONTENT_FIELD_NUMBER = 2;
+ private com.google.protobuf.ByteString content_;
+ /**
+ * optional bytes content = 2;
+ */
+ public boolean hasContent() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * optional bytes content = 2;
+ */
+ public com.google.protobuf.ByteString getContent() {
+ return content_;
+ }
+
+ private void initFields() {
+ type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN;
+ content_ = com.google.protobuf.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeEnum(1, type_.getNumber());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, content_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(1, type_.getNumber());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, content_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code textsecure.StoredMessage}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessageOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.class, org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Builder.class);
+ }
+
+ // Construct using org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ content_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_descriptor;
+ }
+
+ public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage getDefaultInstanceForType() {
+ return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.getDefaultInstance();
+ }
+
+ public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage build() {
+ org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage buildPartial() {
+ org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage result = new org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.type_ = type_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.content_ = content_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage) {
+ return mergeFrom((org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage other) {
+ if (other == org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.getDefaultInstance()) return this;
+ if (other.hasType()) {
+ setType(other.getType());
+ }
+ if (other.hasContent()) {
+ setContent(other.getContent());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // optional .textsecure.StoredMessage.Type type = 1;
+ private org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN;
+ /**
+ * optional .textsecure.StoredMessage.Type type = 1;
+ */
+ public boolean hasType() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * optional .textsecure.StoredMessage.Type type = 1;
+ */
+ public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type getType() {
+ return type_;
+ }
+ /**
+ * optional .textsecure.StoredMessage.Type type = 1;
+ */
+ public Builder setType(org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ type_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional .textsecure.StoredMessage.Type type = 1;
+ */
+ public Builder clearType() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN;
+ onChanged();
+ return this;
+ }
+
+ // optional bytes content = 2;
+ private com.google.protobuf.ByteString content_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * optional bytes content = 2;
+ */
+ public boolean hasContent() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * optional bytes content = 2;
+ */
+ public com.google.protobuf.ByteString getContent() {
+ return content_;
+ }
+ /**
+ * optional bytes content = 2;
+ */
+ public Builder setContent(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ content_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional bytes content = 2;
+ */
+ public Builder clearContent() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ content_ = getDefaultInstance().getContent();
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:textsecure.StoredMessage)
+ }
+
+ static {
+ defaultInstance = new StoredMessage(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:textsecure.StoredMessage)
+ }
+
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_textsecure_StoredMessage_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_textsecure_StoredMessage_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n\023StoredMessage.proto\022\ntextsecure\"p\n\rSto" +
+ "redMessage\022,\n\004type\030\001 \001(\0162\036.textsecure.St" +
+ "oredMessage.Type\022\017\n\007content\030\002 \001(\014\" \n\004Typ" +
+ "e\022\013\n\007UNKNOWN\020\000\022\013\n\007MESSAGE\020\001B?\n(org.whisp" +
+ "ersystems.textsecuregcm.storageB\023StoredM" +
+ "essageProtos"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ internal_static_textsecure_StoredMessage_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_textsecure_StoredMessage_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_textsecure_StoredMessage_descriptor,
+ new java.lang.String[] { "Type", "Content", });
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java
index 4ba277041..909aac599 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java
@@ -19,21 +19,18 @@ package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
+import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.whispersystems.textsecuregcm.entities.PendingMessage;
import org.whispersystems.textsecuregcm.util.Constants;
-import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
-import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import static com.codahale.metrics.MetricRegistry.name;
+import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
+import static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
@@ -44,8 +41,6 @@ public class StoredMessages {
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(getClass(), "queue_size"));
-
- private static final ObjectMapper mapper = SystemMapper.getMapper();
private static final String QUEUE_PREFIX = "msgs";
private final JedisPool jedisPool;
@@ -55,64 +50,54 @@ public class StoredMessages {
}
public void clear(WebsocketAddress address) {
- Jedis jedis = null;
-
- try {
- jedis = jedisPool.getResource();
+ try (Jedis jedis = jedisPool.getResource()) {
jedis.del(getKey(address));
- } finally {
- if (jedis != null)
- jedisPool.returnResource(jedis);
}
}
- public void insert(WebsocketAddress address, PendingMessage message) {
- Jedis jedis = null;
+ public void insert(WebsocketAddress address, OutgoingMessageSignal message) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ StoredMessage storedMessage = StoredMessage.newBuilder()
+ .setType(StoredMessage.Type.MESSAGE)
+ .setContent(message.toByteString())
+ .build();
- try {
- jedis = jedisPool.getResource();
-
- String serializedMessage = mapper.writeValueAsString(message);
- long queueSize = jedis.lpush(getKey(address), serializedMessage);
+ long queueSize = jedis.lpush(getKey(address), storedMessage.toByteArray());
queueSizeHistogram.update(queueSize);
if (queueSize > 1000) {
jedis.ltrim(getKey(address), 0, 999);
}
-
- } catch (JsonProcessingException e) {
- logger.warn("StoredMessages", "Unable to store correctly", e);
- } finally {
- if (jedis != null)
- jedisPool.returnResource(jedis);
}
}
- public List getMessagesForDevice(WebsocketAddress address) {
- List messages = new LinkedList<>();
- Jedis jedis = null;
+ public List getMessagesForDevice(WebsocketAddress address) {
+ List messages = new LinkedList<>();
- try {
- jedis = jedisPool.getResource();
- String message;
+ try (Jedis jedis = jedisPool.getResource()) {
+ byte[] message;
while ((message = jedis.rpop(getKey(address))) != null) {
try {
- messages.add(mapper.readValue(message, PendingMessage.class));
- } catch (IOException e) {
- logger.warn("StoredMessages", "Not a valid PendingMessage", e);
+ StoredMessage storedMessage = StoredMessage.parseFrom(message);
+
+ if (storedMessage.getType().getNumber() == StoredMessage.Type.MESSAGE_VALUE) {
+ messages.add(OutgoingMessageSignal.parseFrom(storedMessage.getContent()));
+ } else {
+ logger.warn("Unkown stored message type: " + storedMessage.getType().getNumber());
+ }
+
+ } catch (InvalidProtocolBufferException e) {
+ logger.warn("Error parsing protobuf", e);
}
}
return messages;
- } finally {
- if (jedis != null)
- jedisPool.returnResource(jedis);
}
}
- private String getKey(WebsocketAddress address) {
- return QUEUE_PREFIX + ":" + address.serialize();
+ private byte[] getKey(WebsocketAddress address) {
+ return (QUEUE_PREFIX + ":" + address.serialize()).getBytes();
}
}
\ No newline at end of file
diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketAccountAuthenticator.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketAccountAuthenticator.java
index d756876ac..df2d04618 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketAccountAuthenticator.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketAccountAuthenticator.java
@@ -4,6 +4,7 @@ import com.google.common.base.Optional;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.storage.Account;
+import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.websocket.auth.AuthenticationException;
import org.whispersystems.websocket.auth.WebSocketAuthenticator;
diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java
index 5c1902d37..cd91f2864 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java
@@ -1,13 +1,14 @@
package org.whispersystems.textsecuregcm.websocket;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.whispersystems.textsecuregcm.entities.PendingMessage;
+import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
+import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
@@ -16,23 +17,20 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.PubSubListener;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
-import org.whispersystems.textsecuregcm.storage.PubSubMessage;
import org.whispersystems.textsecuregcm.storage.StoredMessages;
-import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.List;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
+import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
public class WebSocketConnection implements PubSubListener {
- private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
- private static final ObjectMapper objectMapper = SystemMapper.getMapper();
+ private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final AccountsManager accountsManager;
private final PushSender pushSender;
@@ -72,52 +70,56 @@ public class WebSocketConnection implements PubSubListener {
}
@Override
- public void onPubSubMessage(PubSubMessage message) {
+ public void onPubSubMessage(PubSubMessage pubSubMessage) {
try {
- switch (message.getType()) {
- case PubSubMessage.TYPE_QUERY_DB:
+ switch (pubSubMessage.getType().getNumber()) {
+ case PubSubMessage.Type.QUERY_DB_VALUE:
processStoredMessages();
break;
- case PubSubMessage.TYPE_DELIVER:
- PendingMessage pendingMessage = objectMapper.readValue(message.getContents(),
- PendingMessage.class);
- sendMessage(pendingMessage);
+ case PubSubMessage.Type.DELIVER_VALUE:
+ sendMessage(OutgoingMessageSignal.parseFrom(pubSubMessage.getContent()));
break;
default:
- logger.warn("Unknown pubsub message: " + message.getType());
+ logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
}
- } catch (IOException e) {
- logger.warn("Error deserializing PendingMessage", e);
+ } catch (InvalidProtocolBufferException e) {
+ logger.warn("Protobuf parse error", e);
}
}
- private void sendMessage(final PendingMessage message) {
- String content = message.getEncryptedOutgoingMessage();
- Optional body = Optional.fromNullable(content.getBytes());
- ListenableFuture response = client.sendRequest("PUT", "/api/v1/message", body);
+ private void sendMessage(final OutgoingMessageSignal message) {
+ try {
+ EncryptedOutgoingMessage encryptedMessage = new EncryptedOutgoingMessage(message, device.getSignalingKey());
+ Optional body = Optional.fromNullable(encryptedMessage.toByteArray());
+ ListenableFuture response = client.sendRequest("PUT", "/api/v1/message", body);
- Futures.addCallback(response, new FutureCallback() {
- @Override
- public void onSuccess(@Nullable WebSocketResponseMessage response) {
- if (isSuccessResponse(response) && !message.isReceipt()) {
- sendDeliveryReceiptFor(message);
- } else if (!isSuccessResponse(response)) {
+ Futures.addCallback(response, new FutureCallback() {
+ @Override
+ public void onSuccess(@Nullable WebSocketResponseMessage response) {
+ boolean isReceipt = message.getType() == OutgoingMessageSignal.Type.RECEIPT_VALUE;
+
+ if (isSuccessResponse(response) && !isReceipt) {
+ sendDeliveryReceiptFor(message);
+ } else if (!isSuccessResponse(response)) {
+ requeueMessage(message);
+ }
+ }
+
+ @Override
+ public void onFailure(@Nonnull Throwable throwable) {
requeueMessage(message);
}
- }
- @Override
- public void onFailure(@Nonnull Throwable throwable) {
- requeueMessage(message);
- }
-
- private boolean isSuccessResponse(WebSocketResponseMessage response) {
- return response != null && response.getStatus() >= 200 && response.getStatus() < 300;
- }
- });
+ private boolean isSuccessResponse(WebSocketResponseMessage response) {
+ return response != null && response.getStatus() >= 200 && response.getStatus() < 300;
+ }
+ });
+ } catch (CryptoEncodingException e) {
+ logger.warn("Bad signaling key", e);
+ }
}
- private void requeueMessage(PendingMessage message) {
+ private void requeueMessage(OutgoingMessageSignal message) {
try {
pushSender.sendMessage(account, device, message);
} catch (NotPushRegisteredException | TransientPushFailureException e) {
@@ -126,12 +128,12 @@ public class WebSocketConnection implements PubSubListener {
}
}
- private void sendDeliveryReceiptFor(PendingMessage message) {
+ private void sendDeliveryReceiptFor(OutgoingMessageSignal message) {
try {
- Optional source = accountsManager.get(message.getSender());
+ Optional source = accountsManager.get(message.getSource());
if (!source.isPresent()) {
- logger.warn("Source account disappeared? (%s)", message.getSender());
+ logger.warn("Source account disappeared? (%s)", message.getSource());
return;
}
@@ -139,7 +141,7 @@ public class WebSocketConnection implements PubSubListener {
OutgoingMessageSignal.newBuilder()
.setSource(account.getNumber())
.setSourceDevice((int) device.getId())
- .setTimestamp(message.getMessageId())
+ .setTimestamp(message.getTimestamp())
.setType(OutgoingMessageSignal.Type.RECEIPT_VALUE);
for (Device device : source.get().getDevices()) {
@@ -151,9 +153,9 @@ public class WebSocketConnection implements PubSubListener {
}
private void processStoredMessages() {
- List messages = storedMessages.getMessagesForDevice(address);
+ List messages = storedMessages.getMessagesForDevice(address);
- for (PendingMessage message : messages) {
+ for (OutgoingMessageSignal message : messages) {
sendMessage(message);
}
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java
index fcc8dd0fe..ec3ca8b20 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java
@@ -63,7 +63,7 @@ public class DirectoryCommand extends ConfiguredCommand outgoingMessages = new LinkedList() {{
- add(new PendingMessage("sender1", 1111, false, "first"));
- add(new PendingMessage("sender1", 2222, false, "second"));
- add(new PendingMessage("sender2", 3333, false, "third"));
+ List outgoingMessages = new LinkedList() {{
+ add(createMessage("sender1", 1111, false, "first"));
+ add(createMessage("sender1", 2222, false, "second"));
+ add(createMessage("sender2", 3333, false, "third"));
}};
when(device.getId()).thenReturn(2L);
+ when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52]));
+
when(account.getAuthenticatedDevice()).thenReturn(Optional.of(device));
when(account.getNumber()).thenReturn("+14152222222");
@@ -167,16 +170,26 @@ public class WebSocketConnectionTest {
futures.get(0).setException(new IOException());
futures.get(2).setException(new IOException());
- List pending = new LinkedList() {{
- add(new PendingMessage("sender1", 1111, false, "first"));
- add(new PendingMessage("sender2", 3333, false, "third"));
+ List pending = new LinkedList() {{
+ add(createMessage("sender1", 1111, false, "first"));
+ add(createMessage("sender2", 3333, false, "third"));
}};
- verify(pushSender, times(2)).sendMessage(eq(account), eq(device), any(PendingMessage.class));
- verify(pushSender, times(1)).sendMessage(eq(sender1), eq(sender1device), any(MessageProtos.OutgoingMessageSignal.class));
+ verify(pushSender, times(2)).sendMessage(eq(account), eq(device), any(OutgoingMessageSignal.class));
+ verify(pushSender, times(1)).sendMessage(eq(sender1), eq(sender1device), any(OutgoingMessageSignal.class));
connection.onConnectionLost();
verify(pubSubManager).unsubscribe(eq(new WebsocketAddress("+14152222222", 2L)), eq(connection));
}
+ private OutgoingMessageSignal createMessage(String sender, long timestamp, boolean receipt, String content) {
+ return OutgoingMessageSignal.newBuilder()
+ .setSource(sender)
+ .setSourceDevice(1)
+ .setType(receipt ? OutgoingMessageSignal.Type.RECEIPT_VALUE : OutgoingMessageSignal.Type.CIPHERTEXT_VALUE)
+ .setTimestamp(timestamp)
+ .setMessage(ByteString.copyFrom(content.getBytes()))
+ .build();
+ }
+
}