From 1403dbd5ddeb5db6ac208203f19b212ec4282e5e Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Tue, 10 Mar 2015 12:45:05 -0700 Subject: [PATCH] Handle pubsub callbacks from a cached thread pool. ...implement some belt and suspenders dead letter handling. ...implement some belt and suspenders redis pubsub queue handling. // FREEBIE --- pom.xml | 4 +- .../textsecuregcm/WhisperServerService.java | 4 +- .../textsecuregcm/storage/PubSubManager.java | 57 +++++++++++++------ .../websocket/DeadLetterHandler.java | 39 +++++++++++++ .../websocket/WebsocketAddress.java | 23 ++++++++ 5 files changed, 106 insertions(+), 21 deletions(-) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/websocket/DeadLetterHandler.java diff --git a/pom.xml b/pom.xml index 45eddd569..76aa34136 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.whispersystems.textsecure TextSecureServer - 0.35 + 0.39 0.7.1 @@ -129,7 +129,7 @@ org.whispersystems websocket-resources - 0.2.1 + 0.2.2 diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index e8840753b..87de03342 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -75,6 +75,7 @@ import org.whispersystems.textsecuregcm.storage.PubSubManager; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.UrlSigner; import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener; +import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler; import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener; import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.workers.DirectoryCommand; @@ -157,7 +158,8 @@ public class WhisperServerService extends Application listeners = new HashMap<>(); + private final Executor threaded = Executors.newCachedThreadPool(); + + private final JedisPool jedisPool; + private final DeadLetterHandler deadLetterHandler; - private final JedisPool jedisPool; private boolean subscribed = false; - public PubSubManager(JedisPool jedisPool) { - this.jedisPool = jedisPool; + public PubSubManager(JedisPool jedisPool, DeadLetterHandler deadLetterHandler) { + this.jedisPool = jedisPool; + this.deadLetterHandler = deadLetterHandler; initializePubSubWorker(); waitForSubscription(); } @@ -72,9 +79,13 @@ public class PubSubManager { @Override public void run() { for (;;) { + logger.info("Starting Redis PubSub Subscriber..."); + try (Jedis jedis = jedisPool.getResource()) { jedis.subscribe(baseListener, KEEPALIVE_CHANNEL); logger.warn("**** Unsubscribed from holding channel!!! ******"); + } catch (Throwable t) { + logger.warn("*** SUBSCRIBER CONNECTION CLOSED", t); } } } @@ -89,8 +100,8 @@ public class PubSubManager { publish(KEEPALIVE_CHANNEL, PubSubMessage.newBuilder() .setType(PubSubMessage.Type.KEEPALIVE) .build()); - } catch (InterruptedException e) { - throw new AssertionError(e); + } catch (Throwable e) { + logger.warn("KEEPALIVE PUBLISH EXCEPTION: ", e); } } } @@ -100,20 +111,30 @@ public class PubSubManager { private class SubscriptionListener extends BinaryJedisPubSub { @Override - public void onMessage(byte[] channel, byte[] message) { - try { - PubSubListener listener; - - synchronized (PubSubManager.this) { - listener = listeners.get(new String(channel)); - } - - if (listener != null) { - listener.onPubSubMessage(PubSubMessage.parseFrom(message)); - } - } catch (InvalidProtocolBufferException e) { - logger.warn("Error parsing PubSub protobuf", e); + public void onMessage(final byte[] channel, final byte[] message) { + if (Arrays.equals(KEEPALIVE_CHANNEL, channel)) { + return; } + + final PubSubListener listener; + + synchronized (PubSubManager.this) { + listener = listeners.get(new String(channel)); + } + + threaded.execute(new Runnable() { + @Override + public void run() { + try { + PubSubMessage receivedMessage = PubSubMessage.parseFrom(message); + + if (listener != null) listener.onPubSubMessage(receivedMessage); + else deadLetterHandler.handle(channel, receivedMessage); + } catch (InvalidProtocolBufferException e) { + logger.warn("Error parsing PubSub protobuf", e); + } + } + }); } @Override diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/DeadLetterHandler.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/DeadLetterHandler.java new file mode 100644 index 000000000..3e8504150 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/DeadLetterHandler.java @@ -0,0 +1,39 @@ +package org.whispersystems.textsecuregcm.websocket; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal; +import org.whispersystems.textsecuregcm.storage.MessagesManager; +import org.whispersystems.textsecuregcm.storage.PubSubProtos; + +public class DeadLetterHandler { + + private final Logger logger = LoggerFactory.getLogger(DeadLetterHandler.class); + + private final MessagesManager messagesManager; + + public DeadLetterHandler(MessagesManager messagesManager) { + this.messagesManager = messagesManager; + } + + public void handle(byte[] channel, PubSubProtos.PubSubMessage pubSubMessage) { + try { + WebsocketAddress address = new WebsocketAddress(new String(channel)); + + logger.warn("Handling dead letter to: " + address); + + switch (pubSubMessage.getType().getNumber()) { + case PubSubProtos.PubSubMessage.Type.DELIVER_VALUE: + OutgoingMessageSignal message = OutgoingMessageSignal.parseFrom(pubSubMessage.getContent()); + messagesManager.insert(address.getNumber(), address.getDeviceId(), message); + break; + } + } catch (InvalidProtocolBufferException e) { + logger.warn("Bad pubsub message", e); + } catch (InvalidWebsocketAddressException e) { + logger.warn("Invalid websocket address", e); + } + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketAddress.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketAddress.java index fbd381c34..3ad5d944a 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketAddress.java +++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebsocketAddress.java @@ -10,6 +10,29 @@ public class WebsocketAddress { this.deviceId = deviceId; } + public WebsocketAddress(String serialized) throws InvalidWebsocketAddressException { + try { + String[] parts = serialized.split(":", 2); + + if (parts.length != 2) { + throw new InvalidWebsocketAddressException("Bad address: " + serialized); + } + + this.number = parts[0]; + this.deviceId = Long.parseLong(parts[1]); + } catch (NumberFormatException e) { + throw new InvalidWebsocketAddressException(e); + } + } + + public String getNumber() { + return number; + } + + public long getDeviceId() { + return deviceId; + } + public String serialize() { return number + ":" + deviceId; }