From eb6fe11da1bd44fbd4676c0451ab6eb437ef43b3 Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Wed, 24 Feb 2021 18:09:26 -0500 Subject: [PATCH] Add tools to decline messages from senders meeting specific conditions --- .../textsecuregcm/WhisperServerService.java | 3 +- .../DynamicMessageRateConfiguration.java | 43 +++++++++++++ .../controllers/MessageController.java | 64 ++++++++++++++++++- .../controllers/MessageControllerTest.java | 59 ++++++++++++++++- 4 files changed, 163 insertions(+), 6 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 8a248015b..fbdc1b019 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -308,6 +308,7 @@ public class WhisperServerService extends Application rateLimitedCountryCodes = Collections.emptySet(); + @JsonProperty + private Set rateLimitedHosts = Collections.emptySet(); + + @JsonProperty + private Duration responseDelay = Duration.ofNanos(1_200_000); + + @JsonProperty + private Duration responseDelayJitter = Duration.ofNanos(500_000); + + @JsonProperty + private Duration receiptDelay = Duration.ofMillis(1_200); + + @JsonProperty + private Duration receiptDelayJitter = Duration.ofMillis(800); + + @JsonProperty + private double receiptProbability = 0.82; + public boolean isEnforceUnsealedSenderRateLimit() { return enforceUnsealedSenderRateLimit; } @@ -25,4 +44,28 @@ public class DynamicMessageRateConfiguration { public Set getRateLimitedCountryCodes() { return rateLimitedCountryCodes; } + + public Set getRateLimitedHosts() { + return rateLimitedHosts; + } + + public Duration getResponseDelay() { + return responseDelay; + } + + public Duration getResponseDelayJitter() { + return responseDelayJitter; + } + + public Duration getReceiptDelay() { + return receiptDelay; + } + + public Duration getReceiptDelayJitter() { + return receiptDelayJitter; + } + + public double getReceiptProbability() { + return receiptProbability; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index acb22329e..c35ea8b4f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -19,12 +19,16 @@ import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import java.io.IOException; +import java.time.Duration; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.validation.Valid; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; @@ -43,6 +47,7 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier; import org.whispersystems.textsecuregcm.auth.Anonymous; import org.whispersystems.textsecuregcm.auth.OptionalAccess; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessageRateConfiguration; import org.whispersystems.textsecuregcm.entities.IncomingMessage; import org.whispersystems.textsecuregcm.entities.IncomingMessageList; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; @@ -91,12 +96,16 @@ public class MessageController { private final ApnFallbackManager apnFallbackManager; private final DynamicConfigurationManager dynamicConfigurationManager; private final FaultTolerantRedisCluster metricsCluster; + private final ScheduledExecutorService receiptExecutorService; + + private final Random random = new Random(); private static final String SENT_MESSAGE_COUNTER_NAME = name(MessageController.class, "sentMessages"); private static final String REJECT_UNSEALED_SENDER_COUNTER_NAME = name(MessageController.class, "rejectUnsealedSenderLimit"); private static final String INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME = name(MessageController.class, "internationalUnsealedSender"); private static final String UNSEALED_SENDER_ACCOUNT_AGE_DISTRIBUTION_NAME = name(MessageController.class, "unsealedSenderAccountAge"); private static final String UNSEALED_SENDER_WITHOUT_PUSH_TOKEN_COUNTER_NAME = name(MessageController.class, "unsealedSenderWithoutPushToken"); + private static final String DECLINED_DELIVERY_COUNTER = name(MessageController.class, "declinedDelivery"); private static final String CONTENT_SIZE_DISTRIBUTION_NAME = name(MessageController.class, "messageContentSize"); private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes"); @@ -115,7 +124,8 @@ public class MessageController { MessagesManager messagesManager, ApnFallbackManager apnFallbackManager, DynamicConfigurationManager dynamicConfigurationManager, - FaultTolerantRedisCluster metricsCluster) + FaultTolerantRedisCluster metricsCluster, + ScheduledExecutorService receiptExecutorService) { this.rateLimiters = rateLimiters; this.messageSender = messageSender; @@ -125,6 +135,7 @@ public class MessageController { this.apnFallbackManager = apnFallbackManager; this.dynamicConfigurationManager = dynamicConfigurationManager; this.metricsCluster = metricsCluster; + this.receiptExecutorService = receiptExecutorService; } @Timed @@ -135,6 +146,7 @@ public class MessageController { public Response sendMessage(@Auth Optional source, @HeaderParam(OptionalAccess.UNIDENTIFIED) Optional accessKey, @HeaderParam("User-Agent") String userAgent, + @HeaderParam("X-Forwarded-For") String forwardedFor, @PathParam("destination") AmbiguousIdentifier destinationName, @Valid IncomingMessageList messages) throws RateLimitExceededException @@ -234,9 +246,18 @@ public class MessageController { final String senderCountryCode = Util.getCountryCode(source.get().getNumber()); final String destinationCountryCode = Util.getCountryCode(destination.get().getNumber()); + final Device masterDevice = source.get().getMasterDevice().get(); if (!senderCountryCode.equals(destinationCountryCode)) { - Metrics.counter(INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME, SENDER_COUNTRY_TAG_NAME, Util.getCountryCode(source.get().getNumber())).increment(); + Metrics.counter(INTERNATIONAL_UNSEALED_SENDER_COUNTER_NAME, SENDER_COUNTRY_TAG_NAME, senderCountryCode).increment(); + + if (StringUtils.isAllBlank(masterDevice.getApnId(), masterDevice.getVoipApnId(), masterDevice.getGcmId()) || masterDevice.getUninstalledFeedbackTimestamp() > 0) { + if (dynamicConfigurationManager.getConfiguration().getMessageRateConfiguration().getRateLimitedCountryCodes().contains(senderCountryCode)) { + if (dynamicConfigurationManager.getConfiguration().getMessageRateConfiguration().getRateLimitedHosts().contains(forwardedFor)) { + return declineDelivery(messages, source.get(), destination.get()); + } + } + } } } @@ -282,6 +303,45 @@ public class MessageController { } } + private Response declineDelivery(final IncomingMessageList messages, final Account source, final Account destination) { + Metrics.counter(DECLINED_DELIVERY_COUNTER, SENDER_COUNTRY_TAG_NAME, Util.getCountryCode(source.getNumber())).increment(); + + final DynamicMessageRateConfiguration messageRateConfiguration = dynamicConfigurationManager.getConfiguration().getMessageRateConfiguration(); + + { + final long timestamp = System.currentTimeMillis(); + + for (final IncomingMessage message : messages.getMessages()) { + final long jitterNanos = random.nextInt((int) messageRateConfiguration.getReceiptDelayJitter().toNanos()); + final Duration receiptDelay = messageRateConfiguration.getReceiptDelay().plusNanos(jitterNanos); + + if (random.nextDouble() <= messageRateConfiguration.getReceiptProbability()) { + receiptExecutorService.schedule(() -> { + try { + receiptSender.sendReceipt(destination, source.getNumber(), timestamp); + } catch (final NoSuchUserException ignored) { + } + }, receiptDelay.toMillis(), TimeUnit.MILLISECONDS); + } + } + } + + { + Duration responseDelay = Duration.ZERO; + + for (int i = 0; i < messages.getMessages().size(); i++) { + final long jitterNanos = random.nextInt((int) messageRateConfiguration.getResponseDelayJitter().toNanos()); + + responseDelay = responseDelay.plus( + messageRateConfiguration.getResponseDelay()).plusNanos(jitterNanos); + } + + Util.sleep(responseDelay.toMillis()); + } + + return Response.ok(new SendMessageResponse(source.getEnabledDeviceCount() > 1)).build(); + } + @Timed @GET @Produces(MediaType.APPLICATION_JSON) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java index cfce99395..aabc992ee 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/MessageControllerTest.java @@ -13,11 +13,13 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -29,12 +31,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import io.dropwizard.auth.PolymorphicAuthValueFactoryProvider; import io.dropwizard.testing.junit.ResourceTestRule; +import java.time.Duration; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; @@ -48,10 +53,13 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier; import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccount; import org.whispersystems.textsecuregcm.auth.OptionalAccess; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessageRateConfiguration; import org.whispersystems.textsecuregcm.controllers.MessageController; import org.whispersystems.textsecuregcm.entities.IncomingMessageList; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; @@ -84,6 +92,9 @@ public class MessageControllerTest { private static final String MULTI_DEVICE_RECIPIENT = "+14152222222"; private static final UUID MULTI_DEVICE_UUID = UUID.randomUUID(); + private static final String INTERNATIONAL_RECIPIENT = "+61123456789"; + private static final UUID INTERNATIONAL_UUID = UUID.randomUUID(); + private final MessageSender messageSender = mock(MessageSender.class); private final ReceiptSender receiptSender = mock(ReceiptSender.class); private final AccountsManager accountsManager = mock(AccountsManager.class); @@ -94,6 +105,7 @@ public class MessageControllerTest { private final ApnFallbackManager apnFallbackManager = mock(ApnFallbackManager.class); private final DynamicConfigurationManager dynamicConfigurationManager = mock(DynamicConfigurationManager.class); private final FaultTolerantRedisCluster metricsCluster = mock(FaultTolerantRedisCluster.class); + private final ScheduledExecutorService receiptExecutor = mock(ScheduledExecutorService.class); private final ObjectMapper mapper = new ObjectMapper(); @@ -103,7 +115,7 @@ public class MessageControllerTest { .addProvider(new PolymorphicAuthValueFactoryProvider.Binder<>(ImmutableSet.of(Account.class, DisabledPermittedAccount.class))) .setTestContainerFactory(new GrizzlyWebTestContainerFactory()) .addResource(new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, - messagesManager, apnFallbackManager, dynamicConfigurationManager, metricsCluster)) + messagesManager, apnFallbackManager, dynamicConfigurationManager, metricsCluster, receiptExecutor)) .build(); @@ -123,18 +135,27 @@ public class MessageControllerTest { "isgcm", null, null, false, 444, null, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(31), System.currentTimeMillis(), "Test", 0, new Device.DeviceCapabilities(false, false, false, false, false, false))); }}; - Account singleDeviceAccount = new Account(SINGLE_DEVICE_RECIPIENT, SINGLE_DEVICE_UUID, singleDeviceList, "1234".getBytes()); - Account multiDeviceAccount = new Account(MULTI_DEVICE_RECIPIENT, MULTI_DEVICE_UUID, multiDeviceList, "1234".getBytes()); + Account singleDeviceAccount = new Account(SINGLE_DEVICE_RECIPIENT, SINGLE_DEVICE_UUID, singleDeviceList, "1234".getBytes()); + Account multiDeviceAccount = new Account(MULTI_DEVICE_RECIPIENT, MULTI_DEVICE_UUID, multiDeviceList, "1234".getBytes()); + Account internationalAccount = new Account(INTERNATIONAL_RECIPIENT, INTERNATIONAL_UUID, singleDeviceList, "1234".getBytes()); when(accountsManager.get(eq(SINGLE_DEVICE_RECIPIENT))).thenReturn(Optional.of(singleDeviceAccount)); when(accountsManager.get(argThat((ArgumentMatcher) identifier -> identifier != null && identifier.hasNumber() && identifier.getNumber().equals(SINGLE_DEVICE_RECIPIENT)))).thenReturn(Optional.of(singleDeviceAccount)); when(accountsManager.get(eq(MULTI_DEVICE_RECIPIENT))).thenReturn(Optional.of(multiDeviceAccount)); when(accountsManager.get(argThat((ArgumentMatcher) identifier -> identifier != null && identifier.hasNumber() && identifier.getNumber().equals(MULTI_DEVICE_RECIPIENT)))).thenReturn(Optional.of(multiDeviceAccount)); + when(accountsManager.get(INTERNATIONAL_RECIPIENT)).thenReturn(Optional.of(internationalAccount)); + when(accountsManager.get(argThat((ArgumentMatcher) identifier -> identifier != null && identifier.hasNumber() && identifier.getNumber().equals(INTERNATIONAL_RECIPIENT)))).thenReturn(Optional.of(internationalAccount)); when(rateLimiters.getMessagesLimiter()).thenReturn(rateLimiter); when(rateLimiters.getUnsealedSenderLimiter()).thenReturn(unsealedSenderLimiter); when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); + + when(receiptExecutor.schedule(any(Runnable.class), anyLong(), any())).thenAnswer( + (Answer>) invocation -> { + invocation.getArgument(0, Runnable.class).run(); + return mock(ScheduledFuture.class); + }); } @Test @@ -169,6 +190,38 @@ public class MessageControllerTest { assertTrue(captor.getValue().hasSourceDevice()); } + @Test + public synchronized void testInternationalUnsealedSenderFromRateLimitedHost() throws Exception { + final String senderHost = "10.0.0.1"; + + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + final DynamicMessageRateConfiguration messageRateConfiguration = mock(DynamicMessageRateConfiguration.class); + + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + when(dynamicConfiguration.getMessageRateConfiguration()).thenReturn(messageRateConfiguration); + when(messageRateConfiguration.getRateLimitedCountryCodes()).thenReturn(Set.of("1")); + when(messageRateConfiguration.getRateLimitedHosts()).thenReturn(Set.of(senderHost)); + when(messageRateConfiguration.getResponseDelay()).thenReturn(Duration.ofMillis(1)); + when(messageRateConfiguration.getResponseDelayJitter()).thenReturn(Duration.ofMillis(1)); + when(messageRateConfiguration.getReceiptDelay()).thenReturn(Duration.ofMillis(1)); + when(messageRateConfiguration.getReceiptDelayJitter()).thenReturn(Duration.ofMillis(1)); + when(messageRateConfiguration.getReceiptProbability()).thenReturn(1.0); + + Response response = + resources.getJerseyTest() + .target(String.format("/v1/messages/%s", INTERNATIONAL_RECIPIENT)) + .request() + .header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD)) + .header("X-Forwarded-For", senderHost) + .put(Entity.entity(mapper.readValue(jsonFixture("fixtures/current_message_single_device.json"), IncomingMessageList.class), + MediaType.APPLICATION_JSON_TYPE)); + + assertThat("Good Response", response.getStatus(), is(equalTo(200))); + + verify(messageSender, never()).sendMessage(any(), any(), any(), anyBoolean()); + verify(receiptSender).sendReceipt(any(), eq(AuthHelper.VALID_NUMBER), anyLong()); + } + @Test public synchronized void testSingleDeviceCurrentUnidentified() throws Exception { Response response =