More clearly separate concerns for explicitly getting monitored objects.
This commit is contained in:
parent
28e3b23e8c
commit
3056ea8cbc
|
@ -12,6 +12,7 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
import com.amazonaws.services.s3.model.S3Object;
|
import com.amazonaws.services.s3.model.S3Object;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import io.dropwizard.lifecycle.Managed;
|
import io.dropwizard.lifecycle.Managed;
|
||||||
|
import java.io.IOException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
@ -103,12 +104,33 @@ public class S3ObjectMonitor implements Managed {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Polls S3 for object metadata and notifies the listener provided at construction time if and only if the object has
|
* Immediately returns the monitored S3 object regardless of whether it has changed since it was last retrieved.
|
||||||
* changed. This method blocks until S3 object metadata has been retrieved and, if the object has changed, the
|
*
|
||||||
* listener returns. Callers may wish to call this method at initialization time if they need data from the monitored
|
* @return the current version of the monitored S3 object
|
||||||
* S3 object immediately and don't want to wait for the first scheduled update.
|
*
|
||||||
|
* @throws IOException if the retrieved S3 object is larger than the configured maximum size
|
||||||
*/
|
*/
|
||||||
public void refresh() {
|
public S3Object getObject() throws IOException {
|
||||||
|
final S3Object s3Object = s3Client.getObject(s3Bucket, objectKey);
|
||||||
|
|
||||||
|
lastETag.set(s3Object.getObjectMetadata().getETag());
|
||||||
|
|
||||||
|
if (s3Object.getObjectMetadata().getContentLength() <= maxObjectSize) {
|
||||||
|
return s3Object;
|
||||||
|
} else {
|
||||||
|
log.warn("Object at s3://{}/{} has a size of {} bytes, which exceeds the maximum allowed size of {} bytes",
|
||||||
|
s3Bucket, objectKey, s3Object.getObjectMetadata().getContentLength(), maxObjectSize);
|
||||||
|
|
||||||
|
throw new IOException("S3 object too large");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Polls S3 for object metadata and notifies the listener provided at construction time if and only if the object has
|
||||||
|
* changed since the last call to {@link #getObject()} or {@code refresh()}.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
void refresh() {
|
||||||
try {
|
try {
|
||||||
final ObjectMetadata objectMetadata = s3Client.getObjectMetadata(s3Bucket, objectKey);
|
final ObjectMetadata objectMetadata = s3Client.getObjectMetadata(s3Bucket, objectKey);
|
||||||
|
|
||||||
|
@ -116,17 +138,12 @@ public class S3ObjectMonitor implements Managed {
|
||||||
final String refreshedETag = objectMetadata.getETag();
|
final String refreshedETag = objectMetadata.getETag();
|
||||||
|
|
||||||
if (!StringUtils.equals(initialETag, refreshedETag) && lastETag.compareAndSet(initialETag, refreshedETag)) {
|
if (!StringUtils.equals(initialETag, refreshedETag) && lastETag.compareAndSet(initialETag, refreshedETag)) {
|
||||||
final S3Object s3Object = s3Client.getObject(s3Bucket, objectKey);
|
final S3Object s3Object = getObject();
|
||||||
|
|
||||||
log.info("Object at s3://{}/{} has changed; new eTag is {} and object size is {} bytes",
|
log.info("Object at s3://{}/{} has changed; new eTag is {} and object size is {} bytes",
|
||||||
s3Bucket, objectKey, s3Object.getObjectMetadata().getETag(), s3Object.getObjectMetadata().getContentLength());
|
s3Bucket, objectKey, s3Object.getObjectMetadata().getETag(), s3Object.getObjectMetadata().getContentLength());
|
||||||
|
|
||||||
if (s3Object.getObjectMetadata().getContentLength() <= maxObjectSize) {
|
|
||||||
changeListener.accept(s3Object);
|
changeListener.accept(s3Object);
|
||||||
} else {
|
|
||||||
log.warn("Object at s3://{}/{} has a size of {} bytes, which exceeds the maximum allowed size of {} bytes",
|
|
||||||
s3Bucket, objectKey, s3Object.getObjectMetadata().getContentLength(), maxObjectSize);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
log.warn("Failed to refresh monitored object", e);
|
log.warn("Failed to refresh monitored object", e);
|
||||||
|
|
|
@ -14,6 +14,7 @@ import io.micrometer.core.instrument.Counter;
|
||||||
import io.micrometer.core.instrument.Metrics;
|
import io.micrometer.core.instrument.Metrics;
|
||||||
import io.micrometer.core.instrument.Timer;
|
import io.micrometer.core.instrument.Timer;
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -57,7 +58,12 @@ public class TorExitNodeManager implements Managed {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void start() {
|
public synchronized void start() {
|
||||||
exitListMonitor.refresh();
|
try {
|
||||||
|
handleExitListChanged(exitListMonitor.getObject());
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.warn("Failed to load initial Tor exit node list", e);
|
||||||
|
}
|
||||||
|
|
||||||
exitListMonitor.start();
|
exitListMonitor.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
import com.amazonaws.services.s3.model.S3Object;
|
import com.amazonaws.services.s3.model.S3Object;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
@ -50,6 +51,38 @@ class S3ObjectMonitorTest {
|
||||||
verify(listener).accept(s3Object);
|
verify(listener).accept(s3Object);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void refreshAfterGet() throws IOException {
|
||||||
|
final AmazonS3 s3Client = mock(AmazonS3.class);
|
||||||
|
final ObjectMetadata metadata = mock(ObjectMetadata.class);
|
||||||
|
final S3Object s3Object = mock(S3Object.class);
|
||||||
|
|
||||||
|
final String bucket = "s3bucket";
|
||||||
|
final String objectKey = "greatest-smooth-jazz-hits-of-all-time.zip";
|
||||||
|
|
||||||
|
//noinspection unchecked
|
||||||
|
final Consumer<S3Object> listener = mock(Consumer.class);
|
||||||
|
|
||||||
|
final S3ObjectMonitor objectMonitor = new S3ObjectMonitor(
|
||||||
|
s3Client,
|
||||||
|
bucket,
|
||||||
|
objectKey,
|
||||||
|
16 * 1024 * 1024,
|
||||||
|
mock(ScheduledExecutorService.class),
|
||||||
|
Duration.ofMinutes(1),
|
||||||
|
listener);
|
||||||
|
|
||||||
|
when(metadata.getETag()).thenReturn(UUID.randomUUID().toString());
|
||||||
|
when(s3Object.getObjectMetadata()).thenReturn(metadata);
|
||||||
|
when(s3Client.getObjectMetadata(bucket, objectKey)).thenReturn(metadata);
|
||||||
|
when(s3Client.getObject(bucket, objectKey)).thenReturn(s3Object);
|
||||||
|
|
||||||
|
objectMonitor.getObject();
|
||||||
|
objectMonitor.refresh();
|
||||||
|
|
||||||
|
verify(listener, never()).accept(s3Object);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void refreshOversizedObject() {
|
void refreshOversizedObject() {
|
||||||
final AmazonS3 s3Client = mock(AmazonS3.class);
|
final AmazonS3 s3Client = mock(AmazonS3.class);
|
||||||
|
|
Loading…
Reference in New Issue