diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/S3ObjectMonitor.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/S3ObjectMonitor.java new file mode 100644 index 000000000..dcf1e77f9 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/S3ObjectMonitor.java @@ -0,0 +1,120 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.util; + +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.google.common.annotations.VisibleForTesting; +import io.dropwizard.lifecycle.Managed; +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An S3 object monitor watches a specific object in an S3 bucket and notifies a listener if that object changes. + */ +public class S3ObjectMonitor implements Managed { + + private final String s3Bucket; + private final String objectKey; + + private final ScheduledExecutorService refreshExecutorService; + private final Duration refreshInterval; + private ScheduledFuture refreshFuture; + + private final Consumer changeListener; + + private final AtomicReference lastETag = new AtomicReference<>(); + + private final AmazonS3 s3Client; + + private static final Logger log = LoggerFactory.getLogger(S3ObjectMonitor.class); + + public S3ObjectMonitor( + final String s3Region, + final String s3Bucket, + final String objectKey, + final ScheduledExecutorService refreshExecutorService, + final Duration refreshInterval, + final Consumer changeListener) { + + this(AmazonS3ClientBuilder.standard() + .withCredentials(InstanceProfileCredentialsProvider.getInstance()) + .withRegion(s3Region) + .build(), + s3Bucket, + objectKey, + refreshExecutorService, + refreshInterval, + changeListener); + } + + @VisibleForTesting + S3ObjectMonitor( + final AmazonS3 s3Client, + final String s3Bucket, + final String objectKey, + final ScheduledExecutorService refreshExecutorService, + final Duration refreshInterval, + final Consumer changeListener) { + + this.s3Client = s3Client; + this.s3Bucket = s3Bucket; + this.objectKey = objectKey; + + this.refreshExecutorService = refreshExecutorService; + this.refreshInterval = refreshInterval; + + this.changeListener = changeListener; + } + + @Override + public synchronized void start() { + if (refreshFuture != null) { + throw new RuntimeException("S3 object manager already started"); + } + + refreshFuture = refreshExecutorService + .scheduleAtFixedRate(this::refresh, refreshInterval.toMillis(), refreshInterval.toMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public synchronized void stop() { + if (refreshFuture != null) { + refreshFuture.cancel(true); + } + } + + /** + * Polls S3 for object metadata and notifies the listener provided at construction time if and only if the object has + * changed. This method blocks until S3 object metadata has been retrieved and, if the object has changed, the + * listener returns. Callers may wish to call this method at initialization time if they need data from the monitored + * S3 object immediately and don't want to wait for the first scheduled update. + */ + public void refresh() { + try { + final ObjectMetadata objectMetadata = s3Client.getObjectMetadata(s3Bucket, objectKey); + + final String initialETag = lastETag.get(); + final String refreshedETag = objectMetadata.getETag(); + + if (!StringUtils.equals(initialETag, refreshedETag) && lastETag.compareAndSet(initialETag, refreshedETag)) { + changeListener.accept(s3Client.getObject(s3Bucket, objectKey)); + } + } catch (final Exception e) { + log.warn("Failed to refresh monitored object", e); + } + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/util/S3ObjectMonitorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/util/S3ObjectMonitorTest.java new file mode 100644 index 000000000..60d642c4f --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/util/S3ObjectMonitorTest.java @@ -0,0 +1,49 @@ +package org.whispersystems.textsecuregcm.util; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class S3ObjectMonitorTest { + + @Test + void refresh() { + final AmazonS3 s3Client = mock(AmazonS3.class); + final ObjectMetadata metadata = mock(ObjectMetadata.class); + final S3Object s3Object = mock(S3Object.class); + + final String bucket = "s3bucket"; + final String objectKey = "greatest-smooth-jazz-hits-of-all-time.zip"; + + //noinspection unchecked + final Consumer listener = mock(Consumer.class); + + final S3ObjectMonitor objectMonitor = new S3ObjectMonitor( + s3Client, + bucket, + objectKey, mock(ScheduledExecutorService.class), + Duration.ofMinutes(1), + listener); + + when(metadata.getETag()).thenReturn(UUID.randomUUID().toString()); + when(s3Object.getObjectMetadata()).thenReturn(metadata); + when(s3Client.getObjectMetadata(bucket, objectKey)).thenReturn(metadata); + when(s3Client.getObject(bucket, objectKey)).thenReturn(s3Object); + + objectMonitor.refresh(); + objectMonitor.refresh(); + + verify(listener).accept(s3Object); + } +}