Add tools to decline messages from senders meeting specific conditions

This commit is contained in:
Jon Chambers 2021-02-24 18:09:26 -05:00 committed by GitHub
parent 823025f3b3
commit eb6fe11da1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 163 additions and 6 deletions

View File

@ -308,6 +308,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue);
ScheduledExecutorService recurringJobExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(2).build();
ScheduledExecutorService declinedMessageReceiptExecutor = environment.lifecycle().scheduledExecutorService(name(getClass(), "declined-receipt-%d")).threads(2).build();
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(16).workQueue(keyspaceNotificationDispatchQueue).build();
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build();
ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build();
@ -402,7 +403,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AttachmentControllerV2 attachmentControllerV2 = new AttachmentControllerV2(rateLimiters, config.getAwsAttachmentsConfiguration().getAccessKey(), config.getAwsAttachmentsConfiguration().getAccessSecret(), config.getAwsAttachmentsConfiguration().getRegion(), config.getAwsAttachmentsConfiguration().getBucket());
AttachmentControllerV3 attachmentControllerV3 = new AttachmentControllerV3(rateLimiters, config.getGcpAttachmentsConfiguration().getDomain(), config.getGcpAttachmentsConfiguration().getEmail(), config.getGcpAttachmentsConfiguration().getMaxSizeInBytes(), config.getGcpAttachmentsConfiguration().getPathPrefix(), config.getGcpAttachmentsConfiguration().getRsaSigningKey());
KeysController keysController = new KeysController(rateLimiters, keysDynamoDb, accountsManager, directoryQueue);
MessageController messageController = new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, messagesManager, apnFallbackManager, dynamicConfigurationManager, metricsCluster);
MessageController messageController = new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, messagesManager, apnFallbackManager, dynamicConfigurationManager, metricsCluster, declinedMessageReceiptExecutor);
ProfileController profileController = new ProfileController(rateLimiters, accountsManager, profilesManager, usernamesManager, cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, config.getCdnConfiguration().getBucket(), zkProfileOperations, isZkEnabled);
StickerController stickerController = new StickerController(rateLimiters, config.getCdnConfiguration().getAccessKey(), config.getCdnConfiguration().getAccessSecret(), config.getCdnConfiguration().getRegion(), config.getCdnConfiguration().getBucket());
RemoteConfigController remoteConfigController = new RemoteConfigController(remoteConfigsManager, config.getRemoteConfigConfiguration().getAuthorizedTokens(), config.getRemoteConfigConfiguration().getGlobalConfig());

View File

@ -7,6 +7,7 @@ package org.whispersystems.textsecuregcm.configuration.dynamic;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
@ -18,6 +19,24 @@ public class DynamicMessageRateConfiguration {
@JsonProperty
private Set<String> rateLimitedCountryCodes = Collections.emptySet();
@JsonProperty
private Set<String> rateLimitedHosts = Collections.emptySet();
@JsonProperty
private Duration responseDelay = Duration.ofNanos(1_200_000);
@JsonProperty
private Duration responseDelayJitter = Duration.ofNanos(500_000);
@JsonProperty
private Duration receiptDelay = Duration.ofMillis(1_200);
@JsonProperty
private Duration receiptDelayJitter = Duration.ofMillis(800);
@JsonProperty
private double receiptProbability = 0.82;
public boolean isEnforceUnsealedSenderRateLimit() {
return enforceUnsealedSenderRateLimit;
}
@ -25,4 +44,28 @@ public class DynamicMessageRateConfiguration {
public Set<String> getRateLimitedCountryCodes() {
return rateLimitedCountryCodes;
}
public Set<String> getRateLimitedHosts() {
return rateLimitedHosts;
}
public Duration getResponseDelay() {
return responseDelay;
}
public Duration getResponseDelayJitter() {
return responseDelayJitter;
}
public Duration getReceiptDelay() {
return receiptDelay;
}
public Duration getReceiptDelayJitter() {
return receiptDelayJitter;
}
public double getReceiptProbability() {
return receiptProbability;
}
}

View File

@ -19,12 +19,16 @@ import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
@ -43,6 +47,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import org.whispersystems.textsecuregcm.auth.Anonymous;
import org.whispersystems.textsecuregcm.auth.OptionalAccess;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessageRateConfiguration;
import org.whispersystems.textsecuregcm.entities.IncomingMessage;
import org.whispersystems.textsecuregcm.entities.IncomingMessageList;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
@ -91,12 +96,16 @@ public class MessageController {
private final ApnFallbackManager apnFallbackManager;
private final DynamicConfigurationManager dynamicConfigurationManager;
private final FaultTolerantRedisCluster metricsCluster;
private final ScheduledExecutorService receiptExecutorService;
private final Random random = new Random();
private static final String SENT_MESSAGE_COUNTER_NAME = name(MessageController.class, "sentMessages");
private static final String REJECT_UNSEALED_SENDER_COUNTER_NAME = name(MessageController.class, "rejectUnsealedSenderLimit");
private static final String INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME = name(MessageController.class, "internationalUnsealedSender");
private static final String UNSEALED_SENDER_ACCOUNT_AGE_DISTRIBUTION_NAME = name(MessageController.class, "unsealedSenderAccountAge");
private static final String UNSEALED_SENDER_WITHOUT_PUSH_TOKEN_COUNTER_NAME = name(MessageController.class, "unsealedSenderWithoutPushToken");
private static final String DECLINED_DELIVERY_COUNTER = name(MessageController.class, "declinedDelivery");
private static final String CONTENT_SIZE_DISTRIBUTION_NAME = name(MessageController.class, "messageContentSize");
private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes");
@ -115,7 +124,8 @@ public class MessageController {
MessagesManager messagesManager,
ApnFallbackManager apnFallbackManager,
DynamicConfigurationManager dynamicConfigurationManager,
FaultTolerantRedisCluster metricsCluster)
FaultTolerantRedisCluster metricsCluster,
ScheduledExecutorService receiptExecutorService)
{
this.rateLimiters = rateLimiters;
this.messageSender = messageSender;
@ -125,6 +135,7 @@ public class MessageController {
this.apnFallbackManager = apnFallbackManager;
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.metricsCluster = metricsCluster;
this.receiptExecutorService = receiptExecutorService;
}
@Timed
@ -135,6 +146,7 @@ public class MessageController {
public Response sendMessage(@Auth Optional<Account> source,
@HeaderParam(OptionalAccess.UNIDENTIFIED) Optional<Anonymous> accessKey,
@HeaderParam("User-Agent") String userAgent,
@HeaderParam("X-Forwarded-For") String forwardedFor,
@PathParam("destination") AmbiguousIdentifier destinationName,
@Valid IncomingMessageList messages)
throws RateLimitExceededException
@ -234,9 +246,18 @@ public class MessageController {
final String senderCountryCode = Util.getCountryCode(source.get().getNumber());
final String destinationCountryCode = Util.getCountryCode(destination.get().getNumber());
final Device masterDevice = source.get().getMasterDevice().get();
if (!senderCountryCode.equals(destinationCountryCode)) {
Metrics.counter(INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME, SENDER_COUNTRY_TAG_NAME, Util.getCountryCode(source.get().getNumber())).increment();
Metrics.counter(INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME, SENDER_COUNTRY_TAG_NAME, senderCountryCode).increment();
if (StringUtils.isAllBlank(masterDevice.getApnId(), masterDevice.getVoipApnId(), masterDevice.getGcmId()) || masterDevice.getUninstalledFeedbackTimestamp() > 0) {
if (dynamicConfigurationManager.getConfiguration().getMessageRateConfiguration().getRateLimitedCountryCodes().contains(senderCountryCode)) {
if (dynamicConfigurationManager.getConfiguration().getMessageRateConfiguration().getRateLimitedHosts().contains(forwardedFor)) {
return declineDelivery(messages, source.get(), destination.get());
}
}
}
}
}
@ -282,6 +303,45 @@ public class MessageController {
}
}
private Response declineDelivery(final IncomingMessageList messages, final Account source, final Account destination) {
Metrics.counter(DECLINED_DELIVERY_COUNTER, SENDER_COUNTRY_TAG_NAME, Util.getCountryCode(source.getNumber())).increment();
final DynamicMessageRateConfiguration messageRateConfiguration = dynamicConfigurationManager.getConfiguration().getMessageRateConfiguration();
{
final long timestamp = System.currentTimeMillis();
for (final IncomingMessage message : messages.getMessages()) {
final long jitterNanos = random.nextInt((int) messageRateConfiguration.getReceiptDelayJitter().toNanos());
final Duration receiptDelay = messageRateConfiguration.getReceiptDelay().plusNanos(jitterNanos);
if (random.nextDouble() <= messageRateConfiguration.getReceiptProbability()) {
receiptExecutorService.schedule(() -> {
try {
receiptSender.sendReceipt(destination, source.getNumber(), timestamp);
} catch (final NoSuchUserException ignored) {
}
}, receiptDelay.toMillis(), TimeUnit.MILLISECONDS);
}
}
}
{
Duration responseDelay = Duration.ZERO;
for (int i = 0; i < messages.getMessages().size(); i++) {
final long jitterNanos = random.nextInt((int) messageRateConfiguration.getResponseDelayJitter().toNanos());
responseDelay = responseDelay.plus(
messageRateConfiguration.getResponseDelay()).plusNanos(jitterNanos);
}
Util.sleep(responseDelay.toMillis());
}
return Response.ok(new SendMessageResponse(source.getEnabledDeviceCount() > 1)).build();
}
@Timed
@GET
@Produces(MediaType.APPLICATION_JSON)

View File

@ -13,11 +13,13 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -29,12 +31,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import io.dropwizard.auth.PolymorphicAuthValueFactoryProvider;
import io.dropwizard.testing.junit.ResourceTestRule;
import java.time.Duration;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
@ -48,10 +53,13 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccount;
import org.whispersystems.textsecuregcm.auth.OptionalAccess;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessageRateConfiguration;
import org.whispersystems.textsecuregcm.controllers.MessageController;
import org.whispersystems.textsecuregcm.entities.IncomingMessageList;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
@ -84,6 +92,9 @@ public class MessageControllerTest {
private static final String MULTI_DEVICE_RECIPIENT = "+14152222222";
private static final UUID MULTI_DEVICE_UUID = UUID.randomUUID();
private static final String INTERNATIONAL_RECIPIENT = "+61123456789";
private static final UUID INTERNATIONAL_UUID = UUID.randomUUID();
private final MessageSender messageSender = mock(MessageSender.class);
private final ReceiptSender receiptSender = mock(ReceiptSender.class);
private final AccountsManager accountsManager = mock(AccountsManager.class);
@ -94,6 +105,7 @@ public class MessageControllerTest {
private final ApnFallbackManager apnFallbackManager = mock(ApnFallbackManager.class);
private final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
private final FaultTolerantRedisCluster metricsCluster = mock(FaultTolerantRedisCluster.class);
private final ScheduledExecutorService receiptExecutor = mock(ScheduledExecutorService.class);
private final ObjectMapper mapper = new ObjectMapper();
@ -103,7 +115,7 @@ public class MessageControllerTest {
.addProvider(new PolymorphicAuthValueFactoryProvider.Binder<>(ImmutableSet.of(Account.class, DisabledPermittedAccount.class)))
.setTestContainerFactory(new GrizzlyWebTestContainerFactory())
.addResource(new MessageController(rateLimiters, messageSender, receiptSender, accountsManager,
messagesManager, apnFallbackManager, dynamicConfigurationManager, metricsCluster))
messagesManager, apnFallbackManager, dynamicConfigurationManager, metricsCluster, receiptExecutor))
.build();
@ -123,18 +135,27 @@ public class MessageControllerTest {
"isgcm", null, null, false, 444, null, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(31), System.currentTimeMillis(), "Test", 0, new Device.DeviceCapabilities(false, false, false, false, false, false)));
}};
Account singleDeviceAccount = new Account(SINGLE_DEVICE_RECIPIENT, SINGLE_DEVICE_UUID, singleDeviceList, "1234".getBytes());
Account multiDeviceAccount = new Account(MULTI_DEVICE_RECIPIENT, MULTI_DEVICE_UUID, multiDeviceList, "1234".getBytes());
Account singleDeviceAccount = new Account(SINGLE_DEVICE_RECIPIENT, SINGLE_DEVICE_UUID, singleDeviceList, "1234".getBytes());
Account multiDeviceAccount = new Account(MULTI_DEVICE_RECIPIENT, MULTI_DEVICE_UUID, multiDeviceList, "1234".getBytes());
Account internationalAccount = new Account(INTERNATIONAL_RECIPIENT, INTERNATIONAL_UUID, singleDeviceList, "1234".getBytes());
when(accountsManager.get(eq(SINGLE_DEVICE_RECIPIENT))).thenReturn(Optional.of(singleDeviceAccount));
when(accountsManager.get(argThat((ArgumentMatcher<AmbiguousIdentifier>) identifier -> identifier != null && identifier.hasNumber() && identifier.getNumber().equals(SINGLE_DEVICE_RECIPIENT)))).thenReturn(Optional.of(singleDeviceAccount));
when(accountsManager.get(eq(MULTI_DEVICE_RECIPIENT))).thenReturn(Optional.of(multiDeviceAccount));
when(accountsManager.get(argThat((ArgumentMatcher<AmbiguousIdentifier>) identifier -> identifier != null && identifier.hasNumber() && identifier.getNumber().equals(MULTI_DEVICE_RECIPIENT)))).thenReturn(Optional.of(multiDeviceAccount));
when(accountsManager.get(INTERNATIONAL_RECIPIENT)).thenReturn(Optional.of(internationalAccount));
when(accountsManager.get(argThat((ArgumentMatcher<AmbiguousIdentifier>) identifier -> identifier != null && identifier.hasNumber() && identifier.getNumber().equals(INTERNATIONAL_RECIPIENT)))).thenReturn(Optional.of(internationalAccount));
when(rateLimiters.getMessagesLimiter()).thenReturn(rateLimiter);
when(rateLimiters.getUnsealedSenderLimiter()).thenReturn(unsealedSenderLimiter);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
when(receiptExecutor.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(
(Answer<ScheduledFuture<?>>) invocation -> {
invocation.getArgument(0, Runnable.class).run();
return mock(ScheduledFuture.class);
});
}
@Test
@ -169,6 +190,38 @@ public class MessageControllerTest {
assertTrue(captor.getValue().hasSourceDevice());
}
@Test
public synchronized void testInternationalUnsealedSenderFromRateLimitedHost() throws Exception {
final String senderHost = "10.0.0.1";
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
final DynamicMessageRateConfiguration messageRateConfiguration = mock(DynamicMessageRateConfiguration.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
when(dynamicConfiguration.getMessageRateConfiguration()).thenReturn(messageRateConfiguration);
when(messageRateConfiguration.getRateLimitedCountryCodes()).thenReturn(Set.of("1"));
when(messageRateConfiguration.getRateLimitedHosts()).thenReturn(Set.of(senderHost));
when(messageRateConfiguration.getResponseDelay()).thenReturn(Duration.ofMillis(1));
when(messageRateConfiguration.getResponseDelayJitter()).thenReturn(Duration.ofMillis(1));
when(messageRateConfiguration.getReceiptDelay()).thenReturn(Duration.ofMillis(1));
when(messageRateConfiguration.getReceiptDelayJitter()).thenReturn(Duration.ofMillis(1));
when(messageRateConfiguration.getReceiptProbability()).thenReturn(1.0);
Response response =
resources.getJerseyTest()
.target(String.format("/v1/messages/%s", INTERNATIONAL_RECIPIENT))
.request()
.header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD))
.header("X-Forwarded-For", senderHost)
.put(Entity.entity(mapper.readValue(jsonFixture("fixtures/current_message_single_device.json"), IncomingMessageList.class),
MediaType.APPLICATION_JSON_TYPE));
assertThat("Good Response", response.getStatus(), is(equalTo(200)));
verify(messageSender, never()).sendMessage(any(), any(), any(), anyBoolean());
verify(receiptSender).sendReceipt(any(), eq(AuthHelper.VALID_NUMBER), anyLong());
}
@Test
public synchronized void testSingleDeviceCurrentUnidentified() throws Exception {
Response response =