Use destination service ID from the envelope when looking up in shared MRM data
This commit is contained in:
parent
1c617284f3
commit
f60c9f2a15
|
@ -41,6 +41,7 @@ import java.util.function.Predicate;
|
|||
import javax.annotation.Nullable;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
|
||||
import org.signal.libsignal.protocol.ServiceId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
|
@ -48,6 +49,7 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesCon
|
|||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.experiment.Experiment;
|
||||
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
|
||||
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
|
@ -431,8 +433,9 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
|||
final Experiment experiment = new Experiment(MRM_VIEWS_EXPERIMENT_NAME);
|
||||
|
||||
final byte[] key = mrmMessage.getSharedMrmKey().toByteArray();
|
||||
final byte[] sharedMrmViewKey = MessagesCache.getSharedMrmViewKey(new AciServiceIdentifier(destinationUuid),
|
||||
destinationDevice);
|
||||
final byte[] sharedMrmViewKey = MessagesCache.getSharedMrmViewKey(
|
||||
// the message might be addressed to the account's PNI, so use the service ID from the envelope
|
||||
ServiceIdentifier.valueOf(mrmMessage.getDestinationServiceId()), destinationDevice);
|
||||
|
||||
final Mono<MessageProtos.Envelope> mrmMessageMono = Mono.from(redisCluster.withBinaryClusterReactive(
|
||||
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
|
||||
|
@ -786,9 +789,19 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
|||
return ("user_queue_persisting::{" + accountUuid + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
static byte[] getSharedMrmViewKey(final AciServiceIdentifier serviceIdentifier, final byte deviceId) {
|
||||
static byte[] getSharedMrmViewKey(final ServiceId serviceId, final byte deviceId) {
|
||||
return getSharedMrmViewKey(serviceId.toServiceIdFixedWidthBinary(), deviceId);
|
||||
}
|
||||
|
||||
static byte[] getSharedMrmViewKey(final ServiceIdentifier serviceIdentifier, final byte deviceId) {
|
||||
return getSharedMrmViewKey(serviceIdentifier.toFixedWidthByteArray(), deviceId);
|
||||
}
|
||||
|
||||
private static byte[] getSharedMrmViewKey(final byte[] fixedWithServiceId, final byte deviceId) {
|
||||
assert fixedWithServiceId.length == 17;
|
||||
|
||||
final ByteBuffer keyBb = ByteBuffer.allocate(18);
|
||||
keyBb.put(serviceIdentifier.toFixedWidthByteArray());
|
||||
keyBb.put(fixedWithServiceId);
|
||||
keyBb.put(deviceId);
|
||||
assert !keyBb.hasRemaining();
|
||||
return keyBb.array();
|
||||
|
|
|
@ -42,10 +42,7 @@ class MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript {
|
|||
|
||||
message.getRecipients().forEach((serviceId, recipient) -> {
|
||||
for (byte device : recipient.getDevices()) {
|
||||
final byte[] key = new byte[18];
|
||||
System.arraycopy(serviceId.toServiceIdFixedWidthBinary(), 0, key, 0, 17);
|
||||
key[17] = device;
|
||||
args.add(key);
|
||||
args.add(MessagesCache.getSharedMrmViewKey(serviceId, device));
|
||||
args.add(message.serializedRecipientView(recipient));
|
||||
}
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue