From fbaf4a09e2d7f4eb84fe66417a797fa4c9d7582c Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Mon, 17 May 2021 17:58:13 -0400 Subject: [PATCH] Use the S3 object monitor to retrieve Tor exit node lists. --- .../textsecuregcm/WhisperServerService.java | 3 +- .../TorExitNodeConfiguration.java | 42 +++--- .../util/TorExitNodeManager.java | 130 ++++++------------ .../util/TorExitNodeManagerTest.java | 87 ++---------- 4 files changed, 79 insertions(+), 183 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index ee7f6d2ca..6c7024cfc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -397,7 +397,6 @@ public class WhisperServerService extends Application refreshFuture; + private final S3ObjectMonitor exitListMonitor; - private final FaultTolerantHttpClient refreshClient; - private final URI exitNodeListUri; - - private final AtomicReference lastEtag = new AtomicReference<>(null); private final AtomicReference> exitNodeAddresses = new AtomicReference<>(Collections.emptySet()); private static final Timer REFRESH_TIMER = Metrics.timer(name(TorExitNodeManager.class, "refresh")); @@ -54,78 +41,45 @@ public class TorExitNodeManager implements Managed { private static final Logger log = LoggerFactory.getLogger(TorExitNodeManager.class); - public TorExitNodeManager(final ScheduledExecutorService scheduledExecutorService, final ExecutorService clientExecutorService, final TorExitNodeConfiguration configuration) { - this.refreshScheduledExecutorService = scheduledExecutorService; + public TorExitNodeManager( + final ScheduledExecutorService scheduledExecutorService, + final TorExitNodeConfiguration configuration) { - this.exitNodeListUri = URI.create(configuration.getListUrl()); - this.refreshDelay = configuration.getRefreshInterval(); + this.exitListMonitor = new S3ObjectMonitor( + configuration.getS3Region(), + configuration.getS3Bucket(), + configuration.getObjectKey(), + scheduledExecutorService, + configuration.getRefreshInterval(), + this::handleExitListChanged); + } - refreshClient = FaultTolerantHttpClient.newBuilder() - .withCircuitBreaker(configuration.getCircuitBreakerConfiguration()) - .withRetry(configuration.getRetryConfiguration()) - .withVersion(HttpClient.Version.HTTP_1_1) - .withConnectTimeout(Duration.ofSeconds(10)) - .withRedirect(HttpClient.Redirect.NEVER) - .withExecutor(clientExecutorService) - .withName("tor-exit-node") - .withSecurityProtocol(FaultTolerantHttpClient.SECURITY_PROTOCOL_TLS_1_3) - .build(); + @Override + public synchronized void start() { + exitListMonitor.refresh(); + exitListMonitor.start(); + } + + @Override + public synchronized void stop() { + exitListMonitor.stop(); } public boolean isTorExitNode(final String address) { return exitNodeAddresses.get().contains(address); } + private void handleExitListChanged(final S3Object exitList) { + REFRESH_TIMER.record(() -> handleExitListChanged(exitList.getObjectContent())); + } + @VisibleForTesting - CompletableFuture refresh() { - final String etag = lastEtag.get(); - - final HttpRequest request; - { - final HttpRequest.Builder builder = HttpRequest.newBuilder().GET().uri(exitNodeListUri); - - if (StringUtils.isNotBlank(etag)) { - builder.header("If-None-Match", etag); - } - - request = builder.build(); - } - - final long start = System.nanoTime(); - - return refreshClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) - .whenComplete((response, cause) -> { - REFRESH_TIMER.record(System.nanoTime() - start, TimeUnit.NANOSECONDS); - - if (cause != null) { - REFRESH_ERRORS.increment(); - log.warn("Failed to refresh Tor exit node list", cause); - } else { - if (response.statusCode() == 200) { - exitNodeAddresses.set(response.body().lines().collect(Collectors.toSet())); - response.headers().firstValue("ETag").ifPresent(newEtag -> lastEtag.compareAndSet(etag, newEtag)); - } else if (response.statusCode() != 304) { - REFRESH_ERRORS.increment(); - log.warn("Failed to refresh Tor exit node list: {} ({})", response.statusCode(), response.body()); - } - } - }); - } - - @Override - public synchronized void start() { - if (refreshFuture != null) { - refreshFuture.cancel(true); - } - - refreshFuture = refreshScheduledExecutorService - .scheduleAtFixedRate(this::refresh, 0, refreshDelay.toMillis(), TimeUnit.MILLISECONDS); - } - - @Override - public synchronized void stop() { - if (refreshFuture != null) { - refreshFuture.cancel(true); + void handleExitListChanged(final InputStream inputStream) { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + exitNodeAddresses.set(reader.lines().collect(Collectors.toSet())); + } catch (final Exception e) { + REFRESH_ERRORS.increment(); + log.warn("Failed to refresh Tor exit node list", e); } } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/util/TorExitNodeManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/util/TorExitNodeManagerTest.java index 0133479a6..9d2922d1f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/util/TorExitNodeManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/util/TorExitNodeManagerTest.java @@ -5,89 +5,32 @@ package org.whispersystems.textsecuregcm.util; -import com.github.tomakehurst.wiremock.junit.WireMockRule; -import com.github.tomakehurst.wiremock.matching.StringValuePattern; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ScheduledExecutorService; import org.junit.Test; import org.whispersystems.textsecuregcm.configuration.TorExitNodeConfiguration; import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; -import static com.github.tomakehurst.wiremock.client.WireMock.get; -import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import static com.github.tomakehurst.wiremock.client.WireMock.verify; -import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; -import static org.junit.Assert.*; - public class TorExitNodeManagerTest extends AbstractRedisClusterTest { - private ScheduledExecutorService scheduledExecutorService; - private ExecutorService clientExecutorService; - - private TorExitNodeManager torExitNodeManager; - - private static final String LIST_PATH = "/list"; - - @Rule - public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort().dynamicHttpsPort()); - - @Before - public void setUp() throws Exception { - final TorExitNodeConfiguration configuration = new TorExitNodeConfiguration(); - configuration.setListUrl("http://localhost:" + wireMockRule.port() + LIST_PATH); - - scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - clientExecutorService = Executors.newSingleThreadExecutor(); - - torExitNodeManager = new TorExitNodeManager(scheduledExecutorService, clientExecutorService, configuration); - } - - @After - public void tearDown() throws Exception { - scheduledExecutorService.shutdown(); - scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS); - - clientExecutorService.shutdown(); - clientExecutorService.awaitTermination(1, TimeUnit.SECONDS); - } - @Test public void testIsTorExitNode() { + final TorExitNodeConfiguration configuration = new TorExitNodeConfiguration(); + configuration.setS3Region("ap-northeast-3"); + + final TorExitNodeManager torExitNodeManager = + new TorExitNodeManager(mock(ScheduledExecutorService.class), configuration); + assertFalse(torExitNodeManager.isTorExitNode("10.0.0.1")); assertFalse(torExitNodeManager.isTorExitNode("10.0.0.2")); - final String etag = UUID.randomUUID().toString(); - - wireMockRule.stubFor(get(urlEqualTo(LIST_PATH)) - .willReturn(aResponse() - .withHeader("ETag", etag) - .withBody("10.0.0.1\n10.0.0.2"))); - - torExitNodeManager.refresh().join(); - - verify(getRequestedFor(urlEqualTo(LIST_PATH)).withoutHeader("If-None-Match")); - - assertTrue(torExitNodeManager.isTorExitNode("10.0.0.1")); - assertTrue(torExitNodeManager.isTorExitNode("10.0.0.2")); - assertFalse(torExitNodeManager.isTorExitNode("10.0.0.3")); - - wireMockRule.stubFor(get(urlEqualTo(LIST_PATH)).withHeader("If-None-Match", equalTo(etag)) - .willReturn(aResponse().withStatus(304))); - - torExitNodeManager.refresh().join(); - - verify(getRequestedFor(urlEqualTo(LIST_PATH)).withHeader("If-None-Match", equalTo(etag))); + torExitNodeManager.handleExitListChanged( + new ByteArrayInputStream("10.0.0.1\n10.0.0.2".getBytes(StandardCharsets.UTF_8))); assertTrue(torExitNodeManager.isTorExitNode("10.0.0.1")); assertTrue(torExitNodeManager.isTorExitNode("10.0.0.2"));