Simplify S3ObjectMonitor API, try-with-resource.

This commit is contained in:
Graeme Connell 2021-05-21 09:54:00 -06:00 committed by gram-signal
parent 722055c8b5
commit c10b64c367
4 changed files with 26 additions and 34 deletions

View File

@ -24,8 +24,6 @@ import java.util.zip.GZIPInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.MonitoredS3ObjectConfiguration;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
public class AsnManager implements Managed {
@ -71,7 +69,7 @@ public class AsnManager implements Managed {
}
}
private void handleAsnTableChanged(final ResponseInputStream<GetObjectResponse> asnTableObject) {
private void handleAsnTableChanged(final InputStream asnTableObject) {
REFRESH_TIMER.record(() -> {
try {
handleAsnTableChangedStream(new GZIPInputStream(asnTableObject));

View File

@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.util;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -39,7 +40,7 @@ public class S3ObjectMonitor implements Managed {
private final Duration refreshInterval;
private ScheduledFuture<?> refreshFuture;
private final Consumer<ResponseInputStream<GetObjectResponse>> changeListener;
private final Consumer<InputStream> changeListener;
private final AtomicReference<String> lastETag = new AtomicReference<>();
@ -54,7 +55,7 @@ public class S3ObjectMonitor implements Managed {
final long maxObjectSize,
final ScheduledExecutorService refreshExecutorService,
final Duration refreshInterval,
final Consumer<ResponseInputStream<GetObjectResponse>> changeListener) {
final Consumer<InputStream> changeListener) {
this(S3Client.builder()
.region(Region.of(s3Region))
@ -76,7 +77,7 @@ public class S3ObjectMonitor implements Managed {
final long maxObjectSize,
final ScheduledExecutorService refreshExecutorService,
final Duration refreshInterval,
final Consumer<ResponseInputStream<GetObjectResponse>> changeListener) {
final Consumer<InputStream> changeListener) {
this.s3Client = s3Client;
this.s3Bucket = s3Bucket;
@ -152,15 +153,10 @@ public class S3ObjectMonitor implements Managed {
final String refreshedETag = objectMetadata.eTag();
if (!StringUtils.equals(initialETag, refreshedETag) && lastETag.compareAndSet(initialETag, refreshedETag)) {
final ResponseInputStream<GetObjectResponse> response = getObject();
log.info("Object at s3://{}/{} has changed; new eTag is {} and object size is {} bytes",
s3Bucket, objectKey, response.response().eTag(), response.response().contentLength());
try {
try (final ResponseInputStream<GetObjectResponse> response = getObject()) {
log.info("Object at s3://{}/{} has changed; new eTag is {} and object size is {} bytes",
s3Bucket, objectKey, response.response().eTag(), response.response().contentLength());
changeListener.accept(response);
} finally {
response.close();
}
}
} catch (final Exception e) {

View File

@ -23,8 +23,6 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.MonitoredS3ObjectConfiguration;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
/**
* A utility for checking whether IP addresses belong to Tor exit nodes using the "bulk exit list."
@ -70,7 +68,7 @@ public class TorExitNodeManager implements Managed {
return exitNodeAddresses.get().contains(address);
}
private void handleExitListChanged(final ResponseInputStream<GetObjectResponse> exitList) {
private void handleExitListChanged(final InputStream exitList) {
REFRESH_TIMER.record(() -> handleExitListChanged(exitList));
}

View File

@ -1,5 +1,19 @@
package org.whispersystems.textsecuregcm.util;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.http.AbortableInputStream;
@ -9,20 +23,6 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class S3ObjectMonitorTest {
@Test
@ -33,7 +33,7 @@ class S3ObjectMonitorTest {
final String objectKey = "greatest-smooth-jazz-hits-of-all-time.zip";
//noinspection unchecked
final Consumer<ResponseInputStream<GetObjectResponse>> listener = mock(Consumer.class);
final Consumer<InputStream> listener = mock(Consumer.class);
final S3ObjectMonitor objectMonitor = new S3ObjectMonitor(
s3Client,
@ -64,7 +64,7 @@ class S3ObjectMonitorTest {
final String objectKey = "greatest-smooth-jazz-hits-of-all-time.zip";
//noinspection unchecked
final Consumer<ResponseInputStream<GetObjectResponse>> listener = mock(Consumer.class);
final Consumer<InputStream> listener = mock(Consumer.class);
final S3ObjectMonitor objectMonitor = new S3ObjectMonitor(
s3Client,
@ -102,7 +102,7 @@ class S3ObjectMonitorTest {
final long maxObjectSize = 16 * 1024 * 1024;
//noinspection unchecked
final Consumer<ResponseInputStream<GetObjectResponse>> listener = mock(Consumer.class);
final Consumer<InputStream> listener = mock(Consumer.class);
final S3ObjectMonitor objectMonitor = new S3ObjectMonitor(
s3Client,