Make logging of message delivery loops configurable, default off
This commit is contained in:
parent
09ce79bd43
commit
ea17eee320
|
@ -336,6 +336,9 @@ public class WhisperServerConfiguration extends Configuration {
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private KeyTransparencyServiceConfiguration keyTransparencyService;
|
private KeyTransparencyServiceConfiguration keyTransparencyService;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private boolean logMessageDeliveryLoops;
|
||||||
|
|
||||||
public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() {
|
public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() {
|
||||||
return tlsKeyStore;
|
return tlsKeyStore;
|
||||||
}
|
}
|
||||||
|
@ -558,4 +561,9 @@ public class WhisperServerConfiguration extends Configuration {
|
||||||
public KeyTransparencyServiceConfiguration getKeyTransparencyServiceConfiguration() {
|
public KeyTransparencyServiceConfiguration getKeyTransparencyServiceConfiguration() {
|
||||||
return keyTransparencyService;
|
return keyTransparencyService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean logMessageDeliveryLoops() {
|
||||||
|
return logMessageDeliveryLoops;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,10 +164,12 @@ import org.whispersystems.textsecuregcm.jetty.JettyHttpConfigurationCustomizer;
|
||||||
import org.whispersystems.textsecuregcm.keytransparency.KeyTransparencyServiceClient;
|
import org.whispersystems.textsecuregcm.keytransparency.KeyTransparencyServiceClient;
|
||||||
import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
|
import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
|
||||||
import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor;
|
import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor;
|
||||||
|
import org.whispersystems.textsecuregcm.limits.NoopMessageDeliveryLoopMonitor;
|
||||||
import org.whispersystems.textsecuregcm.limits.PushChallengeManager;
|
import org.whispersystems.textsecuregcm.limits.PushChallengeManager;
|
||||||
import org.whispersystems.textsecuregcm.limits.RateLimitByIpFilter;
|
import org.whispersystems.textsecuregcm.limits.RateLimitByIpFilter;
|
||||||
import org.whispersystems.textsecuregcm.limits.RateLimitChallengeManager;
|
import org.whispersystems.textsecuregcm.limits.RateLimitChallengeManager;
|
||||||
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||||
|
import org.whispersystems.textsecuregcm.limits.RedisMessageDeliveryLoopMonitor;
|
||||||
import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper;
|
import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper;
|
||||||
import org.whispersystems.textsecuregcm.mappers.DeviceLimitExceededExceptionMapper;
|
import org.whispersystems.textsecuregcm.mappers.DeviceLimitExceededExceptionMapper;
|
||||||
import org.whispersystems.textsecuregcm.mappers.GrpcStatusRuntimeExceptionMapper;
|
import org.whispersystems.textsecuregcm.mappers.GrpcStatusRuntimeExceptionMapper;
|
||||||
|
@ -655,7 +657,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
Subscriptions subscriptions = new Subscriptions(
|
Subscriptions subscriptions = new Subscriptions(
|
||||||
config.getDynamoDbTables().getSubscriptions().getTableName(), dynamoDbAsyncClient);
|
config.getDynamoDbTables().getSubscriptions().getTableName(), dynamoDbAsyncClient);
|
||||||
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor =
|
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor =
|
||||||
new MessageDeliveryLoopMonitor(rateLimitersCluster);
|
config.logMessageDeliveryLoops() ? new RedisMessageDeliveryLoopMonitor(rateLimitersCluster) : new NoopMessageDeliveryLoopMonitor();
|
||||||
|
|
||||||
disconnectionRequestManager.addListener(webSocketConnectionEventManager);
|
disconnectionRequestManager.addListener(webSocketConnectionEventManager);
|
||||||
|
|
||||||
|
|
|
@ -1,36 +1,8 @@
|
||||||
package org.whispersystems.textsecuregcm.limits;
|
package org.whispersystems.textsecuregcm.limits;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import io.lettuce.core.ScriptOutputType;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.UncheckedIOException;
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
|
||||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
|
||||||
|
|
||||||
public class MessageDeliveryLoopMonitor {
|
|
||||||
|
|
||||||
private final ClusterLuaScript getDeliveryAttemptsScript;
|
|
||||||
|
|
||||||
private static final Duration DELIVERY_ATTEMPTS_COUNTER_TTL = Duration.ofHours(1);
|
|
||||||
private static final int DELIVERY_LOOP_THRESHOLD = 5;
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryLoopMonitor.class);
|
|
||||||
|
|
||||||
public MessageDeliveryLoopMonitor(final FaultTolerantRedisClusterClient rateLimitCluster) {
|
|
||||||
try {
|
|
||||||
getDeliveryAttemptsScript =
|
|
||||||
ClusterLuaScript.fromResource(rateLimitCluster, "lua/get_delivery_attempt_count.lua", ScriptOutputType.INTEGER);
|
|
||||||
} catch (final IOException e) {
|
|
||||||
throw new UncheckedIOException("Failed to load 'get delivery attempt count' script", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
public interface MessageDeliveryLoopMonitor {
|
||||||
/**
|
/**
|
||||||
* Records an attempt to deliver a message with the given GUID to the given account/device pair and returns the number
|
* Records an attempt to deliver a message with the given GUID to the given account/device pair and returns the number
|
||||||
* of consecutive attempts to deliver the same message and logs a warning if the message appears to be in a delivery
|
* of consecutive attempts to deliver the same message and logs a warning if the message appears to be in a delivery
|
||||||
|
@ -44,29 +16,5 @@ public class MessageDeliveryLoopMonitor {
|
||||||
* @param userAgent the User-Agent header supplied by the caller
|
* @param userAgent the User-Agent header supplied by the caller
|
||||||
* @param context a human-readable string identifying the mechanism of message delivery (e.g. "rest" or "websocket")
|
* @param context a human-readable string identifying the mechanism of message delivery (e.g. "rest" or "websocket")
|
||||||
*/
|
*/
|
||||||
public void recordDeliveryAttempt(final UUID accountIdentifier,
|
void recordDeliveryAttempt(UUID accountIdentifier, byte deviceId, UUID messageGuid, String userAgent, String context);
|
||||||
final byte deviceId,
|
|
||||||
final UUID messageGuid,
|
|
||||||
final String userAgent,
|
|
||||||
final String context) {
|
|
||||||
|
|
||||||
incrementDeliveryAttemptCount(accountIdentifier, deviceId, messageGuid)
|
|
||||||
.thenAccept(deliveryAttemptCount -> {
|
|
||||||
if (deliveryAttemptCount == DELIVERY_LOOP_THRESHOLD) {
|
|
||||||
logger.warn("Detected loop delivering message {} via {} to {}:{} ({})",
|
|
||||||
messageGuid, context, accountIdentifier, deviceId, userAgent);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
CompletableFuture<Long> incrementDeliveryAttemptCount(final UUID accountIdentifier, final byte deviceId, final UUID messageGuid) {
|
|
||||||
final String firstMessageGuidKey = "firstMessageGuid::{" + accountIdentifier + ":" + deviceId + "}";
|
|
||||||
final String deliveryAttemptsKey = "firstMessageDeliveryAttempts::{" + accountIdentifier + ":" + deviceId + "}";
|
|
||||||
|
|
||||||
return getDeliveryAttemptsScript.executeAsync(
|
|
||||||
List.of(firstMessageGuidKey, deliveryAttemptsKey),
|
|
||||||
List.of(messageGuid.toString(), String.valueOf(DELIVERY_ATTEMPTS_COUNTER_TTL.toSeconds())))
|
|
||||||
.thenApply(result -> (long) result);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.whispersystems.textsecuregcm.limits;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class NoopMessageDeliveryLoopMonitor implements MessageDeliveryLoopMonitor {
|
||||||
|
|
||||||
|
public NoopMessageDeliveryLoopMonitor() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recordDeliveryAttempt(final UUID accountIdentifier, final byte deviceId, final UUID messageGuid, final String userAgent, final String context) {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,73 @@
|
||||||
|
package org.whispersystems.textsecuregcm.limits;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import io.lettuce.core.ScriptOutputType;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||||
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||||
|
|
||||||
|
public class RedisMessageDeliveryLoopMonitor implements MessageDeliveryLoopMonitor {
|
||||||
|
|
||||||
|
private final ClusterLuaScript getDeliveryAttemptsScript;
|
||||||
|
|
||||||
|
private static final Duration DELIVERY_ATTEMPTS_COUNTER_TTL = Duration.ofHours(1);
|
||||||
|
private static final int DELIVERY_LOOP_THRESHOLD = 5;
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryLoopMonitor.class);
|
||||||
|
|
||||||
|
public RedisMessageDeliveryLoopMonitor(final FaultTolerantRedisClusterClient rateLimitCluster) {
|
||||||
|
try {
|
||||||
|
getDeliveryAttemptsScript =
|
||||||
|
ClusterLuaScript.fromResource(rateLimitCluster, "lua/get_delivery_attempt_count.lua", ScriptOutputType.INTEGER);
|
||||||
|
} catch (final IOException e) {
|
||||||
|
throw new UncheckedIOException("Failed to load 'get delivery attempt count' script", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Records an attempt to deliver a message with the given GUID to the given account/device pair and returns the number
|
||||||
|
* of consecutive attempts to deliver the same message and logs a warning if the message appears to be in a delivery
|
||||||
|
* loop. This method is intended to detect cases where a message remains at the head of a device's queue after
|
||||||
|
* repeated attempts to deliver the message, and so the given message GUID should be the first message of a "page"
|
||||||
|
* sent to clients.
|
||||||
|
*
|
||||||
|
* @param accountIdentifier the identifier of the destination account
|
||||||
|
* @param deviceId the destination device's ID within the given account
|
||||||
|
* @param messageGuid the GUID of the message
|
||||||
|
* @param userAgent the User-Agent header supplied by the caller
|
||||||
|
* @param context a human-readable string identifying the mechanism of message delivery (e.g. "rest" or "websocket")
|
||||||
|
*/
|
||||||
|
public void recordDeliveryAttempt(final UUID accountIdentifier,
|
||||||
|
final byte deviceId,
|
||||||
|
final UUID messageGuid,
|
||||||
|
final String userAgent,
|
||||||
|
final String context) {
|
||||||
|
|
||||||
|
incrementDeliveryAttemptCount(accountIdentifier, deviceId, messageGuid)
|
||||||
|
.thenAccept(deliveryAttemptCount -> {
|
||||||
|
if (deliveryAttemptCount == DELIVERY_LOOP_THRESHOLD) {
|
||||||
|
logger.warn("Detected loop delivering message {} via {} to {}:{} ({})",
|
||||||
|
messageGuid, context, accountIdentifier, deviceId, userAgent);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
CompletableFuture<Long> incrementDeliveryAttemptCount(final UUID accountIdentifier, final byte deviceId, final UUID messageGuid) {
|
||||||
|
final String firstMessageGuidKey = "firstMessageGuid::{" + accountIdentifier + ":" + deviceId + "}";
|
||||||
|
final String deliveryAttemptsKey = "firstMessageDeliveryAttempts::{" + accountIdentifier + ":" + deviceId + "}";
|
||||||
|
|
||||||
|
return getDeliveryAttemptsScript.executeAsync(
|
||||||
|
List.of(firstMessageGuidKey, deliveryAttemptsKey),
|
||||||
|
List.of(messageGuid.toString(), String.valueOf(DELIVERY_ATTEMPTS_COUNTER_TTL.toSeconds())))
|
||||||
|
.thenApply(result -> (long) result);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -11,14 +11,14 @@ import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
|
|
||||||
class MessageDeliveryLoopMonitorTest {
|
class MessageDeliveryLoopMonitorTest {
|
||||||
|
|
||||||
private MessageDeliveryLoopMonitor messageDeliveryLoopMonitor;
|
private RedisMessageDeliveryLoopMonitor messageDeliveryLoopMonitor;
|
||||||
|
|
||||||
@RegisterExtension
|
@RegisterExtension
|
||||||
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
|
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
void setUp() {
|
||||||
messageDeliveryLoopMonitor = new MessageDeliveryLoopMonitor(REDIS_CLUSTER_EXTENSION.getRedisCluster());
|
messageDeliveryLoopMonitor = new RedisMessageDeliveryLoopMonitor(REDIS_CLUSTER_EXTENSION.getRedisCluster());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue