Close websocket connections when displaced.

This commit is contained in:
Jon Chambers 2020-09-01 16:55:50 -04:00 committed by Jon Chambers
parent ce89bf3c77
commit 697c380cd1
2 changed files with 11 additions and 4 deletions

View File

@ -33,7 +33,6 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private static final Timer durationTimer = metricRegistry.timer(name(WebSocketConnection.class, "connected_duration" ));
private static final Timer unauthenticatedDurationTimer = metricRegistry.timer(name(WebSocketConnection.class, "unauthenticated_connection_duration"));
private static final Counter openWebsocketCounter = metricRegistry.counter(name(WebSocketConnection.class, "open_websockets"));
private static final Meter explicitDisplacementMeter = metricRegistry.meter(name(WebSocketConnection.class, "explicitDisplacement"));
private final PushSender pushSender;
private final ReceiptSender receiptSender;
@ -74,7 +73,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
openWebsocketCounter.inc();
RedisOperation.unchecked(() -> apnFallbackManager.cancel(account, device));
clientPresenceManager.setPresent(account.getUuid(), device.getId(), explicitDisplacementMeter::mark);
clientPresenceManager.setPresent(account.getUuid(), device.getId(), connection);
messagesManager.addMessageAvailabilityListener(account.getUuid(), device.getId(), connection);
pubSubManager.publish(address, connectMessage);
pubSubManager.subscribe(address, connection);
@ -95,4 +95,3 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
}
}
}

View File

@ -15,6 +15,7 @@ import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
@ -39,7 +40,7 @@ import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class WebSocketConnection implements DispatchChannel, MessageAvailabilityListener {
public class WebSocketConnection implements DispatchChannel, MessageAvailabilityListener, DisplacedPresenceListener {
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
public static final Histogram messageTime = metricRegistry.histogram(name(MessageController.class, "message_delivery_duration"));
@ -49,6 +50,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
private static final Meter messagesPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesPersisted"));
private static final Meter pubSubNewMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubNewMessage"));
private static final Meter pubSubPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubPersisted"));
private static final Meter displacementMeter = metricRegistry.meter(name(WebSocketConnection.class, "explicitDisplacement"));
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
@ -230,6 +232,12 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
messagesPersistedMeter.mark();
}
@Override
public void handleDisplacement() {
displacementMeter.mark();
client.hardDisconnectQuietly();
}
private static class StoredMessageInfo {
private final long id;
private final boolean cached;