From a7443a9ece44d153e9372f03f1629abb157b996b Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Tue, 24 Aug 2021 10:03:58 -0500 Subject: [PATCH] =?UTF-8?q?Don=E2=80=99t=20persist=20ephemeral=20messages;?= =?UTF-8?q?=20clear=20ephemeral=20field=20when=20sending=20to=20clients?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storage/MessagePersister.java | 21 ++- .../textsecuregcm/storage/MessagesCache.java | 139 ++++++++++-------- .../storage/MessagesManager.java | 24 ++- .../websocket/WebSocketConnection.java | 28 ++-- 4 files changed, 121 insertions(+), 91 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index ec9dbd26f..28b13b861 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2021 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ @@ -39,7 +39,6 @@ public class MessagePersister implements Managed { private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues")); private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue")); - private final Meter persistMessageMeter = metricRegistry.meter(name(MessagePersister.class, "persistMessage")); private final Meter persistQueueExceptionMeter = metricRegistry.meter(name(MessagePersister.class, "persistQueueException")); private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount")); private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize")); @@ -152,19 +151,19 @@ public class MessagePersister implements Managed { messagesCache.lockQueueForPersistence(accountUuid, deviceId); try { - int messageCount = 0; - List messages; + int messageCount = 0; + List messages; - do { - messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT); + do { + messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT); - messagesManager.persistMessages(accountUuid, deviceId, messages); - messageCount += messages.size(); + messagesManager.persistMessages(accountUuid, deviceId, messages); + messageCount += messages.size(); - persistMessageMeter.mark(messages.size()); - } while (!messages.isEmpty()); - queueSizeHistogram.update(messageCount); + } while (!messages.isEmpty()); + + queueSizeHistogram.update(messageCount); } finally { messagesCache.unlockQueueForPersistence(accountUuid, deviceId); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index aa42e2972..ad6a99ca1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2021 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ @@ -134,51 +134,65 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp } public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) { - final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build(); - return (long)insertTimer.record(() -> - insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), - getMessageQueueMetadataKey(destinationUuid, destinationDevice), - getQueueIndexKey(destinationUuid, destinationDevice)), - List.of(messageWithGuid.toByteArray(), - String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8), - guid.toString().getBytes(StandardCharsets.UTF_8)))); + final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build(); + return (long) insertTimer.record(() -> + insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), + getMessageQueueMetadataKey(destinationUuid, destinationDevice), + getQueueIndexKey(destinationUuid, destinationDevice)), + List.of(messageWithGuid.toByteArray(), + String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8), + guid.toString().getBytes(StandardCharsets.UTF_8)))); } - public void insertEphemeral(final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) { - insertEphemeralTimer.record(() -> { - final byte[] ephemeralQueueKey = getEphemeralMessageQueueKey(destinationUuid, destinationDevice); + public void insertEphemeral(final UUID destinationUuid, final long destinationDevice, + final MessageProtos.Envelope message) { - insertCluster.useBinaryCluster(connection -> { - connection.sync().rpush(ephemeralQueueKey, message.toByteArray()); - connection.sync().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds()); - }); - }); + final MessageProtos.Envelope messageWithGuid; + + if (!message.hasServerGuid()) { + messageWithGuid = message.toBuilder().setServerGuid(UUID.randomUUID().toString()).build(); + } else { + messageWithGuid = message; } - public Optional remove(final UUID destinationUuid, final long destinationDevice, final UUID messageGuid) { - return remove(destinationUuid, destinationDevice, List.of(messageGuid)).stream().findFirst(); + insertEphemeralTimer.record(() -> { + final byte[] ephemeralQueueKey = getEphemeralMessageQueueKey(destinationUuid, destinationDevice); + + insertCluster.useBinaryCluster(connection -> { + connection.sync().rpush(ephemeralQueueKey, messageWithGuid.toByteArray()); + connection.sync().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds()); + }); + }); + } + + public Optional remove(final UUID destinationUuid, final long destinationDevice, + final UUID messageGuid) { + return remove(destinationUuid, destinationDevice, List.of(messageGuid)).stream().findFirst(); + } + + @SuppressWarnings("unchecked") + public List remove(final UUID destinationUuid, final long destinationDevice, + final List messageGuids) { + final List serialized = (List) Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, + REMOVE_METHOD_UUID).record(() -> + removeByGuidScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), + getMessageQueueMetadataKey(destinationUuid, destinationDevice), + getQueueIndexKey(destinationUuid, destinationDevice)), + messageGuids.stream().map(guid -> guid.toString().getBytes(StandardCharsets.UTF_8)) + .collect(Collectors.toList()))); + + final List removedMessages = new ArrayList<>(serialized.size()); + + for (final byte[] bytes : serialized) { + try { + removedMessages.add(constructEntityFromEnvelope(0, MessageProtos.Envelope.parseFrom(bytes))); + } catch (final InvalidProtocolBufferException e) { + logger.warn("Failed to parse envelope", e); + } } - @SuppressWarnings("unchecked") - public List remove(final UUID destinationUuid, final long destinationDevice, final List messageGuids) { - final List serialized = (List)Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_UUID).record(() -> - removeByGuidScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice), - getMessageQueueMetadataKey(destinationUuid, destinationDevice), - getQueueIndexKey(destinationUuid, destinationDevice)), - messageGuids.stream().map(guid -> guid.toString().getBytes(StandardCharsets.UTF_8)).collect(Collectors.toList()))); - - final List removedMessages = new ArrayList<>(serialized.size()); - - for (final byte[] bytes : serialized) { - try { - removedMessages.add(constructEntityFromEnvelope(0, MessageProtos.Envelope.parseFrom(bytes))); - } catch (final InvalidProtocolBufferException e) { - logger.warn("Failed to parse envelope", e); - } - } - - return removedMessages; - } + return removedMessages; + } public boolean hasMessages(final UUID destinationUuid, final long destinationDevice) { return readDeleteCluster.withBinaryCluster(connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0); @@ -223,40 +237,43 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp for (final ScoredValue scoredMessage : scoredMessages) { try { - envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue())); + envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue())); } catch (InvalidProtocolBufferException e) { - logger.warn("Failed to parse envelope", e); + logger.warn("Failed to parse envelope", e); } } - return envelopes; + return envelopes; }); } - public Optional takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice) { - return takeEphemeralMessage(destinationUuid, destinationDevice, System.currentTimeMillis()); - } + public Optional takeEphemeralMessage(final UUID destinationUuid, + final long destinationDevice) { + return takeEphemeralMessage(destinationUuid, destinationDevice, System.currentTimeMillis()); + } - @VisibleForTesting - Optional takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice, final long currentTimeMillis) { - final long earliestAllowableTimestamp = currentTimeMillis - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis(); + @VisibleForTesting + Optional takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice, + final long currentTimeMillis) { + final long earliestAllowableTimestamp = currentTimeMillis - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis(); - return takeEphemeralMessageTimer.record(() -> readDeleteCluster.withBinaryCluster(connection -> { - byte[] messageBytes; + return takeEphemeralMessageTimer.record(() -> readDeleteCluster.withBinaryCluster(connection -> { + byte[] messageBytes; - while ((messageBytes = connection.sync().lpop(getEphemeralMessageQueueKey(destinationUuid, destinationDevice))) != null) { - try { - final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(messageBytes); + while ((messageBytes = connection.sync().lpop(getEphemeralMessageQueueKey(destinationUuid, destinationDevice))) + != null) { + try { + final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(messageBytes); - if (message.getTimestamp() >= earliestAllowableTimestamp) { - return Optional.of(message); - } - } catch (final InvalidProtocolBufferException e) { - logger.warn("Failed to parse envelope", e); - } - } + if (message.getTimestamp() >= earliestAllowableTimestamp) { + return Optional.of(message); + } + } catch (final InvalidProtocolBufferException e) { + logger.warn("Failed to parse envelope", e); + } + } - return Optional.empty(); + return Optional.empty(); })); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 3fdfa98cb..5cba99590 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 Signal Messenger, LLC + * Copyright 2013-2021 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ package org.whispersystems.textsecuregcm.storage; @@ -25,9 +25,13 @@ public class MessagesManager { private static final int RESULT_SET_CHUNK_SIZE = 100; - private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" )); - private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid")); + private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid")); + private static final Meter cacheMissByGuidMeter = metricRegistry.meter( + name(MessagesManager.class, "cacheMissByGuid")); + + // migrated from MessagePersister, name is not a typo + private final Meter persistMessageMeter = metricRegistry.meter(name(MessagePersister.class, "persistMessage")); private final MessagesDynamoDb messagesDynamoDb; private final MessagesCache messagesCache; @@ -110,8 +114,16 @@ public class MessagesManager { final UUID destinationUuid, final long destinationDeviceId, final List messages) { - messagesDynamoDb.store(messages, destinationUuid, destinationDeviceId); - messagesCache.remove(destinationUuid, destinationDeviceId, messages.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList())); + + final List nonEphemeralMessages = messages.stream() + .filter(envelope -> !envelope.getEphemeral()) + .collect(Collectors.toList()); + + messagesDynamoDb.store(nonEphemeralMessages, destinationUuid, destinationDeviceId); + messagesCache.remove(destinationUuid, destinationDeviceId, + messages.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList())); + + persistMessageMeter.mark(nonEphemeralMessages.size()); } public void addMessageAvailabilityListener( diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 2a361d65c..ebb4be3c0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -157,7 +157,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac } private CompletableFuture sendMessage(final Envelope message, final Optional storedMessageInfo) { - final Optional body = Optional.ofNullable(message.toByteArray()); + // clear ephemeral field from the envelope + final Optional body = Optional.ofNullable(message.toBuilder().clearEphemeral().build().toByteArray()); sendMessageMeter.mark(); sentMessageCounter.increment(); @@ -165,19 +166,20 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac // X-Signal-Key: false must be sent until Android stops assuming it missing means true return client.sendRequest("PUT", "/api/v1/message", List.of("X-Signal-Key: false", TimestampHeaderUtil.getTimestampHeader()), body).whenComplete((response, throwable) -> { - if (throwable == null) { - if (isSuccessResponse(response)) { - if (storedMessageInfo.isPresent()) { - messagesManager.delete(auth.getAccount().getUuid(), device.getId(), storedMessageInfo.get().getGuid()); - } + if (throwable == null) { + if (isSuccessResponse(response)) { + if (storedMessageInfo.isPresent()) { + messagesManager.delete(auth.getAccount().getUuid(), device.getId(), storedMessageInfo.get().getGuid()); + } - if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) { - recordMessageDeliveryDuration(message.getTimestamp(), device); - sendDeliveryReceiptFor(message); - } - } else { - final List tags = new ArrayList<>(List.of(Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())), - UserAgentTagUtil.getPlatformTag(client.getUserAgent()))); + if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) { + recordMessageDeliveryDuration(message.getTimestamp(), device); + sendDeliveryReceiptFor(message); + } + } else { + final List tags = new ArrayList<>( + List.of(Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())), + UserAgentTagUtil.getPlatformTag(client.getUserAgent()))); // TODO Remove this once we've identified the cause of message rejections from desktop clients if (StringUtils.isNotBlank(response.getMessage())) {