Add report message API
This commit is contained in:
parent
03dac2bf7e
commit
e320626c6e
|
@ -12,7 +12,6 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.validation.Valid;
|
||||
import javax.validation.constraints.NotEmpty;
|
||||
import javax.validation.constraints.NotNull;
|
||||
import org.whispersystems.textsecuregcm.configuration.AccountDatabaseCrawlerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration;
|
||||
|
@ -157,6 +156,11 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
@JsonProperty
|
||||
private DynamoDbConfiguration pushChallengeDynamoDb;
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private DynamoDbConfiguration reportMessageDynamoDb;
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
|
@ -442,6 +446,10 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
return pushChallengeDynamoDb;
|
||||
}
|
||||
|
||||
public DynamoDbConfiguration getReportMessageDynamoDbConfiguration() {
|
||||
return reportMessageDynamoDb;
|
||||
}
|
||||
|
||||
public TorExitNodeConfiguration getTorExitNodeConfiguration() {
|
||||
return tor;
|
||||
}
|
||||
|
|
|
@ -181,6 +181,8 @@ import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor;
|
|||
import org.whispersystems.textsecuregcm.storage.RegistrationLockVersionCounter;
|
||||
import org.whispersystems.textsecuregcm.storage.RemoteConfigs;
|
||||
import org.whispersystems.textsecuregcm.storage.RemoteConfigsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ReservedUsernames;
|
||||
import org.whispersystems.textsecuregcm.storage.Usernames;
|
||||
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
|
||||
|
@ -329,6 +331,13 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
.withRequestTimeout((int) config.getPushChallengeDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
|
||||
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
|
||||
|
||||
AmazonDynamoDBClientBuilder reportMessageDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
|
||||
.standard()
|
||||
.withRegion(config.getReportMessageDynamoDbConfiguration().getRegion())
|
||||
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getReportMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
|
||||
.withRequestTimeout((int) config.getReportMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
|
||||
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
|
||||
|
||||
DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build());
|
||||
DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
|
||||
|
||||
|
@ -353,6 +362,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
|
||||
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
|
||||
PushChallengeDynamoDb pushChallengeDynamoDb = new PushChallengeDynamoDb(new DynamoDB(pushChallengeDynamoDbClientBuilder.build()), config.getPushChallengeDynamoDbConfiguration().getTableName());
|
||||
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(new DynamoDB(reportMessageDynamoDbClientBuilder.build()), config.getReportMessageDynamoDbConfiguration().getTableName());
|
||||
|
||||
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
|
||||
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
|
||||
|
@ -416,7 +426,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
||||
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
|
||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager);
|
||||
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, Metrics.globalRegistry);
|
||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager, reportMessageManager);
|
||||
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
|
||||
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
||||
|
@ -496,7 +507,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
AttachmentControllerV3 attachmentControllerV3 = new AttachmentControllerV3(rateLimiters, config.getGcpAttachmentsConfiguration().getDomain(), config.getGcpAttachmentsConfiguration().getEmail(), config.getGcpAttachmentsConfiguration().getMaxSizeInBytes(), config.getGcpAttachmentsConfiguration().getPathPrefix(), config.getGcpAttachmentsConfiguration().getRsaSigningKey());
|
||||
DonationController donationController = new DonationController(donationExecutor, config.getDonationConfiguration());
|
||||
KeysController keysController = new KeysController(rateLimiters, keysDynamoDb, accountsManager, directoryQueue, preKeyRateLimiter, dynamicConfigurationManager, rateLimitChallengeManager);
|
||||
MessageController messageController = new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, messagesManager, unsealedSenderRateLimiter, apnFallbackManager, dynamicConfigurationManager, rateLimitChallengeManager, metricsCluster, declinedMessageReceiptExecutor);
|
||||
MessageController messageController = new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, messagesManager, unsealedSenderRateLimiter, apnFallbackManager, dynamicConfigurationManager, rateLimitChallengeManager, reportMessageManager, metricsCluster, declinedMessageReceiptExecutor);
|
||||
ProfileController profileController = new ProfileController(rateLimiters, accountsManager, profilesManager, usernamesManager, dynamicConfigurationManager, cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, config.getCdnConfiguration().getBucket(), zkProfileOperations, isZkEnabled);
|
||||
StickerController stickerController = new StickerController(rateLimiters, config.getCdnConfiguration().getAccessKey(), config.getCdnConfiguration().getAccessSecret(), config.getCdnConfiguration().getRegion(), config.getCdnConfiguration().getBucket());
|
||||
RemoteConfigController remoteConfigController = new RemoteConfigController(remoteConfigsManager, config.getRemoteConfigConfiguration().getAuthorizedTokens(), config.getRemoteConfigConfiguration().getGlobalConfig());
|
||||
|
|
|
@ -38,11 +38,13 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.validation.Valid;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HeaderParam;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
|
@ -89,6 +91,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
|||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.textsecuregcm.util.ForwardedIpUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
@ -118,6 +121,7 @@ public class MessageController {
|
|||
private final ApnFallbackManager apnFallbackManager;
|
||||
private final DynamicConfigurationManager dynamicConfigurationManager;
|
||||
private final RateLimitChallengeManager rateLimitChallengeManager;
|
||||
private final ReportMessageManager reportMessageManager;
|
||||
private final ScheduledExecutorService receiptExecutorService;
|
||||
|
||||
private final Random random = new Random();
|
||||
|
@ -147,6 +151,7 @@ public class MessageController {
|
|||
ApnFallbackManager apnFallbackManager,
|
||||
DynamicConfigurationManager dynamicConfigurationManager,
|
||||
RateLimitChallengeManager rateLimitChallengeManager,
|
||||
ReportMessageManager reportMessageManager,
|
||||
FaultTolerantRedisCluster metricsCluster,
|
||||
ScheduledExecutorService receiptExecutorService)
|
||||
{
|
||||
|
@ -159,6 +164,7 @@ public class MessageController {
|
|||
this.apnFallbackManager = apnFallbackManager;
|
||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||
this.rateLimitChallengeManager = rateLimitChallengeManager;
|
||||
this.reportMessageManager = reportMessageManager;
|
||||
this.receiptExecutorService = receiptExecutorService;
|
||||
|
||||
try {
|
||||
|
@ -550,6 +556,17 @@ public class MessageController {
|
|||
}
|
||||
}
|
||||
|
||||
@Timed
|
||||
@POST
|
||||
@Path("/report/{sourceNumber}/{messageGuid}")
|
||||
public Response reportMessage(@Auth Account account, @PathParam("sourceNumber") String sourceNumber, @PathParam("messageGuid") UUID messageGuid) {
|
||||
|
||||
reportMessageManager.report(sourceNumber, messageGuid);
|
||||
|
||||
return Response.status(Status.ACCEPTED)
|
||||
.build();
|
||||
}
|
||||
|
||||
private void sendMessage(Optional<Account> source,
|
||||
Account destinationAccount,
|
||||
Device destinationDevice,
|
||||
|
|
|
@ -34,18 +34,27 @@ public class MessagesManager {
|
|||
private final MessagesDynamoDb messagesDynamoDb;
|
||||
private final MessagesCache messagesCache;
|
||||
private final PushLatencyManager pushLatencyManager;
|
||||
private final ReportMessageManager reportMessageManager;
|
||||
|
||||
public MessagesManager(
|
||||
MessagesDynamoDb messagesDynamoDb,
|
||||
MessagesCache messagesCache,
|
||||
PushLatencyManager pushLatencyManager) {
|
||||
final MessagesDynamoDb messagesDynamoDb,
|
||||
final MessagesCache messagesCache,
|
||||
final PushLatencyManager pushLatencyManager,
|
||||
final ReportMessageManager reportMessageManager) {
|
||||
this.messagesDynamoDb = messagesDynamoDb;
|
||||
this.messagesCache = messagesCache;
|
||||
this.pushLatencyManager = pushLatencyManager;
|
||||
this.reportMessageManager = reportMessageManager;
|
||||
}
|
||||
|
||||
public void insert(UUID destinationUuid, long destinationDevice, Envelope message) {
|
||||
messagesCache.insert(UUID.randomUUID(), destinationUuid, destinationDevice, message);
|
||||
final UUID messageGuid = UUID.randomUUID();
|
||||
|
||||
messagesCache.insert(messageGuid, destinationUuid, destinationDevice, message);
|
||||
|
||||
if (message.hasSource() && !destinationUuid.toString().equals(message.getSourceUuid())) {
|
||||
reportMessageManager.store(message.getSource(), messageGuid);
|
||||
}
|
||||
}
|
||||
|
||||
public void insertEphemeral(final UUID destinationUuid, final long destinationDevice, final Envelope message) {
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome;
|
||||
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||
import com.amazonaws.services.dynamodbv2.document.Item;
|
||||
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||
import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
|
||||
import com.amazonaws.services.dynamodbv2.model.ReturnValue;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
public class ReportMessageDynamoDb {
|
||||
|
||||
static final String KEY_HASH = "H";
|
||||
static final String ATTR_TTL = "E";
|
||||
|
||||
static final Duration TIME_TO_LIVE = Duration.ofDays(7);
|
||||
|
||||
private final Table table;
|
||||
|
||||
public ReportMessageDynamoDb(final DynamoDB dynamoDB, final String tableName) {
|
||||
|
||||
this.table = dynamoDB.getTable(tableName);
|
||||
}
|
||||
|
||||
public void store(byte[] hash) {
|
||||
|
||||
table.putItem(buildItemForHash(hash));
|
||||
}
|
||||
|
||||
private Item buildItemForHash(byte[] hash) {
|
||||
return new Item()
|
||||
.withBinary(KEY_HASH, hash)
|
||||
.withLong(ATTR_TTL, Instant.now().plus(TIME_TO_LIVE).getEpochSecond());
|
||||
}
|
||||
|
||||
public boolean remove(byte[] hash) {
|
||||
|
||||
final DeleteItemSpec deleteItemSpec = new DeleteItemSpec()
|
||||
.withPrimaryKey(KEY_HASH, hash)
|
||||
.withReturnValues(ReturnValue.ALL_OLD);
|
||||
|
||||
final DeleteItemOutcome outcome = table.deleteItem(deleteItemSpec);
|
||||
|
||||
return outcome.getItem() != null;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
public class ReportMessageManager {
|
||||
|
||||
@VisibleForTesting
|
||||
static final String REPORT_COUNTER_NAME = "reported";
|
||||
|
||||
private final ReportMessageDynamoDb reportMessageDynamoDb;
|
||||
private final MeterRegistry meterRegistry;
|
||||
|
||||
public ReportMessageManager(ReportMessageDynamoDb reportMessageDynamoDb, final MeterRegistry meterRegistry) {
|
||||
|
||||
this.reportMessageDynamoDb = reportMessageDynamoDb;
|
||||
this.meterRegistry = meterRegistry;
|
||||
}
|
||||
|
||||
public void store(String sourceNumber, UUID messageGuid) {
|
||||
|
||||
Objects.requireNonNull(sourceNumber);
|
||||
|
||||
reportMessageDynamoDb.store(hash(messageGuid, sourceNumber));
|
||||
}
|
||||
|
||||
public void report(String sourceNumber, UUID messageGuid) {
|
||||
|
||||
final boolean found = reportMessageDynamoDb.remove(hash(messageGuid, sourceNumber));
|
||||
|
||||
if (found) {
|
||||
Counter.builder(REPORT_COUNTER_NAME)
|
||||
.tag("countryCode", Util.getCountryCode(sourceNumber))
|
||||
.register(meterRegistry)
|
||||
.increment();
|
||||
}
|
||||
}
|
||||
|
||||
private byte[] hash(UUID messageGuid, String otherId) {
|
||||
final MessageDigest sha256;
|
||||
try {
|
||||
sha256 = MessageDigest.getInstance("SHA-256");
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
|
||||
sha256.update(UUIDUtil.toBytes(messageGuid));
|
||||
sha256.update(otherId.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
return sha256.digest();
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ import io.dropwizard.cli.EnvironmentCommand;
|
|||
import io.dropwizard.jdbi3.JdbiFactory;
|
||||
import io.dropwizard.setup.Environment;
|
||||
import io.lettuce.core.resource.ClientResources;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
@ -39,20 +40,22 @@ import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient;
|
|||
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
|
||||
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
|
||||
import org.whispersystems.textsecuregcm.storage.Accounts;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason;
|
||||
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesCache;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
|
||||
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
|
||||
import org.whispersystems.textsecuregcm.storage.Profiles;
|
||||
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ReservedUsernames;
|
||||
import org.whispersystems.textsecuregcm.storage.Usernames;
|
||||
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
|
||||
|
@ -142,8 +145,16 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
|
|||
.withRequestTimeout((int) configuration.getMigrationRetryAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
|
||||
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
|
||||
|
||||
AmazonDynamoDBClientBuilder reportMessageDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
|
||||
.standard()
|
||||
.withRegion(configuration.getReportMessageDynamoDbConfiguration().getRegion())
|
||||
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getReportMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
|
||||
.withRequestTimeout((int) configuration.getReportMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
|
||||
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
|
||||
|
||||
DynamoDB messageDynamoDb = new DynamoDB(clientBuilder.build());
|
||||
DynamoDB preKeysDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
|
||||
DynamoDB reportMessagesDynamoDb = new DynamoDB(reportMessageDynamoDbClientBuilder.build());
|
||||
|
||||
AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build();
|
||||
AmazonDynamoDBAsync accountsDynamoDbAsyncClient = accountsDynamoDbAsyncClientBuilder.build();
|
||||
|
@ -185,7 +196,9 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
|
|||
DirectoryQueue directoryQueue = new DirectoryQueue (configuration.getDirectoryConfiguration().getSqsConfiguration());
|
||||
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
|
||||
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
|
||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager);
|
||||
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb, configuration.getReportMessageDynamoDbConfiguration().getTableName());
|
||||
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, Metrics.globalRegistry);
|
||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager, reportMessageManager);
|
||||
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
|
||||
|
||||
for (String user: users) {
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
|
|||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||
|
||||
public class MessageControllerMetricsTest extends AbstractRedisClusterTest {
|
||||
|
||||
|
@ -40,6 +41,7 @@ public class MessageControllerMetricsTest extends AbstractRedisClusterTest {
|
|||
mock(ApnFallbackManager.class),
|
||||
mock(DynamicConfigurationManager.class),
|
||||
mock(RateLimitChallengeManager.class),
|
||||
mock(ReportMessageManager.class),
|
||||
getRedisCluster(),
|
||||
mock(ScheduledExecutorService.class));
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class MessagePersisterIntegrationTest extends AbstractRedisClusterTest {
|
|||
|
||||
notificationExecutorService = Executors.newSingleThreadExecutor();
|
||||
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), notificationExecutorService);
|
||||
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class));
|
||||
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class), mock(ReportMessageManager.class));
|
||||
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, PERSIST_DELAY);
|
||||
|
||||
account = mock(Account.class);
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
|
||||
import java.util.UUID;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
|
||||
|
||||
class MessagesManagerTest {
|
||||
|
||||
private final MessagesDynamoDb messagesDynamoDb = mock(MessagesDynamoDb.class);
|
||||
private final MessagesCache messagesCache = mock(MessagesCache.class);
|
||||
private final PushLatencyManager pushLatencyManager = mock(PushLatencyManager.class);
|
||||
private final ReportMessageManager reportMessageManager = mock(ReportMessageManager.class);
|
||||
|
||||
private final MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache,
|
||||
pushLatencyManager, reportMessageManager);
|
||||
|
||||
@Test
|
||||
void insert() {
|
||||
final String sourceNumber = "+12025551212";
|
||||
final Envelope message = Envelope.newBuilder()
|
||||
.setSource(sourceNumber)
|
||||
.setSourceUuid(UUID.randomUUID().toString())
|
||||
.build();
|
||||
|
||||
final UUID destinationUuid = UUID.randomUUID();
|
||||
|
||||
messagesManager.insert(destinationUuid, 1L, message);
|
||||
|
||||
verify(reportMessageManager).store(eq(sourceNumber), any(UUID.class));
|
||||
|
||||
final Envelope syncMessage = Envelope.newBuilder(message)
|
||||
.setSourceUuid(destinationUuid.toString())
|
||||
.build();
|
||||
|
||||
messagesManager.insert(destinationUuid, 1L, syncMessage);
|
||||
|
||||
verifyNoMoreInteractions(reportMessageManager);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertAll;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
|
||||
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
|
||||
import java.util.UUID;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
||||
|
||||
class ReportMessageDynamoDbTest {
|
||||
|
||||
private ReportMessageDynamoDb reportMessageDynamoDb;
|
||||
|
||||
private static final String TABLE_NAME = "report_message_test";
|
||||
|
||||
@RegisterExtension
|
||||
static DynamoDbExtension dynamoDbExtension = DynamoDbExtension.builder()
|
||||
.tableName(TABLE_NAME)
|
||||
.hashKey(ReportMessageDynamoDb.KEY_HASH)
|
||||
.attributeDefinition(new AttributeDefinition(ReportMessageDynamoDb.KEY_HASH, ScalarAttributeType.B))
|
||||
.build();
|
||||
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
this.reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbExtension.getDynamoDB(), TABLE_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStore() {
|
||||
|
||||
final byte[] hash1 = UUIDUtil.toBytes(UUID.randomUUID());
|
||||
final byte[] hash2 = UUIDUtil.toBytes(UUID.randomUUID());
|
||||
|
||||
assertAll("database should be empty",
|
||||
() -> assertFalse(reportMessageDynamoDb.remove(hash1)),
|
||||
() -> assertFalse(reportMessageDynamoDb.remove(hash2))
|
||||
);
|
||||
|
||||
reportMessageDynamoDb.store(hash1);
|
||||
reportMessageDynamoDb.store(hash2);
|
||||
|
||||
assertAll("both hashes should be found",
|
||||
() -> assertTrue(reportMessageDynamoDb.remove(hash1)),
|
||||
() -> assertTrue(reportMessageDynamoDb.remove(hash2))
|
||||
);
|
||||
|
||||
assertAll( "database should be empty",
|
||||
() -> assertFalse(reportMessageDynamoDb.remove(hash1)),
|
||||
() -> assertFalse(reportMessageDynamoDb.remove(hash2))
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
|
||||
import java.util.UUID;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class ReportMessageManagerTest {
|
||||
|
||||
private final ReportMessageDynamoDb reportMessageDynamoDb = mock(ReportMessageDynamoDb.class);
|
||||
private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
|
||||
|
||||
private final ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, meterRegistry);
|
||||
|
||||
@Test
|
||||
void testStore() {
|
||||
|
||||
final UUID messageGuid = UUID.randomUUID();
|
||||
final String number = "+15105551111";
|
||||
|
||||
assertThrows(NullPointerException.class, () -> reportMessageManager.store(null, messageGuid));
|
||||
|
||||
reportMessageManager.store(number, messageGuid);
|
||||
|
||||
verify(reportMessageDynamoDb).store(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReport() {
|
||||
final String sourceNumber = "+15105551111";
|
||||
final UUID messageGuid = UUID.randomUUID();
|
||||
|
||||
when(reportMessageDynamoDb.remove(any())).thenReturn(false);
|
||||
reportMessageManager.report(sourceNumber, messageGuid);
|
||||
|
||||
assertEquals(0, getCounterTotal(ReportMessageManager.REPORT_COUNTER_NAME));
|
||||
|
||||
when(reportMessageDynamoDb.remove(any())).thenReturn(true);
|
||||
reportMessageManager.report(sourceNumber, messageGuid);
|
||||
|
||||
assertEquals(1, getCounterTotal(ReportMessageManager.REPORT_COUNTER_NAME));
|
||||
}
|
||||
|
||||
private double getCounterTotal(final String counterName) {
|
||||
return meterRegistry.find(counterName).counters().stream()
|
||||
.map(Counter::count)
|
||||
.reduce(Double::sum)
|
||||
.orElse(0.0);
|
||||
}
|
||||
|
||||
}
|
|
@ -95,6 +95,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
|||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
|
||||
import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper;
|
||||
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
|
||||
|
@ -127,6 +128,7 @@ class MessageControllerTest {
|
|||
private static final ApnFallbackManager apnFallbackManager = mock(ApnFallbackManager.class);
|
||||
private static final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
|
||||
private static final RateLimitChallengeManager rateLimitChallengeManager = mock(RateLimitChallengeManager.class);
|
||||
private static final ReportMessageManager reportMessageManager = mock(ReportMessageManager.class);
|
||||
private static final FaultTolerantRedisCluster metricsCluster = RedisClusterHelper.buildMockRedisCluster(redisCommands);
|
||||
private static final ScheduledExecutorService receiptExecutor = mock(ScheduledExecutorService.class);
|
||||
|
||||
|
@ -139,7 +141,8 @@ class MessageControllerTest {
|
|||
.addProvider(new RateLimitChallengeExceptionMapper(rateLimitChallengeManager))
|
||||
.setTestContainerFactory(new GrizzlyWebTestContainerFactory())
|
||||
.addResource(new MessageController(rateLimiters, messageSender, receiptSender, accountsManager,
|
||||
messagesManager, unsealedSenderRateLimiter, apnFallbackManager, dynamicConfigurationManager, rateLimitChallengeManager, metricsCluster, receiptExecutor))
|
||||
messagesManager, unsealedSenderRateLimiter, apnFallbackManager, dynamicConfigurationManager,
|
||||
rateLimitChallengeManager, reportMessageManager, metricsCluster, receiptExecutor))
|
||||
.build();
|
||||
|
||||
@BeforeEach
|
||||
|
@ -198,6 +201,7 @@ class MessageControllerTest {
|
|||
apnFallbackManager,
|
||||
dynamicConfigurationManager,
|
||||
rateLimitChallengeManager,
|
||||
reportMessageManager,
|
||||
metricsCluster,
|
||||
receiptExecutor
|
||||
);
|
||||
|
@ -602,4 +606,22 @@ class MessageControllerTest {
|
|||
Arguments.of("fixtures/online_message_false_nested_property.json", false)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReportMessage() {
|
||||
|
||||
final String senderNumber = "+12125550001";
|
||||
final UUID messageGuid = UUID.randomUUID();
|
||||
|
||||
final Response response =
|
||||
resources.getJerseyTest()
|
||||
.target(String.format("/v1/messages/report/%s/%s", senderNumber, messageGuid))
|
||||
.request()
|
||||
.header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD))
|
||||
.post(null);
|
||||
|
||||
assertThat(response.getStatus(), is(equalTo(202)));
|
||||
|
||||
verify(reportMessageManager).report(senderNumber, messageGuid);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.whispersystems.textsecuregcm.storage.Device;
|
|||
import org.whispersystems.textsecuregcm.storage.MessagesCache;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||
import org.whispersystems.textsecuregcm.tests.util.MessagesDynamoDbRule;
|
||||
import org.whispersystems.websocket.WebSocketClient;
|
||||
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
|
||||
|
@ -59,6 +60,7 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
|
|||
private ExecutorService executorService;
|
||||
private MessagesDynamoDb messagesDynamoDb;
|
||||
private MessagesCache messagesCache;
|
||||
private ReportMessageManager reportMessageManager;
|
||||
private Account account;
|
||||
private Device device;
|
||||
private WebSocketClient webSocketClient;
|
||||
|
@ -75,6 +77,7 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
|
|||
executorService = Executors.newSingleThreadExecutor();
|
||||
messagesCache = new MessagesCache(getRedisCluster(), getRedisCluster(), executorService);
|
||||
messagesDynamoDb = new MessagesDynamoDb(messagesDynamoDbRule.getDynamoDB(), MessagesDynamoDbRule.TABLE_NAME, Duration.ofDays(7));
|
||||
reportMessageManager = mock(ReportMessageManager.class);
|
||||
account = mock(Account.class);
|
||||
device = mock(Device.class);
|
||||
webSocketClient = mock(WebSocketClient.class);
|
||||
|
@ -86,7 +89,7 @@ public class WebSocketConnectionIntegrationTest extends AbstractRedisClusterTest
|
|||
|
||||
webSocketConnection = new WebSocketConnection(
|
||||
mock(ReceiptSender.class),
|
||||
new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class)),
|
||||
new MessagesManager(messagesDynamoDb, messagesCache, mock(PushLatencyManager.class), reportMessageManager),
|
||||
account,
|
||||
device,
|
||||
webSocketClient,
|
||||
|
|
Loading…
Reference in New Issue