Introduce a listener pattern for reported messages
This commit is contained in:
parent
941a9c3b39
commit
f75e616397
|
@ -197,6 +197,7 @@ import org.whispersystems.textsecuregcm.storage.RemoteConfigs;
|
||||||
import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager;
|
import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
|
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
|
||||||
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.ReportedMessageMetricsListener;
|
||||||
import org.whispersystems.textsecuregcm.storage.ReservedUsernames;
|
import org.whispersystems.textsecuregcm.storage.ReservedUsernames;
|
||||||
import org.whispersystems.textsecuregcm.storage.StoredVerificationCodeManager;
|
import org.whispersystems.textsecuregcm.storage.StoredVerificationCodeManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.SubscriptionManager;
|
import org.whispersystems.textsecuregcm.storage.SubscriptionManager;
|
||||||
|
@ -445,7 +446,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
||||||
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
|
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
|
||||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
|
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster, dynamicConfigurationManager);
|
||||||
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, Metrics.globalRegistry, config.getReportMessageConfiguration().getCounterTtl());
|
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, config.getReportMessageConfiguration().getCounterTtl());
|
||||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager, reportMessageManager);
|
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager, reportMessageManager);
|
||||||
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
|
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
|
||||||
deletedAccountsLockDynamoDbClient, config.getDynamoDbTables().getDeletedAccountsLock().getTableName());
|
deletedAccountsLockDynamoDbClient, config.getDynamoDbTables().getDeletedAccountsLock().getTableName());
|
||||||
|
@ -473,6 +474,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
SubscriptionManager subscriptionManager = new SubscriptionManager(
|
SubscriptionManager subscriptionManager = new SubscriptionManager(
|
||||||
config.getDynamoDbTables().getSubscriptions().getTableName(), dynamoDbAsyncClient);
|
config.getDynamoDbTables().getSubscriptions().getTableName(), dynamoDbAsyncClient);
|
||||||
|
|
||||||
|
ReportedMessageMetricsListener reportedMessageMetricsListener = new ReportedMessageMetricsListener();
|
||||||
|
reportMessageManager.addListener(reportedMessageMetricsListener);
|
||||||
|
|
||||||
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
|
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
|
||||||
DisabledPermittedAccountAuthenticator disabledPermittedAccountAuthenticator = new DisabledPermittedAccountAuthenticator(accountsManager);
|
DisabledPermittedAccountAuthenticator disabledPermittedAccountAuthenticator = new DisabledPermittedAccountAuthenticator(accountsManager);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2013-2022 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.metrics;
|
||||||
|
|
||||||
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
import java.util.UUID;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.ReportedMessageListener;
|
||||||
|
import org.whispersystems.textsecuregcm.util.Util;
|
||||||
|
|
||||||
|
public class ReportedMessageMetricsListener implements ReportedMessageListener {
|
||||||
|
|
||||||
|
// ReportMessageManager name used deliberately to preserve continuity of metrics
|
||||||
|
private static final String REPORT_COUNTER_NAME = name(ReportMessageManager.class, "reported");
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessageReported(final String sourceNumber, final UUID messageGuid, final UUID reporterUuid) {
|
||||||
|
Metrics.counter(REPORT_COUNTER_NAME, "countryCode", Util.getCountryCode(sourceNumber)).increment();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,48 +1,44 @@
|
||||||
package org.whispersystems.textsecuregcm.storage;
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import io.lettuce.core.RedisException;
|
import io.lettuce.core.RedisException;
|
||||||
import io.micrometer.core.instrument.Counter;
|
|
||||||
import io.micrometer.core.instrument.MeterRegistry;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||||
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
||||||
import org.whispersystems.textsecuregcm.util.Util;
|
|
||||||
|
|
||||||
public class ReportMessageManager {
|
public class ReportMessageManager {
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static final String REPORT_COUNTER_NAME = name(ReportMessageManager.class, "reported");
|
|
||||||
|
|
||||||
private final ReportMessageDynamoDb reportMessageDynamoDb;
|
private final ReportMessageDynamoDb reportMessageDynamoDb;
|
||||||
private final FaultTolerantRedisCluster rateLimitCluster;
|
private final FaultTolerantRedisCluster rateLimitCluster;
|
||||||
private final MeterRegistry meterRegistry;
|
|
||||||
|
|
||||||
private final Duration counterTtl;
|
private final Duration counterTtl;
|
||||||
|
|
||||||
|
private final List<ReportedMessageListener> reportedMessageListeners = new ArrayList<>();
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ReportMessageManager.class);
|
private static final Logger logger = LoggerFactory.getLogger(ReportMessageManager.class);
|
||||||
|
|
||||||
public ReportMessageManager(final ReportMessageDynamoDb reportMessageDynamoDb,
|
public ReportMessageManager(final ReportMessageDynamoDb reportMessageDynamoDb,
|
||||||
final FaultTolerantRedisCluster rateLimitCluster,
|
final FaultTolerantRedisCluster rateLimitCluster,
|
||||||
final MeterRegistry meterRegistry,
|
|
||||||
final Duration counterTtl) {
|
final Duration counterTtl) {
|
||||||
|
|
||||||
this.reportMessageDynamoDb = reportMessageDynamoDb;
|
this.reportMessageDynamoDb = reportMessageDynamoDb;
|
||||||
this.rateLimitCluster = rateLimitCluster;
|
this.rateLimitCluster = rateLimitCluster;
|
||||||
this.meterRegistry = meterRegistry;
|
|
||||||
|
|
||||||
this.counterTtl = counterTtl;
|
this.counterTtl = counterTtl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addListener(final ReportedMessageListener listener) {
|
||||||
|
this.reportedMessageListeners.add(listener);
|
||||||
|
}
|
||||||
|
|
||||||
public void store(String sourceNumber, UUID messageGuid) {
|
public void store(String sourceNumber, UUID messageGuid) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -66,10 +62,13 @@ public class ReportMessageManager {
|
||||||
connection.sync().expire(reportedSenderKey, counterTtl.toSeconds());
|
connection.sync().expire(reportedSenderKey, counterTtl.toSeconds());
|
||||||
});
|
});
|
||||||
|
|
||||||
Counter.builder(REPORT_COUNTER_NAME)
|
reportedMessageListeners.forEach(listener -> {
|
||||||
.tag("countryCode", Util.getCountryCode(sourceNumber))
|
try {
|
||||||
.register(meterRegistry)
|
listener.handleMessageReported(sourceNumber, messageGuid, reporterUuid);
|
||||||
.increment();
|
} catch (final Exception e) {
|
||||||
|
logger.error("Failed to notify listener of reported message", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2013-2022 Signal Messenger, LLC
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public interface ReportedMessageListener {
|
||||||
|
|
||||||
|
void handleMessageReported(String sourceNumber, UUID messageGuid, UUID reporterUuid);
|
||||||
|
}
|
|
@ -16,7 +16,6 @@ import io.dropwizard.Application;
|
||||||
import io.dropwizard.cli.EnvironmentCommand;
|
import io.dropwizard.cli.EnvironmentCommand;
|
||||||
import io.dropwizard.setup.Environment;
|
import io.dropwizard.setup.Environment;
|
||||||
import io.lettuce.core.resource.ClientResources;
|
import io.lettuce.core.resource.ClientResources;
|
||||||
import io.micrometer.core.instrument.Metrics;
|
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -180,7 +179,7 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
|
||||||
configuration.getDynamoDbTables().getReportMessage().getTableName(),
|
configuration.getDynamoDbTables().getReportMessage().getTableName(),
|
||||||
configuration.getReportMessageConfiguration().getReportTtl());
|
configuration.getReportMessageConfiguration().getReportTtl());
|
||||||
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
|
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
|
||||||
Metrics.globalRegistry, configuration.getReportMessageConfiguration().getCounterTtl());
|
configuration.getReportMessageConfiguration().getCounterTtl());
|
||||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager,
|
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager,
|
||||||
reportMessageManager);
|
reportMessageManager);
|
||||||
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
|
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
|
||||||
|
|
|
@ -16,7 +16,6 @@ import io.dropwizard.Application;
|
||||||
import io.dropwizard.cli.EnvironmentCommand;
|
import io.dropwizard.cli.EnvironmentCommand;
|
||||||
import io.dropwizard.setup.Environment;
|
import io.dropwizard.setup.Environment;
|
||||||
import io.lettuce.core.resource.ClientResources;
|
import io.lettuce.core.resource.ClientResources;
|
||||||
import io.micrometer.core.instrument.Metrics;
|
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -183,7 +182,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
|
||||||
configuration.getDynamoDbTables().getReportMessage().getTableName(),
|
configuration.getDynamoDbTables().getReportMessage().getTableName(),
|
||||||
configuration.getReportMessageConfiguration().getReportTtl());
|
configuration.getReportMessageConfiguration().getReportTtl());
|
||||||
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
|
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
|
||||||
Metrics.globalRegistry, configuration.getReportMessageConfiguration().getCounterTtl());
|
configuration.getReportMessageConfiguration().getCounterTtl());
|
||||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager,
|
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager,
|
||||||
reportMessageManager);
|
reportMessageManager);
|
||||||
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
|
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
|
||||||
|
|
|
@ -16,7 +16,6 @@ import io.dropwizard.Application;
|
||||||
import io.dropwizard.cli.EnvironmentCommand;
|
import io.dropwizard.cli.EnvironmentCommand;
|
||||||
import io.dropwizard.setup.Environment;
|
import io.dropwizard.setup.Environment;
|
||||||
import io.lettuce.core.resource.ClientResources;
|
import io.lettuce.core.resource.ClientResources;
|
||||||
import io.micrometer.core.instrument.Metrics;
|
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
@ -184,7 +183,7 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
|
||||||
configuration.getDynamoDbTables().getReportMessage().getTableName(),
|
configuration.getDynamoDbTables().getReportMessage().getTableName(),
|
||||||
configuration.getReportMessageConfiguration().getReportTtl());
|
configuration.getReportMessageConfiguration().getReportTtl());
|
||||||
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
|
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
|
||||||
Metrics.globalRegistry, configuration.getReportMessageConfiguration().getCounterTtl());
|
configuration.getReportMessageConfiguration().getCounterTtl());
|
||||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager,
|
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager,
|
||||||
reportMessageManager);
|
reportMessageManager);
|
||||||
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
|
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
|
||||||
|
|
|
@ -23,8 +23,6 @@ import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
||||||
class ReportMessageManagerTest {
|
class ReportMessageManagerTest {
|
||||||
|
|
||||||
private ReportMessageDynamoDb reportMessageDynamoDb;
|
private ReportMessageDynamoDb reportMessageDynamoDb;
|
||||||
private MeterRegistry meterRegistry;
|
|
||||||
|
|
||||||
private ReportMessageManager reportMessageManager;
|
private ReportMessageManager reportMessageManager;
|
||||||
|
|
||||||
@RegisterExtension
|
@RegisterExtension
|
||||||
|
@ -33,10 +31,9 @@ class ReportMessageManagerTest {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
void setUp() {
|
||||||
reportMessageDynamoDb = mock(ReportMessageDynamoDb.class);
|
reportMessageDynamoDb = mock(ReportMessageDynamoDb.class);
|
||||||
meterRegistry = new SimpleMeterRegistry();
|
|
||||||
|
|
||||||
reportMessageManager = new ReportMessageManager(reportMessageDynamoDb,
|
reportMessageManager = new ReportMessageManager(reportMessageDynamoDb,
|
||||||
RATE_LIMIT_CLUSTER_EXTENSION.getRedisCluster(), meterRegistry, Duration.ofDays(1));
|
RATE_LIMIT_CLUSTER_EXTENSION.getRedisCluster(), Duration.ofDays(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -65,24 +62,19 @@ class ReportMessageManagerTest {
|
||||||
final UUID messageGuid = UUID.randomUUID();
|
final UUID messageGuid = UUID.randomUUID();
|
||||||
final UUID reporterUuid = UUID.randomUUID();
|
final UUID reporterUuid = UUID.randomUUID();
|
||||||
|
|
||||||
|
final ReportedMessageListener listener = mock(ReportedMessageListener.class);
|
||||||
|
reportMessageManager.addListener(listener);
|
||||||
|
|
||||||
when(reportMessageDynamoDb.remove(any())).thenReturn(false);
|
when(reportMessageDynamoDb.remove(any())).thenReturn(false);
|
||||||
reportMessageManager.report(sourceNumber, messageGuid, reporterUuid);
|
reportMessageManager.report(sourceNumber, messageGuid, reporterUuid);
|
||||||
|
|
||||||
assertEquals(0, getCounterTotal(ReportMessageManager.REPORT_COUNTER_NAME));
|
|
||||||
assertEquals(0, reportMessageManager.getRecentReportCount(sourceNumber));
|
assertEquals(0, reportMessageManager.getRecentReportCount(sourceNumber));
|
||||||
|
|
||||||
when(reportMessageDynamoDb.remove(any())).thenReturn(true);
|
when(reportMessageDynamoDb.remove(any())).thenReturn(true);
|
||||||
reportMessageManager.report(sourceNumber, messageGuid, reporterUuid);
|
reportMessageManager.report(sourceNumber, messageGuid, reporterUuid);
|
||||||
|
|
||||||
assertEquals(1, getCounterTotal(ReportMessageManager.REPORT_COUNTER_NAME));
|
|
||||||
assertEquals(1, reportMessageManager.getRecentReportCount(sourceNumber));
|
assertEquals(1, reportMessageManager.getRecentReportCount(sourceNumber));
|
||||||
}
|
verify(listener).handleMessageReported(sourceNumber, messageGuid, reporterUuid);
|
||||||
|
|
||||||
private double getCounterTotal(final String counterName) {
|
|
||||||
return meterRegistry.find(counterName).counters().stream()
|
|
||||||
.map(Counter::count)
|
|
||||||
.reduce(Double::sum)
|
|
||||||
.orElse(0.0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue