diff --git a/pom.xml b/pom.xml index f10014ab5..2fc17427f 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ 1.3.9 2.9.8 - 0.13.2 + 0.14.1 UTF-8 @@ -65,13 +65,17 @@ 0.9.30 - io.github.resilience4j resilience4j-circuitbreaker ${resilience4j.version} - + + io.github.resilience4j + resilience4j-retry + ${resilience4j.version} + + com.amazonaws @@ -96,11 +100,6 @@ jar compile - - com.twilio.sdk - twilio-java-sdk - 4.4.4 - org.postgresql @@ -149,6 +148,17 @@ 8.10.2 + + javax.xml.bind + jaxb-api + 2.3.1 + + + org.glassfish.jaxb + jaxb-runtime + 2.3.1 + + org.glassfish.jersey.test-framework.providers @@ -180,16 +190,25 @@ 0.13.1 test - - javax.xml.bind - jaxb-api - 2.3.1 - - - org.glassfish.jaxb - jaxb-runtime - 2.3.1 + com.github.tomakehurst + wiremock-jre8 + 2.23.2 + test + + + com.google.guava + guava + + + org.eclipse.jetty + jetty-server + + + org.eclipse.jetty + jetty-servlet + + diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/CircuitBreakerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/CircuitBreakerConfiguration.java index 1a28b5392..57455cb49 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/configuration/CircuitBreakerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/CircuitBreakerConfiguration.java @@ -7,6 +7,10 @@ import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import java.time.Duration; + +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; + public class CircuitBreakerConfiguration { @JsonProperty @@ -66,4 +70,13 @@ public class CircuitBreakerConfiguration { public void setWaitDurationInOpenStateInSeconds(int seconds) { this.waitDurationInOpenStateInSeconds = seconds; } + + public CircuitBreakerConfig toCircuitBreakerConfig() { + return CircuitBreakerConfig.custom() + .failureRateThreshold(getFailureRateThreshold()) + .ringBufferSizeInHalfOpenState(getRingBufferSizeInHalfOpenState()) + .waitDurationInOpenState(Duration.ofSeconds(getWaitDurationInOpenStateInSeconds())) + .ringBufferSizeInClosedState(getRingBufferSizeInClosedState()) + .build(); + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/RetryConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/RetryConfiguration.java new file mode 100644 index 000000000..a5dc689ad --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/RetryConfiguration.java @@ -0,0 +1,38 @@ +package org.whispersystems.textsecuregcm.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.Min; + +import java.time.Duration; + +import io.github.resilience4j.retry.RetryConfig; + +public class RetryConfiguration { + + @JsonProperty + @Min(1) + private int maxAttempts = RetryConfig.DEFAULT_MAX_ATTEMPTS; + + @JsonProperty + @Min(1) + private long waitDuration = RetryConfig.DEFAULT_WAIT_DURATION; + + public int getMaxAttempts() { + return maxAttempts; + } + + public long getWaitDuration() { + return waitDuration; + } + + public RetryConfig toRetryConfig() { + return toRetryConfigBuilder().build(); + } + + public RetryConfig.Builder toRetryConfigBuilder() { + return RetryConfig.custom() + .maxAttempts(getMaxAttempts()) + .waitDuration(Duration.ofMillis(getWaitDuration())); + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/configuration/TwilioConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/configuration/TwilioConfiguration.java index 7707d1515..197ff271e 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/configuration/TwilioConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/configuration/TwilioConfiguration.java @@ -17,8 +17,10 @@ package org.whispersystems.textsecuregcm.configuration; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import org.hibernate.validator.constraints.NotEmpty; +import javax.validation.Valid; import javax.validation.constraints.NotNull; import java.util.List; @@ -43,23 +45,74 @@ public class TwilioConfiguration { @JsonProperty private String messagingServicesId; + @NotNull + @Valid + private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration(); + + @NotNull + @Valid + private RetryConfiguration retry = new RetryConfiguration(); + public String getAccountId() { return accountId; } + @VisibleForTesting + public void setAccountId(String accountId) { + this.accountId = accountId; + } + public String getAccountToken() { return accountToken; } + @VisibleForTesting + public void setAccountToken(String accountToken) { + this.accountToken = accountToken; + } + public List getNumbers() { return numbers; } + @VisibleForTesting + public void setNumbers(List numbers) { + this.numbers = numbers; + } + public String getLocalDomain() { return localDomain; } + @VisibleForTesting + public void setLocalDomain(String localDomain) { + this.localDomain = localDomain; + } + public String getMessagingServicesId() { return messagingServicesId; } + + @VisibleForTesting + public void setMessagingServicesId(String messagingServicesId) { + this.messagingServicesId = messagingServicesId; + } + + public CircuitBreakerConfiguration getCircuitBreaker() { + return circuitBreaker; + } + + @VisibleForTesting + public void setCircuitBreaker(CircuitBreakerConfiguration circuitBreaker) { + this.circuitBreaker = circuitBreaker; + } + + public RetryConfiguration getRetry() { + return retry; + } + + @VisibleForTesting + public void setRetry(RetryConfiguration retry) { + this.retry = retry; + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java b/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java index 281deffb7..042172e53 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java +++ b/src/main/java/org/whispersystems/textsecuregcm/controllers/AccountController.java @@ -133,7 +133,7 @@ public class AccountController { @HeaderParam("Accept-Language") Optional locale, @QueryParam("client") Optional client, @QueryParam("captcha") Optional captcha) - throws IOException, RateLimitExceededException + throws RateLimitExceededException { if (!Util.isValidNumber(number)) { logger.info("Invalid number: " + number); diff --git a/src/main/java/org/whispersystems/textsecuregcm/http/FaultTolerantHttpClient.java b/src/main/java/org/whispersystems/textsecuregcm/http/FaultTolerantHttpClient.java new file mode 100644 index 000000000..89cdd8ff4 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/http/FaultTolerantHttpClient.java @@ -0,0 +1,137 @@ +package org.whispersystems.textsecuregcm.http; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; +import org.whispersystems.textsecuregcm.util.Constants; + +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; + +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; + +public class FaultTolerantHttpClient { + + private final HttpClient httpClient; + private final ScheduledExecutorService retryExecutor; + private final Retry retry; + private final CircuitBreaker breaker; + + public static Builder newBuilder() { + return new Builder(); + } + + private FaultTolerantHttpClient(String name, HttpClient httpClient, RetryConfiguration retryConfiguration, CircuitBreakerConfiguration circuitBreakerConfiguration) { + this.httpClient = httpClient; + this.retryExecutor = Executors.newSingleThreadScheduledExecutor(); + this.breaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); + + MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + CircuitBreakerUtil.registerMetrics(metricRegistry, breaker, FaultTolerantHttpClient.class); + + if (retryConfiguration != null) { + RetryConfig retryConfig = retryConfiguration.toRetryConfigBuilder().retryOnResult(o -> o.statusCode() >= 500).build(); + this.retry = Retry.of(name + "-retry", retryConfig); + CircuitBreakerUtil.registerMetrics(metricRegistry, retry, FaultTolerantHttpClient.class); + } else { + this.retry = null; + } + } + + public CompletableFuture> sendAsync(HttpRequest request, HttpResponse.BodyHandler bodyHandler) { + Supplier>> asyncRequest = sendAsync(httpClient, request, bodyHandler); + + if (retry != null) { + return breaker.executeCompletionStage(retryableCompletionStage(asyncRequest)).toCompletableFuture(); + } else { + return breaker.executeCompletionStage(asyncRequest).toCompletableFuture(); + } + } + + private Supplier> retryableCompletionStage(Supplier> supplier) { + return () -> retry.executeCompletionStage(retryExecutor, supplier); + } + + private Supplier>> sendAsync(HttpClient client, HttpRequest request, HttpResponse.BodyHandler bodyHandler) { + return () -> client.sendAsync(request, bodyHandler); + } + + public static class Builder { + + + private HttpClient.Version version = HttpClient.Version.HTTP_2; + private HttpClient.Redirect redirect = HttpClient.Redirect.NEVER; + private Duration connectTimeout = Duration.ofSeconds(10); + + private String name; + private Executor executor; + private RetryConfiguration retryConfiguration; + private CircuitBreakerConfiguration circuitBreakerConfiguration; + + private Builder() {} + + public Builder withName(String name) { + this.name = name; + return this; + } + + public Builder withVersion(HttpClient.Version version) { + this.version = version; + return this; + } + + public Builder withRedirect(HttpClient.Redirect redirect) { + this.redirect = redirect; + return this; + } + + public Builder withExecutor(Executor executor) { + this.executor = executor; + return this; + } + + public Builder withConnectTimeout(Duration connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } + + public Builder withRetry(RetryConfiguration retryConfiguration) { + this.retryConfiguration = retryConfiguration; + return this; + } + + public Builder withCircuitBreaker(CircuitBreakerConfiguration circuitBreakerConfiguration) { + this.circuitBreakerConfiguration = circuitBreakerConfiguration; + return this; + } + + public FaultTolerantHttpClient build() { + if (this.circuitBreakerConfiguration == null || this.name == null || this.executor == null) { + throw new IllegalArgumentException("Must specify circuit breaker config, name, and executor"); + } + + HttpClient client = HttpClient.newBuilder() + .connectTimeout(connectTimeout) + .followRedirects(redirect) + .version(version) + .executor(executor) + .build(); + + return new FaultTolerantHttpClient(name, client, retryConfiguration, circuitBreakerConfiguration); + } + + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/http/FormDataBodyPublisher.java b/src/main/java/org/whispersystems/textsecuregcm/http/FormDataBodyPublisher.java new file mode 100644 index 000000000..46f656d4b --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/http/FormDataBodyPublisher.java @@ -0,0 +1,26 @@ +package org.whispersystems.textsecuregcm.http; + +import java.net.URLEncoder; +import java.net.http.HttpRequest; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class FormDataBodyPublisher { + + public static HttpRequest.BodyPublisher of(Map data) { + StringBuilder builder = new StringBuilder(); + + for (Map.Entry entry : data.entrySet()) { + if (builder.length() > 0) { + builder.append("&"); + } + + builder.append(URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8)); + builder.append("="); + builder.append(URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8)); + } + + return HttpRequest.BodyPublishers.ofString(builder.toString()); + } + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java b/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java index 845f0bf21..fc3ffc74a 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java +++ b/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java @@ -35,16 +35,9 @@ public class ReplicatedJedisPool { { if (replicas.size() < 1) throw new IllegalArgumentException("There must be at least one replica"); - MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - - CircuitBreakerConfig config = CircuitBreakerConfig.custom() - .failureRateThreshold(circuitBreakerConfiguration.getFailureRateThreshold()) - .ringBufferSizeInHalfOpenState(circuitBreakerConfiguration.getRingBufferSizeInHalfOpenState()) - .waitDurationInOpenState(Duration.ofSeconds(circuitBreakerConfiguration.getWaitDurationInOpenStateInSeconds())) - .ringBufferSizeInClosedState(circuitBreakerConfiguration.getRingBufferSizeInClosedState()) - .build(); - - CircuitBreaker masterBreaker = CircuitBreaker.of(String.format("%s-master", name), config); + MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); + CircuitBreakerConfig config = circuitBreakerConfiguration.toCircuitBreakerConfig(); + CircuitBreaker masterBreaker = CircuitBreaker.of(String.format("%s-master", name), config); CircuitBreakerUtil.registerMetrics(metricRegistry, masterBreaker, ReplicatedJedisPool.class); diff --git a/src/main/java/org/whispersystems/textsecuregcm/sms/SmsSender.java b/src/main/java/org/whispersystems/textsecuregcm/sms/SmsSender.java index 3c3df140c..55773a3c6 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/sms/SmsSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/sms/SmsSender.java @@ -17,11 +17,9 @@ package org.whispersystems.textsecuregcm.sms; -import com.twilio.sdk.TwilioRestException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Optional; @SuppressWarnings("OptionalUsedAsFieldOrParameterType") @@ -31,8 +29,6 @@ public class SmsSender { static final String SMS_ANDROID_NG_VERIFICATION_TEXT = "<#> Your Signal verification code: %s\n\ndoDiFGKPO1r"; static final String SMS_VERIFICATION_TEXT = "Your Signal verification code: %s"; - private final Logger logger = LoggerFactory.getLogger(SmsSender.class); - private final TwilioSmsSender twilioSender; public SmsSender(TwilioSmsSender twilioSender) @@ -40,28 +36,16 @@ public class SmsSender { this.twilioSender = twilioSender; } - public void deliverSmsVerification(String destination, Optional clientType, String verificationCode) - throws IOException - { + public void deliverSmsVerification(String destination, Optional clientType, String verificationCode) { // Fix up mexico numbers to 'mobile' format just for SMS delivery. if (destination.startsWith("+52") && !destination.startsWith("+521")) { destination = "+521" + destination.substring(3); } - try { - twilioSender.deliverSmsVerification(destination, clientType, verificationCode); - } catch (TwilioRestException e) { - logger.info("Twilio SMS Failed: " + e.getErrorMessage()); - } + twilioSender.deliverSmsVerification(destination, clientType, verificationCode); } - public void deliverVoxVerification(String destination, String verificationCode, Optional locale) - throws IOException - { - try { - twilioSender.deliverVoxVerification(destination, verificationCode, locale); - } catch (TwilioRestException e) { - logger.info("Twilio Vox Failed: " + e.getErrorMessage()); - } + public void deliverVoxVerification(String destination, String verificationCode, Optional locale) { + twilioSender.deliverVoxVerification(destination, verificationCode, locale); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/sms/TwilioSmsSender.java b/src/main/java/org/whispersystems/textsecuregcm/sms/TwilioSmsSender.java index df30d3fa9..e78af15e0 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/sms/TwilioSmsSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/sms/TwilioSmsSender.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (C) 2013 Open WhisperSystems * * This program is free software: you can redistribute it and/or modify @@ -19,33 +19,45 @@ package org.whispersystems.textsecuregcm.sms; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; -import com.twilio.sdk.TwilioRestClient; -import com.twilio.sdk.TwilioRestException; -import com.twilio.sdk.resource.factory.CallFactory; -import com.twilio.sdk.resource.factory.MessageFactory; -import org.apache.http.NameValuePair; -import org.apache.http.message.BasicNameValuePair; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.TwilioConfiguration; +import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient; +import org.whispersystems.textsecuregcm.http.FormDataBodyPublisher; +import org.whispersystems.textsecuregcm.util.Base64; import org.whispersystems.textsecuregcm.util.Constants; +import org.whispersystems.textsecuregcm.util.ExecutorUtils; +import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import static com.codahale.metrics.MetricRegistry.name; @SuppressWarnings("OptionalUsedAsFieldOrParameterType") public class TwilioSmsSender { + private static final Logger logger = LoggerFactory.getLogger(TwilioSmsSender.class); + private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); private final Meter smsMeter = metricRegistry.meter(name(getClass(), "sms", "delivered")); private final Meter voxMeter = metricRegistry.meter(name(getClass(), "vox", "delivered")); + private final Meter priceMeter = metricRegistry.meter(name(getClass(), "price")); private final String accountId; private final String accountToken; @@ -54,73 +66,183 @@ public class TwilioSmsSender { private final String localDomain; private final Random random; - public TwilioSmsSender(TwilioConfiguration config) { - this.accountId = config.getAccountId (); - this.accountToken = config.getAccountToken(); - this.numbers = new ArrayList<>(config.getNumbers()); - this.localDomain = config.getLocalDomain(); - this.messagingServicesId = config.getMessagingServicesId(); + private final FaultTolerantHttpClient httpClient; + private final URI smsUri; + private final URI voxUri; + + @VisibleForTesting + public TwilioSmsSender(String baseUri, TwilioConfiguration twilioConfiguration) { + Executor executor = ExecutorUtils.newFixedThreadBoundedQueueExecutor(10, 100); + + this.accountId = twilioConfiguration.getAccountId(); + this.accountToken = twilioConfiguration.getAccountToken(); + this.numbers = new ArrayList<>(twilioConfiguration.getNumbers()); + this.localDomain = twilioConfiguration.getLocalDomain(); + this.messagingServicesId = twilioConfiguration.getMessagingServicesId(); this.random = new Random(System.currentTimeMillis()); + this.smsUri = URI.create(baseUri + "/2010-04-01/Accounts/" + accountId + "/Messages.json"); + this.voxUri = URI.create(baseUri + "/2010-04-01/Accounts/" + accountId + "/Calls.json" ); + this.httpClient = FaultTolerantHttpClient.newBuilder() + .withCircuitBreaker(twilioConfiguration.getCircuitBreaker()) + .withRetry(twilioConfiguration.getRetry()) + .withVersion(HttpClient.Version.HTTP_2) + .withConnectTimeout(Duration.ofSeconds(10)) + .withRedirect(HttpClient.Redirect.NEVER) + .withExecutor(executor) + .withName("twilio") + .build(); } - public void deliverSmsVerification(String destination, Optional clientType, String verificationCode) - throws IOException, TwilioRestException - { - TwilioRestClient client = new TwilioRestClient(accountId, accountToken); - MessageFactory messageFactory = client.getAccount().getMessageFactory(); - List messageParams = new LinkedList<>(); - messageParams.add(new BasicNameValuePair("To", destination)); + public TwilioSmsSender(TwilioConfiguration twilioConfiguration) { + this("https://api.twilio.com", twilioConfiguration); + } + + public CompletableFuture deliverSmsVerification(String destination, Optional clientType, String verificationCode) { + Map requestParameters = new HashMap<>(); + requestParameters.put("To", destination); if (Util.isEmpty(messagingServicesId)) { - messageParams.add(new BasicNameValuePair("From", getRandom(random, numbers))); + requestParameters.put("From", getRandom(random, numbers)); } else { - messageParams.add(new BasicNameValuePair("MessagingServiceSid", messagingServicesId)); + requestParameters.put("MessagingServiceSid", messagingServicesId); } - + if ("ios".equals(clientType.orElse(null))) { - messageParams.add(new BasicNameValuePair("Body", String.format(SmsSender.SMS_IOS_VERIFICATION_TEXT, verificationCode, verificationCode))); + requestParameters.put("Body", String.format(SmsSender.SMS_IOS_VERIFICATION_TEXT, verificationCode, verificationCode)); } else if ("android-ng".equals(clientType.orElse(null))) { - messageParams.add(new BasicNameValuePair("Body", String.format(SmsSender.SMS_ANDROID_NG_VERIFICATION_TEXT, verificationCode))); + requestParameters.put("Body", String.format(SmsSender.SMS_ANDROID_NG_VERIFICATION_TEXT, verificationCode)); } else { - messageParams.add(new BasicNameValuePair("Body", String.format(SmsSender.SMS_VERIFICATION_TEXT, verificationCode))); - } - - try { - messageFactory.create(messageParams); - } catch (RuntimeException damnYouTwilio) { - throw new IOException(damnYouTwilio); + requestParameters.put("Body", String.format(SmsSender.SMS_VERIFICATION_TEXT, verificationCode)); } + HttpRequest request = HttpRequest.newBuilder() + .uri(smsUri) + .POST(FormDataBodyPublisher.of(requestParameters)) + .header("Content-Type", "application/x-www-form-urlencoded") + .header("Authorization", "Basic " + Base64.encodeBytes((accountId + ":" + accountToken).getBytes())) + .build(); + smsMeter.mark(); + + return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .thenApply(this::parseResponse) + .handle(this::processResponse); } - public void deliverVoxVerification(String destination, String verificationCode, Optional locale) - throws IOException, TwilioRestException - { + public CompletableFuture deliverVoxVerification(String destination, String verificationCode, Optional locale) { String url = "https://" + localDomain + "/v1/voice/description/" + verificationCode; if (locale.isPresent()) { url += "?l=" + locale.get(); } - TwilioRestClient client = new TwilioRestClient(accountId, accountToken); - CallFactory callFactory = client.getAccount().getCallFactory(); - Map callParams = new HashMap<>(); - callParams.put("To", destination); - callParams.put("From", getRandom(random, numbers)); - callParams.put("Url", url); + Map requestParameters = new HashMap<>(); + requestParameters.put("Url", url); + requestParameters.put("To", destination); + requestParameters.put("From", getRandom(random, numbers)); - try { - callFactory.create(callParams); - } catch (RuntimeException damnYouTwilio) { - throw new IOException(damnYouTwilio); - } + HttpRequest request = HttpRequest.newBuilder() + .uri(voxUri) + .POST(FormDataBodyPublisher.of(requestParameters)) + .header("Content-Type", "application/x-www-form-urlencoded") + .header("Authorization", "Basic " + Base64.encodeBytes((accountId + ":" + accountToken).getBytes())) + .build(); voxMeter.mark(); + + return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .thenApply(this::parseResponse) + .handle(this::processResponse); } private String getRandom(Random random, ArrayList elements) { return elements.get(random.nextInt(elements.size())); } + private boolean processResponse(TwilioResponse response, Throwable throwable) { + if (response != null && response.isSuccess()) { + priceMeter.mark((long)(response.successResponse.price * 1000)); + return true; + } else if (response != null && response.isFailure()) { + logger.info("Twilio request failed: " + response.failureResponse.status + ", " + response.failureResponse.message); + return false; + } else if (throwable != null) { + logger.info("Twilio request failed", throwable); + return false; + } else { + logger.warn("No response or throwable!"); + return false; + } + } + + private TwilioResponse parseResponse(HttpResponse response) { + ObjectMapper mapper = SystemMapper.getMapper(); + + if (response.statusCode() >= 200 && response.statusCode() < 300) { + if ("application/json".equals(response.headers().firstValue("Content-Type").orElse(null))) { + return new TwilioResponse(TwilioResponse.TwilioSuccessResponse.fromBody(mapper, response.body())); + } else { + return new TwilioResponse(new TwilioResponse.TwilioSuccessResponse()); + } + } + + if ("application/json".equals(response.headers().firstValue("Content-Type").orElse(null))) { + return new TwilioResponse(TwilioResponse.TwilioFailureResponse.fromBody(mapper, response.body())); + } else { + return new TwilioResponse(new TwilioResponse.TwilioFailureResponse()); + } + } + + public static class TwilioResponse { + + private TwilioSuccessResponse successResponse; + private TwilioFailureResponse failureResponse; + + TwilioResponse(TwilioSuccessResponse successResponse) { + this.successResponse = successResponse; + } + + TwilioResponse(TwilioFailureResponse failureResponse) { + this.failureResponse = failureResponse; + } + + boolean isSuccess() { + return successResponse != null; + } + + boolean isFailure() { + return failureResponse != null; + } + + private static class TwilioSuccessResponse { + @JsonProperty + private double price; + + static TwilioSuccessResponse fromBody(ObjectMapper mapper, String body) { + try { + return mapper.readValue(body, TwilioSuccessResponse.class); + } catch (IOException e) { + logger.warn("Error parsing twilio success response: " + e); + return new TwilioSuccessResponse(); + } + } + } + + private static class TwilioFailureResponse { + @JsonProperty + private int status; + + @JsonProperty + private String message; + + static TwilioFailureResponse fromBody(ObjectMapper mapper, String body) { + try { + return mapper.readValue(body, TwilioFailureResponse.class); + } catch (IOException e) { + logger.warn("Error parsing twilio success response: " + e); + return new TwilioFailureResponse(); + } + } + } + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/FaultTolerantDatabase.java b/src/main/java/org/whispersystems/textsecuregcm/storage/FaultTolerantDatabase.java index 505056740..2abc395e9 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/FaultTolerantDatabase.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/FaultTolerantDatabase.java @@ -1,18 +1,15 @@ package org.whispersystems.textsecuregcm.storage; -import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import org.jdbi.v3.core.Jdbi; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; import org.whispersystems.textsecuregcm.util.Constants; -import java.time.Duration; import java.util.function.Consumer; import java.util.function.Function; import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; public class FaultTolerantDatabase { @@ -20,19 +17,12 @@ public class FaultTolerantDatabase { private final CircuitBreaker circuitBreaker; public FaultTolerantDatabase(String name, Jdbi database, CircuitBreakerConfiguration circuitBreakerConfiguration) { - MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - - CircuitBreakerConfig config = CircuitBreakerConfig.custom() - .failureRateThreshold(circuitBreakerConfiguration.getFailureRateThreshold()) - .ringBufferSizeInHalfOpenState(circuitBreakerConfiguration.getRingBufferSizeInHalfOpenState()) - .waitDurationInOpenState(Duration.ofSeconds(circuitBreakerConfiguration.getWaitDurationInOpenStateInSeconds())) - .ringBufferSizeInClosedState(circuitBreakerConfiguration.getRingBufferSizeInClosedState()) - .build(); - this.database = database; - this.circuitBreaker = CircuitBreaker.of(name, config); + this.circuitBreaker = CircuitBreaker.of(name, circuitBreakerConfiguration.toCircuitBreakerConfig()); - CircuitBreakerUtil.registerMetrics(metricRegistry, circuitBreaker, FaultTolerantDatabase.class); + CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME), + circuitBreaker, + FaultTolerantDatabase.class); } public void use(Consumer consumer) { diff --git a/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java b/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java index fd349fd2b..8ab36c70a 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java +++ b/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java @@ -5,6 +5,8 @@ import com.codahale.metrics.MetricRegistry; import static com.codahale.metrics.MetricRegistry.name; import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.retry.AsyncRetry; +import io.github.resilience4j.retry.Retry; public class CircuitBreakerUtil { @@ -20,4 +22,16 @@ public class CircuitBreakerUtil { circuitBreaker.getEventPublisher().onCallNotPermitted(event -> unpermittedMeter.mark()); } + public static void registerMetrics(MetricRegistry metricRegistry, Retry retry, Class clazz) { + Meter successMeter = metricRegistry.meter(name(clazz, retry.getName(), "success" )); + Meter retryMeter = metricRegistry.meter(name(clazz, retry.getName(), "retry" )); + Meter errorMeter = metricRegistry.meter(name(clazz, retry.getName(), "error" )); + Meter ignoredErrorMeter = metricRegistry.meter(name(clazz, retry.getName(), "ignored_error")); + + retry.getEventPublisher().onSuccess(event -> successMeter.mark()); + retry.getEventPublisher().onRetry(event -> retryMeter.mark()); + retry.getEventPublisher().onError(event -> errorMeter.mark()); + retry.getEventPublisher().onIgnoredError(event -> ignoredErrorMeter.mark()); + } + } diff --git a/src/main/java/org/whispersystems/textsecuregcm/util/ExecutorUtils.java b/src/main/java/org/whispersystems/textsecuregcm/util/ExecutorUtils.java new file mode 100644 index 000000000..a7d13c43b --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/util/ExecutorUtils.java @@ -0,0 +1,21 @@ +package org.whispersystems.textsecuregcm.util; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ExecutorUtils { + + public static Executor newFixedThreadBoundedQueueExecutor(int threadCount, int queueSize) { + ThreadPoolExecutor executor = new ThreadPoolExecutor(threadCount, threadCount, + Long.MAX_VALUE, TimeUnit.NANOSECONDS, + new ArrayBlockingQueue<>(queueSize), + new ThreadPoolExecutor.AbortPolicy()); + + executor.prestartAllCoreThreads(); + + return executor; + } + +} diff --git a/src/test/java/org/whispersystems/sms/TwilioSmsSenderTest.java b/src/test/java/org/whispersystems/sms/TwilioSmsSenderTest.java new file mode 100644 index 000000000..124023b02 --- /dev/null +++ b/src/test/java/org/whispersystems/sms/TwilioSmsSenderTest.java @@ -0,0 +1,150 @@ +package org.whispersystems.sms; + +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import org.junit.Rule; +import org.junit.Test; +import org.whispersystems.textsecuregcm.configuration.TwilioConfiguration; +import org.whispersystems.textsecuregcm.sms.TwilioSmsSender; + +import java.util.List; +import java.util.Optional; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +public class TwilioSmsSenderTest { + + private static final String ACCOUNT_ID = "test_account_id"; + private static final String ACCOUNT_TOKEN = "test_account_token"; + private static final List NUMBERS = List.of("+14151111111", "+14152222222"); + private static final String MESSAGING_SERVICES_ID = "test_messaging_services_id"; + private static final String LOCAL_DOMAIN = "test.com"; + + @Rule + public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort().dynamicHttpsPort()); + + @Test + public void testSendSms() { + wireMockRule.stubFor(post(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Messages.json")) + .withBasicAuth(ACCOUNT_ID, ACCOUNT_TOKEN) + .willReturn(aResponse() + .withHeader("Content-Type", "application/json") + .withBody("{\"price\": -0.00750, \"status\": \"sent\"}"))); + + + TwilioConfiguration configuration = new TwilioConfiguration(); + configuration.setAccountId(ACCOUNT_ID); + configuration.setAccountToken(ACCOUNT_TOKEN); + configuration.setNumbers(NUMBERS); + configuration.setMessagingServicesId(MESSAGING_SERVICES_ID); + configuration.setLocalDomain(LOCAL_DOMAIN); + + TwilioSmsSender sender = new TwilioSmsSender("http://localhost:" + wireMockRule.port(), configuration); + boolean success = sender.deliverSmsVerification("+14153333333", Optional.of("android-ng"), "123-456").join(); + + assertThat(success).isTrue(); + + verify(1, postRequestedFor(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Messages.json")) + .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) + .withRequestBody(equalTo("MessagingServiceSid=test_messaging_services_id&To=%2B14153333333&Body=%3C%23%3E+Your+Signal+verification+code%3A+123-456%0A%0AdoDiFGKPO1r"))); + } + + @Test + public void testSendVox() { + wireMockRule.stubFor(post(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Calls.json")) + .withBasicAuth(ACCOUNT_ID, ACCOUNT_TOKEN) + .willReturn(aResponse() + .withHeader("Content-Type", "application/json") + .withBody("{\"price\": -0.00750, \"status\": \"completed\"}"))); + + + TwilioConfiguration configuration = new TwilioConfiguration(); + configuration.setAccountId(ACCOUNT_ID); + configuration.setAccountToken(ACCOUNT_TOKEN); + configuration.setNumbers(NUMBERS); + configuration.setMessagingServicesId(MESSAGING_SERVICES_ID); + configuration.setLocalDomain(LOCAL_DOMAIN); + + TwilioSmsSender sender = new TwilioSmsSender("http://localhost:" + wireMockRule.port(), configuration); + boolean success = sender.deliverVoxVerification("+14153333333", "123-456", Optional.of("en_US")).join(); + + assertThat(success).isTrue(); + + verify(1, postRequestedFor(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Calls.json")) + .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) + .withRequestBody(matching("To=%2B14153333333&From=%2B1415(1111111|2222222)&Url=https%3A%2F%2Ftest.com%2Fv1%2Fvoice%2Fdescription%2F123-456%3Fl%3Den_US"))); + } + + @Test + public void testSendSmsFiveHundered() { + wireMockRule.stubFor(post(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Messages.json")) + .withBasicAuth(ACCOUNT_ID, ACCOUNT_TOKEN) + .willReturn(aResponse() + .withStatus(500) + .withHeader("Content-Type", "application/json") + .withBody("{\"message\": \"Server error!\"}"))); + + + TwilioConfiguration configuration = new TwilioConfiguration(); + configuration.setAccountId(ACCOUNT_ID); + configuration.setAccountToken(ACCOUNT_TOKEN); + configuration.setNumbers(NUMBERS); + configuration.setMessagingServicesId(MESSAGING_SERVICES_ID); + configuration.setLocalDomain(LOCAL_DOMAIN); + + TwilioSmsSender sender = new TwilioSmsSender("http://localhost:" + wireMockRule.port(), configuration); + boolean success = sender.deliverSmsVerification("+14153333333", Optional.of("android-ng"), "123-456").join(); + + assertThat(success).isFalse(); + + verify(3, postRequestedFor(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Messages.json")) + .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) + .withRequestBody(equalTo("MessagingServiceSid=test_messaging_services_id&To=%2B14153333333&Body=%3C%23%3E+Your+Signal+verification+code%3A+123-456%0A%0AdoDiFGKPO1r"))); + } + + @Test + public void testSendVoxFiveHundred() { + wireMockRule.stubFor(post(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Calls.json")) + .withBasicAuth(ACCOUNT_ID, ACCOUNT_TOKEN) + .willReturn(aResponse() + .withStatus(500) + .withHeader("Content-Type", "application/json") + .withBody("{\"message\": \"Server error!\"}"))); + + TwilioConfiguration configuration = new TwilioConfiguration(); + configuration.setAccountId(ACCOUNT_ID); + configuration.setAccountToken(ACCOUNT_TOKEN); + configuration.setNumbers(NUMBERS); + configuration.setMessagingServicesId(MESSAGING_SERVICES_ID); + configuration.setLocalDomain(LOCAL_DOMAIN); + + TwilioSmsSender sender = new TwilioSmsSender("http://localhost:" + wireMockRule.port(), configuration); + boolean success = sender.deliverVoxVerification("+14153333333", "123-456", Optional.of("en_US")).join(); + + assertThat(success).isFalse(); + + verify(3, postRequestedFor(urlEqualTo("/2010-04-01/Accounts/" + ACCOUNT_ID + "/Calls.json")) + .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) + .withRequestBody(matching("To=%2B14153333333&From=%2B1415(1111111|2222222)&Url=https%3A%2F%2Ftest.com%2Fv1%2Fvoice%2Fdescription%2F123-456%3Fl%3Den_US"))); + + } + + @Test + public void testSendSmsNetworkFailure() { + TwilioConfiguration configuration = new TwilioConfiguration(); + configuration.setAccountId(ACCOUNT_ID); + configuration.setAccountToken(ACCOUNT_TOKEN); + configuration.setNumbers(NUMBERS); + configuration.setMessagingServicesId(MESSAGING_SERVICES_ID); + configuration.setLocalDomain(LOCAL_DOMAIN); + + TwilioSmsSender sender = new TwilioSmsSender("http://localhost:" + 39873, configuration); + boolean success = sender.deliverSmsVerification("+14153333333", Optional.of("android-ng"), "123-456").join(); + + assertThat(success).isFalse(); + } + + + +} diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/http/FaultTolerantHttpClientTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/http/FaultTolerantHttpClientTest.java new file mode 100644 index 000000000..2c3e3a386 --- /dev/null +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/http/FaultTolerantHttpClientTest.java @@ -0,0 +1,154 @@ +package org.whispersystems.textsecuregcm.tests.http; + +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import org.junit.Rule; +import org.junit.Test; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executors; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; +import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +public class FaultTolerantHttpClientTest { + + @Rule + public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort().dynamicHttpsPort()); + + @Test + public void testSimpleGet() { + wireMockRule.stubFor(get(urlEqualTo("/ping")) + .willReturn(aResponse() + .withHeader("Content-Type", "text/plain") + .withBody("Pong!"))); + + + FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder() + .withCircuitBreaker(new CircuitBreakerConfiguration()) + .withRetry(new RetryConfiguration()) + .withExecutor(Executors.newSingleThreadExecutor()) + .withName("test") + .withVersion(HttpClient.Version.HTTP_2) + .build(); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:" + wireMockRule.port() + "/ping")) + .GET() + .build(); + + HttpResponse response = client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join(); + + assertThat(response.statusCode()).isEqualTo(200); + assertThat(response.body()).isEqualTo("Pong!"); + + verify(1, getRequestedFor(urlEqualTo("/ping"))); + } + + @Test + public void testRetryGet() { + wireMockRule.stubFor(get(urlEqualTo("/failure")) + .willReturn(aResponse() + .withStatus(500) + .withHeader("Content-Type", "text/plain") + .withBody("Pong!"))); + + FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder() + .withCircuitBreaker(new CircuitBreakerConfiguration()) + .withRetry(new RetryConfiguration()) + .withExecutor(Executors.newSingleThreadExecutor()) + .withName("test") + .withVersion(HttpClient.Version.HTTP_2) + .build(); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:" + wireMockRule.port() + "/failure")) + .GET() + .build(); + + HttpResponse response = client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join(); + + assertThat(response.statusCode()).isEqualTo(500); + assertThat(response.body()).isEqualTo("Pong!"); + + verify(3, getRequestedFor(urlEqualTo("/failure"))); + } + + @Test + public void testNetworkFailureCircuitBreaker() throws InterruptedException { + CircuitBreakerConfiguration circuitBreakerConfiguration = new CircuitBreakerConfiguration(); + circuitBreakerConfiguration.setRingBufferSizeInClosedState(2); + circuitBreakerConfiguration.setRingBufferSizeInHalfOpenState(1); + circuitBreakerConfiguration.setFailureRateThreshold(50); + circuitBreakerConfiguration.setWaitDurationInOpenStateInSeconds(1); + + FaultTolerantHttpClient client = FaultTolerantHttpClient.newBuilder() + .withCircuitBreaker(circuitBreakerConfiguration) + .withRetry(new RetryConfiguration()) + .withExecutor(Executors.newSingleThreadExecutor()) + .withName("test") + .withVersion(HttpClient.Version.HTTP_2) + .build(); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:" + 39873 + "/failure")) + .GET() + .build(); + + try { + client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join(); + throw new AssertionError("Should have failed!"); + } catch (CompletionException e) { + assertThat(e.getCause()).isInstanceOf(IOException.class); + // good + } + + try { + client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join(); + throw new AssertionError("Should have failed!"); + } catch (CompletionException e) { + assertThat(e.getCause()).isInstanceOf(IOException.class); + // good + } + + try { + client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join(); + throw new AssertionError("Should have failed!"); + } catch (CompletionException e) { + assertThat(e.getCause()).isInstanceOf(CircuitBreakerOpenException.class); + // good + } + + Thread.sleep(1001); + + try { + client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join(); + throw new AssertionError("Should have failed!"); + } catch (CompletionException e) { + assertThat(e.getCause()).isInstanceOf(IOException.class); + // good + } + + try { + client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).join(); + throw new AssertionError("Should have failed!"); + } catch (CompletionException e) { + assertThat(e.getCause()).isInstanceOf(CircuitBreakerOpenException.class); + // good + } + + + } + + + +}