Handle pubsub callbacks from a cached thread pool.

...implement some belt and suspenders dead letter handling.

...implement some belt and suspenders redis pubsub queue handling.

// FREEBIE
This commit is contained in:
Moxie Marlinspike 2015-03-10 12:45:05 -07:00
parent 6ef3845a34
commit 1403dbd5dd
5 changed files with 106 additions and 21 deletions

View File

@ -9,7 +9,7 @@
<groupId>org.whispersystems.textsecure</groupId>
<artifactId>TextSecureServer</artifactId>
<version>0.35</version>
<version>0.39</version>
<properties>
<dropwizard.version>0.7.1</dropwizard.version>
@ -129,7 +129,7 @@
<dependency>
<groupId>org.whispersystems</groupId>
<artifactId>websocket-resources</artifactId>
<version>0.2.1</version>
<version>0.2.2</version>
</dependency>

View File

@ -75,6 +75,7 @@ import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.UrlSigner;
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler;
import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener;
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.DirectoryCommand;
@ -157,7 +158,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);
FederatedClientManager federatedClientManager = new FederatedClientManager(config.getFederationConfiguration());
MessagesManager messagesManager = new MessagesManager(messages);
PubSubManager pubSubManager = new PubSubManager(cacheClient);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(messagesManager);
PubSubManager pubSubManager = new PubSubManager(cacheClient, deadLetterHandler);
PushServiceClient pushServiceClient = new PushServiceClient(httpClient, config.getPushConfiguration());
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager);
AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager);

View File

@ -3,11 +3,14 @@ package org.whispersystems.textsecuregcm.storage;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
import redis.clients.jedis.BinaryJedisPubSub;
@ -21,12 +24,16 @@ public class PubSubManager {
private final Logger logger = LoggerFactory.getLogger(PubSubManager.class);
private final SubscriptionListener baseListener = new SubscriptionListener();
private final Map<String, PubSubListener> listeners = new HashMap<>();
private final Executor threaded = Executors.newCachedThreadPool();
private final JedisPool jedisPool;
private final DeadLetterHandler deadLetterHandler;
private final JedisPool jedisPool;
private boolean subscribed = false;
public PubSubManager(JedisPool jedisPool) {
this.jedisPool = jedisPool;
public PubSubManager(JedisPool jedisPool, DeadLetterHandler deadLetterHandler) {
this.jedisPool = jedisPool;
this.deadLetterHandler = deadLetterHandler;
initializePubSubWorker();
waitForSubscription();
}
@ -72,9 +79,13 @@ public class PubSubManager {
@Override
public void run() {
for (;;) {
logger.info("Starting Redis PubSub Subscriber...");
try (Jedis jedis = jedisPool.getResource()) {
jedis.subscribe(baseListener, KEEPALIVE_CHANNEL);
logger.warn("**** Unsubscribed from holding channel!!! ******");
} catch (Throwable t) {
logger.warn("*** SUBSCRIBER CONNECTION CLOSED", t);
}
}
}
@ -89,8 +100,8 @@ public class PubSubManager {
publish(KEEPALIVE_CHANNEL, PubSubMessage.newBuilder()
.setType(PubSubMessage.Type.KEEPALIVE)
.build());
} catch (InterruptedException e) {
throw new AssertionError(e);
} catch (Throwable e) {
logger.warn("KEEPALIVE PUBLISH EXCEPTION: ", e);
}
}
}
@ -100,20 +111,30 @@ public class PubSubManager {
private class SubscriptionListener extends BinaryJedisPubSub {
@Override
public void onMessage(byte[] channel, byte[] message) {
try {
PubSubListener listener;
synchronized (PubSubManager.this) {
listener = listeners.get(new String(channel));
}
if (listener != null) {
listener.onPubSubMessage(PubSubMessage.parseFrom(message));
}
} catch (InvalidProtocolBufferException e) {
logger.warn("Error parsing PubSub protobuf", e);
public void onMessage(final byte[] channel, final byte[] message) {
if (Arrays.equals(KEEPALIVE_CHANNEL, channel)) {
return;
}
final PubSubListener listener;
synchronized (PubSubManager.this) {
listener = listeners.get(new String(channel));
}
threaded.execute(new Runnable() {
@Override
public void run() {
try {
PubSubMessage receivedMessage = PubSubMessage.parseFrom(message);
if (listener != null) listener.onPubSubMessage(receivedMessage);
else deadLetterHandler.handle(channel, receivedMessage);
} catch (InvalidProtocolBufferException e) {
logger.warn("Error parsing PubSub protobuf", e);
}
}
});
}
@Override

View File

@ -0,0 +1,39 @@
package org.whispersystems.textsecuregcm.websocket;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
public class DeadLetterHandler {
private final Logger logger = LoggerFactory.getLogger(DeadLetterHandler.class);
private final MessagesManager messagesManager;
public DeadLetterHandler(MessagesManager messagesManager) {
this.messagesManager = messagesManager;
}
public void handle(byte[] channel, PubSubProtos.PubSubMessage pubSubMessage) {
try {
WebsocketAddress address = new WebsocketAddress(new String(channel));
logger.warn("Handling dead letter to: " + address);
switch (pubSubMessage.getType().getNumber()) {
case PubSubProtos.PubSubMessage.Type.DELIVER_VALUE:
OutgoingMessageSignal message = OutgoingMessageSignal.parseFrom(pubSubMessage.getContent());
messagesManager.insert(address.getNumber(), address.getDeviceId(), message);
break;
}
} catch (InvalidProtocolBufferException e) {
logger.warn("Bad pubsub message", e);
} catch (InvalidWebsocketAddressException e) {
logger.warn("Invalid websocket address", e);
}
}
}

View File

@ -10,6 +10,29 @@ public class WebsocketAddress {
this.deviceId = deviceId;
}
public WebsocketAddress(String serialized) throws InvalidWebsocketAddressException {
try {
String[] parts = serialized.split(":", 2);
if (parts.length != 2) {
throw new InvalidWebsocketAddressException("Bad address: " + serialized);
}
this.number = parts[0];
this.deviceId = Long.parseLong(parts[1]);
} catch (NumberFormatException e) {
throw new InvalidWebsocketAddressException(e);
}
}
public String getNumber() {
return number;
}
public long getDeviceId() {
return deviceId;
}
public String serialize() {
return number + ":" + deviceId;
}