Stop sending/processing CONNECTED pub/sub messages.
This commit is contained in:
parent
afd645fb11
commit
e324f27655
|
@ -1,11 +1,9 @@
|
||||||
package org.whispersystems.textsecuregcm.websocket;
|
package org.whispersystems.textsecuregcm.websocket;
|
||||||
|
|
||||||
import com.codahale.metrics.Counter;
|
import com.codahale.metrics.Counter;
|
||||||
import com.codahale.metrics.Meter;
|
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import com.google.protobuf.ByteString;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
|
||||||
|
@ -17,7 +15,6 @@ import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.storage.Device;
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
|
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
import org.whispersystems.websocket.session.WebSocketSessionContext;
|
import org.whispersystems.websocket.session.WebSocketSessionContext;
|
||||||
import org.whispersystems.websocket.setup.WebSocketConnectListener;
|
import org.whispersystems.websocket.setup.WebSocketConnectListener;
|
||||||
|
@ -67,16 +64,12 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||||
final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender,
|
final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender,
|
||||||
messagesManager, account, device,
|
messagesManager, account, device,
|
||||||
context.getClient(), connectionId);
|
context.getClient(), connectionId);
|
||||||
final PubSubMessage connectMessage = PubSubMessage.newBuilder().setType(PubSubMessage.Type.CONNECTED)
|
|
||||||
.setContent(ByteString.copyFrom(connectionId.getBytes()))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
openWebsocketCounter.inc();
|
openWebsocketCounter.inc();
|
||||||
RedisOperation.unchecked(() -> apnFallbackManager.cancel(account, device));
|
RedisOperation.unchecked(() -> apnFallbackManager.cancel(account, device));
|
||||||
|
|
||||||
clientPresenceManager.setPresent(account.getUuid(), device.getId(), connection);
|
clientPresenceManager.setPresent(account.getUuid(), device.getId(), connection);
|
||||||
messagesManager.addMessageAvailabilityListener(account.getUuid(), device.getId(), connection);
|
messagesManager.addMessageAvailabilityListener(account.getUuid(), device.getId(), connection);
|
||||||
pubSubManager.publish(address, connectMessage);
|
|
||||||
pubSubManager.subscribe(address, connection);
|
pubSubManager.subscribe(address, connection);
|
||||||
|
|
||||||
context.addListener(new WebSocketSessionContext.WebSocketEventListener() {
|
context.addListener(new WebSocketSessionContext.WebSocketEventListener() {
|
||||||
|
|
|
@ -45,7 +45,6 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||||
public static final Histogram messageTime = metricRegistry.histogram(name(MessageController.class, "message_delivery_duration"));
|
public static final Histogram messageTime = metricRegistry.histogram(name(MessageController.class, "message_delivery_duration"));
|
||||||
private static final Meter sendMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_message"));
|
private static final Meter sendMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_message"));
|
||||||
private static final Meter pubSubDisplacementMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubDisplacement"));
|
|
||||||
private static final Meter messageAvailableMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesAvailable"));
|
private static final Meter messageAvailableMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesAvailable"));
|
||||||
private static final Meter messagesPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesPersisted"));
|
private static final Meter messagesPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesPersisted"));
|
||||||
private static final Meter pubSubNewMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubNewMessage"));
|
private static final Meter pubSubNewMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubNewMessage"));
|
||||||
|
@ -94,12 +93,6 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
pubSubNewMessageMeter.mark();
|
pubSubNewMessageMeter.mark();
|
||||||
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty(), false);
|
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty(), false);
|
||||||
break;
|
break;
|
||||||
case PubSubMessage.Type.CONNECTED_VALUE:
|
|
||||||
if (pubSubMessage.hasContent() && !new String(pubSubMessage.getContent().toByteArray()).equals(connectionId)) {
|
|
||||||
pubSubDisplacementMeter.mark();
|
|
||||||
client.hardDisconnectQuietly();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
|
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue