diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java index e8535924a..12872c372 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2021 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ @@ -14,8 +14,5 @@ public interface MessageAvailabilityListener { void handleNewMessagesAvailable(); - @Deprecated - void handleNewEphemeralMessageAvailable(); - void handleMessagesPersisted(); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 5d454f87a..4cbfe55e8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -47,7 +47,6 @@ import org.whispersystems.textsecuregcm.util.RedisClusterUtil; public class MessagesCache extends RedisClusterPubSubAdapter implements Managed { - private final FaultTolerantRedisCluster insertCluster; private final FaultTolerantRedisCluster readDeleteCluster; private final FaultTolerantPubSubConnection pubSubConnection; @@ -66,7 +65,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get")); private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist")); private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); - private final Timer takeEphemeralMessageTimer = Metrics.timer(name(MessagesCache.class, "takeEphemeral")); private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage")); private final Counter newMessageNotificationCounter = Metrics.counter(name(MessagesCache.class, "newMessageNotification")); private final Counter queuePersistedNotificationCounter = Metrics.counter(name(MessagesCache.class, "queuePersisted")); @@ -75,7 +73,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::"; - private static final String EPHEMERAL_QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_ephemeral::"; private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::"; private static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10); @@ -89,7 +86,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp public MessagesCache(final FaultTolerantRedisCluster insertCluster, final FaultTolerantRedisCluster readDeleteCluster, final ExecutorService notificationExecutorService) throws IOException { - this.insertCluster = insertCluster; this.readDeleteCluster = readDeleteCluster; this.pubSubConnection = readDeleteCluster.createPubSubConnection(); @@ -224,37 +220,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp }); } - @Deprecated - public Optional takeEphemeralMessage(final UUID destinationUuid, - final long destinationDevice) { - return takeEphemeralMessage(destinationUuid, destinationDevice, System.currentTimeMillis()); - } - - @VisibleForTesting - Optional takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice, - final long currentTimeMillis) { - final long earliestAllowableTimestamp = currentTimeMillis - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis(); - - return takeEphemeralMessageTimer.record(() -> readDeleteCluster.withBinaryCluster(connection -> { - byte[] messageBytes; - - while ((messageBytes = connection.sync().lpop(getEphemeralMessageQueueKey(destinationUuid, destinationDevice))) - != null) { - try { - final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(messageBytes); - - if (message.getTimestamp() >= earliestAllowableTimestamp) { - return Optional.of(message); - } - } catch (final InvalidProtocolBufferException e) { - logger.warn("Failed to parse envelope", e); - } - } - - return Optional.empty(); - })); - } - public void clear(final UUID destinationUuid) { // TODO Remove null check in a fully UUID-based world if (destinationUuid != null) { @@ -335,7 +300,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private static String[] getKeyspaceChannels(final String queueName) { return new String[] { QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}", - EPHEMERAL_QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}", PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}" }; } @@ -353,14 +317,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp logger.warn("Unexpected error handling new message", e); } }); - } else if (channel.startsWith(EPHEMERAL_QUEUE_KEYSPACE_PREFIX) && "rpush".equals(message)) { - notificationExecutorService.execute(() -> { - try { - findListener(channel).ifPresent(MessageAvailabilityListener::handleNewEphemeralMessageAvailable); - } catch (final Exception e) { - logger.warn("Unexpected error handling new ephemeral message", e); - } - }); } else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) { queuePersistedNotificationCounter.increment(); notificationExecutorService.execute(() -> { @@ -414,10 +370,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); } - static byte[] getEphemeralMessageQueueKey(final UUID accountUuid, final long deviceId) { - return ("user_queue_ephemeral::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); - } - private static byte[] getMessageQueueMetadataKey(final UUID accountUuid, final long deviceId) { return ("user_queue_metadata::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 6e797cba3..883773f6b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -57,11 +57,6 @@ public class MessagesManager { } } - @Deprecated - public Optional takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice) { - return messagesCache.takeEphemeralMessage(destinationUuid, destinationDevice); - } - public boolean hasCachedMessages(final UUID destinationUuid, final long destinationDevice) { return messagesCache.hasMessages(destinationUuid, destinationDevice); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 9b0b0b206..c838025f0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -64,7 +64,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static final Histogram primaryDeviceMessageTime = metricRegistry.histogram(name(MessageController.class, "primary_device_message_delivery_duration")); private static final Meter sendMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_message")); private static final Meter messageAvailableMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesAvailable")); - private static final Meter ephemeralMessageAvailableMeter = metricRegistry.meter(name(WebSocketConnection.class, "ephemeralMessagesAvailable")); private static final Meter messagesPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesPersisted")); private static final Meter bytesSentMeter = metricRegistry.meter(name(WebSocketConnection.class, "bytes_sent")); private static final Meter sendFailuresMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_failures")); @@ -352,14 +351,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac processStoredMessages(); } - @Override - public void handleNewEphemeralMessageAvailable() { - ephemeralMessageAvailableMeter.mark(); - - messagesManager.takeEphemeralMessage(auth.getAccount().getUuid(), device.getId()) - .ifPresent(message -> sendMessage(message, Optional.empty())); - } - @Override public void handleMessagesPersisted() { messagesPersistedMeter.mark(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index 0f87a55d8..d343075ef 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2021 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ @@ -124,10 +124,6 @@ class MessagePersisterIntegrationTest { public void handleNewMessagesAvailable() { } - @Override - public void handleNewEphemeralMessageAvailable() { - } - @Override public void handleMessagesPersisted() { synchronized (messagesPersisted) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index c1ab7b553..9849c9271 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2021 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ @@ -280,10 +280,6 @@ public class MessagesCacheTest extends AbstractRedisClusterTest { } } - @Override - public void handleNewEphemeralMessageAvailable() { - } - @Override public void handleMessagesPersisted() { } @@ -310,10 +306,6 @@ public class MessagesCacheTest extends AbstractRedisClusterTest { public void handleNewMessagesAvailable() { } - @Override - public void handleNewEphemeralMessageAvailable() { - } - @Override public void handleMessagesPersisted() { synchronized (notified) {