Also check legacy parittion keys for message presence

This commit is contained in:
Jon Chambers 2024-07-30 17:19:54 -04:00 committed by Jon Chambers
parent 97785fa570
commit 2104a60703
1 changed files with 16 additions and 11 deletions

View File

@ -70,6 +70,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private final ExecutorService messageDeletionExecutor;
private final Scheduler messageDeletionScheduler;
private static final CompletableFuture<?>[] EMPTY_FUTURE_ARRAY = new CompletableFuture<?>[0];
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
public MessagesDynamoDb(DynamoDbClient dynamoDb, DynamoDbAsyncClient dynamoDbAsyncClient, String tableName,
@ -116,18 +118,21 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
}
public CompletableFuture<Boolean> mayHaveMessages(final UUID accountIdentifier, final Device device) {
final AttributeValue partitionKey = convertPartitionKey(accountIdentifier, device);
final List<CompletableFuture<Boolean>> mayHaveMessagesFutures =
Stream.of(convertPartitionKeyDeprecated(accountIdentifier, device), convertPartitionKey(accountIdentifier, device))
.distinct()
.map(partitionKey -> dbAsyncClient.query(QueryRequest.builder()
.tableName(tableName)
.consistentRead(false)
.limit(1)
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey)).build())
.thenApply(queryResponse -> queryResponse.count() > 0))
.toList();
QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
.tableName(tableName)
.consistentRead(false)
.limit(1)
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey));
return dbAsyncClient.query(queryRequestBuilder.build())
.thenApply(queryResponse -> queryResponse.count() > 0);
return CompletableFuture.allOf(mayHaveMessagesFutures.toArray(EMPTY_FUTURE_ARRAY))
.thenApply(ignored -> mayHaveMessagesFutures.stream().anyMatch(CompletableFuture::join));
}
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final Device device, final Integer limit) {