remove synchronized locks that may be held while blocking
This commit is contained in:
parent
b483159b3a
commit
cf8f2a3463
|
@ -37,6 +37,7 @@ import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -75,6 +76,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
private final ClusterLuaScript removeQueueScript;
|
private final ClusterLuaScript removeQueueScript;
|
||||||
private final ClusterLuaScript getQueuesToPersistScript;
|
private final ClusterLuaScript getQueuesToPersistScript;
|
||||||
|
|
||||||
|
private final ReentrantLock messageListenersLock = new ReentrantLock();
|
||||||
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
|
private final Map<String, MessageAvailabilityListener> messageListenersByQueueName = new HashMap<>();
|
||||||
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
|
private final Map<MessageAvailabilityListener, String> queueNamesByMessageListener = new IdentityHashMap<>();
|
||||||
|
|
||||||
|
@ -146,8 +148,11 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
|
|
||||||
final Set<String> queueNames;
|
final Set<String> queueNames;
|
||||||
|
|
||||||
synchronized (messageListenersByQueueName) {
|
messageListenersLock.lock();
|
||||||
|
try {
|
||||||
queueNames = new HashSet<>(messageListenersByQueueName.keySet());
|
queueNames = new HashSet<>(messageListenersByQueueName.keySet());
|
||||||
|
} finally {
|
||||||
|
messageListenersLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final String queueName : queueNames) {
|
for (final String queueName : queueNames) {
|
||||||
|
@ -402,11 +407,14 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
final String queueName = getQueueName(destinationUuid, deviceId);
|
final String queueName = getQueueName(destinationUuid, deviceId);
|
||||||
|
|
||||||
final CompletableFuture<Void> subscribeFuture;
|
final CompletableFuture<Void> subscribeFuture;
|
||||||
synchronized (messageListenersByQueueName) {
|
messageListenersLock.lock();
|
||||||
|
try {
|
||||||
messageListenersByQueueName.put(queueName, listener);
|
messageListenersByQueueName.put(queueName, listener);
|
||||||
queueNamesByMessageListener.put(listener, queueName);
|
queueNamesByMessageListener.put(listener, queueName);
|
||||||
// Submit to the Redis queue within the synchronized block, but don’t wait until exiting
|
// Submit to the Redis queue while holding the lock, but don’t wait until exiting
|
||||||
subscribeFuture = subscribeForKeyspaceNotifications(queueName);
|
subscribeFuture = subscribeForKeyspaceNotifications(queueName);
|
||||||
|
} finally {
|
||||||
|
messageListenersLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribeFuture.join();
|
subscribeFuture.join();
|
||||||
|
@ -414,22 +422,28 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
|
|
||||||
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {
|
public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) {
|
||||||
@Nullable final String queueName;
|
@Nullable final String queueName;
|
||||||
synchronized (messageListenersByQueueName) {
|
messageListenersLock.lock();
|
||||||
|
try {
|
||||||
queueName = queueNamesByMessageListener.get(listener);
|
queueName = queueNamesByMessageListener.get(listener);
|
||||||
|
} finally {
|
||||||
|
messageListenersLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (queueName != null) {
|
if (queueName != null) {
|
||||||
|
|
||||||
final CompletableFuture<Void> unsubscribeFuture;
|
final CompletableFuture<Void> unsubscribeFuture;
|
||||||
synchronized (messageListenersByQueueName) {
|
messageListenersLock.lock();
|
||||||
|
try {
|
||||||
queueNamesByMessageListener.remove(listener);
|
queueNamesByMessageListener.remove(listener);
|
||||||
if (messageListenersByQueueName.remove(queueName, listener)) {
|
if (messageListenersByQueueName.remove(queueName, listener)) {
|
||||||
// Submit to the Redis queue within the synchronized block, but don’t wait until exiting
|
// Submit to the Redis queue holding the lock, but don’t wait until exiting
|
||||||
unsubscribeFuture = unsubscribeFromKeyspaceNotifications(queueName);
|
unsubscribeFuture = unsubscribeFromKeyspaceNotifications(queueName);
|
||||||
} else {
|
} else {
|
||||||
messageAvailabilityListenerRemovedAfterAddCounter.increment();
|
messageAvailabilityListenerRemovedAfterAddCounter.increment();
|
||||||
unsubscribeFuture = CompletableFuture.completedFuture(null);
|
unsubscribeFuture = CompletableFuture.completedFuture(null);
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
messageListenersLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
unsubscribeFuture.join();
|
unsubscribeFuture.join();
|
||||||
|
@ -507,8 +521,11 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
private Optional<MessageAvailabilityListener> findListener(final String keyspaceChannel) {
|
private Optional<MessageAvailabilityListener> findListener(final String keyspaceChannel) {
|
||||||
final String queueName = getQueueNameFromKeyspaceChannel(keyspaceChannel);
|
final String queueName = getQueueNameFromKeyspaceChannel(keyspaceChannel);
|
||||||
|
|
||||||
synchronized (messageListenersByQueueName) {
|
messageListenersLock.lock();
|
||||||
|
try {
|
||||||
return Optional.ofNullable(messageListenersByQueueName.get(queueName));
|
return Optional.ofNullable(messageListenersByQueueName.get(queueName));
|
||||||
|
} finally {
|
||||||
|
messageListenersLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ package org.whispersystems.websocket.session;
|
||||||
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import org.whispersystems.websocket.WebSocketClient;
|
import org.whispersystems.websocket.WebSocketClient;
|
||||||
|
|
||||||
|
@ -13,6 +14,8 @@ public class WebSocketSessionContext {
|
||||||
|
|
||||||
private final List<WebSocketEventListener> closeListeners = new LinkedList<>();
|
private final List<WebSocketEventListener> closeListeners = new LinkedList<>();
|
||||||
|
|
||||||
|
private final ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
private final WebSocketClient webSocketClient;
|
private final WebSocketClient webSocketClient;
|
||||||
|
|
||||||
private Object authenticated;
|
private Object authenticated;
|
||||||
|
@ -39,21 +42,33 @@ public class WebSocketSessionContext {
|
||||||
return authenticated;
|
return authenticated;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void addWebsocketClosedListener(WebSocketEventListener listener) {
|
public void addWebsocketClosedListener(WebSocketEventListener listener) {
|
||||||
if (!closed) this.closeListeners.add(listener);
|
lock.lock();
|
||||||
else listener.onWebSocketClose(this, 1000, "Closed");
|
try {
|
||||||
|
if (!closed)
|
||||||
|
this.closeListeners.add(listener);
|
||||||
|
else
|
||||||
|
listener.onWebSocketClose(this, 1000, "Closed");
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public WebSocketClient getClient() {
|
public WebSocketClient getClient() {
|
||||||
return webSocketClient;
|
return webSocketClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void notifyClosed(int statusCode, String reason) {
|
public void notifyClosed(int statusCode, String reason) {
|
||||||
for (WebSocketEventListener listener : closeListeners) {
|
lock.lock();
|
||||||
listener.onWebSocketClose(this, statusCode, reason);
|
try {
|
||||||
}
|
for (WebSocketEventListener listener : closeListeners) {
|
||||||
|
listener.onWebSocketClose(this, statusCode, reason);
|
||||||
|
}
|
||||||
|
|
||||||
closed = true;
|
closed = true;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface WebSocketEventListener {
|
public interface WebSocketEventListener {
|
||||||
|
|
Loading…
Reference in New Issue