Don’t PUT unmigrated accounts in update()
This commit is contained in:
parent
3b3764535c
commit
166d203e8e
|
@ -3,13 +3,11 @@ package org.whispersystems.textsecuregcm.storage;
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||||
import com.amazonaws.services.dynamodbv2.document.AttributeUpdate;
|
|
||||||
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||||
import com.amazonaws.services.dynamodbv2.document.Item;
|
import com.amazonaws.services.dynamodbv2.document.Item;
|
||||||
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
|
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
|
||||||
import com.amazonaws.services.dynamodbv2.document.Table;
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||||
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
|
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
|
||||||
import com.amazonaws.services.dynamodbv2.document.spec.UpdateItemSpec;
|
|
||||||
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
|
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
|
||||||
import com.amazonaws.services.dynamodbv2.model.CancellationReason;
|
import com.amazonaws.services.dynamodbv2.model.CancellationReason;
|
||||||
import com.amazonaws.services.dynamodbv2.model.Delete;
|
import com.amazonaws.services.dynamodbv2.model.Delete;
|
||||||
|
@ -19,6 +17,7 @@ import com.amazonaws.services.dynamodbv2.model.ReturnValuesOnConditionCheckFailu
|
||||||
import com.amazonaws.services.dynamodbv2.model.TransactWriteItem;
|
import com.amazonaws.services.dynamodbv2.model.TransactWriteItem;
|
||||||
import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsRequest;
|
import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsRequest;
|
||||||
import com.amazonaws.services.dynamodbv2.model.TransactionCanceledException;
|
import com.amazonaws.services.dynamodbv2.model.TransactionCanceledException;
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import io.micrometer.core.instrument.Metrics;
|
import io.micrometer.core.instrument.Metrics;
|
||||||
|
@ -145,20 +144,22 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
|
||||||
@Override
|
@Override
|
||||||
public void update(Account account) {
|
public void update(Account account) {
|
||||||
UPDATE_TIMER.record(() -> {
|
UPDATE_TIMER.record(() -> {
|
||||||
UpdateItemSpec updateItemSpec;
|
UpdateItemRequest updateItemRequest;
|
||||||
try {
|
try {
|
||||||
updateItemSpec = new UpdateItemSpec()
|
updateItemRequest = new UpdateItemRequest()
|
||||||
.withPrimaryKey(
|
.withTableName(accountsTable.getTableName())
|
||||||
new PrimaryKey(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(account.getUuid())))
|
.withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(account.getUuid()))))
|
||||||
.withAttributeUpdate(
|
.withUpdateExpression("SET #data=:data")
|
||||||
new AttributeUpdate(ATTR_ACCOUNT_DATA).put(SystemMapper.getMapper().writeValueAsBytes(account)),
|
.withConditionExpression("attribute_exists(#number)")
|
||||||
new AttributeUpdate(ATTR_MIGRATION_VERSION).put(String.valueOf(account.getDynamoDbMigrationVersion())));
|
.withExpressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164,
|
||||||
|
"#data", ATTR_ACCOUNT_DATA))
|
||||||
|
.withExpressionAttributeValues(Map.of(":data", new AttributeValue().withB(ByteBuffer.wrap(SystemMapper.getMapper().writeValueAsBytes(account)))));
|
||||||
|
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
throw new IllegalArgumentException(e);
|
throw new IllegalArgumentException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
accountsTable.updateItem(updateItemSpec);
|
client.updateItem(updateItemRequest);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ package org.whispersystems.textsecuregcm.storage;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
|
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
import com.codahale.metrics.SharedMetricRegistries;
|
import com.codahale.metrics.SharedMetricRegistries;
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
|
@ -130,7 +131,11 @@ public class AccountsManager {
|
||||||
|
|
||||||
if (dynamoWriteEnabled()) {
|
if (dynamoWriteEnabled()) {
|
||||||
runSafelyAndRecordMetrics(() -> {
|
runSafelyAndRecordMetrics(() -> {
|
||||||
dynamoUpdate(account);
|
try {
|
||||||
|
dynamoUpdate(account);
|
||||||
|
} catch (final ConditionalCheckFailedException e) {
|
||||||
|
dynamoCreate(account);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}, Optional.of(account.getUuid()), true, Boolean::compareTo, "update");
|
}, Optional.of(account.getUuid()), true, Boolean::compareTo, "update");
|
||||||
}
|
}
|
||||||
|
@ -404,7 +409,7 @@ public class AccountsManager {
|
||||||
compare(databaseResult, dynamoResult, comparator);
|
compare(databaseResult, dynamoResult, comparator);
|
||||||
|
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.error("Error running " + action + " ih Dynamo", e);
|
logger.error("Error running " + action + " in Dynamo", e);
|
||||||
|
|
||||||
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER, "action", action).increment();
|
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER, "action", action).increment();
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ import com.amazonaws.services.dynamodbv2.document.Item;
|
||||||
import com.amazonaws.services.dynamodbv2.document.Table;
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||||
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
|
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
|
||||||
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
|
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
|
||||||
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
|
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
|
||||||
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
|
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
|
||||||
import com.amazonaws.services.dynamodbv2.model.KeyType;
|
import com.amazonaws.services.dynamodbv2.model.KeyType;
|
||||||
|
@ -172,6 +173,11 @@ class AccountsDynamoDbTest {
|
||||||
|
|
||||||
assertThat(retrieved.isPresent()).isTrue();
|
assertThat(retrieved.isPresent()).isTrue();
|
||||||
verifyStoredState("+14151112222", account.getUuid(), retrieved.get(), account);
|
verifyStoredState("+14151112222", account.getUuid(), retrieved.get(), account);
|
||||||
|
|
||||||
|
device = generateDevice(1);
|
||||||
|
Account unknownAccount = generateAccount("+14151113333", UUID.randomUUID(), Collections.singleton(device));
|
||||||
|
|
||||||
|
assertThatThrownBy(() -> accountsDynamoDb.update(unknownAccount)).isInstanceOfAny(ConditionalCheckFailedException.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -301,7 +307,6 @@ class AccountsDynamoDbTest {
|
||||||
assertThat(migrated).isFalse();
|
assertThat(migrated).isFalse();
|
||||||
verifyStoredState("+14151112222", firstUuid, account);
|
verifyStoredState("+14151112222", firstUuid, account);
|
||||||
|
|
||||||
|
|
||||||
account.setDynamoDbMigrationVersion(account.getDynamoDbMigrationVersion() + 1);
|
account.setDynamoDbMigrationVersion(account.getDynamoDbMigrationVersion() + 1);
|
||||||
|
|
||||||
migrated = accountsDynamoDb.migrate(account);
|
migrated = accountsDynamoDb.migrate(account);
|
||||||
|
|
|
@ -11,6 +11,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.anyString;
|
import static org.mockito.Mockito.anyString;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
|
@ -19,6 +20,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
|
||||||
import io.lettuce.core.RedisException;
|
import io.lettuce.core.RedisException;
|
||||||
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -343,6 +345,42 @@ class AccountsManagerTest {
|
||||||
verifyNoMoreInteractions(accountsDynamoDb);
|
verifyNoMoreInteractions(accountsDynamoDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testUpdate_dynamoConditionFailed() {
|
||||||
|
RedisAdvancedClusterCommands<String, String> commands = mock(RedisAdvancedClusterCommands.class);
|
||||||
|
FaultTolerantRedisCluster cacheCluster = RedisClusterHelper.buildMockRedisCluster(commands);
|
||||||
|
Accounts accounts = mock(Accounts.class);
|
||||||
|
AccountsDynamoDb accountsDynamoDb = mock(AccountsDynamoDb.class);
|
||||||
|
DirectoryQueue directoryQueue = mock(DirectoryQueue.class);
|
||||||
|
KeysDynamoDb keysDynamoDb = mock(KeysDynamoDb.class);
|
||||||
|
MessagesManager messagesManager = mock(MessagesManager.class);
|
||||||
|
UsernamesManager usernamesManager = mock(UsernamesManager.class);
|
||||||
|
ProfilesManager profilesManager = mock(ProfilesManager.class);
|
||||||
|
SecureBackupClient secureBackupClient = mock(SecureBackupClient.class);
|
||||||
|
SecureStorageClient secureStorageClient = mock(SecureStorageClient.class);
|
||||||
|
UUID uuid = UUID.randomUUID();
|
||||||
|
Account account = new Account("+14152222222", uuid, new HashSet<>(), new byte[16]);
|
||||||
|
|
||||||
|
enableDynamo(true);
|
||||||
|
|
||||||
|
when(commands.get(eq("Account3::" + uuid))).thenReturn(null);
|
||||||
|
doThrow(ConditionalCheckFailedException.class).when(accountsDynamoDb).update(any(Account.class));
|
||||||
|
|
||||||
|
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
|
||||||
|
|
||||||
|
assertEquals(0, account.getDynamoDbMigrationVersion());
|
||||||
|
|
||||||
|
accountsManager.update(account);
|
||||||
|
|
||||||
|
assertEquals(1, account.getDynamoDbMigrationVersion());
|
||||||
|
|
||||||
|
verify(accounts, times(1)).update(account);
|
||||||
|
verifyNoMoreInteractions(accounts);
|
||||||
|
|
||||||
|
verify(accountsDynamoDb, times(1)).update(account);
|
||||||
|
verify(accountsDynamoDb, times(1)).create(account);
|
||||||
|
verifyNoMoreInteractions(accountsDynamoDb);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testCompareAccounts() {
|
void testCompareAccounts() {
|
||||||
|
|
Loading…
Reference in New Issue