diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java index bedcd120c..0c8515624 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java @@ -145,7 +145,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter displacedListener = new AtomicReference<>(); final AtomicReference> subscribeFuture = new AtomicReference<>(); @@ -216,7 +216,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter { unsubscribeFuture.set(pubSubConnection.withPubSubConnection(connection -> - connection.async().sunsubscribe(getClientPresenceKey(accountIdentifier, deviceId))) + connection.async().sunsubscribe(getClientEventChannel(accountIdentifier, deviceId))) .thenRun(Util.NOOP)); return null; @@ -245,7 +245,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter - connection.async().spublish(getClientPresenceKey(accountIdentifier, deviceId), NEW_MESSAGE_EVENT_BYTES)) + connection.async().spublish(getClientEventChannel(accountIdentifier, deviceId), NEW_MESSAGE_EVENT_BYTES)) .thenApply(listeners -> listeners > 0); } @@ -264,7 +264,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter - connection.async().spublish(getClientPresenceKey(accountIdentifier, deviceId), MESSAGES_PERSISTED_EVENT_BYTES)) + connection.async().spublish(getClientEventChannel(accountIdentifier, deviceId), MESSAGES_PERSISTED_EVENT_BYTES)) .thenRun(Util.NOOP); } @@ -305,7 +305,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter requestDisconnection(final UUID accountIdentifier, final Collection deviceIds) { return CompletableFuture.allOf(deviceIds.stream() .map(deviceId -> { - final byte[] clientPresenceKey = getClientPresenceKey(accountIdentifier, deviceId); + final byte[] clientPresenceKey = getClientEventChannel(accountIdentifier, deviceId); return clusterClient.withBinaryCluster(connection -> connection.async() .spublish(clientPresenceKey, DISCONNECT_REQUESTED_EVENT_BYTES)) @@ -323,12 +323,12 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter getClientPresenceKey(accountAndDeviceIdentifier.accountIdentifier(), accountAndDeviceIdentifier.deviceId())) - .forEach(clientPresenceKey -> { - final int slot = SlotHash.getSlot(clientPresenceKey); + .map(accountAndDeviceIdentifier -> getClientEventChannel(accountAndDeviceIdentifier.accountIdentifier(), accountAndDeviceIdentifier.deviceId())) + .forEach(clientEventChannel -> { + final int slot = SlotHash.getSlot(clientEventChannel); if (changedSlots[slot]) { - clientPresenceKeysBySlot.computeIfAbsent(slot, ignored -> new ArrayList<>()).add(clientPresenceKey); + clientPresenceKeysBySlot.computeIfAbsent(slot, ignored -> new ArrayList<>()).add(clientEventChannel); } }); @@ -380,8 +380,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter keys = List.of( MessagesCache.getMessageQueueKey(destinationUuid, destinationDevice), // queueKey MessagesCache.getMessageQueueMetadataKey(destinationUuid, destinationDevice), // queueMetadataKey - MessagesCache.getQueueIndexKey(destinationUuid, destinationDevice) // queueTotalIndexKey + MessagesCache.getQueueIndexKey(destinationUuid, destinationDevice), // queueTotalIndexKey + PubSubClientEventManager.getClientEventChannel(destinationUuid, destinationDevice) // eventChannelKey ); final List args = new ArrayList<>(Arrays.asList( envelope.toByteArray(), // message String.valueOf(envelope.getServerTimestamp()).getBytes(StandardCharsets.UTF_8), // currentTime - envelope.getServerGuid().getBytes(StandardCharsets.UTF_8) // guid + envelope.getServerGuid().getBytes(StandardCharsets.UTF_8), // guid + NEW_MESSAGE_EVENT_BYTES // eventPayload )); - insertScript.executeBinary(keys, args); + return (boolean) insertScript.executeBinary(keys, args); } } diff --git a/service/src/main/resources/lua/insert_item.lua b/service/src/main/resources/lua/insert_item.lua index a5f8cf7a5..dc929c3e4 100644 --- a/service/src/main/resources/lua/insert_item.lua +++ b/service/src/main/resources/lua/insert_item.lua @@ -4,9 +4,11 @@ local queueKey = KEYS[1] -- sorted set of Envelopes for a device, by queue-local ID local queueMetadataKey = KEYS[2] -- hash of message GUID to queue-local IDs local queueTotalIndexKey = KEYS[3] -- sorted set of all queues in the shard, by timestamp of oldest message +local eventChannelKey = KEYS[4] -- pub/sub channel for message availability events local message = ARGV[1] -- [bytes] the Envelope to insert local currentTime = ARGV[2] -- [number] the message timestamp, to sort the queue in the queueTotalIndex local guid = ARGV[3] -- [string] the message GUID +local eventPayload = ARGV[4] -- [bytes] a protobuf payload for a "message available" pub/sub event if redis.call("HEXISTS", queueMetadataKey, guid) == 1 then return tonumber(redis.call("HGET", queueMetadataKey, guid)) @@ -21,4 +23,5 @@ redis.call("EXPIRE", queueKey, 3974400) -- 46 days redis.call("EXPIRE", queueMetadataKey, 3974400) -- 46 days redis.call("ZADD", queueTotalIndexKey, "NX", currentTime, queueKey) -return messageId + +return redis.call("SPUBLISH", eventChannelKey, eventPayload) > 0 diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java index fd4f89bbd..74a6e5c6a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java @@ -301,7 +301,7 @@ class PubSubClientEventManagerTest { final UUID firstAccountIdentifier = UUID.randomUUID(); final byte firstDeviceId = Device.PRIMARY_ID; - final int firstSlot = SlotHash.getSlot(PubSubClientEventManager.getClientPresenceKey(firstAccountIdentifier, firstDeviceId)); + final int firstSlot = SlotHash.getSlot(PubSubClientEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); final UUID secondAccountIdentifier; final byte secondDeviceId = firstDeviceId + 1; @@ -312,7 +312,7 @@ class PubSubClientEventManagerTest { do { candidateIdentifier = UUID.randomUUID(); - } while (SlotHash.getSlot(PubSubClientEventManager.getClientPresenceKey(candidateIdentifier, secondDeviceId)) == firstSlot); + } while (SlotHash.getSlot(PubSubClientEventManager.getClientEventChannel(candidateIdentifier, secondDeviceId)) == firstSlot); secondAccountIdentifier = candidateIdentifier; } @@ -320,7 +320,7 @@ class PubSubClientEventManagerTest { presenceManager.handleClientConnected(firstAccountIdentifier, firstDeviceId, new ClientEventAdapter()).toCompletableFuture().join(); presenceManager.handleClientConnected(secondAccountIdentifier, secondDeviceId, new ClientEventAdapter()).toCompletableFuture().join(); - final int secondSlot = SlotHash.getSlot(PubSubClientEventManager.getClientPresenceKey(secondAccountIdentifier, secondDeviceId)); + final int secondSlot = SlotHash.getSlot(PubSubClientEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); final String firstNodeId = UUID.randomUUID().toString(); @@ -343,7 +343,7 @@ class PubSubClientEventManagerTest { List.of(firstBeforeNode), List.of(firstAfterNode, secondAfterNode))); - verify(pubSubCommands).ssubscribe(PubSubClientEventManager.getClientPresenceKey(secondAccountIdentifier, secondDeviceId)); - verify(pubSubCommands, never()).ssubscribe(PubSubClientEventManager.getClientPresenceKey(firstAccountIdentifier, firstDeviceId)); + verify(pubSubCommands).ssubscribe(PubSubClientEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); + verify(pubSubCommands, never()).ssubscribe(PubSubClientEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java index b43c681a1..733b3ded8 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java @@ -6,6 +6,8 @@ package org.whispersystems.textsecuregcm.storage; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.io.UncheckedIOException; @@ -18,6 +20,8 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; +import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; class MessagesCacheInsertScriptTest { @@ -76,4 +80,29 @@ class MessagesCacheInsertScriptTest { return messages; } + + @Test + void returnPresence() throws IOException { + final UUID destinationUuid = UUID.randomUUID(); + final byte deviceId = 1; + + final MessagesCacheInsertScript insertScript = + new MessagesCacheInsertScript(REDIS_CLUSTER_EXTENSION.getRedisCluster()); + + assertFalse(insertScript.execute(destinationUuid, deviceId, MessageProtos.Envelope.newBuilder() + .setServerTimestamp(Instant.now().getEpochSecond()) + .setServerGuid(UUID.randomUUID().toString()) + .build())); + + final FaultTolerantPubSubClusterConnection pubSubClusterConnection = + REDIS_CLUSTER_EXTENSION.getRedisCluster().createBinaryPubSubConnection(); + + pubSubClusterConnection.usePubSubConnection(connection -> + connection.sync().ssubscribe(PubSubClientEventManager.getClientEventChannel(destinationUuid, deviceId))); + + assertTrue(insertScript.execute(destinationUuid, deviceId, MessageProtos.Envelope.newBuilder() + .setServerTimestamp(Instant.now().getEpochSecond()) + .setServerGuid(UUID.randomUUID().toString()) + .build())); + } }