Remove unused utility classes
This commit is contained in:
parent
ad1aeea74b
commit
c931103712
|
@ -36,7 +36,6 @@ import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguratio
|
||||||
import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.configuration.MessageDynamoDbConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.MessageDynamoDbConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.configuration.MonitoredS3ObjectConfiguration;
|
|
||||||
import org.whispersystems.textsecuregcm.configuration.PaymentsServiceConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.PaymentsServiceConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.configuration.PushConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.PushConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
|
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
|
||||||
|
@ -296,16 +295,6 @@ public class WhisperServerConfiguration extends Configuration {
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private AppConfigConfiguration appConfig;
|
private AppConfigConfiguration appConfig;
|
||||||
|
|
||||||
@Valid
|
|
||||||
@NotNull
|
|
||||||
@JsonProperty
|
|
||||||
private MonitoredS3ObjectConfiguration torExitNodeList;
|
|
||||||
|
|
||||||
@Valid
|
|
||||||
@NotNull
|
|
||||||
@JsonProperty
|
|
||||||
private MonitoredS3ObjectConfiguration asnTable;
|
|
||||||
|
|
||||||
@Valid
|
@Valid
|
||||||
@NotNull
|
@NotNull
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -543,14 +532,6 @@ public class WhisperServerConfiguration extends Configuration {
|
||||||
return pendingDevicesDynamoDb;
|
return pendingDevicesDynamoDb;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MonitoredS3ObjectConfiguration getTorExitNodeListConfiguration() {
|
|
||||||
return torExitNodeList;
|
|
||||||
}
|
|
||||||
|
|
||||||
public MonitoredS3ObjectConfiguration getAsnTableConfiguration() {
|
|
||||||
return asnTable;
|
|
||||||
}
|
|
||||||
|
|
||||||
public DonationConfiguration getDonationConfiguration() {
|
public DonationConfiguration getDonationConfiguration() {
|
||||||
return donation;
|
return donation;
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,11 +202,9 @@ import org.whispersystems.textsecuregcm.storage.Usernames;
|
||||||
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
|
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.VerificationCodeStore;
|
import org.whispersystems.textsecuregcm.storage.VerificationCodeStore;
|
||||||
import org.whispersystems.textsecuregcm.stripe.StripeManager;
|
import org.whispersystems.textsecuregcm.stripe.StripeManager;
|
||||||
import org.whispersystems.textsecuregcm.util.AsnManager;
|
|
||||||
import org.whispersystems.textsecuregcm.util.Constants;
|
import org.whispersystems.textsecuregcm.util.Constants;
|
||||||
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
|
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
|
||||||
import org.whispersystems.textsecuregcm.util.HostnameUtil;
|
import org.whispersystems.textsecuregcm.util.HostnameUtil;
|
||||||
import org.whispersystems.textsecuregcm.util.TorExitNodeManager;
|
|
||||||
import org.whispersystems.textsecuregcm.util.logging.LoggingUnhandledExceptionMapper;
|
import org.whispersystems.textsecuregcm.util.logging.LoggingUnhandledExceptionMapper;
|
||||||
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
|
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
|
||||||
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
|
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
|
||||||
|
@ -471,8 +469,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
GCMSender gcmSender = new GCMSender(gcmSenderExecutor, accountsManager, config.getGcmConfiguration().getApiKey());
|
GCMSender gcmSender = new GCMSender(gcmSenderExecutor, accountsManager, config.getGcmConfiguration().getApiKey());
|
||||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), dynamicConfigurationManager, rateLimitersCluster);
|
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), dynamicConfigurationManager, rateLimitersCluster);
|
||||||
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);
|
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);
|
||||||
TorExitNodeManager torExitNodeManager = new TorExitNodeManager(recurringJobExecutor, config.getTorExitNodeListConfiguration());
|
|
||||||
AsnManager asnManager = new AsnManager(recurringJobExecutor, config.getAsnTableConfiguration());
|
|
||||||
IssuedReceiptsManager issuedReceiptsManager = new IssuedReceiptsManager(
|
IssuedReceiptsManager issuedReceiptsManager = new IssuedReceiptsManager(
|
||||||
config.getDynamoDbTables().getIssuedReceipts().getTableName(),
|
config.getDynamoDbTables().getIssuedReceipts().getTableName(),
|
||||||
config.getDynamoDbTables().getIssuedReceipts().getExpiration(),
|
config.getDynamoDbTables().getIssuedReceipts().getExpiration(),
|
||||||
|
@ -565,8 +561,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||||
environment.lifecycle().manage(messagePersister);
|
environment.lifecycle().manage(messagePersister);
|
||||||
environment.lifecycle().manage(clientPresenceManager);
|
environment.lifecycle().manage(clientPresenceManager);
|
||||||
environment.lifecycle().manage(currencyManager);
|
environment.lifecycle().manage(currencyManager);
|
||||||
environment.lifecycle().manage(torExitNodeManager);
|
|
||||||
environment.lifecycle().manage(asnManager);
|
|
||||||
environment.lifecycle().manage(directoryQueue);
|
environment.lifecycle().manage(directoryQueue);
|
||||||
|
|
||||||
StaticCredentialsProvider cdnCredentialsProvider = StaticCredentialsProvider
|
StaticCredentialsProvider cdnCredentialsProvider = StaticCredentialsProvider
|
||||||
|
|
|
@ -1,63 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2021 Signal Messenger, LLC
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.configuration;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import javax.validation.Valid;
|
|
||||||
import javax.validation.constraints.NotBlank;
|
|
||||||
import java.time.Duration;
|
|
||||||
|
|
||||||
public class MonitoredS3ObjectConfiguration {
|
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
@NotBlank
|
|
||||||
private String s3Region;
|
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
@NotBlank
|
|
||||||
private String s3Bucket;
|
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
@NotBlank
|
|
||||||
private String objectKey;
|
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
private long maxSize = 16 * 1024 * 1024;
|
|
||||||
|
|
||||||
@JsonProperty
|
|
||||||
private Duration refreshInterval = Duration.ofMinutes(5);
|
|
||||||
|
|
||||||
public String getS3Region() {
|
|
||||||
return s3Region;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public void setS3Region(final String s3Region) {
|
|
||||||
this.s3Region = s3Region;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getS3Bucket() {
|
|
||||||
return s3Bucket;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getObjectKey() {
|
|
||||||
return objectKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getMaxSize() {
|
|
||||||
return maxSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public void setMaxSize(final long maxSize) {
|
|
||||||
this.maxSize = maxSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Duration getRefreshInterval() {
|
|
||||||
return refreshInterval;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,91 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2013-2021 Signal Messenger, LLC
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.util;
|
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import io.dropwizard.lifecycle.Managed;
|
|
||||||
import io.micrometer.core.instrument.Counter;
|
|
||||||
import io.micrometer.core.instrument.Metrics;
|
|
||||||
import io.micrometer.core.instrument.Timer;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.net.Inet4Address;
|
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.zip.GZIPInputStream;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.whispersystems.textsecuregcm.configuration.MonitoredS3ObjectConfiguration;
|
|
||||||
|
|
||||||
public class AsnManager implements Managed {
|
|
||||||
|
|
||||||
private final S3ObjectMonitor asnTableMonitor;
|
|
||||||
|
|
||||||
private final AtomicReference<AsnTable> asnTable = new AtomicReference<>(AsnTable.EMPTY);
|
|
||||||
|
|
||||||
private static final Timer REFRESH_TIMER = Metrics.timer(name(AsnManager.class, "refresh"));
|
|
||||||
private static final Counter REFRESH_ERRORS = Metrics.counter(name(AsnManager.class, "refreshErrors"));
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(AsnManager.class);
|
|
||||||
|
|
||||||
public AsnManager(
|
|
||||||
final ScheduledExecutorService scheduledExecutorService,
|
|
||||||
final MonitoredS3ObjectConfiguration configuration) {
|
|
||||||
|
|
||||||
this.asnTableMonitor = new S3ObjectMonitor(
|
|
||||||
configuration.getS3Region(),
|
|
||||||
configuration.getS3Bucket(),
|
|
||||||
configuration.getObjectKey(),
|
|
||||||
configuration.getMaxSize(),
|
|
||||||
scheduledExecutorService,
|
|
||||||
configuration.getRefreshInterval(),
|
|
||||||
this::handleAsnTableChanged);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() throws Exception {
|
|
||||||
asnTableMonitor.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop() throws Exception {
|
|
||||||
asnTableMonitor.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Optional<Long> getAsn(final String address) {
|
|
||||||
try {
|
|
||||||
return asnTable.get().getAsn((Inet4Address) Inet4Address.getByName(address));
|
|
||||||
} catch (final UnknownHostException e) {
|
|
||||||
log.warn("Could not parse \"{}\" as an Inet4Address", address);
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleAsnTableChanged(final InputStream asnTableObject) {
|
|
||||||
REFRESH_TIMER.record(() -> {
|
|
||||||
try {
|
|
||||||
handleAsnTableChangedStream(new GZIPInputStream(asnTableObject));
|
|
||||||
} catch (final IOException e) {
|
|
||||||
log.error("Retrieved object was not a gzip archive", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
void handleAsnTableChangedStream(final InputStream inputStream) {
|
|
||||||
try (final InputStreamReader reader = new InputStreamReader(inputStream)) {
|
|
||||||
asnTable.set(new AsnTable(reader));
|
|
||||||
} catch (final Exception e) {
|
|
||||||
REFRESH_ERRORS.increment();
|
|
||||||
log.warn("Failed to refresh IP-to-ASN table", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,91 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2021 Signal Messenger, LLC
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.util;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.Reader;
|
|
||||||
import java.net.Inet4Address;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.NavigableMap;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.TreeMap;
|
|
||||||
import org.apache.commons.csv.CSVFormat;
|
|
||||||
import org.apache.commons.csv.CSVParser;
|
|
||||||
import org.apache.commons.csv.CSVRecord;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Allows IP->ASN lookup operations using data from https://iptoasn.com/.
|
|
||||||
*/
|
|
||||||
class AsnTable {
|
|
||||||
private final NavigableMap<Long, AsnRange> asnBlocksByFirstIp = new TreeMap<>();
|
|
||||||
private final Map<Long, String> countryCodesByAsn = new HashMap<>();
|
|
||||||
|
|
||||||
private static class AsnRange {
|
|
||||||
private final long rangeStart;
|
|
||||||
private final long rangeEnd;
|
|
||||||
|
|
||||||
private final long asn;
|
|
||||||
|
|
||||||
private AsnRange(long rangeStart, long rangeEnd, long asn) {
|
|
||||||
this.rangeStart = rangeStart;
|
|
||||||
this.rangeEnd = rangeEnd;
|
|
||||||
this.asn = asn;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean contains(final long address) {
|
|
||||||
return address >= rangeStart && address <= rangeEnd;
|
|
||||||
}
|
|
||||||
|
|
||||||
long getAsn() {
|
|
||||||
return asn;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final AsnTable EMPTY = new AsnTable();
|
|
||||||
|
|
||||||
public AsnTable(final Reader tsvReader) throws IOException {
|
|
||||||
try (final CSVParser csvParser = CSVFormat.TDF.parse(tsvReader)) {
|
|
||||||
for (final CSVRecord record : csvParser) {
|
|
||||||
final long start = Long.parseLong(record.get(0), 10);
|
|
||||||
final long end = Long.parseLong(record.get(1), 10);
|
|
||||||
final long asn = Long.parseLong(record.get(2), 10);
|
|
||||||
final String countryCode = record.get(3);
|
|
||||||
|
|
||||||
asnBlocksByFirstIp.put(start, new AsnRange(start, end, asn));
|
|
||||||
countryCodesByAsn.put(asn, countryCode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private AsnTable() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public Optional<Long> getAsn(final Inet4Address address) {
|
|
||||||
final long addressAsLong = ipToLong(address);
|
|
||||||
|
|
||||||
return Optional.ofNullable(asnBlocksByFirstIp.floorEntry(addressAsLong))
|
|
||||||
.filter(entry -> entry.getValue().contains(addressAsLong))
|
|
||||||
.map(entry -> entry.getValue().getAsn())
|
|
||||||
.filter(asn -> asn != 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Optional<String> getCountryCode(final long asn) {
|
|
||||||
return Optional.ofNullable(countryCodesByAsn.get(asn));
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static long ipToLong(final Inet4Address address) {
|
|
||||||
final ByteBuffer buffer = ByteBuffer.allocate(8);
|
|
||||||
buffer.position(4);
|
|
||||||
buffer.put(address.getAddress());
|
|
||||||
|
|
||||||
buffer.flip();
|
|
||||||
return buffer.getLong();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,166 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2013-2021 Signal Messenger, LLC
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.util;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import io.dropwizard.lifecycle.Managed;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.ScheduledFuture;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
|
|
||||||
import software.amazon.awssdk.core.ResponseInputStream;
|
|
||||||
import software.amazon.awssdk.regions.Region;
|
|
||||||
import software.amazon.awssdk.services.s3.S3Client;
|
|
||||||
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
|
|
||||||
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
|
|
||||||
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
|
|
||||||
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An S3 object monitor watches a specific object in an S3 bucket and notifies a listener if that object changes.
|
|
||||||
*/
|
|
||||||
public class S3ObjectMonitor implements Managed {
|
|
||||||
|
|
||||||
private final String s3Bucket;
|
|
||||||
private final String objectKey;
|
|
||||||
private final long maxObjectSize;
|
|
||||||
|
|
||||||
private final ScheduledExecutorService refreshExecutorService;
|
|
||||||
private final Duration refreshInterval;
|
|
||||||
private ScheduledFuture<?> refreshFuture;
|
|
||||||
|
|
||||||
private final Consumer<InputStream> changeListener;
|
|
||||||
|
|
||||||
private final AtomicReference<String> lastETag = new AtomicReference<>();
|
|
||||||
|
|
||||||
private final S3Client s3Client;
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(S3ObjectMonitor.class);
|
|
||||||
|
|
||||||
public S3ObjectMonitor(
|
|
||||||
final String s3Region,
|
|
||||||
final String s3Bucket,
|
|
||||||
final String objectKey,
|
|
||||||
final long maxObjectSize,
|
|
||||||
final ScheduledExecutorService refreshExecutorService,
|
|
||||||
final Duration refreshInterval,
|
|
||||||
final Consumer<InputStream> changeListener) {
|
|
||||||
|
|
||||||
this(S3Client.builder()
|
|
||||||
.region(Region.of(s3Region))
|
|
||||||
.credentialsProvider(InstanceProfileCredentialsProvider.create())
|
|
||||||
.build(),
|
|
||||||
s3Bucket,
|
|
||||||
objectKey,
|
|
||||||
maxObjectSize,
|
|
||||||
refreshExecutorService,
|
|
||||||
refreshInterval,
|
|
||||||
changeListener);
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
S3ObjectMonitor(
|
|
||||||
final S3Client s3Client,
|
|
||||||
final String s3Bucket,
|
|
||||||
final String objectKey,
|
|
||||||
final long maxObjectSize,
|
|
||||||
final ScheduledExecutorService refreshExecutorService,
|
|
||||||
final Duration refreshInterval,
|
|
||||||
final Consumer<InputStream> changeListener) {
|
|
||||||
|
|
||||||
this.s3Client = s3Client;
|
|
||||||
this.s3Bucket = s3Bucket;
|
|
||||||
this.objectKey = objectKey;
|
|
||||||
this.maxObjectSize = maxObjectSize;
|
|
||||||
|
|
||||||
this.refreshExecutorService = refreshExecutorService;
|
|
||||||
this.refreshInterval = refreshInterval;
|
|
||||||
|
|
||||||
this.changeListener = changeListener;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void start() {
|
|
||||||
if (refreshFuture != null) {
|
|
||||||
throw new RuntimeException("S3 object manager already started");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run the first request immediately/blocking, then start subsequent calls.
|
|
||||||
log.info("Initial request for s3://{}/{}", s3Bucket, objectKey);
|
|
||||||
refresh();
|
|
||||||
|
|
||||||
refreshFuture = refreshExecutorService
|
|
||||||
.scheduleAtFixedRate(this::refresh, refreshInterval.toMillis(), refreshInterval.toMillis(),
|
|
||||||
TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void stop() {
|
|
||||||
if (refreshFuture != null) {
|
|
||||||
refreshFuture.cancel(true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Immediately returns the monitored S3 object regardless of whether it has changed since it was last retrieved.
|
|
||||||
*
|
|
||||||
* @return the current version of the monitored S3 object. Caller should close() this upon completion.
|
|
||||||
* @throws IOException if the retrieved S3 object is larger than the configured maximum size
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
ResponseInputStream<GetObjectResponse> getObject() throws IOException {
|
|
||||||
ResponseInputStream<GetObjectResponse> response = s3Client.getObject(GetObjectRequest.builder()
|
|
||||||
.key(objectKey)
|
|
||||||
.bucket(s3Bucket)
|
|
||||||
.build());
|
|
||||||
|
|
||||||
lastETag.set(response.response().eTag());
|
|
||||||
|
|
||||||
if (response.response().contentLength() <= maxObjectSize) {
|
|
||||||
return response;
|
|
||||||
} else {
|
|
||||||
log.warn("Object at s3://{}/{} has a size of {} bytes, which exceeds the maximum allowed size of {} bytes",
|
|
||||||
s3Bucket, objectKey, response.response().contentLength(), maxObjectSize);
|
|
||||||
response.abort();
|
|
||||||
throw new IOException("S3 object too large");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Polls S3 for object metadata and notifies the listener provided at construction time if and only if the object has
|
|
||||||
* changed since the last call to {@link #getObject()} or {@code refresh()}.
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
void refresh() {
|
|
||||||
try {
|
|
||||||
final HeadObjectResponse objectMetadata = s3Client.headObject(HeadObjectRequest.builder()
|
|
||||||
.bucket(s3Bucket)
|
|
||||||
.key(objectKey)
|
|
||||||
.build());
|
|
||||||
|
|
||||||
final String initialETag = lastETag.get();
|
|
||||||
final String refreshedETag = objectMetadata.eTag();
|
|
||||||
|
|
||||||
if (!StringUtils.equals(initialETag, refreshedETag) && lastETag.compareAndSet(initialETag, refreshedETag)) {
|
|
||||||
try (final ResponseInputStream<GetObjectResponse> response = getObject()) {
|
|
||||||
log.info("Object at s3://{}/{} has changed; new eTag is {} and object size is {} bytes",
|
|
||||||
s3Bucket, objectKey, response.response().eTag(), response.response().contentLength());
|
|
||||||
changeListener.accept(response);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (final Exception e) {
|
|
||||||
log.warn("Failed to refresh monitored object", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,84 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2021 Signal Messenger, LLC
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.util;
|
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import io.dropwizard.lifecycle.Managed;
|
|
||||||
import io.micrometer.core.instrument.Counter;
|
|
||||||
import io.micrometer.core.instrument.Metrics;
|
|
||||||
import io.micrometer.core.instrument.Timer;
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.whispersystems.textsecuregcm.configuration.MonitoredS3ObjectConfiguration;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A utility for checking whether IP addresses belong to Tor exit nodes using the "bulk exit list."
|
|
||||||
*
|
|
||||||
* @see <a href="https://blog.torproject.org/changes-tor-exit-list-service">Changes to the Tor Exit List Service</a>
|
|
||||||
*/
|
|
||||||
public class TorExitNodeManager implements Managed {
|
|
||||||
|
|
||||||
private final S3ObjectMonitor exitListMonitor;
|
|
||||||
|
|
||||||
private final AtomicReference<Set<String>> exitNodeAddresses = new AtomicReference<>(Collections.emptySet());
|
|
||||||
|
|
||||||
private static final Timer REFRESH_TIMER = Metrics.timer(name(TorExitNodeManager.class, "refresh"));
|
|
||||||
private static final Counter REFRESH_ERRORS = Metrics.counter(name(TorExitNodeManager.class, "refreshErrors"));
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(TorExitNodeManager.class);
|
|
||||||
|
|
||||||
public TorExitNodeManager(
|
|
||||||
final ScheduledExecutorService scheduledExecutorService,
|
|
||||||
final MonitoredS3ObjectConfiguration configuration) {
|
|
||||||
|
|
||||||
this.exitListMonitor = new S3ObjectMonitor(
|
|
||||||
configuration.getS3Region(),
|
|
||||||
configuration.getS3Bucket(),
|
|
||||||
configuration.getObjectKey(),
|
|
||||||
configuration.getMaxSize(),
|
|
||||||
scheduledExecutorService,
|
|
||||||
configuration.getRefreshInterval(),
|
|
||||||
this::handleExitListChanged);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void start() {
|
|
||||||
exitListMonitor.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void stop() {
|
|
||||||
exitListMonitor.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isTorExitNode(final String address) {
|
|
||||||
return exitNodeAddresses.get().contains(address);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleExitListChanged(final InputStream exitList) {
|
|
||||||
REFRESH_TIMER.record(() -> handleExitListChangedStream(exitList));
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
void handleExitListChangedStream(final InputStream inputStream) {
|
|
||||||
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
|
|
||||||
exitNodeAddresses.set(reader.lines().collect(Collectors.toSet()));
|
|
||||||
} catch (final Exception e) {
|
|
||||||
REFRESH_ERRORS.increment();
|
|
||||||
log.warn("Failed to refresh Tor exit node list", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,39 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2013-2021 Signal Messenger, LLC
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.util;
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
import org.whispersystems.textsecuregcm.configuration.MonitoredS3ObjectConfiguration;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
class AsnManagerTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void getAsn() throws IOException {
|
|
||||||
final MonitoredS3ObjectConfiguration configuration = new MonitoredS3ObjectConfiguration();
|
|
||||||
configuration.setS3Region("ap-northeast-3");
|
|
||||||
|
|
||||||
final AsnManager asnManager = new AsnManager(mock(ScheduledExecutorService.class), configuration);
|
|
||||||
|
|
||||||
assertEquals(Optional.empty(), asnManager.getAsn("10.0.0.1"));
|
|
||||||
|
|
||||||
try (final InputStream tableInputStream = getClass().getResourceAsStream("ip2asn-test.tsv")) {
|
|
||||||
asnManager.handleAsnTableChangedStream(tableInputStream);
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(Optional.of(7922L), asnManager.getAsn("50.79.54.1"));
|
|
||||||
assertEquals(Optional.of(7552L), asnManager.getAsn("27.79.32.1"));
|
|
||||||
assertEquals(Optional.empty(), asnManager.getAsn("32.79.117.1"));
|
|
||||||
assertEquals(Optional.empty(), asnManager.getAsn("10.0.0.1"));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,49 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2021 Signal Messenger, LLC
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.util;
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.net.Inet4Address;
|
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
|
||||||
|
|
||||||
class AsnTableTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void getAsn() throws IOException {
|
|
||||||
try (final InputStreamReader reader = new InputStreamReader(getClass().getResourceAsStream("ip2asn-test.tsv"))) {
|
|
||||||
final AsnTable asnTable = new AsnTable(reader);
|
|
||||||
|
|
||||||
assertEquals(Optional.of(7922L), asnTable.getAsn((Inet4Address) Inet4Address.getByName("50.79.54.1")));
|
|
||||||
assertEquals(Optional.of(7552L), asnTable.getAsn((Inet4Address) Inet4Address.getByName("27.79.32.1")));
|
|
||||||
assertEquals(Optional.empty(), asnTable.getAsn((Inet4Address) Inet4Address.getByName("5.182.202.1")));
|
|
||||||
assertEquals(Optional.empty(), asnTable.getAsn((Inet4Address) Inet4Address.getByName("32.79.117.1")));
|
|
||||||
assertEquals(Optional.empty(), asnTable.getAsn((Inet4Address) Inet4Address.getByName("10.0.0.1")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void getCountryCode() throws IOException {
|
|
||||||
try (final InputStreamReader reader = new InputStreamReader(getClass().getResourceAsStream("ip2asn-test.tsv"))) {
|
|
||||||
final AsnTable asnTable = new AsnTable(reader);
|
|
||||||
|
|
||||||
assertEquals(Optional.of("US"), asnTable.getCountryCode(7922));
|
|
||||||
assertEquals(Optional.of("VN"), asnTable.getCountryCode(7552));
|
|
||||||
assertEquals(Optional.empty(), asnTable.getCountryCode(1234));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void ipToLong() throws UnknownHostException {
|
|
||||||
assertEquals(0x00000000ffffffffL, AsnTable.ipToLong((Inet4Address) Inet4Address.getByName("255.255.255.255")));
|
|
||||||
assertEquals(0x0000000000000001L, AsnTable.ipToLong((Inet4Address) Inet4Address.getByName("0.0.0.1")));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,126 +0,0 @@
|
||||||
package org.whispersystems.textsecuregcm.util;
|
|
||||||
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.never;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
import software.amazon.awssdk.core.ResponseInputStream;
|
|
||||||
import software.amazon.awssdk.http.AbortableInputStream;
|
|
||||||
import software.amazon.awssdk.services.s3.S3Client;
|
|
||||||
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
|
|
||||||
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
|
|
||||||
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
|
|
||||||
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
|
|
||||||
|
|
||||||
class S3ObjectMonitorTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void refresh() {
|
|
||||||
final S3Client s3Client = mock(S3Client.class);
|
|
||||||
|
|
||||||
final String bucket = "s3bucket";
|
|
||||||
final String objectKey = "greatest-smooth-jazz-hits-of-all-time.zip";
|
|
||||||
|
|
||||||
//noinspection unchecked
|
|
||||||
final Consumer<InputStream> listener = mock(Consumer.class);
|
|
||||||
|
|
||||||
final S3ObjectMonitor objectMonitor = new S3ObjectMonitor(
|
|
||||||
s3Client,
|
|
||||||
bucket,
|
|
||||||
objectKey,
|
|
||||||
16 * 1024 * 1024,
|
|
||||||
mock(ScheduledExecutorService.class),
|
|
||||||
Duration.ofMinutes(1),
|
|
||||||
listener);
|
|
||||||
|
|
||||||
String uuid = UUID.randomUUID().toString();
|
|
||||||
when(s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(objectKey).build())).thenReturn(
|
|
||||||
HeadObjectResponse.builder().eTag(uuid).build());
|
|
||||||
ResponseInputStream<GetObjectResponse> ris = responseInputStreamFromString("abc", uuid);
|
|
||||||
when(s3Client.getObject(GetObjectRequest.builder().bucket(bucket).key(objectKey).build())).thenReturn(ris);
|
|
||||||
|
|
||||||
objectMonitor.refresh();
|
|
||||||
objectMonitor.refresh();
|
|
||||||
|
|
||||||
verify(listener).accept(ris);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void refreshAfterGet() throws IOException {
|
|
||||||
final S3Client s3Client = mock(S3Client.class);
|
|
||||||
|
|
||||||
final String bucket = "s3bucket";
|
|
||||||
final String objectKey = "greatest-smooth-jazz-hits-of-all-time.zip";
|
|
||||||
|
|
||||||
//noinspection unchecked
|
|
||||||
final Consumer<InputStream> listener = mock(Consumer.class);
|
|
||||||
|
|
||||||
final S3ObjectMonitor objectMonitor = new S3ObjectMonitor(
|
|
||||||
s3Client,
|
|
||||||
bucket,
|
|
||||||
objectKey,
|
|
||||||
16 * 1024 * 1024,
|
|
||||||
mock(ScheduledExecutorService.class),
|
|
||||||
Duration.ofMinutes(1),
|
|
||||||
listener);
|
|
||||||
|
|
||||||
String uuid = UUID.randomUUID().toString();
|
|
||||||
when(s3Client.headObject(HeadObjectRequest.builder().key(objectKey).bucket(bucket).build()))
|
|
||||||
.thenReturn(HeadObjectResponse.builder().eTag(uuid).build());
|
|
||||||
ResponseInputStream<GetObjectResponse> responseInputStream = responseInputStreamFromString("abc", uuid);
|
|
||||||
when(s3Client.getObject(GetObjectRequest.builder().key(objectKey).bucket(bucket).build())).thenReturn(responseInputStream);
|
|
||||||
|
|
||||||
objectMonitor.getObject();
|
|
||||||
objectMonitor.refresh();
|
|
||||||
|
|
||||||
verify(listener, never()).accept(responseInputStream);
|
|
||||||
}
|
|
||||||
|
|
||||||
private ResponseInputStream<GetObjectResponse> responseInputStreamFromString(String s, String etag) {
|
|
||||||
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
|
|
||||||
AbortableInputStream ais = AbortableInputStream.create(new ByteArrayInputStream(bytes));
|
|
||||||
return new ResponseInputStream<>(GetObjectResponse.builder().contentLength((long) bytes.length).eTag(etag).build(), ais);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void refreshOversizedObject() {
|
|
||||||
final S3Client s3Client = mock(S3Client.class);
|
|
||||||
|
|
||||||
final String bucket = "s3bucket";
|
|
||||||
final String objectKey = "greatest-smooth-jazz-hits-of-all-time.zip";
|
|
||||||
final long maxObjectSize = 16 * 1024 * 1024;
|
|
||||||
|
|
||||||
//noinspection unchecked
|
|
||||||
final Consumer<InputStream> listener = mock(Consumer.class);
|
|
||||||
|
|
||||||
final S3ObjectMonitor objectMonitor = new S3ObjectMonitor(
|
|
||||||
s3Client,
|
|
||||||
bucket,
|
|
||||||
objectKey,
|
|
||||||
maxObjectSize,
|
|
||||||
mock(ScheduledExecutorService.class),
|
|
||||||
Duration.ofMinutes(1),
|
|
||||||
listener);
|
|
||||||
|
|
||||||
String uuid = UUID.randomUUID().toString();
|
|
||||||
when(s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(objectKey).build())).thenReturn(
|
|
||||||
HeadObjectResponse.builder().eTag(uuid).contentLength(maxObjectSize+1).build());
|
|
||||||
ResponseInputStream<GetObjectResponse> ris = responseInputStreamFromString("a".repeat((int) maxObjectSize+1), uuid);
|
|
||||||
when(s3Client.getObject(GetObjectRequest.builder().bucket(bucket).key(objectKey).build())).thenReturn(ris);
|
|
||||||
|
|
||||||
objectMonitor.refresh();
|
|
||||||
|
|
||||||
verify(listener, never()).accept(any());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,43 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2021 Signal Messenger, LLC
|
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.whispersystems.textsecuregcm.util;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
|
||||||
import org.whispersystems.textsecuregcm.configuration.MonitoredS3ObjectConfiguration;
|
|
||||||
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
|
||||||
|
|
||||||
class TorExitNodeManagerTest {
|
|
||||||
|
|
||||||
@RegisterExtension
|
|
||||||
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void testIsTorExitNode() {
|
|
||||||
final MonitoredS3ObjectConfiguration configuration = new MonitoredS3ObjectConfiguration();
|
|
||||||
configuration.setS3Region("ap-northeast-3");
|
|
||||||
|
|
||||||
final TorExitNodeManager torExitNodeManager =
|
|
||||||
new TorExitNodeManager(mock(ScheduledExecutorService.class), configuration);
|
|
||||||
|
|
||||||
assertFalse(torExitNodeManager.isTorExitNode("10.0.0.1"));
|
|
||||||
assertFalse(torExitNodeManager.isTorExitNode("10.0.0.2"));
|
|
||||||
|
|
||||||
torExitNodeManager.handleExitListChangedStream(
|
|
||||||
new ByteArrayInputStream("10.0.0.1\n10.0.0.2".getBytes(StandardCharsets.UTF_8)));
|
|
||||||
|
|
||||||
assertTrue(torExitNodeManager.isTorExitNode("10.0.0.1"));
|
|
||||||
assertTrue(torExitNodeManager.isTorExitNode("10.0.0.2"));
|
|
||||||
assertFalse(torExitNodeManager.isTorExitNode("10.0.0.3"));
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue