From 42ff3f8432fbc67b84d5fdb2e4e005d853354b70 Mon Sep 17 00:00:00 2001 From: Graeme Connell Date: Wed, 30 Jun 2021 09:11:38 -0600 Subject: [PATCH] Switch SQS to Amazon SDKv2. --- service/pom.xml | 8 +-- .../textsecuregcm/sqs/DirectoryQueue.java | 67 ++++++++++--------- .../textsecuregcm/sqs/DirectoryQueueTest.java | 44 ++++++------ 3 files changed, 60 insertions(+), 59 deletions(-) diff --git a/service/pom.xml b/service/pom.xml index 4d26edace..db440160e 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -259,6 +259,10 @@ software.amazon.awssdk s3 + + software.amazon.awssdk + sqs + software.amazon.awssdk dynamodb @@ -271,10 +275,6 @@ com.amazonaws aws-java-sdk-s3 - - com.amazonaws - aws-java-sdk-sqs - com.amazonaws aws-java-sdk-appconfig diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java b/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java index 19872cd4e..122d08d42 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueue.java @@ -6,16 +6,6 @@ package org.whispersystems.textsecuregcm.sqs; import static com.codahale.metrics.MetricRegistry.name; -import com.amazonaws.AmazonClientException; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.MessageAttributeValue; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; @@ -32,6 +22,15 @@ import org.whispersystems.textsecuregcm.configuration.SqsConfiguration; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkServiceException; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; public class DirectoryQueue { @@ -43,20 +42,23 @@ public class DirectoryQueue { private final Timer sendMessageBatchTimer = metricRegistry.timer(name(DirectoryQueue.class, "sendMessageBatch")); private final List queueUrls; - private final AmazonSQS sqs; + private final SqsClient sqs; public DirectoryQueue(SqsConfiguration sqsConfig) { - final AWSCredentials credentials = new BasicAWSCredentials(sqsConfig.getAccessKey(), sqsConfig.getAccessSecret()); - final AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials); + StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create( + sqsConfig.getAccessKey(), sqsConfig.getAccessSecret())); this.queueUrls = sqsConfig.getQueueUrls(); - this.sqs = AmazonSQSClientBuilder.standard().withRegion(sqsConfig.getRegion()).withCredentials(credentialsProvider).build(); + this.sqs = SqsClient.builder() + .region(Region.of(sqsConfig.getRegion())) + .credentialsProvider(credentialsProvider) + .build(); } @VisibleForTesting - DirectoryQueue(final List queueUrls, final AmazonSQS sqs) { + DirectoryQueue(final List queueUrls, final SqsClient sqs) { this.queueUrls = queueUrls; - this.sqs = sqs; + this.sqs = sqs; } public void refreshRegisteredUser(final Account account) { @@ -82,28 +84,30 @@ public class DirectoryQueue { final Account account = pair.first(); final String action = pair.second(); - return new SendMessageBatchRequestEntry() - .withMessageBody("-") - .withId(UUID.randomUUID().toString()) - .withMessageDeduplicationId(UUID.randomUUID().toString()) - .withMessageGroupId(account.getNumber()) - .withMessageAttributes(Map.of( - "id", new MessageAttributeValue().withDataType("String").withStringValue(account.getNumber()), - "uuid", new MessageAttributeValue().withDataType("String").withStringValue(account.getUuid().toString()), - "action", new MessageAttributeValue().withDataType("String").withStringValue(action) - )); + return SendMessageBatchRequestEntry.builder() + .messageBody("-") + .id(UUID.randomUUID().toString()) + .messageDeduplicationId(UUID.randomUUID().toString()) + .messageGroupId(account.getNumber()) + .messageAttributes(Map.of( + "id", MessageAttributeValue.builder().dataType("String").stringValue(account.getNumber()).build(), + "uuid", MessageAttributeValue.builder().dataType("String").stringValue(account.getUuid().toString()).build(), + "action", MessageAttributeValue.builder().dataType("String").stringValue(action).build() + )) + .build(); }).collect(Collectors.toList()); - final SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest() - .withQueueUrl(queueUrl) - .withEntries(entries); + final SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries(entries) + .build(); try (final Timer.Context ignored = sendMessageBatchTimer.time()) { sqs.sendMessageBatch(sendMessageBatchRequest); - } catch (AmazonServiceException ex) { + } catch (SdkServiceException ex) { serviceErrorMeter.mark(); logger.warn("sqs service error: ", ex); - } catch (AmazonClientException ex) { + } catch (SdkClientException ex) { clientErrorMeter.mark(); logger.warn("sqs client error: ", ex); } catch (Throwable t) { @@ -112,5 +116,4 @@ public class DirectoryQueue { } } } - } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java index 3bf270a7c..921751719 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/sqs/DirectoryQueueTest.java @@ -5,17 +5,15 @@ package org.whispersystems.textsecuregcm.sqs; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.MessageAttributeValue; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageRequest; import junitparams.JUnitParamsRunner; import junitparams.Parameters; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.whispersystems.textsecuregcm.storage.Account; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import java.util.List; import java.util.Map; @@ -33,7 +31,7 @@ public class DirectoryQueueTest { @Test @Parameters(method = "argumentsForTestRefreshRegisteredUser") public void testRefreshRegisteredUser(final boolean accountEnabled, final boolean accountDiscoverableByPhoneNumber, final String expectedAction) { - final AmazonSQS sqs = mock(AmazonSQS.class); + final SqsClient sqs = mock(SqsClient.class); final DirectoryQueue directoryQueue = new DirectoryQueue(List.of("sqs://test"), sqs); final Account account = mock(Account.class); @@ -47,15 +45,15 @@ public class DirectoryQueueTest { final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SendMessageBatchRequest.class); verify(sqs).sendMessageBatch(requestCaptor.capture()); - assertEquals(1, requestCaptor.getValue().getEntries().size()); + assertEquals(1, requestCaptor.getValue().entries().size()); - final Map messageAttributes = requestCaptor.getValue().getEntries().get(0).getMessageAttributes(); - assertEquals(new MessageAttributeValue().withDataType("String").withStringValue(expectedAction), messageAttributes.get("action")); + final Map messageAttributes = requestCaptor.getValue().entries().get(0).messageAttributes(); + assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(expectedAction).build(), messageAttributes.get("action")); } @Test public void testRefreshBatch() { - final AmazonSQS sqs = mock(AmazonSQS.class); + final SqsClient sqs = mock(SqsClient.class); final DirectoryQueue directoryQueue = new DirectoryQueue(List.of("sqs://test"), sqs); final Account discoverableAccount = mock(Account.class); @@ -75,22 +73,22 @@ public class DirectoryQueueTest { final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SendMessageBatchRequest.class); verify(sqs).sendMessageBatch(requestCaptor.capture()); - assertEquals(2, requestCaptor.getValue().getEntries().size()); + assertEquals(2, requestCaptor.getValue().entries().size()); - final Map discoverableAccountAttributes = requestCaptor.getValue().getEntries().get(0).getMessageAttributes(); - assertEquals(new MessageAttributeValue().withDataType("String").withStringValue(discoverableAccount.getNumber()), discoverableAccountAttributes.get("id")); - assertEquals(new MessageAttributeValue().withDataType("String").withStringValue(discoverableAccount.getUuid().toString()), discoverableAccountAttributes.get("uuid")); - assertEquals(new MessageAttributeValue().withDataType("String").withStringValue("add"), discoverableAccountAttributes.get("action")); + final Map discoverableAccountAttributes = requestCaptor.getValue().entries().get(0).messageAttributes(); + assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(discoverableAccount.getNumber()).build(), discoverableAccountAttributes.get("id")); + assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(discoverableAccount.getUuid().toString()).build(), discoverableAccountAttributes.get("uuid")); + assertEquals(MessageAttributeValue.builder().dataType("String").stringValue("add").build(), discoverableAccountAttributes.get("action")); - final Map undiscoverableAccountAttributes = requestCaptor.getValue().getEntries().get(1).getMessageAttributes(); - assertEquals(new MessageAttributeValue().withDataType("String").withStringValue(undiscoverableAccount.getNumber()), undiscoverableAccountAttributes.get("id")); - assertEquals(new MessageAttributeValue().withDataType("String").withStringValue(undiscoverableAccount.getUuid().toString()), undiscoverableAccountAttributes.get("uuid")); - assertEquals(new MessageAttributeValue().withDataType("String").withStringValue("delete"), undiscoverableAccountAttributes.get("action")); + final Map undiscoverableAccountAttributes = requestCaptor.getValue().entries().get(1).messageAttributes(); + assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(undiscoverableAccount.getNumber()).build(), undiscoverableAccountAttributes.get("id")); + assertEquals(MessageAttributeValue.builder().dataType("String").stringValue(undiscoverableAccount.getUuid().toString()).build(), undiscoverableAccountAttributes.get("uuid")); + assertEquals(MessageAttributeValue.builder().dataType("String").stringValue("delete").build(), undiscoverableAccountAttributes.get("action")); } @Test public void testSendMessageMultipleQueues() { - final AmazonSQS sqs = mock(AmazonSQS.class); + final SqsClient sqs = mock(SqsClient.class); final DirectoryQueue directoryQueue = new DirectoryQueue(List.of("sqs://first", "sqs://second"), sqs); final Account account = mock(Account.class); @@ -105,10 +103,10 @@ public class DirectoryQueueTest { verify(sqs, times(2)).sendMessageBatch(requestCaptor.capture()); for (final SendMessageBatchRequest sendMessageBatchRequest : requestCaptor.getAllValues()) { - assertEquals(1, requestCaptor.getValue().getEntries().size()); + assertEquals(1, requestCaptor.getValue().entries().size()); - final Map messageAttributes = sendMessageBatchRequest.getEntries().get(0).getMessageAttributes(); - assertEquals(new MessageAttributeValue().withDataType("String").withStringValue("add"), messageAttributes.get("action")); + final Map messageAttributes = sendMessageBatchRequest.entries().get(0).messageAttributes(); + assertEquals(MessageAttributeValue.builder().dataType("String").stringValue("add").build(), messageAttributes.get("action")); } }