From 2de9adb7ae2c8f5915799fd29a4df57bcee0df4c Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Thu, 19 Mar 2015 14:37:10 -0700 Subject: [PATCH] Make dispatch subscription/unsubscription synchronized. --- .../dispatch/DispatchManager.java | 63 ++++++------------- 1 file changed, 20 insertions(+), 43 deletions(-) diff --git a/src/main/java/org/whispersystems/dispatch/DispatchManager.java b/src/main/java/org/whispersystems/dispatch/DispatchManager.java index fe9e1d267..2bea48cd0 100644 --- a/src/main/java/org/whispersystems/dispatch/DispatchManager.java +++ b/src/main/java/org/whispersystems/dispatch/DispatchManager.java @@ -8,9 +8,8 @@ import org.whispersystems.dispatch.redis.PubSubConnection; import org.whispersystems.dispatch.redis.PubSubReply; import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -18,7 +17,7 @@ public class DispatchManager extends Thread { private final Logger logger = LoggerFactory.getLogger(DispatchManager.class); private final Executor executor = Executors.newCachedThreadPool(); - private final Map subscriptions = new HashMap<>(); + private final Map subscriptions = new ConcurrentHashMap<>(); private final Optional deadLetterChannel; private final RedisPubSubConnectionFactory redisPubSubConnectionFactory; @@ -45,13 +44,9 @@ public class DispatchManager extends Thread { this.pubSubConnection.close(); } - public void subscribe(String name, DispatchChannel dispatchChannel) { - Optional previous; - - synchronized (subscriptions) { - previous = Optional.fromNullable(subscriptions.get(name)); - subscriptions.put(name, dispatchChannel); - } + public synchronized void subscribe(String name, DispatchChannel dispatchChannel) { + Optional previous = Optional.fromNullable(subscriptions.get(name)); + subscriptions.put(name, dispatchChannel); try { pubSubConnection.subscribe(name); @@ -64,18 +59,12 @@ public class DispatchManager extends Thread { } } - public void unsubscribe(String name, DispatchChannel channel) { - final Optional subscription; + public synchronized void unsubscribe(String name, DispatchChannel channel) { + Optional subscription = Optional.fromNullable(subscriptions.get(name)); - synchronized (subscriptions) { - subscription = Optional.fromNullable(subscriptions.get(name)); + if (subscription.isPresent() && subscription.get() == channel) { + subscriptions.remove(name); - if (subscription.isPresent() && subscription.get() == channel) { - subscriptions.remove(name); - } - } - - if (subscription.isPresent()) { try { pubSubConnection.unsubscribe(name); } catch (IOException e) { @@ -103,7 +92,7 @@ public class DispatchManager extends Thread { if (running) { this.pubSubConnection.close(); this.pubSubConnection = redisPubSubConnectionFactory.connect(); - resubscribe(); + resubscribeAll(); } } } @@ -112,11 +101,7 @@ public class DispatchManager extends Thread { } private void dispatchSubscribe(final PubSubReply reply) { - final Optional subscription; - - synchronized (subscriptions) { - subscription = Optional.fromNullable(subscriptions.get(reply.getChannel())); - } + Optional subscription = Optional.fromNullable(subscriptions.get(reply.getChannel())); if (subscription.isPresent()) { dispatchSubscription(reply.getChannel(), subscription.get()); @@ -126,11 +111,7 @@ public class DispatchManager extends Thread { } private void dispatchMessage(PubSubReply reply) { - Optional subscription; - - synchronized (subscriptions) { - subscription = Optional.fromNullable(subscriptions.get(reply.getChannel())); - } + Optional subscription = Optional.fromNullable(subscriptions.get(reply.getChannel())); if (subscription.isPresent()) { dispatchMessage(reply.getChannel(), subscription.get(), reply.getContent().get()); @@ -141,22 +122,18 @@ public class DispatchManager extends Thread { } } - private void resubscribe() { - final Collection names; - - synchronized (subscriptions) { - names = subscriptions.keySet(); - } - + private void resubscribeAll() { new Thread() { @Override public void run() { - try { - for (String name : names) { - pubSubConnection.subscribe(name); + synchronized (DispatchManager.this) { + try { + for (String name : subscriptions.keySet()) { + pubSubConnection.subscribe(name); + } + } catch (IOException e) { + logger.warn("***** RESUBSCRIPTION ERROR *****", e); } - } catch (IOException e) { - logger.warn("***** RESUBSCRIPTION ERROR *****", e); } } }.start();