diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index a0ccee336..9d623ab59 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -336,6 +336,9 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private KeyTransparencyServiceConfiguration keyTransparencyService; + @JsonProperty + private boolean logMessageDeliveryLoops; + public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() { return tlsKeyStore; } @@ -558,4 +561,9 @@ public class WhisperServerConfiguration extends Configuration { public KeyTransparencyServiceConfiguration getKeyTransparencyServiceConfiguration() { return keyTransparencyService; } + + public boolean logMessageDeliveryLoops() { + return logMessageDeliveryLoops; + } + } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 85abf1565..c2b0bd986 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -164,10 +164,12 @@ import org.whispersystems.textsecuregcm.jetty.JettyHttpConfigurationCustomizer; import org.whispersystems.textsecuregcm.keytransparency.KeyTransparencyServiceClient; import org.whispersystems.textsecuregcm.limits.CardinalityEstimator; import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; +import org.whispersystems.textsecuregcm.limits.NoopMessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.limits.PushChallengeManager; import org.whispersystems.textsecuregcm.limits.RateLimitByIpFilter; import org.whispersystems.textsecuregcm.limits.RateLimitChallengeManager; import org.whispersystems.textsecuregcm.limits.RateLimiters; +import org.whispersystems.textsecuregcm.limits.RedisMessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper; import org.whispersystems.textsecuregcm.mappers.DeviceLimitExceededExceptionMapper; import org.whispersystems.textsecuregcm.mappers.GrpcStatusRuntimeExceptionMapper; @@ -655,7 +657,7 @@ public class WhisperServerService extends Application { - if (deliveryAttemptCount == DELIVERY_LOOP_THRESHOLD) { - logger.warn("Detected loop delivering message {} via {} to {}:{} ({})", - messageGuid, context, accountIdentifier, deviceId, userAgent); - } - }); - } - - @VisibleForTesting - CompletableFuture incrementDeliveryAttemptCount(final UUID accountIdentifier, final byte deviceId, final UUID messageGuid) { - final String firstMessageGuidKey = "firstMessageGuid::{" + accountIdentifier + ":" + deviceId + "}"; - final String deliveryAttemptsKey = "firstMessageDeliveryAttempts::{" + accountIdentifier + ":" + deviceId + "}"; - - return getDeliveryAttemptsScript.executeAsync( - List.of(firstMessageGuidKey, deliveryAttemptsKey), - List.of(messageGuid.toString(), String.valueOf(DELIVERY_ATTEMPTS_COUNTER_TTL.toSeconds()))) - .thenApply(result -> (long) result); - } + void recordDeliveryAttempt(UUID accountIdentifier, byte deviceId, UUID messageGuid, String userAgent, String context); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/NoopMessageDeliveryLoopMonitor.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/NoopMessageDeliveryLoopMonitor.java new file mode 100644 index 000000000..7e0b74708 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/NoopMessageDeliveryLoopMonitor.java @@ -0,0 +1,13 @@ +package org.whispersystems.textsecuregcm.limits; + +import java.util.UUID; + +public class NoopMessageDeliveryLoopMonitor implements MessageDeliveryLoopMonitor { + + public NoopMessageDeliveryLoopMonitor() { + } + + public void recordDeliveryAttempt(final UUID accountIdentifier, final byte deviceId, final UUID messageGuid, final String userAgent, final String context) { + } + +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RedisMessageDeliveryLoopMonitor.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RedisMessageDeliveryLoopMonitor.java new file mode 100644 index 000000000..ea59e0751 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RedisMessageDeliveryLoopMonitor.java @@ -0,0 +1,73 @@ +package org.whispersystems.textsecuregcm.limits; + +import com.google.common.annotations.VisibleForTesting; +import io.lettuce.core.ScriptOutputType; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; + +public class RedisMessageDeliveryLoopMonitor implements MessageDeliveryLoopMonitor { + + private final ClusterLuaScript getDeliveryAttemptsScript; + + private static final Duration DELIVERY_ATTEMPTS_COUNTER_TTL = Duration.ofHours(1); + private static final int DELIVERY_LOOP_THRESHOLD = 5; + + private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryLoopMonitor.class); + + public RedisMessageDeliveryLoopMonitor(final FaultTolerantRedisClusterClient rateLimitCluster) { + try { + getDeliveryAttemptsScript = + ClusterLuaScript.fromResource(rateLimitCluster, "lua/get_delivery_attempt_count.lua", ScriptOutputType.INTEGER); + } catch (final IOException e) { + throw new UncheckedIOException("Failed to load 'get delivery attempt count' script", e); + } + } + + /** + * Records an attempt to deliver a message with the given GUID to the given account/device pair and returns the number + * of consecutive attempts to deliver the same message and logs a warning if the message appears to be in a delivery + * loop. This method is intended to detect cases where a message remains at the head of a device's queue after + * repeated attempts to deliver the message, and so the given message GUID should be the first message of a "page" + * sent to clients. + * + * @param accountIdentifier the identifier of the destination account + * @param deviceId the destination device's ID within the given account + * @param messageGuid the GUID of the message + * @param userAgent the User-Agent header supplied by the caller + * @param context a human-readable string identifying the mechanism of message delivery (e.g. "rest" or "websocket") + */ + public void recordDeliveryAttempt(final UUID accountIdentifier, + final byte deviceId, + final UUID messageGuid, + final String userAgent, + final String context) { + + incrementDeliveryAttemptCount(accountIdentifier, deviceId, messageGuid) + .thenAccept(deliveryAttemptCount -> { + if (deliveryAttemptCount == DELIVERY_LOOP_THRESHOLD) { + logger.warn("Detected loop delivering message {} via {} to {}:{} ({})", + messageGuid, context, accountIdentifier, deviceId, userAgent); + } + }); + } + + @VisibleForTesting + CompletableFuture incrementDeliveryAttemptCount(final UUID accountIdentifier, final byte deviceId, final UUID messageGuid) { + final String firstMessageGuidKey = "firstMessageGuid::{" + accountIdentifier + ":" + deviceId + "}"; + final String deliveryAttemptsKey = "firstMessageDeliveryAttempts::{" + accountIdentifier + ":" + deviceId + "}"; + + return getDeliveryAttemptsScript.executeAsync( + List.of(firstMessageGuidKey, deliveryAttemptsKey), + List.of(messageGuid.toString(), String.valueOf(DELIVERY_ATTEMPTS_COUNTER_TTL.toSeconds()))) + .thenApply(result -> (long) result); + } + +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/limits/MessageDeliveryLoopMonitorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/limits/MessageDeliveryLoopMonitorTest.java index a4c09fb01..7d6e1a09b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/limits/MessageDeliveryLoopMonitorTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/limits/MessageDeliveryLoopMonitorTest.java @@ -11,14 +11,14 @@ import org.whispersystems.textsecuregcm.storage.Device; class MessageDeliveryLoopMonitorTest { - private MessageDeliveryLoopMonitor messageDeliveryLoopMonitor; + private RedisMessageDeliveryLoopMonitor messageDeliveryLoopMonitor; @RegisterExtension static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); @BeforeEach void setUp() { - messageDeliveryLoopMonitor = new MessageDeliveryLoopMonitor(REDIS_CLUSTER_EXTENSION.getRedisCluster()); + messageDeliveryLoopMonitor = new RedisMessageDeliveryLoopMonitor(REDIS_CLUSTER_EXTENSION.getRedisCluster()); } @Test