Replace Twilio SDK with Java 11 HttpClient

This commit is contained in:
Moxie Marlinspike 2019-04-18 15:33:08 -07:00
parent 4121cae1d6
commit d3dcd39f61
15 changed files with 824 additions and 110 deletions

53
pom.xml
View File

@ -14,7 +14,7 @@
<properties>
<dropwizard.version>1.3.9</dropwizard.version>
<jackson.api.version>2.9.8</jackson.api.version>
<resilience4j.version>0.13.2</resilience4j.version>
<resilience4j.version>0.14.1</resilience4j.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
@ -65,13 +65,17 @@
<version>0.9.30</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>${resilience4j.version}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilience4j.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
@ -96,11 +100,6 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twilio.sdk</groupId>
<artifactId>twilio-java-sdk</artifactId>
<version>4.4.4</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
@ -149,6 +148,17 @@
<version>8.10.2</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
@ -180,16 +190,25 @@
<version>0.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
<version>2.3.1</version>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<version>2.23.2</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

View File

@ -7,6 +7,10 @@ import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.time.Duration;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
public class CircuitBreakerConfiguration {
@JsonProperty
@ -66,4 +70,13 @@ public class CircuitBreakerConfiguration {
public void setWaitDurationInOpenStateInSeconds(int seconds) {
this.waitDurationInOpenStateInSeconds = seconds;
}
public CircuitBreakerConfig toCircuitBreakerConfig() {
return CircuitBreakerConfig.custom()
.failureRateThreshold(getFailureRateThreshold())
.ringBufferSizeInHalfOpenState(getRingBufferSizeInHalfOpenState())
.waitDurationInOpenState(Duration.ofSeconds(getWaitDurationInOpenStateInSeconds()))
.ringBufferSizeInClosedState(getRingBufferSizeInClosedState())
.build();
}
}

View File

@ -0,0 +1,38 @@
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Min;
import java.time.Duration;
import io.github.resilience4j.retry.RetryConfig;
public class RetryConfiguration {
@JsonProperty
@Min(1)
private int maxAttempts = RetryConfig.DEFAULT_MAX_ATTEMPTS;
@JsonProperty
@Min(1)
private long waitDuration = RetryConfig.DEFAULT_WAIT_DURATION;
public int getMaxAttempts() {
return maxAttempts;
}
public long getWaitDuration() {
return waitDuration;
}
public RetryConfig toRetryConfig() {
return toRetryConfigBuilder().build();
}
public <T> RetryConfig.Builder<T> toRetryConfigBuilder() {
return RetryConfig.<T>custom()
.maxAttempts(getMaxAttempts())
.waitDuration(Duration.ofMillis(getWaitDuration()));
}
}

View File

@ -17,8 +17,10 @@
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import org.hibernate.validator.constraints.NotEmpty;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.util.List;
@ -43,23 +45,74 @@ public class TwilioConfiguration {
@JsonProperty
private String messagingServicesId;
@NotNull
@Valid
private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
@NotNull
@Valid
private RetryConfiguration retry = new RetryConfiguration();
public String getAccountId() {
return accountId;
}
@VisibleForTesting
public void setAccountId(String accountId) {
this.accountId = accountId;
}
public String getAccountToken() {
return accountToken;
}
@VisibleForTesting
public void setAccountToken(String accountToken) {
this.accountToken = accountToken;
}
public List<String> getNumbers() {
return numbers;
}
@VisibleForTesting
public void setNumbers(List<String> numbers) {
this.numbers = numbers;
}
public String getLocalDomain() {
return localDomain;
}
@VisibleForTesting
public void setLocalDomain(String localDomain) {
this.localDomain = localDomain;
}
public String getMessagingServicesId() {
return messagingServicesId;
}
@VisibleForTesting
public void setMessagingServicesId(String messagingServicesId) {
this.messagingServicesId = messagingServicesId;
}
public CircuitBreakerConfiguration getCircuitBreaker() {
return circuitBreaker;
}
@VisibleForTesting
public void setCircuitBreaker(CircuitBreakerConfiguration circuitBreaker) {
this.circuitBreaker = circuitBreaker;
}
public RetryConfiguration getRetry() {
return retry;
}
@VisibleForTesting
public void setRetry(RetryConfiguration retry) {
this.retry = retry;
}
}

View File

@ -133,7 +133,7 @@ public class AccountController {
@HeaderParam("Accept-Language") Optional<String> locale,
@QueryParam("client") Optional<String> client,
@QueryParam("captcha") Optional<String> captcha)
throws IOException, RateLimitExceededException
throws RateLimitExceededException
{
if (!Util.isValidNumber(number)) {
logger.info("Invalid number: " + number);

View File

@ -0,0 +1,137 @@
package org.whispersystems.textsecuregcm.http;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Constants;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
public class FaultTolerantHttpClient {
private final HttpClient httpClient;
private final ScheduledExecutorService retryExecutor;
private final Retry retry;
private final CircuitBreaker breaker;
public static Builder newBuilder() {
return new Builder();
}
private FaultTolerantHttpClient(String name, HttpClient httpClient, RetryConfiguration retryConfiguration, CircuitBreakerConfiguration circuitBreakerConfiguration) {
this.httpClient = httpClient;
this.retryExecutor = Executors.newSingleThreadScheduledExecutor();
this.breaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
CircuitBreakerUtil.registerMetrics(metricRegistry, breaker, FaultTolerantHttpClient.class);
if (retryConfiguration != null) {
RetryConfig retryConfig = retryConfiguration.<HttpResponse>toRetryConfigBuilder().retryOnResult(o -> o.statusCode() >= 500).build();
this.retry = Retry.of(name + "-retry", retryConfig);
CircuitBreakerUtil.registerMetrics(metricRegistry, retry, FaultTolerantHttpClient.class);
} else {
this.retry = null;
}
}
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, HttpResponse.BodyHandler<T> bodyHandler) {
Supplier<CompletionStage<HttpResponse<T>>> asyncRequest = sendAsync(httpClient, request, bodyHandler);
if (retry != null) {
return breaker.executeCompletionStage(retryableCompletionStage(asyncRequest)).toCompletableFuture();
} else {
return breaker.executeCompletionStage(asyncRequest).toCompletableFuture();
}
}
private <T> Supplier<CompletionStage<T>> retryableCompletionStage(Supplier<CompletionStage<T>> supplier) {
return () -> retry.executeCompletionStage(retryExecutor, supplier);
}
private <T> Supplier<CompletionStage<HttpResponse<T>>> sendAsync(HttpClient client, HttpRequest request, HttpResponse.BodyHandler<T> bodyHandler) {
return () -> client.sendAsync(request, bodyHandler);
}
public static class Builder {
private HttpClient.Version version = HttpClient.Version.HTTP_2;
private HttpClient.Redirect redirect = HttpClient.Redirect.NEVER;
private Duration connectTimeout = Duration.ofSeconds(10);
private String name;
private Executor executor;
private RetryConfiguration retryConfiguration;
private CircuitBreakerConfiguration circuitBreakerConfiguration;
private Builder() {}
public Builder withName(String name) {
this.name = name;
return this;
}
public Builder withVersion(HttpClient.Version version) {
this.version = version;
return this;
}
public Builder withRedirect(HttpClient.Redirect redirect) {
this.redirect = redirect;
return this;
}
public Builder withExecutor(Executor executor) {
this.executor = executor;
return this;
}
public Builder withConnectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}
public Builder withRetry(RetryConfiguration retryConfiguration) {
this.retryConfiguration = retryConfiguration;
return this;
}
public Builder withCircuitBreaker(CircuitBreakerConfiguration circuitBreakerConfiguration) {
this.circuitBreakerConfiguration = circuitBreakerConfiguration;
return this;
}
public FaultTolerantHttpClient build() {
if (this.circuitBreakerConfiguration == null || this.name == null || this.executor == null) {
throw new IllegalArgumentException("Must specify circuit breaker config, name, and executor");
}
HttpClient client = HttpClient.newBuilder()
.connectTimeout(connectTimeout)
.followRedirects(redirect)
.version(version)
.executor(executor)
.build();
return new FaultTolerantHttpClient(name, client, retryConfiguration, circuitBreakerConfiguration);
}
}
}

View File

@ -0,0 +1,26 @@
package org.whispersystems.textsecuregcm.http;
import java.net.URLEncoder;
import java.net.http.HttpRequest;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class FormDataBodyPublisher {
public static HttpRequest.BodyPublisher of(Map<String, String> data) {
StringBuilder builder = new StringBuilder();
for (Map.Entry<String, String> entry : data.entrySet()) {
if (builder.length() > 0) {
builder.append("&");
}
builder.append(URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8));
builder.append("=");
builder.append(URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8));
}
return HttpRequest.BodyPublishers.ofString(builder.toString());
}
}

