Don’t persist ephemeral messages; clear ephemeral field when sending to clients
This commit is contained in:
parent
54fe3b9a43
commit
a7443a9ece
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2013-2020 Signal Messenger, LLC
|
* Copyright 2013-2021 Signal Messenger, LLC
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@ -39,7 +39,6 @@ public class MessagePersister implements Managed {
|
||||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||||
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
|
||||||
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue"));
|
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue"));
|
||||||
private final Meter persistMessageMeter = metricRegistry.meter(name(MessagePersister.class, "persistMessage"));
|
|
||||||
private final Meter persistQueueExceptionMeter = metricRegistry.meter(name(MessagePersister.class, "persistQueueException"));
|
private final Meter persistQueueExceptionMeter = metricRegistry.meter(name(MessagePersister.class, "persistQueueException"));
|
||||||
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount"));
|
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount"));
|
||||||
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize"));
|
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize"));
|
||||||
|
@ -152,19 +151,19 @@ public class MessagePersister implements Managed {
|
||||||
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
|
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int messageCount = 0;
|
int messageCount = 0;
|
||||||
List<MessageProtos.Envelope> messages;
|
List<MessageProtos.Envelope> messages;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
|
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
|
||||||
|
|
||||||
messagesManager.persistMessages(accountUuid, deviceId, messages);
|
messagesManager.persistMessages(accountUuid, deviceId, messages);
|
||||||
messageCount += messages.size();
|
messageCount += messages.size();
|
||||||
|
|
||||||
persistMessageMeter.mark(messages.size());
|
|
||||||
} while (!messages.isEmpty());
|
|
||||||
|
|
||||||
queueSizeHistogram.update(messageCount);
|
} while (!messages.isEmpty());
|
||||||
|
|
||||||
|
queueSizeHistogram.update(messageCount);
|
||||||
} finally {
|
} finally {
|
||||||
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
|
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2013-2020 Signal Messenger, LLC
|
* Copyright 2013-2021 Signal Messenger, LLC
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@ -134,51 +134,65 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
}
|
}
|
||||||
|
|
||||||
public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) {
|
public long insert(final UUID guid, final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) {
|
||||||
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build();
|
||||||
return (long)insertTimer.record(() ->
|
return (long) insertTimer.record(() ->
|
||||||
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
insertScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||||
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
||||||
getQueueIndexKey(destinationUuid, destinationDevice)),
|
getQueueIndexKey(destinationUuid, destinationDevice)),
|
||||||
List.of(messageWithGuid.toByteArray(),
|
List.of(messageWithGuid.toByteArray(),
|
||||||
String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8),
|
String.valueOf(message.getTimestamp()).getBytes(StandardCharsets.UTF_8),
|
||||||
guid.toString().getBytes(StandardCharsets.UTF_8))));
|
guid.toString().getBytes(StandardCharsets.UTF_8))));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void insertEphemeral(final UUID destinationUuid, final long destinationDevice, final MessageProtos.Envelope message) {
|
public void insertEphemeral(final UUID destinationUuid, final long destinationDevice,
|
||||||
insertEphemeralTimer.record(() -> {
|
final MessageProtos.Envelope message) {
|
||||||
final byte[] ephemeralQueueKey = getEphemeralMessageQueueKey(destinationUuid, destinationDevice);
|
|
||||||
|
|
||||||
insertCluster.useBinaryCluster(connection -> {
|
final MessageProtos.Envelope messageWithGuid;
|
||||||
connection.sync().rpush(ephemeralQueueKey, message.toByteArray());
|
|
||||||
connection.sync().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds());
|
if (!message.hasServerGuid()) {
|
||||||
});
|
messageWithGuid = message.toBuilder().setServerGuid(UUID.randomUUID().toString()).build();
|
||||||
});
|
} else {
|
||||||
|
messageWithGuid = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice, final UUID messageGuid) {
|
insertEphemeralTimer.record(() -> {
|
||||||
return remove(destinationUuid, destinationDevice, List.of(messageGuid)).stream().findFirst();
|
final byte[] ephemeralQueueKey = getEphemeralMessageQueueKey(destinationUuid, destinationDevice);
|
||||||
|
|
||||||
|
insertCluster.useBinaryCluster(connection -> {
|
||||||
|
connection.sync().rpush(ephemeralQueueKey, messageWithGuid.toByteArray());
|
||||||
|
connection.sync().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds());
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice,
|
||||||
|
final UUID messageGuid) {
|
||||||
|
return remove(destinationUuid, destinationDevice, List.of(messageGuid)).stream().findFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public List<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice,
|
||||||
|
final List<UUID> messageGuids) {
|
||||||
|
final List<byte[]> serialized = (List<byte[]>) Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG,
|
||||||
|
REMOVE_METHOD_UUID).record(() ->
|
||||||
|
removeByGuidScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
||||||
|
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
||||||
|
getQueueIndexKey(destinationUuid, destinationDevice)),
|
||||||
|
messageGuids.stream().map(guid -> guid.toString().getBytes(StandardCharsets.UTF_8))
|
||||||
|
.collect(Collectors.toList())));
|
||||||
|
|
||||||
|
final List<OutgoingMessageEntity> removedMessages = new ArrayList<>(serialized.size());
|
||||||
|
|
||||||
|
for (final byte[] bytes : serialized) {
|
||||||
|
try {
|
||||||
|
removedMessages.add(constructEntityFromEnvelope(0, MessageProtos.Envelope.parseFrom(bytes)));
|
||||||
|
} catch (final InvalidProtocolBufferException e) {
|
||||||
|
logger.warn("Failed to parse envelope", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
return removedMessages;
|
||||||
public List<OutgoingMessageEntity> remove(final UUID destinationUuid, final long destinationDevice, final List<UUID> messageGuids) {
|
}
|
||||||
final List<byte[]> serialized = (List<byte[]>)Metrics.timer(REMOVE_TIMER_NAME, REMOVE_METHOD_TAG, REMOVE_METHOD_UUID).record(() ->
|
|
||||||
removeByGuidScript.executeBinary(List.of(getMessageQueueKey(destinationUuid, destinationDevice),
|
|
||||||
getMessageQueueMetadataKey(destinationUuid, destinationDevice),
|
|
||||||
getQueueIndexKey(destinationUuid, destinationDevice)),
|
|
||||||
messageGuids.stream().map(guid -> guid.toString().getBytes(StandardCharsets.UTF_8)).collect(Collectors.toList())));
|
|
||||||
|
|
||||||
final List<OutgoingMessageEntity> removedMessages = new ArrayList<>(serialized.size());
|
|
||||||
|
|
||||||
for (final byte[] bytes : serialized) {
|
|
||||||
try {
|
|
||||||
removedMessages.add(constructEntityFromEnvelope(0, MessageProtos.Envelope.parseFrom(bytes)));
|
|
||||||
} catch (final InvalidProtocolBufferException e) {
|
|
||||||
logger.warn("Failed to parse envelope", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return removedMessages;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasMessages(final UUID destinationUuid, final long destinationDevice) {
|
public boolean hasMessages(final UUID destinationUuid, final long destinationDevice) {
|
||||||
return readDeleteCluster.withBinaryCluster(connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0);
|
return readDeleteCluster.withBinaryCluster(connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0);
|
||||||
|
@ -223,40 +237,43 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
|
|
||||||
for (final ScoredValue<byte[]> scoredMessage : scoredMessages) {
|
for (final ScoredValue<byte[]> scoredMessage : scoredMessages) {
|
||||||
try {
|
try {
|
||||||
envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue()));
|
envelopes.add(MessageProtos.Envelope.parseFrom(scoredMessage.getValue()));
|
||||||
} catch (InvalidProtocolBufferException e) {
|
} catch (InvalidProtocolBufferException e) {
|
||||||
logger.warn("Failed to parse envelope", e);
|
logger.warn("Failed to parse envelope", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return envelopes;
|
return envelopes;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<MessageProtos.Envelope> takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice) {
|
public Optional<MessageProtos.Envelope> takeEphemeralMessage(final UUID destinationUuid,
|
||||||
return takeEphemeralMessage(destinationUuid, destinationDevice, System.currentTimeMillis());
|
final long destinationDevice) {
|
||||||
}
|
return takeEphemeralMessage(destinationUuid, destinationDevice, System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Optional<MessageProtos.Envelope> takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice, final long currentTimeMillis) {
|
Optional<MessageProtos.Envelope> takeEphemeralMessage(final UUID destinationUuid, final long destinationDevice,
|
||||||
final long earliestAllowableTimestamp = currentTimeMillis - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis();
|
final long currentTimeMillis) {
|
||||||
|
final long earliestAllowableTimestamp = currentTimeMillis - MAX_EPHEMERAL_MESSAGE_DELAY.toMillis();
|
||||||
|
|
||||||
return takeEphemeralMessageTimer.record(() -> readDeleteCluster.withBinaryCluster(connection -> {
|
return takeEphemeralMessageTimer.record(() -> readDeleteCluster.withBinaryCluster(connection -> {
|
||||||
byte[] messageBytes;
|
byte[] messageBytes;
|
||||||
|
|
||||||
while ((messageBytes = connection.sync().lpop(getEphemeralMessageQueueKey(destinationUuid, destinationDevice))) != null) {
|
while ((messageBytes = connection.sync().lpop(getEphemeralMessageQueueKey(destinationUuid, destinationDevice)))
|
||||||
try {
|
!= null) {
|
||||||
final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(messageBytes);
|
try {
|
||||||
|
final MessageProtos.Envelope message = MessageProtos.Envelope.parseFrom(messageBytes);
|
||||||
|
|
||||||
if (message.getTimestamp() >= earliestAllowableTimestamp) {
|
if (message.getTimestamp() >= earliestAllowableTimestamp) {
|
||||||
return Optional.of(message);
|
return Optional.of(message);
|
||||||
}
|
}
|
||||||
} catch (final InvalidProtocolBufferException e) {
|
} catch (final InvalidProtocolBufferException e) {
|
||||||
logger.warn("Failed to parse envelope", e);
|
logger.warn("Failed to parse envelope", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2013-2020 Signal Messenger, LLC
|
* Copyright 2013-2021 Signal Messenger, LLC
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
*/
|
*/
|
||||||
package org.whispersystems.textsecuregcm.storage;
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
@ -25,9 +25,13 @@ public class MessagesManager {
|
||||||
|
|
||||||
private static final int RESULT_SET_CHUNK_SIZE = 100;
|
private static final int RESULT_SET_CHUNK_SIZE = 100;
|
||||||
|
|
||||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||||
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
|
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid"));
|
||||||
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
|
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(
|
||||||
|
name(MessagesManager.class, "cacheMissByGuid"));
|
||||||
|
|
||||||
|
// migrated from MessagePersister, name is not a typo
|
||||||
|
private final Meter persistMessageMeter = metricRegistry.meter(name(MessagePersister.class, "persistMessage"));
|
||||||
|
|
||||||
private final MessagesDynamoDb messagesDynamoDb;
|
private final MessagesDynamoDb messagesDynamoDb;
|
||||||
private final MessagesCache messagesCache;
|
private final MessagesCache messagesCache;
|
||||||
|
@ -110,8 +114,16 @@ public class MessagesManager {
|
||||||
final UUID destinationUuid,
|
final UUID destinationUuid,
|
||||||
final long destinationDeviceId,
|
final long destinationDeviceId,
|
||||||
final List<Envelope> messages) {
|
final List<Envelope> messages) {
|
||||||
messagesDynamoDb.store(messages, destinationUuid, destinationDeviceId);
|
|
||||||
messagesCache.remove(destinationUuid, destinationDeviceId, messages.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()));
|
final List<Envelope> nonEphemeralMessages = messages.stream()
|
||||||
|
.filter(envelope -> !envelope.getEphemeral())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
messagesDynamoDb.store(nonEphemeralMessages, destinationUuid, destinationDeviceId);
|
||||||
|
messagesCache.remove(destinationUuid, destinationDeviceId,
|
||||||
|
messages.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()));
|
||||||
|
|
||||||
|
persistMessageMeter.mark(nonEphemeralMessages.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addMessageAvailabilityListener(
|
public void addMessageAvailabilityListener(
|
||||||
|
|
|
@ -157,7 +157,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<WebSocketResponseMessage> sendMessage(final Envelope message, final Optional<StoredMessageInfo> storedMessageInfo) {
|
private CompletableFuture<WebSocketResponseMessage> sendMessage(final Envelope message, final Optional<StoredMessageInfo> storedMessageInfo) {
|
||||||
final Optional<byte[]> body = Optional.ofNullable(message.toByteArray());
|
// clear ephemeral field from the envelope
|
||||||
|
final Optional<byte[]> body = Optional.ofNullable(message.toBuilder().clearEphemeral().build().toByteArray());
|
||||||
|
|
||||||
sendMessageMeter.mark();
|
sendMessageMeter.mark();
|
||||||
sentMessageCounter.increment();
|
sentMessageCounter.increment();
|
||||||
|
@ -165,19 +166,20 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||||
|
|
||||||
// X-Signal-Key: false must be sent until Android stops assuming it missing means true
|
// X-Signal-Key: false must be sent until Android stops assuming it missing means true
|
||||||
return client.sendRequest("PUT", "/api/v1/message", List.of("X-Signal-Key: false", TimestampHeaderUtil.getTimestampHeader()), body).whenComplete((response, throwable) -> {
|
return client.sendRequest("PUT", "/api/v1/message", List.of("X-Signal-Key: false", TimestampHeaderUtil.getTimestampHeader()), body).whenComplete((response, throwable) -> {
|
||||||
if (throwable == null) {
|
if (throwable == null) {
|
||||||
if (isSuccessResponse(response)) {
|
if (isSuccessResponse(response)) {
|
||||||
if (storedMessageInfo.isPresent()) {
|
if (storedMessageInfo.isPresent()) {
|
||||||
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), storedMessageInfo.get().getGuid());
|
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), storedMessageInfo.get().getGuid());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) {
|
if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) {
|
||||||
recordMessageDeliveryDuration(message.getTimestamp(), device);
|
recordMessageDeliveryDuration(message.getTimestamp(), device);
|
||||||
sendDeliveryReceiptFor(message);
|
sendDeliveryReceiptFor(message);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
final List<Tag> tags = new ArrayList<>(List.of(Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())),
|
final List<Tag> tags = new ArrayList<>(
|
||||||
UserAgentTagUtil.getPlatformTag(client.getUserAgent())));
|
List.of(Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())),
|
||||||
|
UserAgentTagUtil.getPlatformTag(client.getUserAgent())));
|
||||||
|
|
||||||
// TODO Remove this once we've identified the cause of message rejections from desktop clients
|
// TODO Remove this once we've identified the cause of message rejections from desktop clients
|
||||||
if (StringUtils.isNotBlank(response.getMessage())) {
|
if (StringUtils.isNotBlank(response.getMessage())) {
|
||||||
|
|
Loading…
Reference in New Issue