Add a Dynamo-backed key store.

This commit is contained in:
Jon Chambers 2021-01-21 15:43:15 -05:00 committed by Jon Chambers
parent 426e6923ac
commit d4d9403829
18 changed files with 758 additions and 210 deletions

View File

@ -14,6 +14,7 @@ import org.whispersystems.textsecuregcm.configuration.AwsAttachmentsConfiguratio
import org.whispersystems.textsecuregcm.configuration.CdnConfiguration;
import org.whispersystems.textsecuregcm.configuration.DatabaseConfiguration;
import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration;
import org.whispersystems.textsecuregcm.configuration.DynamoDbConfiguration;
import org.whispersystems.textsecuregcm.configuration.GcmConfiguration;
import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration;
import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration;
@ -128,6 +129,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private MessageDynamoDbConfiguration messageDynamoDb;
@Valid
@NotNull
@JsonProperty
private DynamoDbConfiguration keysDynamoDb;
@Valid
@NotNull
@JsonProperty
@ -306,6 +312,10 @@ public class WhisperServerConfiguration extends Configuration {
return messageDynamoDb;
}
public DynamoDbConfiguration getKeysDynamoDbConfiguration() {
return keysDynamoDb;
}
public DatabaseConfiguration getMessageStoreConfiguration() {
return messageStore;
}

View File

@ -126,6 +126,7 @@ import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.FeatureFlags;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import org.whispersystems.textsecuregcm.storage.Keys;
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagePersister;
import org.whispersystems.textsecuregcm.storage.Messages;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
@ -275,7 +276,16 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
AmazonDynamoDBClientBuilder keysDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(config.getKeysDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getKeysDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getKeysDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build());
DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
Accounts accounts = new Accounts(accountDatabase);
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
@ -284,6 +294,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
Keys keys = new Keys(accountDatabase, config.getAccountsDatabaseConfiguration().getKeyOperationRetryConfiguration());
KeysDynamoDb keysDynamoDb = new KeysDynamoDb(preKeyDynamoDb, config.getKeysDynamoDbConfiguration().getTableName());
Messages messages = new Messages(messageDatabase);
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, config.getMessageDynamoDbConfiguration().getTableName(), config.getMessageDynamoDbConfiguration().getTimeToLive());
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
@ -338,7 +349,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
MessagesManager messagesManager = new MessagesManager(messages, messagesDynamoDb, messagesCache, pushLatencyManager, experimentEnrollmentManager);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster, directoryQueue, keys, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, recurringJobExecutor);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);

View File

@ -0,0 +1,50 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import java.time.Duration;
public class DynamoDbConfiguration {
@JsonProperty
@NotBlank
private String region;
@JsonProperty
@NotBlank
private String tableName;
@JsonProperty
private Duration clientExecutionTimeout = Duration.ofSeconds(30);
@JsonProperty
private Duration clientRequestTimeout = Duration.ofSeconds(10);
@Valid
@NotEmpty
public String getRegion() {
return region;
}
@Valid
@NotEmpty
public String getTableName() {
return tableName;
}
public Duration getClientExecutionTimeout() {
return clientExecutionTimeout;
}
public Duration getClientRequestTimeout() {
return clientRequestTimeout;
}
}

View File

@ -9,35 +9,12 @@ import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import java.time.Duration;
public class MessageDynamoDbConfiguration {
private String region;
private String tableName;
public class MessageDynamoDbConfiguration extends DynamoDbConfiguration {
private Duration timeToLive = Duration.ofDays(7);
private Duration clientExecutionTimeout = Duration.ofSeconds(30);
private Duration clientRequestTimeout = Duration.ofSeconds(10);
@Valid
@NotEmpty
public String getRegion() {
return region;
}
@Valid
@NotEmpty
public String getTableName() {
return tableName;
}
@Valid
public Duration getTimeToLive() {
return timeToLive;
}
public Duration getClientExecutionTimeout() {
return clientExecutionTimeout;
}
public Duration getClientRequestTimeout() {
return clientRequestTimeout;
}
}

View File

@ -62,7 +62,7 @@ public class KeysController {
@GET
@Produces(MediaType.APPLICATION_JSON)
public PreKeyCount getStatus(@Auth Account account) {
int count = keys.getCount(account.getNumber(), account.getAuthenticatedDevice().get().getId());
int count = keys.getCount(account, account.getAuthenticatedDevice().get().getId());
if (count > 0) {
count = count - 1;
@ -98,7 +98,7 @@ public class KeysController {
}
}
keys.store(account.getNumber(), device.getId(), preKeys.getPreKeys());
keys.store(account, device.getId(), preKeys.getPreKeys());
}
@Timed
@ -179,12 +179,12 @@ public class KeysController {
private List<KeyRecord> getLocalKeys(Account destination, String deviceIdSelector) {
try {
if (deviceIdSelector.equals("*")) {
return keys.get(destination.getNumber());
return keys.take(destination);
}
long deviceId = Long.parseLong(deviceIdSelector);
return keys.get(destination.getNumber(), deviceId);
return keys.take(destination, deviceId);
} catch (NumberFormatException e) {
throw new WebApplicationException(Response.status(422).build());
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static com.codahale.metrics.MetricRegistry.name;
import static io.micrometer.core.instrument.Metrics.counter;
import static io.micrometer.core.instrument.Metrics.timer;
public class AbstractDynamoDbStore {
private final DynamoDB dynamoDb;
private final Timer batchWriteItemsFirstPass = timer(name(getClass(), "batchWriteItems"), "firstAttempt", "true");
private final Timer batchWriteItemsRetryPass = timer(name(getClass(), "batchWriteItems"), "firstAttempt", "false");
private final Counter batchWriteItemsUnprocessed = counter(name(getClass(), "batchWriteItemsUnprocessed"));
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final int MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE = 25; // This was arbitrarily chosen and may be entirely too high.
public static final int DYNAMO_DB_MAX_BATCH_SIZE = 25; // This limit comes from Amazon Dynamo DB itself. It will reject batch writes larger than this.
public static final int RESULT_SET_CHUNK_SIZE = 100;
public AbstractDynamoDbStore(final DynamoDB dynamoDb) {
this.dynamoDb = dynamoDb;
}
protected DynamoDB getDynamoDb() {
return dynamoDb;
}
protected void executeTableWriteItemsUntilComplete(final TableWriteItems items) {
AtomicReference<BatchWriteItemOutcome> outcome = new AtomicReference<>();
batchWriteItemsFirstPass.record(() -> outcome.set(dynamoDb.batchWriteItem(items)));
int attemptCount = 0;
while (!outcome.get().getUnprocessedItems().isEmpty() && attemptCount < MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE) {
batchWriteItemsRetryPass.record(() -> outcome.set(dynamoDb.batchWriteItemUnprocessed(outcome.get().getUnprocessedItems())));
++attemptCount;
}
if (!outcome.get().getUnprocessedItems().isEmpty()) {
logger.error("Attempt count ({}) reached max ({}}) before applying all batch writes to dynamo. {} unprocessed items remain.", attemptCount, MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE, outcome.get().getUnprocessedItems().size());
batchWriteItemsUnprocessed.increment(outcome.get().getUnprocessedItems().size());
}
}
static <T> void writeInBatches(final Iterable<T> items, final Consumer<List<T>> action) {
final List<T> batch = new ArrayList<>(DYNAMO_DB_MAX_BATCH_SIZE);
for (T item : items) {
batch.add(item);
if (batch.size() == DYNAMO_DB_MAX_BATCH_SIZE) {
action.accept(batch);
batch.clear();
}
}
if (!batch.isEmpty()) {
action.accept(batch);
}
}
}

View File

@ -56,6 +56,7 @@ public class AccountsManager {
private final DirectoryManager directory;
private final DirectoryQueue directoryQueue;
private final Keys keys;
private final KeysDynamoDb keysDynamoDb;
private final MessagesManager messagesManager;
private final UsernamesManager usernamesManager;
private final ProfilesManager profilesManager;
@ -73,12 +74,13 @@ public class AccountsManager {
}
}
public AccountsManager(Accounts accounts, DirectoryManager directory, FaultTolerantRedisCluster cacheCluster, final DirectoryQueue directoryQueue, final Keys keys, final MessagesManager messagesManager, final UsernamesManager usernamesManager, final ProfilesManager profilesManager) {
public AccountsManager(Accounts accounts, DirectoryManager directory, FaultTolerantRedisCluster cacheCluster, final DirectoryQueue directoryQueue, final Keys keys, final KeysDynamoDb keysDynamoDb, final MessagesManager messagesManager, final UsernamesManager usernamesManager, final ProfilesManager profilesManager) {
this.accounts = accounts;
this.directory = directory;
this.cacheCluster = cacheCluster;
this.directoryQueue = directoryQueue;
this.keys = keys;
this.keysDynamoDb = keysDynamoDb;
this.messagesManager = messagesManager;
this.usernamesManager = usernamesManager;
this.profilesManager = profilesManager;
@ -150,7 +152,8 @@ public class AccountsManager {
directoryQueue.deleteAccount(account);
directory.remove(account.getNumber());
profilesManager.deleteAll(account.getUuid());
keys.delete(account.getNumber());
keys.delete(account);
keysDynamoDb.delete(account);
messagesManager.clear(account.getNumber(), account.getUuid());
redisDelete(account);
databaseDelete(account);

View File

@ -5,6 +5,8 @@
package org.whispersystems.textsecuregcm.storage;
import java.util.Objects;
public class KeyRecord {
private long id;
@ -41,4 +43,20 @@ public class KeyRecord {
return publicKey;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final KeyRecord keyRecord = (KeyRecord)o;
return id == keyRecord.id &&
deviceId == keyRecord.deviceId &&
keyId == keyRecord.keyId &&
Objects.equals(number, keyRecord.number) &&
Objects.equals(publicKey, keyRecord.publicKey);
}
@Override
public int hashCode() {
return Objects.hash(id, number, deviceId, keyId, publicKey);
}
}

View File

@ -27,7 +27,7 @@ import java.util.function.Supplier;
import static com.codahale.metrics.MetricRegistry.name;
public class Keys {
public class Keys implements PreKeyStore {
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter fallbackMeter = metricRegistry.meter(name(Keys.class, "fallback"));
@ -49,7 +49,10 @@ public class Keys {
this.retry = Retry.of("keys", retryConfiguration.toRetryConfigBuilder().build());
}
public void store(String number, long deviceId, List<PreKey> keys) {
@Override
public void store(Account account, long deviceId, List<PreKey> keys) {
final String number = account.getNumber();
retry.executeRunnable(() -> {
database.use(jdbi -> jdbi.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
try (Timer.Context ignored = storeTimer.time()) {
@ -74,8 +77,12 @@ public class Keys {
});
}
public List<KeyRecord> get(String number, long deviceId) {
/* try {
@Override
public List<KeyRecord> take(Account account, long deviceId) {
/*
final String number = account.getNumber();
try {
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
try (Timer.Context ignored = getDevicetTimer.time()) {
return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT id FROM keys WHERE number = :number AND device_id = :device_id ORDER BY key_id ASC LIMIT 1) RETURNING *")
@ -95,8 +102,12 @@ public class Keys {
return new LinkedList<>();
}
public List<KeyRecord> get(String number) {
/* try {
@Override
public List<KeyRecord> take(Account account) {
/*
final String number = account.getNumber();
try {
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
try (Timer.Context ignored = getTimer.time()) {
return handle.createQuery("DELETE FROM keys WHERE id IN (SELECT DISTINCT ON (number, device_id) id FROM keys WHERE number = :number ORDER BY number, device_id, key_id ASC) RETURNING *")
@ -115,7 +126,10 @@ public class Keys {
return new LinkedList<>();
}
public int getCount(String number, long deviceId) {
@Override
public int getCount(Account account, long deviceId) {
final String number = account.getNumber();
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getCountTimer.time()) {
return handle.createQuery("SELECT COUNT(*) FROM keys WHERE number = :number AND device_id = :device_id")
@ -127,7 +141,9 @@ public class Keys {
}));
}
public void delete(final String number) {
public void delete(final Account account) {
final String number = account.getNumber();
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = getCountTimer.time()) {
handle.createUpdate("DELETE FROM keys WHERE number = :number")

View File

@ -0,0 +1,208 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import com.amazonaws.services.dynamodbv2.model.ReturnValue;
import com.amazonaws.services.dynamodbv2.model.Select;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import org.whispersystems.textsecuregcm.entities.PreKey;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
public class KeysDynamoDb extends AbstractDynamoDbStore implements PreKeyStore {
private final Table table;
static final String KEY_ACCOUNT_UUID = "U";
static final String KEY_DEVICE_ID_KEY_ID = "DK";
static final String KEY_PUBLIC_KEY = "P";
private static final Timer STORE_KEYS_TIMER = Metrics.timer(name(KeysDynamoDb.class, "storeKeys"));
private static final Timer TAKE_KEY_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForDevice"));
private static final Timer TAKE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "takeKeyForAccount"));
private static final Timer GET_KEY_COUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "getKeyCount"));
private static final Timer DELETE_KEYS_FOR_DEVICE_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForDevice"));
private static final Timer DELETE_KEYS_FOR_ACCOUNT_TIMER = Metrics.timer(name(KeysDynamoDb.class, "deleteKeysForAccount"));
private static final DistributionSummary CONTESTED_KEY_DISTRIBUTION = Metrics.summary(name(KeysDynamoDb.class, "contestedKeys"));
public KeysDynamoDb(final DynamoDB dynamoDB, final String tableName) {
super(dynamoDB);
this.table = dynamoDB.getTable(tableName);
}
@Override
public void store(final Account account, final long deviceId, final List<PreKey> keys) {
STORE_KEYS_TIMER.record(() -> {
delete(account, deviceId);
writeInBatches(keys, batch -> {
final TableWriteItems items = new TableWriteItems(table.getTableName());
for (final PreKey preKey : batch) {
items.addItemToPut(getItemFromPreKey(account.getUuid(), deviceId, preKey));
}
executeTableWriteItemsUntilComplete(items);
});
});
}
@Override
public List<KeyRecord> take(final Account account, final long deviceId) {
return TAKE_KEY_FOR_DEVICE_TIMER.record(() -> {
final byte[] partitionKey = getPartitionKey(account.getUuid());
final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
.withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
.withValueMap(Map.of(":uuid", partitionKey,
":sortprefix", getSortKeyPrefix(deviceId)))
.withProjectionExpression(KEY_DEVICE_ID_KEY_ID)
.withConsistentRead(false);
int contestedKeys = 0;
try {
for (final Item candidate : table.query(querySpec)) {
final DeleteItemSpec deleteItemSpec = new DeleteItemSpec().withPrimaryKey(KEY_ACCOUNT_UUID, partitionKey, KEY_DEVICE_ID_KEY_ID, candidate.getBinary(KEY_DEVICE_ID_KEY_ID))
.withReturnValues(ReturnValue.ALL_OLD);
final DeleteItemOutcome outcome = table.deleteItem(deleteItemSpec);
if (outcome.getItem() != null) {
final PreKey preKey = getPreKeyFromItem(outcome.getItem());
return List.of(new KeyRecord(-1, account.getNumber(), deviceId, preKey.getKeyId(), preKey.getPublicKey()));
}
contestedKeys++;
}
return Collections.emptyList();
} finally {
CONTESTED_KEY_DISTRIBUTION.record(contestedKeys);
}
});
}
@Override
public List<KeyRecord> take(final Account account) {
return TAKE_KEYS_FOR_ACCOUNT_TIMER.record(() -> {
final List<KeyRecord> keyRecords = new ArrayList<>();
for (final Device device : account.getDevices()) {
keyRecords.addAll(take(account, device.getId()));
}
return keyRecords;
});
}
@Override
public int getCount(final Account account, final long deviceId) {
return GET_KEY_COUNT_TIMER.record(() -> {
final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
.withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
.withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid()),
":sortprefix", getSortKeyPrefix(deviceId)))
.withSelect(Select.COUNT)
.withConsistentRead(false);
// This is very confusing, but does appear to be the intended behavior. See:
//
// - https://github.com/aws/aws-sdk-java/issues/693
// - https://github.com/aws/aws-sdk-java/issues/915
return table.query(querySpec).firstPage().getLowLevelResult().getQueryResult().getCount();
});
}
@Override
public void delete(final Account account) {
DELETE_KEYS_FOR_ACCOUNT_TIMER.record(() -> {
final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid")
.withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID))
.withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid())))
.withProjectionExpression(KEY_ACCOUNT_UUID + ", " + KEY_DEVICE_ID_KEY_ID)
.withConsistentRead(true);
deleteItemsMatchingQuery(querySpec);
});
}
@VisibleForTesting
void delete(final Account account, final long deviceId) {
DELETE_KEYS_FOR_DEVICE_TIMER.record(() -> {
final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#uuid = :uuid AND begins_with (#sort, :sortprefix)")
.withNameMap(Map.of("#uuid", KEY_ACCOUNT_UUID, "#sort", KEY_DEVICE_ID_KEY_ID))
.withValueMap(Map.of(":uuid", getPartitionKey(account.getUuid()),
":sortprefix", getSortKeyPrefix(deviceId)))
.withProjectionExpression(KEY_ACCOUNT_UUID + ", " + KEY_DEVICE_ID_KEY_ID)
.withConsistentRead(true);
deleteItemsMatchingQuery(querySpec);
});
}
private void deleteItemsMatchingQuery(final QuerySpec querySpec) {
writeInBatches(table.query(querySpec), batch -> {
final TableWriteItems writeItems = new TableWriteItems(table.getTableName());
for (final Item item : batch) {
writeItems.addPrimaryKeyToDelete(new PrimaryKey(KEY_ACCOUNT_UUID, item.getBinary(KEY_ACCOUNT_UUID), KEY_DEVICE_ID_KEY_ID, item.getBinary(KEY_DEVICE_ID_KEY_ID)));
}
executeTableWriteItemsUntilComplete(writeItems);
});
}
private static byte[] getPartitionKey(final UUID accountUuid) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(accountUuid.getMostSignificantBits());
byteBuffer.putLong(accountUuid.getLeastSignificantBits());
return byteBuffer.array();
}
private static byte[] getSortKey(final long deviceId, final long keyId) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(deviceId);
byteBuffer.putLong(keyId);
return byteBuffer.array();
}
private static byte[] getSortKeyPrefix(final long deviceId) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
byteBuffer.putLong(deviceId);
return byteBuffer.array();
}
private Item getItemFromPreKey(final UUID accountUuid, final long deviceId, final PreKey preKey) {
return new Item().withBinary(KEY_ACCOUNT_UUID, getPartitionKey(accountUuid))
.withBinary(KEY_DEVICE_ID_KEY_ID, getSortKey(deviceId, preKey.getKeyId()))
.withString(KEY_PUBLIC_KEY, preKey.getPublicKey());
}
private PreKey getPreKeyFromItem(final Item item) {
final long keyId = ByteBuffer.wrap(item.getBinary(KEY_DEVICE_ID_KEY_ID)).getLong(8);
return new PreKey(keyId, item.getString(KEY_PUBLIC_KEY));
}
}

View File

@ -5,7 +5,6 @@
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Index;
@ -17,11 +16,8 @@ import com.amazonaws.services.dynamodbv2.document.api.QueryApi;
import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import com.amazonaws.services.dynamodbv2.model.ReturnValue;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
@ -33,17 +29,11 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static com.codahale.metrics.MetricRegistry.name;
import static io.micrometer.core.instrument.Metrics.counter;
import static io.micrometer.core.instrument.Metrics.timer;
public class MessagesDynamoDb {
private static final int MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE = 25; // This was arbitrarily chosen and may be entirely too high.
private static final int DYNAMO_DB_MAX_BATCH_SIZE = 25; // This limit comes from Amazon Dynamo DB itself. It will reject batch writes larger than this.
public static final int RESULT_SET_CHUNK_SIZE = 100;
public class MessagesDynamoDb extends AbstractDynamoDbStore {
private static final String KEY_PARTITION = "H";
private static final String KEY_SORT = "S";
@ -60,10 +50,6 @@ public class MessagesDynamoDb {
private static final String KEY_CONTENT = "C";
private static final String KEY_TTL = "E";
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Timer batchWriteItemsFirstPass = timer(name(getClass(), "batchWriteItems"), "firstAttempt", "true");
private final Timer batchWriteItemsRetryPass = timer(name(getClass(), "batchWriteItems"), "firstAttempt", "false");
private final Counter batchWriteItemsUnprocessed = counter(name(getClass(), "batchWriteItemsUnprocessed"));
private final Timer storeTimer = timer(name(getClass(), "store"));
private final Timer loadTimer = timer(name(getClass(), "load"));
private final Timer deleteBySourceAndTimestamp = timer(name(getClass(), "delete", "sourceAndTimestamp"));
@ -71,18 +57,18 @@ public class MessagesDynamoDb {
private final Timer deleteByAccount = timer(name(getClass(), "delete", "account"));
private final Timer deleteByDevice = timer(name(getClass(), "delete", "device"));
private final DynamoDB dynamoDb;
private final String tableName;
private final Duration timeToLive;
public MessagesDynamoDb(DynamoDB dynamoDb, String tableName, Duration timeToLive) {
this.dynamoDb = dynamoDb;
super(dynamoDb);
this.tableName = tableName;
this.timeToLive = timeToLive;
}
public void store(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid, final long destinationDeviceId) {
storeTimer.record(() -> doInBatches(messages, (messageBatch) -> storeBatch(messageBatch, destinationAccountUuid, destinationDeviceId), DYNAMO_DB_MAX_BATCH_SIZE));
storeTimer.record(() -> writeInBatches(messages, (messageBatch) -> storeBatch(messageBatch, destinationAccountUuid, destinationDeviceId)));
}
private void storeBatch(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid, final long destinationDeviceId) {
@ -135,7 +121,7 @@ public class MessagesDynamoDb {
.withValueMap(Map.of(":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
.withMaxResultSize(numberOfMessagesToFetch);
final Table table = dynamoDb.getTable(tableName);
final Table table = getDynamoDb().getTable(tableName);
List<OutgoingMessageEntity> messageEntities = new ArrayList<>(numberOfMessagesToFetch);
for (Item message : table.query(querySpec)) {
messageEntities.add(convertItemToOutgoingMessageEntity(message));
@ -164,7 +150,7 @@ public class MessagesDynamoDb {
":source", source,
":timestamp", timestamp));
final Table table = dynamoDb.getTable(tableName);
final Table table = getDynamoDb().getTable(tableName);
return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(table, partitionKey, querySpec, table);
});
}
@ -179,7 +165,7 @@ public class MessagesDynamoDb {
"#uuid", LOCAL_INDEX_MESSAGE_UUID_KEY_SORT))
.withValueMap(Map.of(":part", partitionKey,
":uuid", convertLocalIndexMessageUuidSortKey(messageUuid)));
final Table table = dynamoDb.getTable(tableName);
final Table table = getDynamoDb().getTable(tableName);
final Index index = table.getIndex(LOCAL_INDEX_MESSAGE_UUID_NAME);
return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(table, partitionKey, querySpec, index);
});
@ -241,62 +227,24 @@ public class MessagesDynamoDb {
}
private void deleteRowsMatchingQuery(byte[] partitionKey, QuerySpec querySpec) {
final Table table = dynamoDb.getTable(tableName);
doInBatches(table.query(querySpec), (itemBatch) -> deleteItems(partitionKey, itemBatch), DYNAMO_DB_MAX_BATCH_SIZE);
final Table table = getDynamoDb().getTable(tableName);
writeInBatches(table.query(querySpec), (itemBatch) -> deleteItems(partitionKey, itemBatch));
}
private void deleteItems(byte[] partitionKey, List<Item> items) {
final TableWriteItems tableWriteItems = new TableWriteItems(tableName);
items.stream().map((x) -> new PrimaryKey(KEY_PARTITION, partitionKey, KEY_SORT, x.getBinary(KEY_SORT))).forEach(tableWriteItems::addPrimaryKeyToDelete);
items.stream().map(item -> new PrimaryKey(KEY_PARTITION, partitionKey, KEY_SORT, item.getBinary(KEY_SORT))).forEach(tableWriteItems::addPrimaryKeyToDelete);
executeTableWriteItemsUntilComplete(tableWriteItems);
}
private void executeTableWriteItemsUntilComplete(TableWriteItems items) {
AtomicReference<BatchWriteItemOutcome> outcome = new AtomicReference<>();
batchWriteItemsFirstPass.record(() -> {
outcome.set(dynamoDb.batchWriteItem(items));
});
int attemptCount = 0;
while (!outcome.get().getUnprocessedItems().isEmpty() && attemptCount < MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE) {
batchWriteItemsRetryPass.record(() -> {
outcome.set(dynamoDb.batchWriteItemUnprocessed(outcome.get().getUnprocessedItems()));
});
++attemptCount;
}
if (!outcome.get().getUnprocessedItems().isEmpty()) {
logger.error("Attempt count ({}) reached max ({}}) before applying all batch writes to dynamo. {} unprocessed items remain.", attemptCount, MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE, outcome.get().getUnprocessedItems().size());
batchWriteItemsUnprocessed.increment(outcome.get().getUnprocessedItems().size());
}
}
private long getTtlForMessage(MessageProtos.Envelope message) {
return message.getServerTimestamp() / 1000 + timeToLive.getSeconds();
}
private static <T> void doInBatches(final Iterable<T> items, final Consumer<List<T>> action, final int batchSize) {
List<T> batch = new ArrayList<>(batchSize);
for (T item : items) {
batch.add(item);
if (batch.size() == batchSize) {
action.accept(batch);
batch.clear();
}
}
if (!batch.isEmpty()) {
action.accept(batch);
}
}
private static byte[] convertPartitionKey(final UUID destinationAccountUuid) {
return convertUuidToBytes(destinationAccountUuid);
}
private static UUID convertPartitionKey(final byte[] bytes) {
return convertUuidFromBytes(bytes, "partition key");
}
private static byte[] convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[32]);
byteBuffer.putLong(destinationDeviceId);

View File

@ -0,0 +1,23 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import org.whispersystems.textsecuregcm.entities.PreKey;
import java.util.List;
public interface PreKeyStore {
void store(Account account, long deviceId, List<PreKey> keys);
int getCount(Account account, long deviceId);
List<KeyRecord> take(Account account, long deviceId);
List<KeyRecord> take(Account account);
void delete(Account account);
}

View File

@ -34,6 +34,7 @@ import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.Keys;
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
import org.whispersystems.textsecuregcm.storage.Messages;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
@ -97,7 +98,16 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
AmazonDynamoDBClientBuilder keysDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(configuration.getKeysDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getKeysDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getKeysDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDB messageDynamoDb = new DynamoDB(clientBuilder.build());
DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration(), redisClusterClientResources);
@ -111,6 +121,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
Profiles profiles = new Profiles(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
Keys keys = new Keys(accountDatabase, configuration.getAccountsDatabaseConfiguration().getKeyOperationRetryConfiguration());
KeysDynamoDb keysDynamoDb = new KeysDynamoDb(messageDynamoDb, configuration.getKeysDynamoDbConfiguration().getTableName());
Messages messages = new Messages(messageDatabase);
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, configuration.getMessageDynamoDbConfiguration().getTableName(), configuration.getMessageDynamoDbConfiguration().getTimeToLive());
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
@ -124,7 +135,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesManager messagesManager = new MessagesManager(messages, messagesDynamoDb, messagesCache, pushLatencyManager, new ExperimentEnrollmentManager(dynamicConfigurationManager));
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster, directoryQueue, keys, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
for (String user: users) {
Optional<Account> account = accountsManager.get(user);

View File

@ -0,0 +1,40 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import org.whispersystems.textsecuregcm.tests.util.LocalDynamoDbRule;
public class KeysDynamoDbRule extends LocalDynamoDbRule {
public static final String TABLE_NAME = "Signal_Keys_Test";
@Override
protected void before() throws Throwable {
super.before();
final DynamoDB dynamoDB = getDynamoDB();
final CreateTableRequest createTableRequest = new CreateTableRequest()
.withTableName(TABLE_NAME)
.withKeySchema(new KeySchemaElement(KeysDynamoDb.KEY_ACCOUNT_UUID, "HASH"),
new KeySchemaElement(KeysDynamoDb.KEY_DEVICE_ID_KEY_ID, "RANGE"))
.withAttributeDefinitions(new AttributeDefinition(KeysDynamoDb.KEY_ACCOUNT_UUID, ScalarAttributeType.B),
new AttributeDefinition(KeysDynamoDb.KEY_DEVICE_ID_KEY_ID, ScalarAttributeType.B))
.withProvisionedThroughput(new ProvisionedThroughput(20L, 20L));
dynamoDB.createTable(createTableRequest);
}
@Override
protected void after() {
super.after();
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.whispersystems.textsecuregcm.entities.PreKey;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class KeysDynamoDbTest {
private Account account;
private KeysDynamoDb keysDynamoDb;
@ClassRule
public static KeysDynamoDbRule dynamoDbRule = new KeysDynamoDbRule();
private static final String ACCOUNT_NUMBER = "+18005551234";
private static final long DEVICE_ID = 1L;
@Before
public void setup() {
keysDynamoDb = new KeysDynamoDb(dynamoDbRule.getDynamoDB(), KeysDynamoDbRule.TABLE_NAME);
account = mock(Account.class);
when(account.getNumber()).thenReturn(ACCOUNT_NUMBER);
when(account.getUuid()).thenReturn(UUID.randomUUID());
}
@Test
public void testStore() {
assertEquals("Initial pre-key count for an account should be zero",
0, keysDynamoDb.getCount(account, DEVICE_ID));
keysDynamoDb.store(account, DEVICE_ID, List.of(new PreKey(1, "public-key")));
assertEquals(1, keysDynamoDb.getCount(account, DEVICE_ID));
keysDynamoDb.store(account, DEVICE_ID, List.of(new PreKey(1, "public-key")));
assertEquals("Repeatedly storing same key should have no effect",
1, keysDynamoDb.getCount(account, DEVICE_ID));
keysDynamoDb.store(account, DEVICE_ID, List.of(new PreKey(2, "different-public-key")));
assertEquals("Inserting a new key should overwrite all prior keys for the given account/device",
1, keysDynamoDb.getCount(account, DEVICE_ID));
keysDynamoDb.store(account, DEVICE_ID, List.of(new PreKey(3, "third-public-key"), new PreKey(4, "fourth-public-key")));
assertEquals("Inserting multiple new keys should overwrite all prior keys for the given account/device",
2, keysDynamoDb.getCount(account, DEVICE_ID));
}
@Test
public void testTakeAccount() {
final Device firstDevice = mock(Device.class);
final Device secondDevice = mock(Device.class);
when(firstDevice.getId()).thenReturn(DEVICE_ID);
when(secondDevice.getId()).thenReturn(DEVICE_ID + 1);
when(account.getDevices()).thenReturn(Set.of(firstDevice, secondDevice));
assertEquals(Collections.emptyList(), keysDynamoDb.take(account));
final PreKey firstDevicePreKey = new PreKey(1, "public-key");
final PreKey secondDevicePreKey = new PreKey(2, "second-key");
keysDynamoDb.store(account, DEVICE_ID, List.of(firstDevicePreKey));
keysDynamoDb.store(account, DEVICE_ID + 1, List.of(secondDevicePreKey));
final Set<KeyRecord> expectedKeys = Set.of(
new KeyRecord(-1, ACCOUNT_NUMBER, DEVICE_ID, firstDevicePreKey.getKeyId(), firstDevicePreKey.getPublicKey()),
new KeyRecord(-1, ACCOUNT_NUMBER, DEVICE_ID + 1, secondDevicePreKey.getKeyId(), secondDevicePreKey.getPublicKey()));
assertEquals(expectedKeys, new HashSet<>(keysDynamoDb.take(account)));
assertEquals(0, keysDynamoDb.getCount(account, DEVICE_ID));
assertEquals(0, keysDynamoDb.getCount(account, DEVICE_ID + 1));
}
@Test
public void testTakeAccountAndDeviceId() {
assertEquals(Collections.emptyList(), keysDynamoDb.take(account, DEVICE_ID));
final PreKey preKey = new PreKey(1, "public-key");
keysDynamoDb.store(account, DEVICE_ID, List.of(preKey, new PreKey(2, "different-pre-key")));
assertEquals(List.of(new KeyRecord(-1, ACCOUNT_NUMBER, DEVICE_ID, preKey.getKeyId(), preKey.getPublicKey())), keysDynamoDb.take(account, DEVICE_ID));
assertEquals(1, keysDynamoDb.getCount(account, DEVICE_ID));
}
@Test
public void testGetCount() {
assertEquals(0, keysDynamoDb.getCount(account, DEVICE_ID));
keysDynamoDb.store(account, DEVICE_ID, List.of(new PreKey(1, "public-key")));
assertEquals(1, keysDynamoDb.getCount(account, DEVICE_ID));
}
@Test
public void testDeleteByAccount() {
keysDynamoDb.store(account, DEVICE_ID, List.of(new PreKey(1, "public-key"), new PreKey(2, "different-public-key")));
keysDynamoDb.store(account, DEVICE_ID + 1, List.of(new PreKey(3, "public-key-for-different-device")));
assertEquals(2, keysDynamoDb.getCount(account, DEVICE_ID));
assertEquals(1, keysDynamoDb.getCount(account, DEVICE_ID + 1));
keysDynamoDb.delete(account);
assertEquals(0, keysDynamoDb.getCount(account, DEVICE_ID));
assertEquals(0, keysDynamoDb.getCount(account, DEVICE_ID + 1));
}
@Test
public void testDeleteByAccountAndDevice() {
keysDynamoDb.store(account, DEVICE_ID, List.of(new PreKey(1, "public-key"), new PreKey(2, "different-public-key")));
keysDynamoDb.store(account, DEVICE_ID + 1, List.of(new PreKey(3, "public-key-for-different-device")));
assertEquals(2, keysDynamoDb.getCount(account, DEVICE_ID));
assertEquals(1, keysDynamoDb.getCount(account, DEVICE_ID + 1));
keysDynamoDb.delete(account, DEVICE_ID);
assertEquals(0, keysDynamoDb.getCount(account, DEVICE_ID));
assertEquals(1, keysDynamoDb.getCount(account, DEVICE_ID + 1));
}
}

View File

@ -46,7 +46,7 @@ import io.dropwizard.testing.junit.ResourceTestRule;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;
public class KeyControllerTest {
public class KeysControllerTest {
private static final String EXISTS_NUMBER = "+14152222222";
private static final UUID EXISTS_UUID = UUID.randomUUID();
@ -141,18 +141,16 @@ public class KeyControllerTest {
List<KeyRecord> singleDevice = new LinkedList<>();
singleDevice.add(SAMPLE_KEY);
when(keys.get(eq(EXISTS_NUMBER), eq(1L))).thenReturn(singleDevice);
when(keys.get(eq(NOT_EXISTS_NUMBER), eq(1L))).thenReturn(new LinkedList<>());
when(keys.take(eq(existsAccount), eq(1L))).thenReturn(singleDevice);
List<KeyRecord> multiDevice = new LinkedList<>();
multiDevice.add(SAMPLE_KEY);
multiDevice.add(SAMPLE_KEY2);
multiDevice.add(SAMPLE_KEY3);
multiDevice.add(SAMPLE_KEY4);
when(keys.get(EXISTS_NUMBER)).thenReturn(multiDevice);
when(keys.take(existsAccount)).thenReturn(multiDevice);
when(keys.getCount(eq(AuthHelper.VALID_NUMBER), eq(1L))).thenReturn(5);
when(keys.getCount(eq(AuthHelper.VALID_ACCOUNT), eq(1L))).thenReturn(5);
when(AuthHelper.VALID_DEVICE.getSignedPreKey()).thenReturn(VALID_DEVICE_SIGNED_KEY);
when(AuthHelper.VALID_ACCOUNT.getIdentityKey()).thenReturn(null);
@ -169,7 +167,7 @@ public class KeyControllerTest {
assertThat(result.getCount()).isEqualTo(4);
verify(keys).getCount(eq(AuthHelper.VALID_NUMBER), eq(1L));
verify(keys).getCount(eq(AuthHelper.VALID_ACCOUNT), eq(1L));
}
@Test
@ -183,7 +181,7 @@ public class KeyControllerTest {
assertThat(result.getCount()).isEqualTo(4);
verify(keys).getCount(eq(AuthHelper.VALID_NUMBER), eq(1L));
verify(keys).getCount(eq(AuthHelper.VALID_ACCOUNT), eq(1L));
}
@ -283,7 +281,7 @@ public class KeyControllerTest {
assertThat(result.getDevice(1).getPreKey().getPublicKey()).isEqualTo(SAMPLE_KEY.getPublicKey());
assertThat(result.getDevice(1).getSignedPreKey()).isEqualTo(existsAccount.getDevice(1).get().getSignedPreKey());
verify(keys).get(eq(EXISTS_NUMBER), eq(1L));
verify(keys).take(eq(existsAccount), eq(1L));
verifyNoMoreInteractions(keys);
}
@ -301,7 +299,7 @@ public class KeyControllerTest {
assertThat(result.getDevice(1).getPreKey().getPublicKey()).isEqualTo(SAMPLE_KEY.getPublicKey());
assertThat(result.getDevice(1).getSignedPreKey()).isEqualTo(existsAccount.getDevice(1).get().getSignedPreKey());
verify(keys).get(eq(EXISTS_NUMBER), eq(1L));
verify(keys).take(eq(existsAccount), eq(1L));
verifyNoMoreInteractions(keys);
}
@ -320,7 +318,7 @@ public class KeyControllerTest {
assertThat(result.getDevice(1).getPreKey().getPublicKey()).isEqualTo(SAMPLE_KEY.getPublicKey());
assertThat(result.getDevice(1).getSignedPreKey()).isEqualTo(existsAccount.getDevice(1).get().getSignedPreKey());
verify(keys).get(eq(EXISTS_NUMBER), eq(1L));
verify(keys).take(eq(existsAccount), eq(1L));
verifyNoMoreInteractions(keys);
}
@ -338,7 +336,7 @@ public class KeyControllerTest {
assertThat(result.getDevice(1).getPreKey().getPublicKey()).isEqualTo(SAMPLE_KEY.getPublicKey());
assertThat(result.getDevice(1).getSignedPreKey()).isEqualTo(existsAccount.getDevice(1).get().getSignedPreKey());
verify(keys).get(eq(EXISTS_NUMBER), eq(1L));
verify(keys).take(eq(existsAccount), eq(1L));
verifyNoMoreInteractions(keys);
}
@ -414,7 +412,7 @@ public class KeyControllerTest {
assertThat(signedPreKey).isNull();
assertThat(deviceId).isEqualTo(4);
verify(keys).get(eq(EXISTS_NUMBER));
verify(keys).take(eq(existsAccount));
verifyNoMoreInteractions(keys);
}
@ -464,7 +462,7 @@ public class KeyControllerTest {
assertThat(signedPreKey).isNull();
assertThat(deviceId).isEqualTo(4);
verify(keys).get(eq(EXISTS_NUMBER));
verify(keys).take(eq(existsAccount));
verifyNoMoreInteractions(keys);
}
@ -533,7 +531,7 @@ public class KeyControllerTest {
assertThat(response.getStatus()).isEqualTo(204);
ArgumentCaptor<List> listCaptor = ArgumentCaptor.forClass(List.class);
verify(keys).store(eq(AuthHelper.VALID_NUMBER), eq(1L), listCaptor.capture());
verify(keys).store(eq(AuthHelper.VALID_ACCOUNT), eq(1L), listCaptor.capture());
List<PreKey> capturedList = listCaptor.getValue();
assertThat(capturedList.size()).isEqualTo(1);
@ -567,7 +565,7 @@ public class KeyControllerTest {
assertThat(response.getStatus()).isEqualTo(204);
ArgumentCaptor<List> listCaptor = ArgumentCaptor.forClass(List.class);
verify(keys).store(eq(AuthHelper.DISABLED_NUMBER), eq(1L), listCaptor.capture());
verify(keys).store(eq(AuthHelper.DISABLED_ACCOUNT), eq(1L), listCaptor.capture());
List<PreKey> capturedList = listCaptor.getValue();
assertThat(capturedList.size()).isEqualTo(1);

View File

@ -8,7 +8,6 @@ package org.whispersystems.textsecuregcm.tests.storage;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.junit.Test;
import org.whispersystems.textsecuregcm.entities.Profile;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
@ -16,6 +15,7 @@ import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.storage.Keys;
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
@ -46,6 +46,7 @@ public class AccountsManagerTest {
DirectoryManager directoryManager = mock(DirectoryManager.class);
DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
Keys keys = mock(Keys.class);
KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class);
MessagesManager messagesManager = mock(MessagesManager.class);
UsernamesManager usernamesManager = mock(UsernamesManager.class);
ProfilesManager profilesManager = mock(ProfilesManager.class);
@ -55,7 +56,7 @@ public class AccountsManagerTest {
when(commands.get(eq("AccountMap::+14152222222"))).thenReturn(uuid.toString());
when(commands.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}");
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
Optional<Account> account = accountsManager.get("+14152222222");
assertTrue(account.isPresent());
@ -76,6 +77,7 @@ public class AccountsManagerTest {
DirectoryManager directoryManager = mock(DirectoryManager.class);
DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
Keys keys = mock(Keys.class);
KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class);
MessagesManager messagesManager = mock(MessagesManager.class);
UsernamesManager usernamesManager = mock(UsernamesManager.class);
ProfilesManager profilesManager = mock(ProfilesManager.class);
@ -84,7 +86,7 @@ public class AccountsManagerTest {
when(commands.get(eq("Account3::" + uuid.toString()))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}");
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
Optional<Account> account = accountsManager.get(uuid);
assertTrue(account.isPresent());
@ -106,6 +108,7 @@ public class AccountsManagerTest {
DirectoryManager directoryManager = mock(DirectoryManager.class);
DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
Keys keys = mock(Keys.class);
KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class);
MessagesManager messagesManager = mock(MessagesManager.class);
UsernamesManager usernamesManager = mock(UsernamesManager.class);
ProfilesManager profilesManager = mock(ProfilesManager.class);
@ -115,7 +118,7 @@ public class AccountsManagerTest {
when(commands.get(eq("AccountMap::+14152222222"))).thenReturn(null);
when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account));
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
Optional<Account> retrieved = accountsManager.get("+14152222222");
assertTrue(retrieved.isPresent());
@ -138,6 +141,7 @@ public class AccountsManagerTest {
DirectoryManager directoryManager = mock(DirectoryManager.class);
DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
Keys keys = mock(Keys.class);
KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class);
MessagesManager messagesManager = mock(MessagesManager.class);
UsernamesManager usernamesManager = mock(UsernamesManager.class);
ProfilesManager profilesManager = mock(ProfilesManager.class);
@ -147,7 +151,7 @@ public class AccountsManagerTest {
when(commands.get(eq("Account3::" + uuid))).thenReturn(null);
when(accounts.get(eq(uuid))).thenReturn(Optional.of(account));
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
Optional<Account> retrieved = accountsManager.get(uuid);
assertTrue(retrieved.isPresent());
@ -170,6 +174,7 @@ public class AccountsManagerTest {
DirectoryManager directoryManager = mock(DirectoryManager.class);
DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
Keys keys = mock(Keys.class);
KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class);
MessagesManager messagesManager = mock(MessagesManager.class);
UsernamesManager usernamesManager = mock(UsernamesManager.class);
ProfilesManager profilesManager = mock(ProfilesManager.class);
@ -179,7 +184,7 @@ public class AccountsManagerTest {
when(commands.get(eq("AccountMap::+14152222222"))).thenThrow(new RedisException("Connection lost!"));
when(accounts.get(eq("+14152222222"))).thenReturn(Optional.of(account));
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
Optional<Account> retrieved = accountsManager.get("+14152222222");
assertTrue(retrieved.isPresent());
@ -202,6 +207,7 @@ public class AccountsManagerTest {
DirectoryManager directoryManager = mock(DirectoryManager.class);
DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
Keys keys = mock(Keys.class);
KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class);
MessagesManager messagesManager = mock(MessagesManager.class);
UsernamesManager usernamesManager = mock(UsernamesManager.class);
ProfilesManager profilesManager = mock(ProfilesManager.class);
@ -211,7 +217,7 @@ public class AccountsManagerTest {
when(commands.get(eq("Account3::" + uuid))).thenThrow(new RedisException("Connection lost!"));
when(accounts.get(eq(uuid))).thenReturn(Optional.of(account));
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);
AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheCluster, directoryQueue, keys, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
Optional<Account> retrieved = accountsManager.get(uuid);
assertTrue(retrieved.isPresent());

View File

@ -24,6 +24,7 @@ import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguratio
import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.entities.PreKey;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.KeyRecord;
import org.whispersystems.textsecuregcm.storage.Keys;
@ -41,12 +42,13 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Ignore
public class KeysTest {
@Rule
public PreparedDbRule db = EmbeddedPostgresRules.preparedDatabase(LiquibasePreparer.forClasspathLocation("accountsdb.xml"));
private Account firstAccount;
private Account secondAccount;
private Keys keys;
@Before
@ -56,6 +58,12 @@ public class KeysTest {
new CircuitBreakerConfiguration());
this.keys = new Keys(faultTolerantDatabase, new RetryConfiguration());
this.firstAccount = mock(Account.class);
this.secondAccount = mock(Account.class);
when(firstAccount.getNumber()).thenReturn("+14152222222");
when(secondAccount.getNumber()).thenReturn("+14151111111");
}
@ -79,18 +87,18 @@ public class KeysTest {
anotherDeviceTwoPreKeys.add(new PreKey(i, "+14151111111Device2PublicKey" + i));
}
keys.store("+14152222222", 1, deviceOnePreKeys);
keys.store("+14152222222", 2, deviceTwoPreKeys);
keys.store(firstAccount, 1, deviceOnePreKeys);
keys.store(firstAccount, 2, deviceTwoPreKeys);
keys.store("+14151111111", 1, oldAnotherDeviceOnePrKeys);
keys.store("+14151111111", 1, anotherDeviceOnePreKeys);
keys.store("+14151111111", 2, anotherDeviceTwoPreKeys);
keys.store(secondAccount, 1, oldAnotherDeviceOnePrKeys);
keys.store(secondAccount, 1, anotherDeviceOnePreKeys);
keys.store(secondAccount, 2, anotherDeviceTwoPreKeys);
PreparedStatement statement = db.getTestDatabase().getConnection().prepareStatement("SELECT * FROM keys WHERE number = ? AND device_id = ? ORDER BY key_id");
verifyStoredState(statement, "+14152222222", 1);
verifyStoredState(statement, "+14152222222", 2);
verifyStoredState(statement, "+14151111111", 1);
verifyStoredState(statement, "+14151111111", 2);
verifyStoredState(statement, firstAccount, 1);
verifyStoredState(statement, firstAccount, 2);
verifyStoredState(statement, secondAccount, 1);
verifyStoredState(statement, secondAccount, 2);
}
@Test
@ -102,11 +110,12 @@ public class KeysTest {
}
keys.store("+14152222222", 1, deviceOnePreKeys);
keys.store(firstAccount, 1, deviceOnePreKeys);
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(100);
}
@Ignore
@Test
public void testGetForDevice() {
List<PreKey> deviceOnePreKeys = new LinkedList<>();
@ -125,45 +134,46 @@ public class KeysTest {
anotherDeviceTwoPreKeys.add(new PreKey(i, "+14151111111Device2PublicKey" + i));
}
keys.store("+14152222222", 1, deviceOnePreKeys);
keys.store("+14152222222", 2, deviceTwoPreKeys);
keys.store(firstAccount, 1, deviceOnePreKeys);
keys.store(firstAccount, 2, deviceTwoPreKeys);
keys.store("+14151111111", 1, anotherDeviceOnePreKeys);
keys.store("+14151111111", 2, anotherDeviceTwoPreKeys);
keys.store(secondAccount, 1, anotherDeviceOnePreKeys);
keys.store(secondAccount, 2, anotherDeviceTwoPreKeys);
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(100);
List<KeyRecord> records = keys.get("+14152222222", 1);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(100);
List<KeyRecord> records = keys.take(firstAccount, 1);
assertThat(records.size()).isEqualTo(1);
assertThat(records.get(0).getKeyId()).isEqualTo(1);
assertThat(records.get(0).getPublicKey()).isEqualTo("+14152222222Device1PublicKey1");
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(99);
assertThat(keys.getCount("+14152222222", 2)).isEqualTo(100);
assertThat(keys.getCount("+14151111111", 1)).isEqualTo(100);
assertThat(keys.getCount("+14151111111", 2)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(99);
assertThat(keys.getCount(firstAccount, 2)).isEqualTo(100);
assertThat(keys.getCount(secondAccount, 1)).isEqualTo(100);
assertThat(keys.getCount(secondAccount, 2)).isEqualTo(100);
records = keys.get("+14152222222", 1);
records = keys.take(firstAccount, 1);
assertThat(records.size()).isEqualTo(1);
assertThat(records.get(0).getKeyId()).isEqualTo(2);
assertThat(records.get(0).getPublicKey()).isEqualTo("+14152222222Device1PublicKey2");
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(98);
assertThat(keys.getCount("+14152222222", 2)).isEqualTo(100);
assertThat(keys.getCount("+14151111111", 1)).isEqualTo(100);
assertThat(keys.getCount("+14151111111", 2)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(98);
assertThat(keys.getCount(firstAccount, 2)).isEqualTo(100);
assertThat(keys.getCount(secondAccount, 1)).isEqualTo(100);
assertThat(keys.getCount(secondAccount, 2)).isEqualTo(100);
records = keys.get("+14152222222", 2);
records = keys.take(firstAccount, 2);
assertThat(records.size()).isEqualTo(1);
assertThat(records.get(0).getKeyId()).isEqualTo(1);
assertThat(records.get(0).getPublicKey()).isEqualTo("+14152222222Device2PublicKey1");
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(98);
assertThat(keys.getCount("+14152222222", 2)).isEqualTo(99);
assertThat(keys.getCount("+14151111111", 1)).isEqualTo(100);
assertThat(keys.getCount("+14151111111", 2)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(98);
assertThat(keys.getCount(firstAccount, 2)).isEqualTo(99);
assertThat(keys.getCount(secondAccount, 1)).isEqualTo(100);
assertThat(keys.getCount(secondAccount, 2)).isEqualTo(100);
}
@Ignore
@Test
public void testGetForAllDevices() {
List<PreKey> deviceOnePreKeys = new LinkedList<>();
@ -184,18 +194,18 @@ public class KeysTest {
anotherDeviceThreePreKeys.add(new PreKey(i, "+14151111111Device3PublicKey" + i));
}
keys.store("+14152222222", 1, deviceOnePreKeys);
keys.store("+14152222222", 2, deviceTwoPreKeys);
keys.store(firstAccount, 1, deviceOnePreKeys);
keys.store(firstAccount, 2, deviceTwoPreKeys);
keys.store("+14151111111", 1, anotherDeviceOnePreKeys);
keys.store("+14151111111", 2, anotherDeviceTwoPreKeys);
keys.store("+14151111111", 3, anotherDeviceThreePreKeys);
keys.store(secondAccount, 1, anotherDeviceOnePreKeys);
keys.store(secondAccount, 2, anotherDeviceTwoPreKeys);
keys.store(secondAccount, 3, anotherDeviceThreePreKeys);
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(100);
assertThat(keys.getCount("+14152222222", 2)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 2)).isEqualTo(100);
List<KeyRecord> records = keys.get("+14152222222");
List<KeyRecord> records = keys.take(firstAccount);
assertThat(records.size()).isEqualTo(2);
assertThat(records.get(0).getKeyId()).isEqualTo(1);
@ -204,10 +214,10 @@ public class KeysTest {
assertThat(records.stream().anyMatch(record -> record.getPublicKey().equals("+14152222222Device1PublicKey1"))).isTrue();
assertThat(records.stream().anyMatch(record -> record.getPublicKey().equals("+14152222222Device2PublicKey1"))).isTrue();
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(99);
assertThat(keys.getCount("+14152222222", 2)).isEqualTo(99);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(99);
assertThat(keys.getCount(firstAccount, 2)).isEqualTo(99);
records = keys.get("+14152222222");
records = keys.take(firstAccount);
assertThat(records.size()).isEqualTo(2);
assertThat(records.get(0).getKeyId()).isEqualTo(2);
@ -216,11 +226,11 @@ public class KeysTest {
assertThat(records.stream().anyMatch(record -> record.getPublicKey().equals("+14152222222Device1PublicKey2"))).isTrue();
assertThat(records.stream().anyMatch(record -> record.getPublicKey().equals("+14152222222Device2PublicKey2"))).isTrue();
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(98);
assertThat(keys.getCount("+14152222222", 2)).isEqualTo(98);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(98);
assertThat(keys.getCount(firstAccount, 2)).isEqualTo(98);
records = keys.get("+14151111111");
records = keys.take(secondAccount);
assertThat(records.size()).isEqualTo(3);
assertThat(records.get(0).getKeyId()).isEqualTo(1);
@ -231,11 +241,12 @@ public class KeysTest {
assertThat(records.stream().anyMatch(record -> record.getPublicKey().equals("+14151111111Device2PublicKey1"))).isTrue();
assertThat(records.stream().anyMatch(record -> record.getPublicKey().equals("+14151111111Device3PublicKey1"))).isTrue();
assertThat(keys.getCount("+14151111111", 1)).isEqualTo(99);
assertThat(keys.getCount("+14151111111", 2)).isEqualTo(99);
assertThat(keys.getCount("+14151111111", 3)).isEqualTo(99);
assertThat(keys.getCount(secondAccount, 1)).isEqualTo(99);
assertThat(keys.getCount(secondAccount, 2)).isEqualTo(99);
assertThat(keys.getCount(secondAccount, 3)).isEqualTo(99);
}
@Ignore
@Test
public void testGetForAllDevicesParallel() throws InterruptedException {
List<PreKey> deviceOnePreKeys = new LinkedList<>();
@ -246,11 +257,11 @@ public class KeysTest {
deviceTwoPreKeys.add(new PreKey(i, "+14152222222Device2PublicKey" + i));
}
keys.store("+14152222222", 1, deviceOnePreKeys);
keys.store("+14152222222", 2, deviceTwoPreKeys);
keys.store(firstAccount, 1, deviceOnePreKeys);
keys.store(firstAccount, 2, deviceTwoPreKeys);
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(100);
assertThat(keys.getCount("+14152222222", 2)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 2)).isEqualTo(100);
List<Thread> threads = new LinkedList<>();
@ -260,7 +271,7 @@ public class KeysTest {
final int MAX_RETRIES = 5;
for (int retryAttempt = 0; results == null && retryAttempt < MAX_RETRIES; ++retryAttempt) {
try {
results = keys.get("+14152222222");
results = keys.take(firstAccount);
} catch (UnableToExecuteStatementException e) {
if (retryAttempt == MAX_RETRIES - 1) {
throw e;
@ -278,8 +289,8 @@ public class KeysTest {
thread.join();
}
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(80);
assertThat(keys.getCount("+14152222222",2)).isEqualTo(80);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(80);
assertThat(keys.getCount(firstAccount,2)).isEqualTo(80);
}
@Test
@ -302,32 +313,32 @@ public class KeysTest {
anotherDeviceThreePreKeys.add(new PreKey(i, "+14151111111Device3PublicKey" + i));
}
keys.store("+14152222222", 1, deviceOnePreKeys);
keys.store("+14152222222", 2, deviceTwoPreKeys);
keys.store(firstAccount, 1, deviceOnePreKeys);
keys.store(firstAccount, 2, deviceTwoPreKeys);
keys.store("+14151111111", 1, anotherDeviceOnePreKeys);
keys.store("+14151111111", 2, anotherDeviceTwoPreKeys);
keys.store("+14151111111", 3, anotherDeviceThreePreKeys);
keys.store(secondAccount, 1, anotherDeviceOnePreKeys);
keys.store(secondAccount, 2, anotherDeviceTwoPreKeys);
keys.store(secondAccount, 3, anotherDeviceThreePreKeys);
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(100);
assertThat(keys.getCount("+14152222222", 2)).isEqualTo(100);
assertThat(keys.getCount("+14151111111", 1)).isEqualTo(100);
assertThat(keys.getCount("+14151111111", 2)).isEqualTo(100);
assertThat(keys.getCount("+14151111111", 3)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 2)).isEqualTo(100);
assertThat(keys.getCount(secondAccount, 1)).isEqualTo(100);
assertThat(keys.getCount(secondAccount, 2)).isEqualTo(100);
assertThat(keys.getCount(secondAccount, 3)).isEqualTo(100);
keys.delete("+14152222222");
keys.delete(firstAccount);
assertThat(keys.getCount("+14152222222", 1)).isEqualTo(0);
assertThat(keys.getCount("+14152222222", 2)).isEqualTo(0);
assertThat(keys.getCount("+14151111111", 1)).isEqualTo(100);
assertThat(keys.getCount("+14151111111", 2)).isEqualTo(100);
assertThat(keys.getCount("+14151111111", 3)).isEqualTo(100);
assertThat(keys.getCount(firstAccount, 1)).isEqualTo(0);
assertThat(keys.getCount(firstAccount, 2)).isEqualTo(0);
assertThat(keys.getCount(secondAccount, 1)).isEqualTo(100);
assertThat(keys.getCount(secondAccount, 2)).isEqualTo(100);
assertThat(keys.getCount(secondAccount, 3)).isEqualTo(100);
}
@Test
public void testEmptyKeyGet() {
List<KeyRecord> records = keys.get("+14152222222");
List<KeyRecord> records = keys.take(firstAccount);
assertThat(records.isEmpty()).isTrue();
}
@ -361,21 +372,21 @@ public class KeysTest {
}
try {
keys.store("+14152222222", 1, deviceOnePreKeys);
keys.store(firstAccount, 1, deviceOnePreKeys);
throw new AssertionError();
} catch (TransactionException e) {
// good
}
try {
keys.store("+14152222222", 1, deviceOnePreKeys);
keys.store(firstAccount, 1, deviceOnePreKeys);
throw new AssertionError();
} catch (TransactionException e) {
// good
}
try {
keys.store("+14152222222", 1, deviceOnePreKeys);
keys.store(firstAccount, 1, deviceOnePreKeys);
throw new AssertionError();
} catch (CallNotPermittedException e) {
// good
@ -384,7 +395,7 @@ public class KeysTest {
Thread.sleep(1100);
try {
keys.store("+14152222222", 1, deviceOnePreKeys);
keys.store(firstAccount, 1, deviceOnePreKeys);
throw new AssertionError();
} catch (TransactionException e) {
// good
@ -401,7 +412,10 @@ public class KeysTest {
Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, new CircuitBreakerConfiguration()), new RetryConfiguration());
// We're happy as long as nothing throws an exception
keys.store("+18005551234", 1, Collections.emptyList());
Account account = mock(Account.class);
when(account.getNumber()).thenReturn("+18005551234");
keys.store(account, 1, Collections.emptyList());
}
@Test
@ -414,12 +428,15 @@ public class KeysTest {
Keys keys = new Keys(new FaultTolerantDatabase("testBreaker", jdbi, new CircuitBreakerConfiguration()), new RetryConfiguration());
assertThat(keys.get("+18005551234")).isEqualTo(Collections.emptyList());
assertThat(keys.get("+18005551234", 1)).isEqualTo(Collections.emptyList());
Account account = mock(Account.class);
when(account.getNumber()).thenReturn("+18005551234");
assertThat(keys.take(account)).isEqualTo(Collections.emptyList());
assertThat(keys.take(account, 1)).isEqualTo(Collections.emptyList());
}
private void verifyStoredState(PreparedStatement statement, String number, int deviceId) throws SQLException {
statement.setString(1, number);
private void verifyStoredState(PreparedStatement statement, Account account, int deviceId) throws SQLException {
statement.setString(1, account.getNumber());
statement.setInt(2, deviceId);
ResultSet resultSet = statement.executeQuery();
@ -431,7 +448,7 @@ public class KeysTest {
assertThat(keyId).isEqualTo(rowCount);
assertThat(publicKey).isEqualTo(number + "Device" + deviceId + "PublicKey" + rowCount);
assertThat(publicKey).isEqualTo(account.getNumber() + "Device" + deviceId + "PublicKey" + rowCount);
rowCount++;
}