View File

@ -35,16 +35,9 @@ public class ReplicatedJedisPool {
{
if (replicas.size() < 1) throw new IllegalArgumentException("There must be at least one replica");
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(circuitBreakerConfiguration.getFailureRateThreshold())
.ringBufferSizeInHalfOpenState(circuitBreakerConfiguration.getRingBufferSizeInHalfOpenState())
.waitDurationInOpenState(Duration.ofSeconds(circuitBreakerConfiguration.getWaitDurationInOpenStateInSeconds()))
.ringBufferSizeInClosedState(circuitBreakerConfiguration.getRingBufferSizeInClosedState())
.build();
CircuitBreaker masterBreaker = CircuitBreaker.of(String.format("%s-master", name), config);
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
CircuitBreakerConfig config = circuitBreakerConfiguration.toCircuitBreakerConfig();
CircuitBreaker masterBreaker = CircuitBreaker.of(String.format("%s-master", name), config);
CircuitBreakerUtil.registerMetrics(metricRegistry, masterBreaker, ReplicatedJedisPool.class);

View File

@ -17,11 +17,9 @@
package org.whispersystems.textsecuregcm.sms;
import com.twilio.sdk.TwilioRestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Optional;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@ -31,8 +29,6 @@ public class SmsSender {
static final String SMS_ANDROID_NG_VERIFICATION_TEXT = "<#> Your Signal verification code: %s\n\ndoDiFGKPO1r";
static final String SMS_VERIFICATION_TEXT = "Your Signal verification code: %s";
private final Logger logger = LoggerFactory.getLogger(SmsSender.class);
private final TwilioSmsSender twilioSender;
public SmsSender(TwilioSmsSender twilioSender)
@ -40,28 +36,16 @@ public class SmsSender {
this.twilioSender = twilioSender;
}
public void deliverSmsVerification(String destination, Optional<String> clientType, String verificationCode)
throws IOException
{
public void deliverSmsVerification(String destination, Optional<String> clientType, String verificationCode) {
// Fix up mexico numbers to 'mobile' format just for SMS delivery.
if (destination.startsWith("+52") && !destination.startsWith("+521")) {
destination = "+521" + destination.substring(3);
}
try {
twilioSender.deliverSmsVerification(destination, clientType, verificationCode);
} catch (TwilioRestException e) {
logger.info("Twilio SMS Failed: " + e.getErrorMessage());
}
twilioSender.deliverSmsVerification(destination, clientType, verificationCode);
}
public void deliverVoxVerification(String destination, String verificationCode, Optional<String> locale)
throws IOException
{
try {
twilioSender.deliverVoxVerification(destination, verificationCode, locale);
} catch (TwilioRestException e) {
logger.info("Twilio Vox Failed: " + e.getErrorMessage());
}
public void deliverVoxVerification(String destination, String verificationCode, Optional<String> locale) {
twilioSender.deliverVoxVerification(destination, verificationCode, locale);
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Copyright (C) 2013 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
@ -19,33 +19,45 @@ package org.whispersystems.textsecuregcm.sms;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.twilio.sdk.TwilioRestClient;
import com.twilio.sdk.TwilioRestException;
import com.twilio.sdk.resource.factory.CallFactory;
import com.twilio.sdk.resource.factory.MessageFactory;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicNameValuePair;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.TwilioConfiguration;
import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient;
import org.whispersystems.textsecuregcm.http.FormDataBodyPublisher;
import org.whispersystems.textsecuregcm.util.Base64;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.ExecutorUtils;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import static com.codahale.metrics.MetricRegistry.name;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class TwilioSmsSender {
private static final Logger logger = LoggerFactory.getLogger(TwilioSmsSender.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter smsMeter = metricRegistry.meter(name(getClass(), "sms", "delivered"));
private final Meter voxMeter = metricRegistry.meter(name(getClass(), "vox", "delivered"));
private final Meter priceMeter = metricRegistry.meter(name(getClass(), "price"));
private final String accountId;
private final String accountToken;
@ -54,73 +66,183 @@ public class TwilioSmsSender {
private final String localDomain;
private final Random random;
public TwilioSmsSender(TwilioConfiguration config) {
this.accountId = config.getAccountId ();
this.accountToken = config.getAccountToken();
this.numbers = new ArrayList<>(config.getNumbers());
this.localDomain = config.getLocalDomain();
this.messagingServicesId = config.getMessagingServicesId();
private final FaultTolerantHttpClient httpClient;
private final URI smsUri;
private final URI voxUri;
@VisibleForTesting
public TwilioSmsSender(String baseUri, TwilioConfiguration twilioConfiguration) {
Executor executor = ExecutorUtils.newFixedThreadBoundedQueueExecutor(10, 100);
this.accountId = twilioConfiguration.getAccountId();
this.accountToken = twilioConfiguration.getAccountToken();
this.numbers = new ArrayList<>(twilioConfiguration.getNumbers());
this.localDomain = twilioConfiguration.getLocalDomain();
this.messagingServicesId = twilioConfiguration.getMessagingServicesId();
this.random = new Random(System.currentTimeMillis());
this.smsUri = URI.create(baseUri + "/2010-04-01/Accounts/" + accountId + "/Messages.json");
this.voxUri = URI.create(baseUri + "/2010-04-01/Accounts/" + accountId + "/Calls.json" );
this.httpClient = FaultTolerantHttpClient.newBuilder()
.withCircuitBreaker(twilioConfiguration.getCircuitBreaker())
.withRetry(twilioConfiguration.getRetry())
.withVersion(HttpClient.Version.HTTP_2)
.withConnectTimeout(Duration.ofSeconds(10))
.withRedirect(HttpClient.Redirect.NEVER)
.withExecutor(executor)
.withName("twilio")
.build();
}
public void deliverSmsVerification(String destination, Optional<String> clientType, String verificationCode)
throws IOException, TwilioRestException
{
TwilioRestClient client = new TwilioRestClient(accountId, accountToken);
MessageFactory messageFactory = client.getAccount().getMessageFactory();
List<NameValuePair> messageParams = new LinkedList<>();
messageParams.add(new BasicNameValuePair("To", destination));
public TwilioSmsSender(TwilioConfiguration twilioConfiguration) {
this("https://api.twilio.com", twilioConfiguration);
}
public CompletableFuture<Boolean> deliverSmsVerification(String destination, Optional<String> clientType, String verificationCode) {
Map<String, String> requestParameters = new HashMap<>();
requestParameters.put("To", destination);
if (Util.isEmpty(messagingServicesId)) {
messageParams.add(new BasicNameValuePair("From", getRandom(random, numbers)));
requestParameters.put("From", getRandom(random, numbers));
} else {
messageParams.add(new BasicNameValuePair("MessagingServiceSid", messagingServicesId));
requestParameters.put("MessagingServiceSid", messagingServicesId);
}
if ("ios".equals(clientType.orElse(null))) {
messageParams.add(new BasicNameValuePair("Body", String.format(SmsSender.SMS_IOS_VERIFICATION_TEXT, verificationCode, verificationCode)));
requestParameters.put("Body", String.format(SmsSender.SMS_IOS_VERIFICATION_TEXT, verificationCode, verificationCode));
} else if ("android-ng".equals(clientType.orElse(null))) {
messageParams.add(new BasicNameValuePair("Body", String.format(SmsSender.SMS_ANDROID_NG_VERIFICATION_TEXT, verificationCode)));
requestParameters.put("Body", String.format(SmsSender.SMS_ANDROID_NG_VERIFICATION_TEXT, verificationCode));
} else {
messageParams.add(new BasicNameValuePair("Body", String.format(SmsSender.SMS_VERIFICATION_TEXT, verificationCode)));
}
try {
messageFactory.create(messageParams);
} catch (RuntimeException damnYouTwilio) {
throw new IOException(damnYouTwilio);
requestParameters.put("Body", String.format(SmsSender.SMS_VERIFICATION_TEXT, verificationCode));
}
HttpRequest request = HttpRequest.newBuilder()
.uri(smsUri)
.POST(FormDataBodyPublisher.of(requestParameters))
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Authorization", "Basic " + Base64.encodeBytes((accountId + ":" + accountToken).getBytes()))
.build();
smsMeter.mark();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(this::parseResponse)
.handle(this::processResponse);
}
public void deliverVoxVerification(String destination, String verificationCode, Optional<String> locale)
throws IOException, TwilioRestException
{
public CompletableFuture<Boolean> deliverVoxVerification(String destination, String verificationCode, Optional<String> locale) {
String url = "https://" + localDomain + "/v1/voice/description/" + verificationCode;
if (locale.isPresent()) {
url += "?l=" + locale.get();
}
TwilioRestClient client = new TwilioRestClient(accountId, accountToken);
CallFactory callFactory = client.getAccount().getCallFactory();
Map<String, String> callParams = new HashMap<>();
callParams.put("To", destination);
callParams.put("From", getRandom(random, numbers));
callParams.put("Url", url);
Map<String, String> requestParameters = new HashMap<>();
requestParameters.put("Url", url);
requestParameters.put("To", destination);
requestParameters.put("From", getRandom(random, numbers));
try {
callFactory.create(callParams);
} catch (RuntimeException damnYouTwilio) {
throw new IOException(damnYouTwilio);
}
HttpRequest request = HttpRequest.newBuilder()
.uri(voxUri)
.POST(FormDataBodyPublisher.of(requestParameters))
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Authorization", "Basic " + Base64.encodeBytes((accountId + ":" + accountToken).getBytes()))
.build();
voxMeter.mark();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(this::parseResponse)
.handle(this::processResponse);
}
private String getRandom(Random random, ArrayList<String> elements) {
return elements.get(random.nextInt(elements.size()));
}
private boolean processResponse(TwilioResponse response, Throwable throwable) {
if (response != null && response.isSuccess()) {
priceMeter.mark((long)(response.successResponse.price * 1000));
return true;
} else if (response != null && response.isFailure()) {
logger.info("Twilio request failed: " + response.failureResponse.status + ", " + response.failureResponse.message);
return false;
} else if (throwable != null) {
logger.info("Twilio request failed", throwable);
return false;
} else {
logger.warn("No response or throwable!");
return false;
}
}
private TwilioResponse parseResponse(HttpResponse<String> response) {
ObjectMapper mapper = SystemMapper.getMapper();
if (response.statusCode() >= 200 && response.statusCode() < 300) {
if ("application/json".equals(response.headers().firstValue("Content-Type").orElse(null))) {
return new TwilioResponse(TwilioResponse.TwilioSuccessResponse.fromBody(mapper, response.body()));
} else {
return new TwilioResponse(new TwilioResponse.TwilioSuccessResponse());
}
}
if ("application/json".equals(response.headers().firstValue("Content-Type").orElse(null))) {
return new TwilioResponse(TwilioResponse.TwilioFailureResponse.fromBody(mapper, response.body()));
} else {
return new TwilioResponse(new TwilioResponse.TwilioFailureResponse());
}
}
public static class TwilioResponse {
private TwilioSuccessResponse successResponse;
private TwilioFailureResponse failureResponse;
TwilioResponse(TwilioSuccessResponse successResponse) {
this.successResponse = successResponse;
}
TwilioResponse(TwilioFailureResponse failureResponse) {
this.failureResponse = failureResponse;
}
boolean isSuccess() {
return successResponse != null;
}
boolean isFailure() {
return failureResponse != null;
}
private static class TwilioSuccessResponse {
@JsonProperty
private double price;
static TwilioSuccessResponse fromBody(ObjectMapper mapper, String body) {
try {
return mapper.readValue(body, TwilioSuccessResponse.class);
} catch (IOException e) {
logger.warn("Error parsing twilio success response: " + e);
return new TwilioSuccessResponse();
}
}
}
private static class TwilioFailureResponse {
@JsonProperty
private int status;
@JsonProperty
private String message;
static TwilioFailureResponse fromBody(ObjectMapper mapper, String body) {
try {
return mapper.readValue(body, TwilioFailureResponse.class);
} catch (IOException e) {
logger.warn("Error parsing twilio success response: " + e);
return new TwilioFailureResponse();
}
}
}
}
}

View File

@ -1,18 +1,15 @@
package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.jdbi.v3.core.Jdbi;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Constants;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
public class FaultTolerantDatabase {
@ -20,19 +17,12 @@ public class FaultTolerantDatabase {
private final CircuitBreaker circuitBreaker;
public FaultTolerantDatabase(String name, Jdbi database, CircuitBreakerConfiguration circuitBreakerConfiguration) {
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(circuitBreakerConfiguration.getFailureRateThreshold())
.ringBufferSizeInHalfOpenState(circuitBreakerConfiguration.getRingBufferSizeInHalfOpenState())
.waitDurationInOpenState(Duration.ofSeconds(circuitBreakerConfiguration.getWaitDurationInOpenStateInSeconds()))
.ringBufferSizeInClosedState(circuitBreakerConfiguration.getRingBufferSizeInClosedState())
.build();
this.database = database;
this.circuitBreaker = CircuitBreaker.of(name, config);
this.circuitBreaker = CircuitBreaker.of(name, circuitBreakerConfiguration.toCircuitBreakerConfig());
CircuitBreakerUtil.registerMetrics(metricRegistry, circuitBreaker, FaultTolerantDatabase.class);
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
circuitBreaker,
FaultTolerantDatabase.class);
}
public void use(Consumer<Jdbi> consumer) {

View File

@ -5,6 +5,8 @@ import com.codahale.metrics.MetricRegistry;
import static com.codahale.metrics.MetricRegistry.name;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.AsyncRetry;
import io.github.resilience4j.retry.Retry;
public class CircuitBreakerUtil {
@ -20,4 +22,16 @@ public class CircuitBreakerUtil {
circuitBreaker.getEventPublisher().onCallNotPermitted(event -> unpermittedMeter.mark());
}
public static void registerMetrics(MetricRegistry metricRegistry, Retry retry, Class<?> clazz) {
Meter successMeter = metricRegistry.meter(name(clazz, retry.getName(), "success" ));
Meter retryMeter = metricRegistry.meter(name(clazz, retry.getName(), "retry" ));
Meter errorMeter = metricRegistry.meter(name(clazz, retry.getName(), "error" ));
Meter ignoredErrorMeter = metricRegistry.meter(name(clazz, retry.getName(), "ignored_error"));
retry.getEventPublisher().onSuccess(event -> successMeter.mark());
retry.getEventPublisher().onRetry(event -> retryMeter.mark());
retry.getEventPublisher().onError(event -> errorMeter.mark());
retry.getEventPublisher().onIgnoredError(event -> ignoredErrorMeter.mark());
}
}

View File

@ -0,0 +1,21 @@
package org.whispersystems.textsecuregcm.util;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExecutorUtils {
public static Executor newFixedThreadBoundedQueueExecutor(int threadCount, int queueSize) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(threadCount, threadCount,
Long.MAX_VALUE, TimeUnit.NANOSECONDS,
new ArrayBlockingQueue<>(queueSize),
new ThreadPoolExecutor.AbortPolicy());
executor.prestartAllCoreThreads();
return executor;
}
}

View File

@ -0,0 +1,150 @@
package org.whispersystems.sms;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import org.junit.Rule;
import org.junit.Test;
import org.whispersystems.textsecuregcm.configuration.TwilioConfiguration;
import org.whispersystems.textsecuregcm.sms.TwilioSmsSender;
import java.util.List;
import java.util.Optional;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
public class TwilioSmsSenderTest {
private static final String ACCOUNT_ID = "test_account_id";
private static final String ACCOUNT_TOKEN = "test_account_token";
private static final List<String> NUMBERS = List.of("+14151111111", "+14152222222");
private static final String MESSAGING_SERVICES_ID = "test_messaging_services_id";
private static final String LOCAL_DOMAIN = "test.com";
@Rule
public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort().dynamicHttpsPort());
@Test
public void testSendSms() {
wireMockRule.stubFor(post(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Messages.json"))
.withBasicAuth(ACCOUNT_ID, ACCOUNT_TOKEN)
.willReturn(aResponse()
.withHeader("Content-Type", "application/json")
.withBody("{\"price\": -0.00750, \"status\": \"sent\"}")));
TwilioConfiguration configuration = new TwilioConfiguration();
configuration.setAccountId(ACCOUNT_ID);
configuration.setAccountToken(ACCOUNT_TOKEN);
configuration.setNumbers(NUMBERS);
configuration.setMessagingServicesId(MESSAGING_SERVICES_ID);
configuration.setLocalDomain(LOCAL_DOMAIN);
TwilioSmsSender sender = new TwilioSmsSender("http://localhost:" + wireMockRule.port(), configuration);
boolean success = sender.deliverSmsVerification("+14153333333", Optional.of("android-ng"), "123-456").join();
assertThat(success).isTrue();
verify(1, postRequestedFor(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Messages.json"))
.withHeader("Content-Type", equalTo("application/x-www-form-urlencoded"))
.withRequestBody(equalTo("MessagingServiceSid=test_messaging_services_id&To=%2B14153333333&Body=%3C%23%3E+Your+Signal+verification+code%3A+123-456%0A%0AdoDiFGKPO1r")));
}
@Test
public void testSendVox() {
wireMockRule.stubFor(post(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Calls.json"))
.withBasicAuth(ACCOUNT_ID, ACCOUNT_TOKEN)
.willReturn(aResponse()
.withHeader("Content-Type", "application/json")
.withBody("{\"price\": -0.00750, \"status\": \"completed\"}")));
TwilioConfiguration configuration = new TwilioConfiguration();
configuration.setAccountId(ACCOUNT_ID);
configuration.setAccountToken(ACCOUNT_TOKEN);
configuration.setNumbers(NUMBERS);
configuration.setMessagingServicesId(MESSAGING_SERVICES_ID);
configuration.setLocalDomain(LOCAL_DOMAIN);
TwilioSmsSender sender = new TwilioSmsSender("http://localhost:" + wireMockRule.port(), configuration);
boolean success = sender.deliverVoxVerification("+14153333333", "123-456", Optional.of("en_US")).join();
assertThat(success).isTrue();
verify(1, postRequestedFor(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Calls.json"))
.withHeader("Content-Type", equalTo("application/x-www-form-urlencoded"))
.withRequestBody(matching("To=%2B14153333333&From=%2B1415(1111111|2222222)&Url=https%3A%2F%2Ftest.com%2Fv1%2Fvoice%2Fdescription%2F123-456%3Fl%3Den_US")));
}
@Test
public void testSendSmsFiveHundered() {
wireMockRule.stubFor(post(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Messages.json"))
.withBasicAuth(ACCOUNT_ID, ACCOUNT_TOKEN)
.willReturn(aResponse()
.withStatus(500)
.withHeader("Content-Type", "application/json")
.withBody("{\"message\": \"Server error!\"}")));
TwilioConfiguration configuration = new TwilioConfiguration();
configuration.setAccountId(ACCOUNT_ID);
configuration.setAccountToken(ACCOUNT_TOKEN);
configuration.setNumbers(NUMBERS);
configuration.setMessagingServicesId(MESSAGING_SERVICES_ID);
configuration.setLocalDomain(LOCAL_DOMAIN);
TwilioSmsSender sender = new TwilioSmsSender("http://localhost:" + wireMockRule.port(), configuration);
boolean success = sender.deliverSmsVerification("+14153333333", Optional.of("android-ng"), "123-456").join();
assertThat(success).isFalse();
verify(3, postRequestedFor(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Messages.json"))
.withHeader("Content-Type", equalTo("application/x-www-form-urlencoded"))
.withRequestBody(equalTo("MessagingServiceSid=test_messaging_services_id&To=%2B14153333333&Body=%3C%23%3E+Your+Signal+verification+code%3A+123-456%0A%0AdoDiFGKPO1r")));
}
@Test
public void testSendVoxFiveHundred() {
wireMockRule.stubFor(post(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Calls.json"))
.withBasicAuth(ACCOUNT_ID, ACCOUNT_TOKEN)
.willReturn(aResponse()
.withStatus(500)
.withHeader("Content-Type", "application/json")
.withBody("{\"message\": \"Server error!\"}")));
TwilioConfiguration configuration = new TwilioConfiguration();
configuration.setAccountId(ACCOUNT_ID);
configuration.setAccountToken(ACCOUNT_TOKEN);
configuration.setNumbers(NUMBERS);
configuration.setMessagingServicesId(MESSAGING_SERVICES_ID);
configuration.setLocalDomain(LOCAL_DOMAIN);
TwilioSmsSender sender = new TwilioSmsSender("http://localhost:" + wireMockRule.port(), configuration);
boolean success = sender.deliverVoxVerification("+14153333333", "123-456", Optional.of("en_US")).join();
assertThat(success).isFalse();
verify(3, postRequestedFor(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Calls.json"))
.withHeader("Content-Type", equalTo("application/x-www-form-urlencoded"))
.withRequestBody(matching("To=%2B14153333333&From=%2B1415(1111111|2222222)&Url=https%3A%2F%2Ftest.com%2Fv1%2Fvoice%2Fdescription%2F123-456%3Fl%3Den_US")));
}
@Test
public void testSendSmsNetworkFailure() {
TwilioConfiguration configuration = new TwilioConfiguration();
configuration.setAccountId(ACCOUNT_ID);
configuration.setAccountToken(ACCOUNT_TOKEN);
configuration.setNumbers(NUMBERS);
configuration.setMessagingServicesId(MESSAGING_SERVICES_ID);
configuration.setLocalDomain(LOCAL_DOMAIN);
TwilioSmsSender sender = new TwilioSmsSender("http://localhost:" + 39873, configuration);
boolean success = sender.deliverSmsVerification("+14153333333", Optional.of("android-ng"), "123-456").join();
assertThat(success).isFalse();
}
}

View File

@ -0,0 +1,154 @@
package org.whispersystems.textsecuregcm.tests.http;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import org.junit.Rule;
import org.junit.Test;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executors;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
public class FaultTolerantHttpClientTest {
@Rule
public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort().dynamicHttpsPort());
@Test
public void testSimpleGet() {
wireMockRule.stubFor(get(urlEqualTo("/ping"))
.willReturn(aResponse()
.withHeader("Content-Type", "text/plain")
.withBody("Pong!")));
FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder()
.withCircuitBreaker(new CircuitBreakerConfiguration())
.withRetry(new RetryConfiguration())
.withExecutor(Executors.newSingleThreadExecutor())
.withName("test")
.withVersion(HttpClient.Version.HTTP_2)
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + wireMockRule.port() + "/ping"))
.GET()
.build();
HttpResponse<String> response = client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join();
assertThat(response.statusCode()).isEqualTo(200);
assertThat(response.body()).isEqualTo("Pong!");
verify(1, getRequestedFor(urlEqualTo("/ping")));
}
@Test
public void testRetryGet() {
wireMockRule.stubFor(get(urlEqualTo("/failure"))
.willReturn(aResponse()
.withStatus(500)
.withHeader("Content-Type", "text/plain")
.withBody("Pong!")));
FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder()
.withCircuitBreaker(new CircuitBreakerConfiguration())
.withRetry(new RetryConfiguration())
.withExecutor(Executors.newSingleThreadExecutor())
.withName("test")
.withVersion(HttpClient.Version.HTTP_2)
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + wireMockRule.port() + "/failure"))
.GET()
.build();
HttpResponse<String> response = client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join();
assertThat(response.statusCode()).isEqualTo(500);
assertThat(response.body()).isEqualTo("Pong!");
verify(3, getRequestedFor(urlEqualTo("/failure")));
}
@Test
public void testNetworkFailureCircuitBreaker() throws InterruptedException {
CircuitBreakerConfiguration circuitBreakerConfiguration = new CircuitBreakerConfiguration();
circuitBreakerConfiguration.setRingBufferSizeInClosedState(2);
circuitBreakerConfiguration.setRingBufferSizeInHalfOpenState(1);
circuitBreakerConfiguration.setFailureRateThreshold(50);
circuitBreakerConfiguration.setWaitDurationInOpenStateInSeconds(1);
FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder()
.withCircuitBreaker(circuitBreakerConfiguration)
.withRetry(new RetryConfiguration())
.withExecutor(Executors.newSingleThreadExecutor())
.withName("test")
.withVersion(HttpClient.Version.HTTP_2)
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + 39873 + "/failure"))
.GET()
.build();
try {
client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join();
throw new AssertionError("Should have failed!");
} catch (CompletionException e) {
assertThat(e.getCause()).isInstanceOf(IOException.class);
// good
}
try {
client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join();
throw new AssertionError("Should have failed!");
} catch (CompletionException e) {
assertThat(e.getCause()).isInstanceOf(IOException.class);
// good
}
try {
client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join();
throw new AssertionError("Should have failed!");
} catch (CompletionException e) {
assertThat(e.getCause()).isInstanceOf(CircuitBreakerOpenException.class);
// good
}
Thread.sleep(1001);
try {
client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join();
throw new AssertionError("Should have failed!");
} catch (CompletionException e) {
assertThat(e.getCause()).isInstanceOf(IOException.class);
// good
}
try {
client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join();
throw new AssertionError("Should have failed!");
} catch (CompletionException e) {
assertThat(e.getCause()).isInstanceOf(CircuitBreakerOpenException.class);
// good
}
}
}