diff --git a/pom.xml b/pom.xml
index 45eddd569..76aa34136 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,7 +9,7 @@
org.whispersystems.textsecure
TextSecureServer
- 0.35
+ 0.39
0.7.1
@@ -129,7 +129,7 @@
org.whispersystems
websocket-resources
- 0.2.1
+ 0.2.2
diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index e8840753b..87de03342 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -75,6 +75,7 @@ import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.UrlSigner;
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
+import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler;
import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener;
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.DirectoryCommand;
@@ -157,7 +158,8 @@ public class WhisperServerService extends Application listeners = new HashMap<>();
+ private final Executor threaded = Executors.newCachedThreadPool();
+
+ private final JedisPool jedisPool;
+ private final DeadLetterHandler deadLetterHandler;
- private final JedisPool jedisPool;
private boolean subscribed = false;
- public PubSubManager(JedisPool jedisPool) {
- this.jedisPool = jedisPool;
+ public PubSubManager(JedisPool jedisPool, DeadLetterHandler deadLetterHandler) {
+ this.jedisPool = jedisPool;
+ this.deadLetterHandler = deadLetterHandler;
initializePubSubWorker();
waitForSubscription();
}
@@ -72,9 +79,13 @@ public class PubSubManager {
@Override
public void run() {
for (;;) {
+ logger.info("Starting Redis PubSub Subscriber...");
+
try (Jedis jedis = jedisPool.getResource()) {
jedis.subscribe(baseListener, KEEPALIVE_CHANNEL);
logger.warn("**** Unsubscribed from holding channel!!! ******");
+ } catch (Throwable t) {
+ logger.warn("*** SUBSCRIBER CONNECTION CLOSED", t);
}
}
}
@@ -89,8 +100,8 @@ public class PubSubManager {
publish(KEEPALIVE_CHANNEL, PubSubMessage.newBuilder()
.setType(PubSubMessage.Type.KEEPALIVE)
.build());
- } catch (InterruptedException e) {
- throw new AssertionError(e);
+ } catch (Throwable e) {
+ logger.warn("KEEPALIVE PUBLISH EXCEPTION: ", e);
}
}
}
@@ -100,20 +111,30 @@ public class PubSubManager {
private class SubscriptionListener extends BinaryJedisPubSub {
@Override
- public void onMessage(byte[] channel, byte[] message) {
- try {
- PubSubListener listener;
-
- synchronized (PubSubManager.this) {
- listener = listeners.get(new String(channel));
- }
-
- if (listener != null) {
- listener.onPubSubMessage(PubSubMessage.parseFrom(message));
- }
- } catch (InvalidProtocolBufferException e) {
- logger.warn("Error parsing PubSub protobuf", e);
+ public void onMessage(final byte[] channel, final byte[] message) {
+ if (Arrays.equals(KEEPALIVE_CHANNEL, channel)) {
+ return;
}
+
+ final PubSubListener listener;
+
+ synchronized (PubSubManager.this) {
+ listener = listeners.get(new String(channel));
+ }
+
+ threaded.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ PubSubMessage receivedMessage = PubSubMessage.parseFrom(message);
+
+ if (listener != null) listener.onPubSubMessage(receivedMessage);
+ else deadLetterHandler.handle(channel, receivedMessage);
+ } catch (InvalidProtocolBufferException e) {
+ logger.warn("Error parsing PubSub protobuf", e);
+ }
+ }
+ });
}
@Override
diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/DeadLetterHandler.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/DeadLetterHandler.java
new file mode 100644
index 000000000..3e8504150
--- /dev/null
+++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/DeadLetterHandler.java
@@ -0,0 +1,39 @@
+package org.whispersystems.textsecuregcm.websocket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
+import org.whispersystems.textsecuregcm.storage.MessagesManager;
+import org.whispersystems.textsecuregcm.storage.PubSubProtos;
+
+public class DeadLetterHandler {
+
+ private final Logger logger = LoggerFactory.getLogger(DeadLetterHandler.class);
+
+ private final MessagesManager messagesManager;
+
+ public DeadLetterHandler(MessagesManager messagesManager) {
+ this.messagesManager = messagesManager;
+ }
+
+ public void handle(byte[] channel, PubSubProtos.PubSubMessage pubSubMessage) {
+ try {
+ WebsocketAddress address = new WebsocketAddress(new String(channel));
+
+ logger.warn("Handling dead letter to: " + address);
+
+ switch (pubSubMessage.getType().getNumber()) {
+ case PubSubProtos.PubSubMessage.Type.DELIVER_VALUE:
+ OutgoingMessageSignal message = OutgoingMessageSignal.parseFrom(pubSubMessage.getContent());
+ messagesManager.insert(address.getNumber(), address.getDeviceId(), message);
+ break;
+ }
+ } catch (InvalidProtocolBufferException e) {
+ logger.warn("Bad pubsub message", e);
+ } catch (InvalidWebsocketAddressException e) {
+ logger.warn("Invalid websocket address", e);
+ }
+ }
+
+}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketAddress.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketAddress.java
index fbd381c34..3ad5d944a 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketAddress.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketAddress.java
@@ -10,6 +10,29 @@ public class WebsocketAddress {
this.deviceId = deviceId;
}
+ public WebsocketAddress(String serialized) throws InvalidWebsocketAddressException {
+ try {
+ String[] parts = serialized.split(":", 2);
+
+ if (parts.length != 2) {
+ throw new InvalidWebsocketAddressException("Bad address: " + serialized);
+ }
+
+ this.number = parts[0];
+ this.deviceId = Long.parseLong(parts[1]);
+ } catch (NumberFormatException e) {
+ throw new InvalidWebsocketAddressException(e);
+ }
+ }
+
+ public String getNumber() {
+ return number;
+ }
+
+ public long getDeviceId() {
+ return deviceId;
+ }
+
public String serialize() {
return number + ":" + deviceId;
}