diff --git a/service/pom.xml b/service/pom.xml
index 4d26edace..db440160e 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -259,6 +259,10 @@
software.amazon.awssdk
s3
+
+ software.amazon.awssdk
+ sqs
+
software.amazon.awssdk
dynamodb
@@ -271,10 +275,6 @@
com.amazonaws
aws-java-sdk-s3
-
- com.amazonaws
- aws-java-sdk-sqs
-
com.amazonaws
aws-java-sdk-appconfig
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java b/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java
index 19872cd4e..122d08d42 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java
@@ -6,16 +6,6 @@ package org.whispersystems.textsecuregcm.sqs;
import static com.codahale.metrics.MetricRegistry.name;
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.sqs.AmazonSQS;
-import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
-import com.amazonaws.services.sqs.model.MessageAttributeValue;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
@@ -32,6 +22,15 @@ import org.whispersystems.textsecuregcm.configuration.SqsConfiguration;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.core.exception.SdkServiceException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
public class DirectoryQueue {
@@ -43,20 +42,23 @@ public class DirectoryQueue {
private final Timer sendMessageBatchTimer = metricRegistry.timer(name(DirectoryQueue.class, "sendMessageBatch"));
private final List queueUrls;
- private final AmazonSQS sqs;
+ private final SqsClient sqs;
public DirectoryQueue(SqsConfiguration sqsConfig) {
- final AWSCredentials credentials = new BasicAWSCredentials(sqsConfig.getAccessKey(), sqsConfig.getAccessSecret());
- final AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
+ StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(
+ sqsConfig.getAccessKey(), sqsConfig.getAccessSecret()));
this.queueUrls = sqsConfig.getQueueUrls();
- this.sqs = AmazonSQSClientBuilder.standard().withRegion(sqsConfig.getRegion()).withCredentials(credentialsProvider).build();
+ this.sqs = SqsClient.builder()
+ .region(Region.of(sqsConfig.getRegion()))
+ .credentialsProvider(credentialsProvider)
+ .build();
}
@VisibleForTesting
- DirectoryQueue(final List queueUrls, final AmazonSQS sqs) {
+ DirectoryQueue(final List queueUrls, final SqsClient sqs) {
this.queueUrls = queueUrls;
- this.sqs = sqs;
+ this.sqs = sqs;
}
public void refreshRegisteredUser(final Account account) {
@@ -82,28 +84,30 @@ public class DirectoryQueue {
final Account account = pair.first();
final String action = pair.second();
- return new SendMessageBatchRequestEntry()
- .withMessageBody("-")
- .withId(UUID.randomUUID().toString())
- .withMessageDeduplicationId(UUID.randomUUID().toString())
- .withMessageGroupId(account.getNumber())
- .withMessageAttributes(Map.of(
- "id", new MessageAttributeValue().withDataType("String").withStringValue(account.getNumber()),
- "uuid", new MessageAttributeValue().withDataType("String").withStringValue(account.getUuid().toString()),
- "action", new MessageAttributeValue().withDataType("String").withStringValue(action)
- ));
+ return SendMessageBatchRequestEntry.builder()
+ .messageBody("-")
+ .id(UUID.randomUUID().toString())
+ .messageDeduplicationId(UUID.randomUUID().toString())
+ .messageGroupId(account.getNumber())
+ .messageAttributes(Map.of(
+ "id", MessageAttributeValue.builder().dataType("String").stringValue(account.getNumber()).build(),
+ "uuid", MessageAttributeValue.builder().dataType("String").stringValue(account.getUuid().toString()).build(),
+ "action", MessageAttributeValue.builder().dataType("String").stringValue(action).build()
+ ))
+ .build();
}).collect(Collectors.toList());
- final SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest()
- .withQueueUrl(queueUrl)
- .withEntries(entries);
+ final SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder()
+ .queueUrl(queueUrl)
+ .entries(entries)
+ .build();
try (final Timer.Context ignored = sendMessageBatchTimer.time()) {
sqs.sendMessageBatch(sendMessageBatchRequest);
- } catch (AmazonServiceException ex) {
+ } catch (SdkServiceException ex) {
serviceErrorMeter.mark();
logger.warn("sqs service error: ", ex);
- } catch (AmazonClientException ex) {
+ } catch (SdkClientException ex) {
clientErrorMeter.mark();
logger.warn("sqs client error: ", ex);
} catch (Throwable t) {
@@ -112,5 +116,4 @@ public class DirectoryQueue {
}
}
}
-
}
diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java
index 3bf270a7c..921751719 100644
--- a/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java
+++ b/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java
@@ -5,17 +5,15 @@
package org.whispersystems.textsecuregcm.sqs;
-import com.amazonaws.services.sqs.AmazonSQS;
-import com.amazonaws.services.sqs.model.MessageAttributeValue;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageRequest;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.whispersystems.textsecuregcm.storage.Account;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import java.util.List;
import java.util.Map;
@@ -33,7 +31,7 @@ public class DirectoryQueueTest {
@Test
@Parameters(method = "argumentsForTestRefreshRegisteredUser")
public void testRefreshRegisteredUser(final boolean accountEnabled, final boolean accountDiscoverableByPhoneNumber, final String expectedAction) {
- final AmazonSQS sqs = mock(AmazonSQS.class);
+ final SqsClient sqs = mock(SqsClient.class);
final DirectoryQueue directoryQueue = new DirectoryQueue(List.of("sqs://test"), sqs);
final Account account = mock(Account.class);
@@ -47,15 +45,15 @@ public class DirectoryQueueTest {
final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SendMessageBatchRequest.class);
verify(sqs).sendMessageBatch(requestCaptor.capture());
- assertEquals(1, requestCaptor.getValue().getEntries().size());
+ assertEquals(1, requestCaptor.getValue().entries().size());
- final Map messageAttributes = requestCaptor.getValue().getEntries().get(0).getMessageAttributes();
- assertEquals(new MessageAttributeValue().withDataType("String").withStringValue(expectedAction), messageAttributes.get("action"));
+ final Map messageAttributes = requestCaptor.getValue().entries().get(0).messageAttributes();
+ assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(expectedAction).build(), messageAttributes.get("action"));
}
@Test
public void testRefreshBatch() {
- final AmazonSQS sqs = mock(AmazonSQS.class);
+ final SqsClient sqs = mock(SqsClient.class);
final DirectoryQueue directoryQueue = new DirectoryQueue(List.of("sqs://test"), sqs);
final Account discoverableAccount = mock(Account.class);
@@ -75,22 +73,22 @@ public class DirectoryQueueTest {
final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SendMessageBatchRequest.class);
verify(sqs).sendMessageBatch(requestCaptor.capture());
- assertEquals(2, requestCaptor.getValue().getEntries().size());
+ assertEquals(2, requestCaptor.getValue().entries().size());
- final Map discoverableAccountAttributes = requestCaptor.getValue().getEntries().get(0).getMessageAttributes();
- assertEquals(new MessageAttributeValue().withDataType("String").withStringValue(discoverableAccount.getNumber()), discoverableAccountAttributes.get("id"));
- assertEquals(new MessageAttributeValue().withDataType("String").withStringValue(discoverableAccount.getUuid().toString()), discoverableAccountAttributes.get("uuid"));
- assertEquals(new MessageAttributeValue().withDataType("String").withStringValue("add"), discoverableAccountAttributes.get("action"));
+ final Map discoverableAccountAttributes = requestCaptor.getValue().entries().get(0).messageAttributes();
+ assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(discoverableAccount.getNumber()).build(), discoverableAccountAttributes.get("id"));
+ assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(discoverableAccount.getUuid().toString()).build(), discoverableAccountAttributes.get("uuid"));
+ assertEquals(MessageAttributeValue.builder().dataType("String").stringValue("add").build(), discoverableAccountAttributes.get("action"));
- final Map undiscoverableAccountAttributes = requestCaptor.getValue().getEntries().get(1).getMessageAttributes();
- assertEquals(new MessageAttributeValue().withDataType("String").withStringValue(undiscoverableAccount.getNumber()), undiscoverableAccountAttributes.get("id"));
- assertEquals(new MessageAttributeValue().withDataType("String").withStringValue(undiscoverableAccount.getUuid().toString()), undiscoverableAccountAttributes.get("uuid"));
- assertEquals(new MessageAttributeValue().withDataType("String").withStringValue("delete"), undiscoverableAccountAttributes.get("action"));
+ final Map undiscoverableAccountAttributes = requestCaptor.getValue().entries().get(1).messageAttributes();
+ assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(undiscoverableAccount.getNumber()).build(), undiscoverableAccountAttributes.get("id"));
+ assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(undiscoverableAccount.getUuid().toString()).build(), undiscoverableAccountAttributes.get("uuid"));
+ assertEquals(MessageAttributeValue.builder().dataType("String").stringValue("delete").build(), undiscoverableAccountAttributes.get("action"));
}
@Test
public void testSendMessageMultipleQueues() {
- final AmazonSQS sqs = mock(AmazonSQS.class);
+ final SqsClient sqs = mock(SqsClient.class);
final DirectoryQueue directoryQueue = new DirectoryQueue(List.of("sqs://first", "sqs://second"), sqs);
final Account account = mock(Account.class);
@@ -105,10 +103,10 @@ public class DirectoryQueueTest {
verify(sqs, times(2)).sendMessageBatch(requestCaptor.capture());
for (final SendMessageBatchRequest sendMessageBatchRequest : requestCaptor.getAllValues()) {
- assertEquals(1, requestCaptor.getValue().getEntries().size());
+ assertEquals(1, requestCaptor.getValue().entries().size());
- final Map messageAttributes = sendMessageBatchRequest.getEntries().get(0).getMessageAttributes();
- assertEquals(new MessageAttributeValue().withDataType("String").withStringValue("add"), messageAttributes.get("action"));
+ final Map messageAttributes = sendMessageBatchRequest.entries().get(0).messageAttributes();
+ assertEquals(MessageAttributeValue.builder().dataType("String").stringValue("add").build(), messageAttributes.get("action"));
}
}