Always store and fetch shared MRM data
This commit is contained in:
parent
d53a6e4c42
commit
ee5df0e11c
|
@ -5,10 +5,9 @@
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.configuration.dynamic;
|
package org.whispersystems.textsecuregcm.configuration.dynamic;
|
||||||
|
|
||||||
public record DynamicMessagesConfiguration(boolean storeSharedMrmData, boolean fetchSharedMrmData,
|
public record DynamicMessagesConfiguration(boolean useSharedMrmData) {
|
||||||
boolean useSharedMrmData) {
|
|
||||||
|
|
||||||
public DynamicMessagesConfiguration() {
|
public DynamicMessagesConfiguration() {
|
||||||
this(false, false, false);
|
this(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -655,10 +655,7 @@ public class MessageController {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@Nullable final byte[] sharedMrmKey =
|
final byte[] sharedMrmKey = messagesManager.insertSharedMultiRecipientMessagePayload(multiRecipientMessage);
|
||||||
dynamicConfigurationManager.getConfiguration().getMessagesConfiguration().storeSharedMrmData()
|
|
||||||
? messagesManager.insertSharedMultiRecipientMessagePayload(multiRecipientMessage)
|
|
||||||
: null;
|
|
||||||
|
|
||||||
CompletableFuture.allOf(
|
CompletableFuture.allOf(
|
||||||
recipients.values().stream()
|
recipients.values().stream()
|
||||||
|
@ -941,7 +938,7 @@ public class MessageController {
|
||||||
boolean story,
|
boolean story,
|
||||||
boolean urgent,
|
boolean urgent,
|
||||||
byte[] payload,
|
byte[] payload,
|
||||||
@Nullable byte[] sharedMrmKey) {
|
byte[] sharedMrmKey) {
|
||||||
|
|
||||||
final Envelope.Builder messageBuilder = Envelope.newBuilder();
|
final Envelope.Builder messageBuilder = Envelope.newBuilder();
|
||||||
final long serverTimestamp = System.currentTimeMillis();
|
final long serverTimestamp = System.currentTimeMillis();
|
||||||
|
@ -952,12 +949,10 @@ public class MessageController {
|
||||||
.setServerTimestamp(serverTimestamp)
|
.setServerTimestamp(serverTimestamp)
|
||||||
.setStory(story)
|
.setStory(story)
|
||||||
.setUrgent(urgent)
|
.setUrgent(urgent)
|
||||||
.setDestinationServiceId(serviceIdentifier.toServiceIdentifierString());
|
.setDestinationServiceId(serviceIdentifier.toServiceIdentifierString())
|
||||||
|
.setSharedMrmKey(ByteString.copyFrom(sharedMrmKey));
|
||||||
|
|
||||||
if (sharedMrmKey != null) {
|
// mrm views phase 3: always set content
|
||||||
messageBuilder.setSharedMrmKey(ByteString.copyFrom(sharedMrmKey));
|
|
||||||
}
|
|
||||||
// mrm views phase 2: always set content
|
|
||||||
messageBuilder.setContent(ByteString.copyFrom(payload));
|
messageBuilder.setContent(ByteString.copyFrom(payload));
|
||||||
|
|
||||||
messageSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build(), online);
|
messageSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build(), online);
|
||||||
|
|
|
@ -370,8 +370,8 @@ public class MessagesCache {
|
||||||
messageMono = Mono.just(message.toBuilder().clearSharedMrmKey().build());
|
messageMono = Mono.just(message.toBuilder().clearSharedMrmKey().build());
|
||||||
skippedStaleEphemeralMrmCounter.increment();
|
skippedStaleEphemeralMrmCounter.increment();
|
||||||
} else {
|
} else {
|
||||||
// mrm views phase 2: fetch shared MRM data -- internally depends on dynamic config that
|
// mrm views phase 3: fetch shared MRM data -- internally depends on dynamic config that
|
||||||
// enables fetching and using it (the stored messages still always have `content` set upstream)
|
// enables using it (the stored messages still always have `content` set upstream)
|
||||||
messageMono = getMessageWithSharedMrmData(message, destinationDevice);
|
messageMono = getMessageWithSharedMrmData(message, destinationDevice);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,7 +393,6 @@ public class MessagesCache {
|
||||||
/**
|
/**
|
||||||
* Returns the given message with its shared MRM data.
|
* Returns the given message with its shared MRM data.
|
||||||
*
|
*
|
||||||
* @see DynamicMessagesConfiguration#fetchSharedMrmData()
|
|
||||||
* @see DynamicMessagesConfiguration#useSharedMrmData()
|
* @see DynamicMessagesConfiguration#useSharedMrmData()
|
||||||
*/
|
*/
|
||||||
private Mono<MessageProtos.Envelope> getMessageWithSharedMrmData(final MessageProtos.Envelope mrmMessage,
|
private Mono<MessageProtos.Envelope> getMessageWithSharedMrmData(final MessageProtos.Envelope mrmMessage,
|
||||||
|
@ -406,53 +405,49 @@ public class MessagesCache {
|
||||||
mrmPhaseTwoMissingContentCounter.increment();
|
mrmPhaseTwoMissingContentCounter.increment();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration().fetchSharedMrmData()
|
final Experiment experiment = new Experiment(MRM_VIEWS_EXPERIMENT_NAME);
|
||||||
|
|
||||||
|
final byte[] key = mrmMessage.getSharedMrmKey().toByteArray();
|
||||||
|
final byte[] sharedMrmViewKey = MessagesCache.getSharedMrmViewKey(
|
||||||
|
// the message might be addressed to the account's PNI, so use the service ID from the envelope
|
||||||
|
ServiceIdentifier.valueOf(mrmMessage.getDestinationServiceId()), destinationDevice);
|
||||||
|
|
||||||
|
final Mono<MessageProtos.Envelope> messageFromRedisMono = Mono.from(redisCluster.withBinaryClusterReactive(
|
||||||
|
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
|
||||||
|
.collectList()
|
||||||
|
.publishOn(messageDeliveryScheduler)))
|
||||||
|
.<MessageProtos.Envelope>handle((mrmDataAndView, sink) -> {
|
||||||
|
try {
|
||||||
|
assert mrmDataAndView.size() == 2;
|
||||||
|
|
||||||
|
final byte[] content = SealedSenderMultiRecipientMessage.messageForRecipient(
|
||||||
|
mrmDataAndView.getFirst().getValue(),
|
||||||
|
mrmDataAndView.getLast().getValue());
|
||||||
|
|
||||||
|
sink.next(mrmMessage.toBuilder()
|
||||||
|
.clearSharedMrmKey()
|
||||||
|
.setContent(ByteString.copyFrom(content))
|
||||||
|
.build());
|
||||||
|
|
||||||
|
mrmContentRetrievedCounter.increment();
|
||||||
|
} catch (Exception e) {
|
||||||
|
sink.error(e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.onErrorResume(throwable -> {
|
||||||
|
logger.warn("Failed to retrieve shared mrm data", throwable);
|
||||||
|
mrmRetrievalErrorCounter.increment();
|
||||||
|
return Mono.empty();
|
||||||
|
})
|
||||||
|
.share();
|
||||||
|
|
||||||
|
if (mrmMessage.hasContent()) {
|
||||||
|
experiment.compareMonoResult(mrmMessage.toBuilder().clearSharedMrmKey().build(), messageFromRedisMono);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration().useSharedMrmData()
|
||||||
|| !mrmMessage.hasContent()) {
|
|| !mrmMessage.hasContent()) {
|
||||||
|
return messageFromRedisMono;
|
||||||
final Experiment experiment = new Experiment(MRM_VIEWS_EXPERIMENT_NAME);
|
|
||||||
|
|
||||||
final byte[] key = mrmMessage.getSharedMrmKey().toByteArray();
|
|
||||||
final byte[] sharedMrmViewKey = MessagesCache.getSharedMrmViewKey(
|
|
||||||
// the message might be addressed to the account's PNI, so use the service ID from the envelope
|
|
||||||
ServiceIdentifier.valueOf(mrmMessage.getDestinationServiceId()), destinationDevice);
|
|
||||||
|
|
||||||
final Mono<MessageProtos.Envelope> messageFromRedisMono = Mono.from(redisCluster.withBinaryClusterReactive(
|
|
||||||
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
|
|
||||||
.collectList()
|
|
||||||
.publishOn(messageDeliveryScheduler)))
|
|
||||||
.<MessageProtos.Envelope>handle((mrmDataAndView, sink) -> {
|
|
||||||
try {
|
|
||||||
assert mrmDataAndView.size() == 2;
|
|
||||||
|
|
||||||
final byte[] content = SealedSenderMultiRecipientMessage.messageForRecipient(
|
|
||||||
mrmDataAndView.getFirst().getValue(),
|
|
||||||
mrmDataAndView.getLast().getValue());
|
|
||||||
|
|
||||||
sink.next(mrmMessage.toBuilder()
|
|
||||||
.clearSharedMrmKey()
|
|
||||||
.setContent(ByteString.copyFrom(content))
|
|
||||||
.build());
|
|
||||||
|
|
||||||
mrmContentRetrievedCounter.increment();
|
|
||||||
} catch (Exception e) {
|
|
||||||
sink.error(e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.onErrorResume(throwable -> {
|
|
||||||
logger.warn("Failed to retrieve shared mrm data", throwable);
|
|
||||||
mrmRetrievalErrorCounter.increment();
|
|
||||||
return Mono.empty();
|
|
||||||
})
|
|
||||||
.share();
|
|
||||||
|
|
||||||
if (mrmMessage.hasContent()) {
|
|
||||||
experiment.compareMonoResult(mrmMessage.toBuilder().clearSharedMrmKey().build(), messageFromRedisMono);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration().useSharedMrmData()
|
|
||||||
|| !mrmMessage.hasContent()) {
|
|
||||||
return messageFromRedisMono;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if fetching or using shared data is disabled, fallback to just() with the existing message
|
// if fetching or using shared data is disabled, fallback to just() with the existing message
|
||||||
|
|
|
@ -82,12 +82,12 @@ import org.junit.jupiter.params.provider.ValueSource;
|
||||||
import org.junitpioneer.jupiter.cartesian.ArgumentSets;
|
import org.junitpioneer.jupiter.cartesian.ArgumentSets;
|
||||||
import org.junitpioneer.jupiter.cartesian.CartesianTest;
|
import org.junitpioneer.jupiter.cartesian.CartesianTest;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
|
||||||
import org.signal.libsignal.zkgroup.ServerSecretParams;
|
import org.signal.libsignal.zkgroup.ServerSecretParams;
|
||||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
|
import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
|
||||||
import org.whispersystems.textsecuregcm.auth.UnidentifiedAccessUtil;
|
import org.whispersystems.textsecuregcm.auth.UnidentifiedAccessUtil;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicInboundMessageByteLimitConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicInboundMessageByteLimitConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesConfiguration;
|
|
||||||
import org.whispersystems.textsecuregcm.entities.AccountMismatchedDevices;
|
import org.whispersystems.textsecuregcm.entities.AccountMismatchedDevices;
|
||||||
import org.whispersystems.textsecuregcm.entities.AccountStaleDevices;
|
import org.whispersystems.textsecuregcm.entities.AccountStaleDevices;
|
||||||
import org.whispersystems.textsecuregcm.entities.IncomingMessage;
|
import org.whispersystems.textsecuregcm.entities.IncomingMessage;
|
||||||
|
@ -252,8 +252,6 @@ class MessageControllerTest {
|
||||||
|
|
||||||
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
|
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
|
||||||
when(dynamicConfiguration.getInboundMessageByteLimitConfiguration()).thenReturn(inboundMessageByteLimitConfiguration);
|
when(dynamicConfiguration.getInboundMessageByteLimitConfiguration()).thenReturn(inboundMessageByteLimitConfiguration);
|
||||||
when(dynamicConfiguration.getMessagesConfiguration()).thenReturn(
|
|
||||||
new DynamicMessagesConfiguration(true, true, true));
|
|
||||||
|
|
||||||
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
|
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
|
||||||
|
|
||||||
|
@ -1141,6 +1139,10 @@ class MessageControllerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testManyRecipientMessage() throws Exception {
|
void testManyRecipientMessage() throws Exception {
|
||||||
|
|
||||||
|
when(messagesManager.insertSharedMultiRecipientMessagePayload(any(SealedSenderMultiRecipientMessage.class)))
|
||||||
|
.thenReturn(new byte[]{1});
|
||||||
|
|
||||||
final int nRecipients = 999;
|
final int nRecipients = 999;
|
||||||
final int devicesPerRecipient = 5;
|
final int devicesPerRecipient = 5;
|
||||||
final List<Recipient> recipients = new ArrayList<>();
|
final List<Recipient> recipients = new ArrayList<>();
|
||||||
|
@ -1152,8 +1154,8 @@ class MessageControllerTest {
|
||||||
d -> generateTestDevice(
|
d -> generateTestDevice(
|
||||||
(byte) d, 100 + d, 10 * d, true))
|
(byte) d, 100 + d, 10 * d, true))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
final UUID aci = new UUID(0L, (long) i);
|
final UUID aci = new UUID(0L, i);
|
||||||
final UUID pni = new UUID(1L, (long) i);
|
final UUID pni = new UUID(1L, i);
|
||||||
final String e164 = String.format("+1408555%04d", i);
|
final String e164 = String.format("+1408555%04d", i);
|
||||||
final Account account = AccountsHelper.generateTestAccount(e164, aci, pni, devices, UNIDENTIFIED_ACCESS_BYTES);
|
final Account account = AccountsHelper.generateTestAccount(e164, aci, pni, devices, UNIDENTIFIED_ACCESS_BYTES);
|
||||||
|
|
||||||
|
@ -1186,13 +1188,12 @@ class MessageControllerTest {
|
||||||
|
|
||||||
// see testMultiRecipientMessageNoPni and testMultiRecipientMessagePni below for actual invocations
|
// see testMultiRecipientMessageNoPni and testMultiRecipientMessagePni below for actual invocations
|
||||||
private void testMultiRecipientMessage(
|
private void testMultiRecipientMessage(
|
||||||
Map<ServiceIdentifier, Map<Byte, Integer>> destinations,
|
Map<ServiceIdentifier, Map<Byte, Integer>> destinations, boolean authorize, boolean isStory, boolean urgent,
|
||||||
boolean authorize,
|
boolean explicitIdentifier, int expectedStatus, int expectedMessagesSent) throws Exception {
|
||||||
boolean isStory,
|
|
||||||
boolean urgent,
|
when(messagesManager.insertSharedMultiRecipientMessagePayload(any(SealedSenderMultiRecipientMessage.class)))
|
||||||
boolean explicitIdentifier,
|
.thenReturn(new byte[]{1});
|
||||||
int expectedStatus,
|
|
||||||
int expectedMessagesSent) throws Exception {
|
|
||||||
final List<Recipient> recipients = new ArrayList<>();
|
final List<Recipient> recipients = new ArrayList<>();
|
||||||
destinations.forEach(
|
destinations.forEach(
|
||||||
(serviceIdentifier, deviceToRegistrationId) ->
|
(serviceIdentifier, deviceToRegistrationId) ->
|
||||||
|
@ -1383,6 +1384,10 @@ class MessageControllerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testMultiRecipientMessageWithGroupSendEndorsements() throws Exception {
|
void testMultiRecipientMessageWithGroupSendEndorsements() throws Exception {
|
||||||
|
|
||||||
|
when(messagesManager.insertSharedMultiRecipientMessagePayload(any(SealedSenderMultiRecipientMessage.class)))
|
||||||
|
.thenReturn(new byte[]{1});
|
||||||
|
|
||||||
final List<Recipient> recipients = List.of(
|
final List<Recipient> recipients = List.of(
|
||||||
new Recipient(SINGLE_DEVICE_ACI_ID, SINGLE_DEVICE_ID1, SINGLE_DEVICE_REG_ID1, new byte[48]),
|
new Recipient(SINGLE_DEVICE_ACI_ID, SINGLE_DEVICE_ID1, SINGLE_DEVICE_REG_ID1, new byte[48]),
|
||||||
new Recipient(MULTI_DEVICE_ACI_ID, MULTI_DEVICE_ID1, MULTI_DEVICE_REG_ID1, new byte[48]),
|
new Recipient(MULTI_DEVICE_ACI_ID, MULTI_DEVICE_ID1, MULTI_DEVICE_REG_ID1, new byte[48]),
|
||||||
|
@ -1550,6 +1555,9 @@ class MessageControllerTest {
|
||||||
@MethodSource
|
@MethodSource
|
||||||
void testSendMultiRecipientMessageToUnknownAccounts(boolean story, boolean known, boolean useExplicitIdentifier) {
|
void testSendMultiRecipientMessageToUnknownAccounts(boolean story, boolean known, boolean useExplicitIdentifier) {
|
||||||
|
|
||||||
|
when(messagesManager.insertSharedMultiRecipientMessagePayload(any(SealedSenderMultiRecipientMessage.class)))
|
||||||
|
.thenReturn(new byte[]{1});
|
||||||
|
|
||||||
final Recipient r1;
|
final Recipient r1;
|
||||||
if (known) {
|
if (known) {
|
||||||
r1 = new Recipient(SINGLE_DEVICE_ACI_ID, SINGLE_DEVICE_ID1, SINGLE_DEVICE_REG_ID1, new byte[48]);
|
r1 = new Recipient(SINGLE_DEVICE_ACI_ID, SINGLE_DEVICE_ID1, SINGLE_DEVICE_REG_ID1, new byte[48]);
|
||||||
|
|
|
@ -109,7 +109,7 @@ class MessagesCacheTest {
|
||||||
void setUp() throws Exception {
|
void setUp() throws Exception {
|
||||||
dynamicConfiguration = mock(DynamicConfiguration.class);
|
dynamicConfiguration = mock(DynamicConfiguration.class);
|
||||||
when(dynamicConfiguration.getMessagesConfiguration()).thenReturn(
|
when(dynamicConfiguration.getMessagesConfiguration()).thenReturn(
|
||||||
new DynamicMessagesConfiguration(true, true, true));
|
new DynamicMessagesConfiguration(true));
|
||||||
dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
|
dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
|
||||||
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
|
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
|
||||||
|
|
||||||
|
@ -434,7 +434,7 @@ class MessagesCacheTest {
|
||||||
@CartesianTest.Values(booleans = {true, false}) final boolean useSharedMrmData) {
|
@CartesianTest.Values(booleans = {true, false}) final boolean useSharedMrmData) {
|
||||||
|
|
||||||
when(dynamicConfiguration.getMessagesConfiguration())
|
when(dynamicConfiguration.getMessagesConfiguration())
|
||||||
.thenReturn(new DynamicMessagesConfiguration(true, true, useSharedMrmData));
|
.thenReturn(new DynamicMessagesConfiguration(useSharedMrmData));
|
||||||
|
|
||||||
final ServiceIdentifier destinationServiceId = new AciServiceIdentifier(UUID.randomUUID());
|
final ServiceIdentifier destinationServiceId = new AciServiceIdentifier(UUID.randomUUID());
|
||||||
final byte deviceId = 1;
|
final byte deviceId = 1;
|
||||||
|
@ -493,13 +493,12 @@ class MessagesCacheTest {
|
||||||
}, "Shared MRM data should be deleted asynchronously");
|
}, "Shared MRM data should be deleted asynchronously");
|
||||||
}
|
}
|
||||||
|
|
||||||
@CartesianTest
|
@ParameterizedTest
|
||||||
void testMultiRecipientMessagePhase2MissingContentSafeguard(
|
@ValueSource(booleans = {true, false})
|
||||||
@CartesianTest.Values(booleans = {true, false}) final boolean useSharedMrmData,
|
void testMultiRecipientMessagePhase2MissingContentSafeguard(final boolean useSharedMrmData) {
|
||||||
@CartesianTest.Values(booleans = {true, false}) final boolean fetchSharedMrmData) {
|
|
||||||
|
|
||||||
when(dynamicConfiguration.getMessagesConfiguration())
|
when(dynamicConfiguration.getMessagesConfiguration())
|
||||||
.thenReturn(new DynamicMessagesConfiguration(true, fetchSharedMrmData, useSharedMrmData));
|
.thenReturn(new DynamicMessagesConfiguration(useSharedMrmData));
|
||||||
|
|
||||||
final ServiceIdentifier destinationServiceId = new AciServiceIdentifier(UUID.randomUUID());
|
final ServiceIdentifier destinationServiceId = new AciServiceIdentifier(UUID.randomUUID());
|
||||||
final byte deviceId = 1;
|
final byte deviceId = 1;
|
||||||
|
@ -554,7 +553,7 @@ class MessagesCacheTest {
|
||||||
@CartesianTest.Values(booleans = {true, false}) final boolean useSharedMrmData) {
|
@CartesianTest.Values(booleans = {true, false}) final boolean useSharedMrmData) {
|
||||||
|
|
||||||
when(dynamicConfiguration.getMessagesConfiguration())
|
when(dynamicConfiguration.getMessagesConfiguration())
|
||||||
.thenReturn(new DynamicMessagesConfiguration(true, true, useSharedMrmData));
|
.thenReturn(new DynamicMessagesConfiguration(useSharedMrmData));
|
||||||
|
|
||||||
final UUID destinationUuid = UUID.randomUUID();
|
final UUID destinationUuid = UUID.randomUUID();
|
||||||
final ServiceIdentifier destinationServiceId = new AciServiceIdentifier(destinationUuid);
|
final ServiceIdentifier destinationServiceId = new AciServiceIdentifier(destinationUuid);
|
||||||
|
|
Loading…
Reference in New Issue