Add machinery to allow a percentage of message sends to succeed.
This commit is contained in:
parent
4e7ace3b48
commit
a57ce1dd17
|
@ -380,7 +380,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
AttachmentControllerV2 attachmentControllerV2 = new AttachmentControllerV2(rateLimiters, config.getAwsAttachmentsConfiguration().getAccessKey(), config.getAwsAttachmentsConfiguration().getAccessSecret(), config.getAwsAttachmentsConfiguration().getRegion(), config.getAwsAttachmentsConfiguration().getBucket());
|
AttachmentControllerV2 attachmentControllerV2 = new AttachmentControllerV2(rateLimiters, config.getAwsAttachmentsConfiguration().getAccessKey(), config.getAwsAttachmentsConfiguration().getAccessSecret(), config.getAwsAttachmentsConfiguration().getRegion(), config.getAwsAttachmentsConfiguration().getBucket());
|
||||||
AttachmentControllerV3 attachmentControllerV3 = new AttachmentControllerV3(rateLimiters, config.getGcpAttachmentsConfiguration().getDomain(), config.getGcpAttachmentsConfiguration().getEmail(), config.getGcpAttachmentsConfiguration().getMaxSizeInBytes(), config.getGcpAttachmentsConfiguration().getPathPrefix(), config.getGcpAttachmentsConfiguration().getRsaSigningKey());
|
AttachmentControllerV3 attachmentControllerV3 = new AttachmentControllerV3(rateLimiters, config.getGcpAttachmentsConfiguration().getDomain(), config.getGcpAttachmentsConfiguration().getEmail(), config.getGcpAttachmentsConfiguration().getMaxSizeInBytes(), config.getGcpAttachmentsConfiguration().getPathPrefix(), config.getGcpAttachmentsConfiguration().getRsaSigningKey());
|
||||||
KeysController keysController = new KeysController(rateLimiters, keys, accountsManager, directoryQueue);
|
KeysController keysController = new KeysController(rateLimiters, keys, accountsManager, directoryQueue);
|
||||||
MessageController messageController = new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, messagesManager, apnFallbackManager);
|
MessageController messageController = new MessageController(rateLimiters, messageSender, receiptSender, accountsManager, messagesManager, apnFallbackManager, featureFlagsManager);
|
||||||
ProfileController profileController = new ProfileController(rateLimiters, accountsManager, profilesManager, usernamesManager, cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, config.getCdnConfiguration().getBucket(), zkProfileOperations, isZkEnabled);
|
ProfileController profileController = new ProfileController(rateLimiters, accountsManager, profilesManager, usernamesManager, cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, config.getCdnConfiguration().getBucket(), zkProfileOperations, isZkEnabled);
|
||||||
StickerController stickerController = new StickerController(rateLimiters, config.getCdnConfiguration().getAccessKey(), config.getCdnConfiguration().getAccessSecret(), config.getCdnConfiguration().getRegion(), config.getCdnConfiguration().getBucket());
|
StickerController stickerController = new StickerController(rateLimiters, config.getCdnConfiguration().getAccessKey(), config.getCdnConfiguration().getAccessSecret(), config.getCdnConfiguration().getRegion(), config.getCdnConfiguration().getBucket());
|
||||||
RemoteConfigController remoteConfigController = new RemoteConfigController(remoteConfigsManager, config.getRemoteConfigConfiguration().getAuthorizedTokens(), config.getRemoteConfigConfiguration().getGlobalConfig());
|
RemoteConfigController remoteConfigController = new RemoteConfigController(remoteConfigsManager, config.getRemoteConfigConfiguration().getAuthorizedTokens(), config.getRemoteConfigConfiguration().getGlobalConfig());
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.Device;
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
import org.whispersystems.textsecuregcm.util.Base64;
|
import org.whispersystems.textsecuregcm.util.Base64;
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
|
@ -61,6 +62,7 @@ import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@ -85,6 +87,9 @@ public class MessageController {
|
||||||
private final MessagesManager messagesManager;
|
private final MessagesManager messagesManager;
|
||||||
private final ApnFallbackManager apnFallbackManager;
|
private final ApnFallbackManager apnFallbackManager;
|
||||||
|
|
||||||
|
private final FeatureFlagsManager featureFlagsManager;
|
||||||
|
private final Random random = new Random();
|
||||||
|
|
||||||
private static final String CONTENT_SIZE_DISTRIBUTION_NAME = name(MessageController.class, "messageContentSize");
|
private static final String CONTENT_SIZE_DISTRIBUTION_NAME = name(MessageController.class, "messageContentSize");
|
||||||
private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes");
|
private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes");
|
||||||
|
|
||||||
|
@ -95,7 +100,8 @@ public class MessageController {
|
||||||
ReceiptSender receiptSender,
|
ReceiptSender receiptSender,
|
||||||
AccountsManager accountsManager,
|
AccountsManager accountsManager,
|
||||||
MessagesManager messagesManager,
|
MessagesManager messagesManager,
|
||||||
ApnFallbackManager apnFallbackManager)
|
ApnFallbackManager apnFallbackManager,
|
||||||
|
FeatureFlagsManager featureFlagsManager)
|
||||||
{
|
{
|
||||||
this.rateLimiters = rateLimiters;
|
this.rateLimiters = rateLimiters;
|
||||||
this.messageSender = messageSender;
|
this.messageSender = messageSender;
|
||||||
|
@ -103,6 +109,7 @@ public class MessageController {
|
||||||
this.accountsManager = accountsManager;
|
this.accountsManager = accountsManager;
|
||||||
this.messagesManager = messagesManager;
|
this.messagesManager = messagesManager;
|
||||||
this.apnFallbackManager = apnFallbackManager;
|
this.apnFallbackManager = apnFallbackManager;
|
||||||
|
this.featureFlagsManager = featureFlagsManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Timed
|
@Timed
|
||||||
|
@ -117,77 +124,105 @@ public class MessageController {
|
||||||
@Valid IncomingMessageList messages)
|
@Valid IncomingMessageList messages)
|
||||||
throws RateLimitExceededException
|
throws RateLimitExceededException
|
||||||
{
|
{
|
||||||
return Response.status(503).build();
|
if (random.nextDouble() <= getSuccessPercentage()) {
|
||||||
/* if (!source.isPresent() && !accessKey.isPresent()) {
|
if (!source.isPresent() && !accessKey.isPresent()) {
|
||||||
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
|
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
|
||||||
}
|
|
||||||
|
|
||||||
if (source.isPresent() && !source.get().isFor(destinationName)) {
|
|
||||||
rateLimiters.getMessagesLimiter().validate(source.get().getNumber() + "__" + destinationName);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (source.isPresent() && !source.get().isFor(destinationName)) {
|
|
||||||
identifiedMeter.mark();
|
|
||||||
} else if (!source.isPresent()) {
|
|
||||||
unidentifiedMeter.mark();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (final IncomingMessage message : messages.getMessages()) {
|
|
||||||
int contentLength = 0;
|
|
||||||
|
|
||||||
if (!Util.isEmpty(message.getContent())) {
|
|
||||||
contentLength += message.getContent().length();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!Util.isEmpty(message.getBody())) {
|
if (source.isPresent() && !source.get().isFor(destinationName)) {
|
||||||
contentLength += message.getBody().length();
|
rateLimiters.getMessagesLimiter().validate(source.get().getNumber() + "__" + destinationName);
|
||||||
}
|
}
|
||||||
|
|
||||||
Metrics.summary(CONTENT_SIZE_DISTRIBUTION_NAME, UserAgentTagUtil.getUserAgentTags(userAgent)).record(contentLength);
|
if (source.isPresent() && !source.get().isFor(destinationName)) {
|
||||||
|
identifiedMeter.mark();
|
||||||
if (contentLength > MAX_MESSAGE_SIZE) {
|
} else if (!source.isPresent()) {
|
||||||
// TODO Reject the request
|
unidentifiedMeter.mark();
|
||||||
rejectOversizeMessageMeter.mark();
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
for (final IncomingMessage message : messages.getMessages()) {
|
||||||
boolean isSyncMessage = source.isPresent() && source.get().isFor(destinationName);
|
int contentLength = 0;
|
||||||
|
|
||||||
Optional<Account> destination;
|
if (!Util.isEmpty(message.getContent())) {
|
||||||
|
contentLength += message.getContent().length();
|
||||||
|
}
|
||||||
|
|
||||||
if (!isSyncMessage) destination = accountsManager.get(destinationName);
|
if (!Util.isEmpty(message.getBody())) {
|
||||||
else destination = source;
|
contentLength += message.getBody().length();
|
||||||
|
}
|
||||||
|
|
||||||
OptionalAccess.verify(source, accessKey, destination);
|
Metrics.summary(CONTENT_SIZE_DISTRIBUTION_NAME, UserAgentTagUtil.getUserAgentTags(userAgent)).record(contentLength);
|
||||||
assert(destination.isPresent());
|
|
||||||
|
|
||||||
validateCompleteDeviceList(destination.get(), messages.getMessages(), isSyncMessage);
|
if (contentLength > MAX_MESSAGE_SIZE) {
|
||||||
validateRegistrationIds(destination.get(), messages.getMessages());
|
// TODO Reject the request
|
||||||
|
rejectOversizeMessageMeter.mark();
|
||||||
for (IncomingMessage incomingMessage : messages.getMessages()) {
|
|
||||||
Optional<Device> destinationDevice = destination.get().getDevice(incomingMessage.getDestinationDeviceId());
|
|
||||||
|
|
||||||
if (destinationDevice.isPresent()) {
|
|
||||||
sendMessage(source, destination.get(), destinationDevice.get(), messages.getTimestamp(), messages.isOnline(), incomingMessage);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new SendMessageResponse(!isSyncMessage && source.isPresent() && source.get().getEnabledDeviceCount() > 1);
|
try {
|
||||||
} catch (NoSuchUserException e) {
|
boolean isSyncMessage = source.isPresent() && source.get().isFor(destinationName);
|
||||||
throw new WebApplicationException(Response.status(404).build());
|
|
||||||
} catch (MismatchedDevicesException e) {
|
Optional<Account> destination;
|
||||||
throw new WebApplicationException(Response.status(409)
|
|
||||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
if (!isSyncMessage) destination = accountsManager.get(destinationName);
|
||||||
.entity(new MismatchedDevices(e.getMissingDevices(),
|
else destination = source;
|
||||||
e.getExtraDevices()))
|
|
||||||
.build());
|
OptionalAccess.verify(source, accessKey, destination);
|
||||||
} catch (StaleDevicesException e) {
|
assert(destination.isPresent());
|
||||||
throw new WebApplicationException(Response.status(410)
|
|
||||||
.type(MediaType.APPLICATION_JSON)
|
validateCompleteDeviceList(destination.get(), messages.getMessages(), isSyncMessage);
|
||||||
.entity(new StaleDevices(e.getStaleDevices()))
|
validateRegistrationIds(destination.get(), messages.getMessages());
|
||||||
.build());
|
|
||||||
} */
|
for (IncomingMessage incomingMessage : messages.getMessages()) {
|
||||||
|
Optional<Device> destinationDevice = destination.get().getDevice(incomingMessage.getDestinationDeviceId());
|
||||||
|
|
||||||
|
if (destinationDevice.isPresent()) {
|
||||||
|
sendMessage(source, destination.get(), destinationDevice.get(), messages.getTimestamp(), messages.isOnline(), incomingMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Response.ok(new SendMessageResponse(!isSyncMessage && source.isPresent() && source.get().getEnabledDeviceCount() > 1)).build();
|
||||||
|
} catch (NoSuchUserException e) {
|
||||||
|
throw new WebApplicationException(Response.status(404).build());
|
||||||
|
} catch (MismatchedDevicesException e) {
|
||||||
|
throw new WebApplicationException(Response.status(409)
|
||||||
|
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||||
|
.entity(new MismatchedDevices(e.getMissingDevices(),
|
||||||
|
e.getExtraDevices()))
|
||||||
|
.build());
|
||||||
|
} catch (StaleDevicesException e) {
|
||||||
|
throw new WebApplicationException(Response.status(410)
|
||||||
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
.entity(new StaleDevices(e.getStaleDevices()))
|
||||||
|
.build());
|
||||||
|
} } else {
|
||||||
|
return Response.status(503).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private double getSuccessPercentage() {
|
||||||
|
if (featureFlagsManager.isFeatureFlagActive("SEND_MESSAGE_1_PERCENT")) {
|
||||||
|
return 0.01;
|
||||||
|
} else if (featureFlagsManager.isFeatureFlagActive("SEND_MESSAGE_2_PERCENT")) {
|
||||||
|
return 0.02;
|
||||||
|
} else if (featureFlagsManager.isFeatureFlagActive("SEND_MESSAGE_4_PERCENT")) {
|
||||||
|
return 0.04;
|
||||||
|
} else if (featureFlagsManager.isFeatureFlagActive("SEND_MESSAGE_8_PERCENT")) {
|
||||||
|
return 0.08;
|
||||||
|
} else if (featureFlagsManager.isFeatureFlagActive("SEND_MESSAGE_16_PERCENT")) {
|
||||||
|
return 0.16;
|
||||||
|
} else if (featureFlagsManager.isFeatureFlagActive("SEND_MESSAGE_32_PERCENT")) {
|
||||||
|
return 0.32;
|
||||||
|
} else if (featureFlagsManager.isFeatureFlagActive("SEND_MESSAGE_48_PERCENT")) {
|
||||||
|
return 0.48;
|
||||||
|
} else if (featureFlagsManager.isFeatureFlagActive("SEND_MESSAGE_64_PERCENT")) {
|
||||||
|
return 0.64;
|
||||||
|
} else if (featureFlagsManager.isFeatureFlagActive("SEND_MESSAGE_80_PERCENT")) {
|
||||||
|
return 0.80;
|
||||||
|
} else if (featureFlagsManager.isFeatureFlagActive("SEND_MESSAGE_100_PERCENT")) {
|
||||||
|
return 1.0d;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Timed
|
@Timed
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.whispersystems.textsecuregcm.push.ReceiptSender;
|
||||||
import org.whispersystems.textsecuregcm.storage.Account;
|
import org.whispersystems.textsecuregcm.storage.Account;
|
||||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.Device;
|
import org.whispersystems.textsecuregcm.storage.Device;
|
||||||
|
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
|
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
|
||||||
import org.whispersystems.textsecuregcm.util.Base64;
|
import org.whispersystems.textsecuregcm.util.Base64;
|
||||||
|
@ -86,6 +87,7 @@ public class MessageControllerTest {
|
||||||
private final RateLimiters rateLimiters = mock(RateLimiters.class);
|
private final RateLimiters rateLimiters = mock(RateLimiters.class);
|
||||||
private final RateLimiter rateLimiter = mock(RateLimiter.class);
|
private final RateLimiter rateLimiter = mock(RateLimiter.class);
|
||||||
private final ApnFallbackManager apnFallbackManager = mock(ApnFallbackManager.class);
|
private final ApnFallbackManager apnFallbackManager = mock(ApnFallbackManager.class);
|
||||||
|
private final FeatureFlagsManager featureFlagsManager = mock(FeatureFlagsManager.class);
|
||||||
|
|
||||||
private final ObjectMapper mapper = new ObjectMapper();
|
private final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
@ -95,7 +97,7 @@ public class MessageControllerTest {
|
||||||
.addProvider(new PolymorphicAuthValueFactoryProvider.Binder<>(ImmutableSet.of(Account.class, DisabledPermittedAccount.class)))
|
.addProvider(new PolymorphicAuthValueFactoryProvider.Binder<>(ImmutableSet.of(Account.class, DisabledPermittedAccount.class)))
|
||||||
.setTestContainerFactory(new GrizzlyWebTestContainerFactory())
|
.setTestContainerFactory(new GrizzlyWebTestContainerFactory())
|
||||||
.addResource(new MessageController(rateLimiters, messageSender, receiptSender, accountsManager,
|
.addResource(new MessageController(rateLimiters, messageSender, receiptSender, accountsManager,
|
||||||
messagesManager, apnFallbackManager))
|
messagesManager, apnFallbackManager, featureFlagsManager))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue