Revert "Revert "Retire the "migrate signed pre-keys" command""
This reverts commit f738bc97e7
.
This commit is contained in:
parent
184cdc0331
commit
61256d49cd
|
@ -215,7 +215,6 @@ import org.whispersystems.textsecuregcm.workers.CertificateCommand;
|
|||
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.MigrateSignedECPreKeysCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.ProcessPushNotificationFeedbackCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand;
|
||||
|
@ -273,7 +272,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
bootstrap.addCommand(new UnlinkDeviceCommand());
|
||||
bootstrap.addCommand(new ScheduledApnPushNotificationSenderServiceCommand());
|
||||
bootstrap.addCommand(new MessagePersisterServiceCommand());
|
||||
bootstrap.addCommand(new MigrateSignedECPreKeysCommand());
|
||||
bootstrap.addCommand(new RemoveExpiredAccountsCommand(Clock.systemUTC()));
|
||||
bootstrap.addCommand(new ProcessPushNotificationFeedbackCommand(Clock.systemUTC()));
|
||||
bootstrap.addCommand(new RemoveExpiredLinkedDevicesCommand());
|
||||
|
|
|
@ -107,11 +107,6 @@ public class KeysManager {
|
|||
}
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> storeEcSignedPreKeyIfAbsent(final UUID identifier, final byte deviceId,
|
||||
final ECSignedPreKey signedPreKey) {
|
||||
return ecSignedPreKeys.storeIfAbsent(identifier, deviceId, signedPreKey);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> storePqLastResort(final UUID identifier, final byte deviceId, final KEMSignedPreKey lastResortKey) {
|
||||
return pqLastResortKeys.store(identifier, deviceId, lastResortKey);
|
||||
}
|
||||
|
|
|
@ -7,27 +7,17 @@ package org.whispersystems.textsecuregcm.storage;
|
|||
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.signal.libsignal.protocol.InvalidKeyException;
|
||||
import org.signal.libsignal.protocol.ecc.ECPublicKey;
|
||||
import org.whispersystems.textsecuregcm.entities.ECSignedPreKey;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
|
||||
public class RepeatedUseECSignedPreKeyStore extends RepeatedUseSignedPreKeyStore<ECSignedPreKey> {
|
||||
|
||||
private final DynamoDbAsyncClient dynamoDbAsyncClient;
|
||||
private final String tableName;
|
||||
|
||||
public RepeatedUseECSignedPreKeyStore(final DynamoDbAsyncClient dynamoDbAsyncClient, final String tableName) {
|
||||
super(dynamoDbAsyncClient, tableName);
|
||||
|
||||
this.dynamoDbAsyncClient = dynamoDbAsyncClient;
|
||||
this.tableName = tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -53,21 +43,4 @@ public class RepeatedUseECSignedPreKeyStore extends RepeatedUseSignedPreKeyStore
|
|||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> storeIfAbsent(final UUID identifier, final byte deviceId, final ECSignedPreKey signedPreKey) {
|
||||
return dynamoDbAsyncClient.putItem(PutItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.item(getItemFromPreKey(identifier, deviceId, signedPreKey))
|
||||
.conditionExpression("attribute_not_exists(#public_key)")
|
||||
.expressionAttributeNames(Map.of("#public_key", ATTR_PUBLIC_KEY))
|
||||
.build())
|
||||
.thenApply(ignored -> true)
|
||||
.exceptionally(throwable -> {
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof ConditionalCheckFailedException) {
|
||||
return false;
|
||||
}
|
||||
|
||||
throw ExceptionUtils.wrap(throwable);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,98 +0,0 @@
|
|||
/*
|
||||
* Copyright 2023 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
import net.sourceforge.argparse4j.inf.Subparser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.entities.ECSignedPreKey;
|
||||
import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.KeysManager;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple3;
|
||||
import reactor.util.function.Tuples;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
|
||||
|
||||
private static final String STORE_KEY_ATTEMPT_COUNTER_NAME =
|
||||
MetricsUtil.name(MigrateSignedECPreKeysCommand.class, "storeKeyAttempt");
|
||||
|
||||
// It's tricky to find, but the default connection count for the AWS SDK's async DynamoDB client is 50. As long as
|
||||
// we stay below that, we should be fine.
|
||||
private static final int DEFAULT_MAX_CONCURRENCY = 32;
|
||||
|
||||
private static final String BUFFER_ARGUMENT = "buffer";
|
||||
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MigrateSignedECPreKeysCommand.class);
|
||||
|
||||
public MigrateSignedECPreKeysCommand() {
|
||||
super("migrate-signed-ec-pre-keys", "Migrate signed EC pre-keys from Account records to a dedicated table");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Subparser subparser) {
|
||||
super.configure(subparser);
|
||||
|
||||
subparser.addArgument("--max-concurrency")
|
||||
.type(Integer.class)
|
||||
.dest(MAX_CONCURRENCY_ARGUMENT)
|
||||
.setDefault(DEFAULT_MAX_CONCURRENCY)
|
||||
.help("Max concurrency for DynamoDB operations");
|
||||
|
||||
subparser.addArgument("--buffer")
|
||||
.type(Integer.class)
|
||||
.dest(BUFFER_ARGUMENT)
|
||||
.setDefault(16_384)
|
||||
.help("Devices to buffer");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void crawlAccounts(final Flux<Account> accounts) {
|
||||
final KeysManager keysManager = getCommandDependencies().keysManager();
|
||||
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
|
||||
final int bufferSize = getNamespace().getInt(BUFFER_ARGUMENT);
|
||||
|
||||
accounts
|
||||
.flatMap(account -> Flux.fromIterable(account.getDevices())
|
||||
.flatMap(device -> Flux.fromArray(IdentityType.values())
|
||||
.filter(identityType -> device.getSignedPreKey(identityType) != null)
|
||||
.map(identityType -> Tuples.of(account.getIdentifier(identityType), device.getId(), device.getSignedPreKey(identityType)))))
|
||||
.buffer(bufferSize)
|
||||
.map(source -> {
|
||||
final List<Tuple3<UUID, Byte, ECSignedPreKey>> shuffled = new ArrayList<>(source);
|
||||
Collections.shuffle(shuffled);
|
||||
return shuffled;
|
||||
})
|
||||
.flatMapIterable(Function.identity())
|
||||
.flatMap(keyTuple -> {
|
||||
final UUID identifier = keyTuple.getT1();
|
||||
final byte deviceId = keyTuple.getT2();
|
||||
final ECSignedPreKey signedPreKey = keyTuple.getT3();
|
||||
|
||||
return Mono.fromFuture(() -> keysManager.storeEcSignedPreKeyIfAbsent(identifier, deviceId, signedPreKey))
|
||||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).onRetryExhaustedThrow((spec, rs) -> rs.failure()))
|
||||
.onErrorResume(throwable -> {
|
||||
logger.warn("Failed to migrate key for UUID {}, device {}", identifier, deviceId);
|
||||
return Mono.just(false);
|
||||
})
|
||||
.doOnSuccess(keyStored -> Metrics.counter(STORE_KEY_ATTEMPT_COUNTER_NAME, "stored", String.valueOf(keyStored)).increment());
|
||||
}, maxConcurrency)
|
||||
.then()
|
||||
.block();
|
||||
}
|
||||
}
|
|
@ -5,14 +5,7 @@
|
|||
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.signal.libsignal.protocol.ecc.Curve;
|
||||
import org.signal.libsignal.protocol.ecc.ECKeyPair;
|
||||
|
@ -52,21 +45,4 @@ class RepeatedUseECSignedPreKeyStoreTest extends RepeatedUseSignedPreKeyStoreTes
|
|||
protected DynamoDbClient getDynamoDbClient() {
|
||||
return DYNAMO_DB_EXTENSION.getDynamoDbClient();
|
||||
}
|
||||
|
||||
@Test
|
||||
void storeIfAbsent() {
|
||||
final UUID identifier = UUID.randomUUID();
|
||||
final byte deviceIdWithExistingKey = 1;
|
||||
final byte deviceIdWithoutExistingKey = deviceIdWithExistingKey + 1;
|
||||
|
||||
final ECSignedPreKey originalSignedPreKey = generateSignedPreKey();
|
||||
|
||||
keyStore.store(identifier, deviceIdWithExistingKey, originalSignedPreKey).join();
|
||||
|
||||
assertFalse(keyStore.storeIfAbsent(identifier, deviceIdWithExistingKey, generateSignedPreKey()).join());
|
||||
assertTrue(keyStore.storeIfAbsent(identifier, deviceIdWithoutExistingKey, generateSignedPreKey()).join());
|
||||
|
||||
assertEquals(Optional.of(originalSignedPreKey), keyStore.find(identifier, deviceIdWithExistingKey).join());
|
||||
assertTrue(keyStore.find(identifier, deviceIdWithoutExistingKey).join().isPresent());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue