Make dispatch subscription/unsubscription synchronized.

This commit is contained in:
Moxie Marlinspike 2015-03-19 14:37:10 -07:00
parent c7e0cc1158
commit 2de9adb7ae
1 changed files with 20 additions and 43 deletions

View File

@ -8,9 +8,8 @@ import org.whispersystems.dispatch.redis.PubSubConnection;
import org.whispersystems.dispatch.redis.PubSubReply; import org.whispersystems.dispatch.redis.PubSubReply;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -18,7 +17,7 @@ public class DispatchManager extends Thread {
private final Logger logger = LoggerFactory.getLogger(DispatchManager.class); private final Logger logger = LoggerFactory.getLogger(DispatchManager.class);
private final Executor executor = Executors.newCachedThreadPool(); private final Executor executor = Executors.newCachedThreadPool();
private final Map<String, DispatchChannel> subscriptions = new HashMap<>(); private final Map<String, DispatchChannel> subscriptions = new ConcurrentHashMap<>();
private final Optional<DispatchChannel> deadLetterChannel; private final Optional<DispatchChannel> deadLetterChannel;
private final RedisPubSubConnectionFactory redisPubSubConnectionFactory; private final RedisPubSubConnectionFactory redisPubSubConnectionFactory;
@ -45,13 +44,9 @@ public class DispatchManager extends Thread {
this.pubSubConnection.close(); this.pubSubConnection.close();
} }
public void subscribe(String name, DispatchChannel dispatchChannel) { public synchronized void subscribe(String name, DispatchChannel dispatchChannel) {
Optional<DispatchChannel> previous; Optional<DispatchChannel> previous = Optional.fromNullable(subscriptions.get(name));
subscriptions.put(name, dispatchChannel);
synchronized (subscriptions) {
previous = Optional.fromNullable(subscriptions.get(name));
subscriptions.put(name, dispatchChannel);
}
try { try {
pubSubConnection.subscribe(name); pubSubConnection.subscribe(name);
@ -64,18 +59,12 @@ public class DispatchManager extends Thread {
} }
} }
public void unsubscribe(String name, DispatchChannel channel) { public synchronized void unsubscribe(String name, DispatchChannel channel) {
final Optional<DispatchChannel> subscription; Optional<DispatchChannel> subscription = Optional.fromNullable(subscriptions.get(name));
synchronized (subscriptions) { if (subscription.isPresent() && subscription.get() == channel) {
subscription = Optional.fromNullable(subscriptions.get(name)); subscriptions.remove(name);
if (subscription.isPresent() && subscription.get() == channel) {
subscriptions.remove(name);
}
}
if (subscription.isPresent()) {
try { try {
pubSubConnection.unsubscribe(name); pubSubConnection.unsubscribe(name);
} catch (IOException e) { } catch (IOException e) {
@ -103,7 +92,7 @@ public class DispatchManager extends Thread {
if (running) { if (running) {
this.pubSubConnection.close(); this.pubSubConnection.close();
this.pubSubConnection = redisPubSubConnectionFactory.connect(); this.pubSubConnection = redisPubSubConnectionFactory.connect();
resubscribe(); resubscribeAll();
} }
} }
} }
@ -112,11 +101,7 @@ public class DispatchManager extends Thread {
} }
private void dispatchSubscribe(final PubSubReply reply) { private void dispatchSubscribe(final PubSubReply reply) {
final Optional<DispatchChannel> subscription; Optional<DispatchChannel> subscription = Optional.fromNullable(subscriptions.get(reply.getChannel()));
synchronized (subscriptions) {
subscription = Optional.fromNullable(subscriptions.get(reply.getChannel()));
}
if (subscription.isPresent()) { if (subscription.isPresent()) {
dispatchSubscription(reply.getChannel(), subscription.get()); dispatchSubscription(reply.getChannel(), subscription.get());
@ -126,11 +111,7 @@ public class DispatchManager extends Thread {
} }
private void dispatchMessage(PubSubReply reply) { private void dispatchMessage(PubSubReply reply) {
Optional<DispatchChannel> subscription; Optional<DispatchChannel> subscription = Optional.fromNullable(subscriptions.get(reply.getChannel()));
synchronized (subscriptions) {
subscription = Optional.fromNullable(subscriptions.get(reply.getChannel()));
}
if (subscription.isPresent()) { if (subscription.isPresent()) {
dispatchMessage(reply.getChannel(), subscription.get(), reply.getContent().get()); dispatchMessage(reply.getChannel(), subscription.get(), reply.getContent().get());
@ -141,22 +122,18 @@ public class DispatchManager extends Thread {
} }
} }
private void resubscribe() { private void resubscribeAll() {
final Collection<String> names;
synchronized (subscriptions) {
names = subscriptions.keySet();
}
new Thread() { new Thread() {
@Override @Override
public void run() { public void run() {
try { synchronized (DispatchManager.this) {
for (String name : names) { try {
pubSubConnection.subscribe(name); for (String name : subscriptions.keySet()) {
pubSubConnection.subscribe(name);
}
} catch (IOException e) {
logger.warn("***** RESUBSCRIPTION ERROR *****", e);
} }
} catch (IOException e) {
logger.warn("***** RESUBSCRIPTION ERROR *****", e);
} }
} }
}.start(); }.start();