Remove obsolete ephemeral queue handling

This commit is contained in:
Chris Eager 2021-08-25 16:30:49 -05:00 committed by Chris Eager
parent 93c3cea912
commit 3e5087e60b
6 changed files with 3 additions and 80 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
@ -14,8 +14,5 @@ public interface MessageAvailabilityListener {
void handleNewMessagesAvailable();
@Deprecated
void handleNewEphemeralMessageAvailable();
void handleMessagesPersisted();
}

View File

@ -47,7 +47,6 @@ import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
private final FaultTolerantRedisCluster insertCluster;
private final FaultTolerantRedisCluster readDeleteCluster;
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
@ -66,7 +65,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get"));
private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist"));
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
private final Timer takeEphemeralMessageTimer = Metrics.timer(name(MessagesCache.class, "takeEphemeral"));
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification"));
private final Counter queuePersistedNotificationCounter = Metrics.counter(name(MessagesCache.class, "queuePersisted"));
@ -75,7 +73,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::";
private static final String EPHEMERAL_QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_ephemeral::";
private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::";
private static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10);
@ -89,7 +86,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, final ExecutorService notificationExecutorService) throws IOException {
this.insertCluster = insertCluster;
this.readDeleteCluster = readDeleteCluster;
this.pubSubConnection = readDeleteCluster.createPubSubConnection();
@ -224,37 +220,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
});
}
@Deprecated
public Optional<MessageProtos.Envelope> takeEphemeralMessage(final UUID destinationUuid,
final long destinationDevice) {
return takeEphemeralMessage(destinationUuid, destinationDevice, System.currentTimeMillis());
}
@VisibleForTesting
Optional<MessageProtos.Envelope> takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice,
final long currentTimeMillis) {
final long earliestAllowableTimestamp = currentTimeMillis - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis();
return takeEphemeralMessageTimer.record(() -> readDeleteCluster.withBinaryCluster(connection -> {
byte[] messageBytes;
while ((messageBytes = connection.sync().lpop(getEphemeralMessageQueueKey(destinationUuid, destinationDevice)))
!= null) {
try {
final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(messageBytes);
if (message.getTimestamp() >= earliestAllowableTimestamp) {
return Optional.of(message);
}
} catch (final InvalidProtocolBufferException e) {
logger.warn("Failed to parse envelope", e);
}
}
return Optional.empty();
}));
}
public void clear(final UUID destinationUuid) {
// TODO Remove null check in a fully UUID-based world
if (destinationUuid != null) {
@ -335,7 +300,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private static String[] getKeyspaceChannels(final String queueName) {
return new String[] {
QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}",
EPHEMERAL_QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}",
PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}"
};
}
@ -353,14 +317,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
logger.warn("Unexpected error handling new message", e);
}
});
} else if (channel.startsWith(EPHEMERAL_QUEUE_KEYSPACE_PREFIX) && "rpush".equals(message)) {
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresent(MessageAvailabilityListener::handleNewEphemeralMessageAvailable);
} catch (final Exception e) {
logger.warn("Unexpected error handling new ephemeral message", e);
}
});
} else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) {
queuePersistedNotificationCounter.increment();
notificationExecutorService.execute(() -> {
@ -414,10 +370,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
}
static byte[] getEphemeralMessageQueueKey(final UUID accountUuid, final long deviceId) {
return ("user_queue_ephemeral::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
}
private static byte[] getMessageQueueMetadataKey(final UUID accountUuid, final long deviceId) {
return ("user_queue_metadata::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
}

View File

@ -57,11 +57,6 @@ public class MessagesManager {
}
}
@Deprecated
public Optional<Envelope> takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice) {
return messagesCache.takeEphemeralMessage(destinationUuid, destinationDevice);
}
public boolean hasCachedMessages(final UUID destinationUuid, final long destinationDevice) {
return messagesCache.hasMessages(destinationUuid, destinationDevice);
}

View File

@ -64,7 +64,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private static final Histogram primaryDeviceMessageTime = metricRegistry.histogram(name(MessageController.class, "primary_device_message_delivery_duration"));
private static final Meter sendMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_message"));
private static final Meter messageAvailableMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesAvailable"));
private static final Meter ephemeralMessageAvailableMeter = metricRegistry.meter(name(WebSocketConnection.class, "ephemeralMessagesAvailable"));
private static final Meter messagesPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesPersisted"));
private static final Meter bytesSentMeter = metricRegistry.meter(name(WebSocketConnection.class, "bytes_sent"));
private static final Meter sendFailuresMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_failures"));
@ -352,14 +351,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
processStoredMessages();
}
@Override
public void handleNewEphemeralMessageAvailable() {
ephemeralMessageAvailableMeter.mark();
messagesManager.takeEphemeralMessage(auth.getAccount().getUuid(), device.getId())
.ifPresent(message -> sendMessage(message, Optional.empty()));
}
@Override
public void handleMessagesPersisted() {
messagesPersistedMeter.mark();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
@ -124,10 +124,6 @@ class MessagePersisterIntegrationTest {
public void handleNewMessagesAvailable() {
}
@Override
public void handleNewEphemeralMessageAvailable() {
}
@Override
public void handleMessagesPersisted() {
synchronized (messagesPersisted) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
@ -280,10 +280,6 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
}
}
@Override
public void handleNewEphemeralMessageAvailable() {
}
@Override
public void handleMessagesPersisted() {
}
@ -310,10 +306,6 @@ public class MessagesCacheTest extends AbstractRedisClusterTest {
public void handleNewMessagesAvailable() {
}
@Override
public void handleNewEphemeralMessageAvailable() {
}
@Override
public void handleMessagesPersisted() {
synchronized (notified) {