Add error reporting to `/v1/devices/transfer_archive`

This commit is contained in:
ravi-signal 2024-11-25 14:41:51 -06:00 committed by GitHub
parent 3ba7ba4f92
commit 49d6a5e32d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 145 additions and 28 deletions

View File

@ -67,6 +67,7 @@ import org.whispersystems.textsecuregcm.entities.ProvisioningMessage;
import org.whispersystems.textsecuregcm.entities.RemoteAttachment;
import org.whispersystems.textsecuregcm.entities.RestoreAccountRequest;
import org.whispersystems.textsecuregcm.entities.SetPublicKeyRequest;
import org.whispersystems.textsecuregcm.entities.TransferArchiveResult;
import org.whispersystems.textsecuregcm.entities.TransferArchiveUploadedRequest;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.limits.RateLimitedByIp;
@ -522,8 +523,10 @@ public class DeviceController {
@Operation(
summary = "Signals that a transfer archive has been uploaded for a specific linked device",
description = """
Signals that a transfer archive has been uploaded for a specific linked device. Devices waiting via the "wait
for transfer archive" endpoint will be notified that the new archive is available.
Signals that a transfer archive has been uploaded or failed for a specific linked device. Devices waiting via
the "wait for transfer archive" endpoint will be notified that the new archive is available.
If the uploader cannot upload the transfer archive, they must signal an error.
""")
@ApiResponse(responseCode = "204", description = "Success")
@ApiResponse(responseCode = "422", description = "The request object could not be parsed or was otherwise invalid")
@ -546,8 +549,8 @@ public class DeviceController {
Waits for a new transfer archive to be uploaded for the authenticated device and returns the location of the
archive when available.
""")
@ApiResponse(responseCode = "200", description = "A new transfer archive was uploaded for the authenticated device",
content = @Content(schema = @Schema(implementation = RemoteAttachment.class)))
@ApiResponse(responseCode = "200", description = "Either a new transfer archive was uploaded for the authenticated device, or the upload has failed",
content = @Content(schema = @Schema(implementation = TransferArchiveResult.class)))
@ApiResponse(responseCode = "204", description = "No transfer archive was uploaded before the call completed; clients may repeat the call to continue waiting")
@ApiResponse(responseCode = "400", description = "The given timeout was invalid")
@ApiResponse(responseCode = "429", description = "Rate-limited; try again after the prescribed delay")

View File

@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.entities;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.whispersystems.textsecuregcm.util.ValidBase64URLString;
public record RemoteAttachment(
@ -17,6 +18,6 @@ public record RemoteAttachment(
@NotBlank
@ValidBase64URLString
@Schema(description = "The attachment key")
String key) {
}
@Size(max = 64)
@Schema(description = "The attachment key", maxLength = 64)
String key) implements TransferArchiveResult {}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.entities;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
@Schema(description = "Indicates an attachment failed to upload")
public record RemoteAttachmentError(
@Schema(description = "The type of error encountered")
@Valid @NotNull ErrorType error)
implements TransferArchiveResult {
public enum ErrorType {
RELINK_REQUESTED,
CONTINUE_WITHOUT_UPLOAD;
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.swagger.v3.oas.annotations.media.Schema;
@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION)
@JsonSubTypes({
@JsonSubTypes.Type(value = RemoteAttachment.class, name = "success"),
@JsonSubTypes.Type(value = RemoteAttachmentError.class, name = "error"),
})
@Schema(description = """
The location of the transfer archive if the archive was successfully uploaded, otherwise a error indicating that
the upload has failed and the destination device should stop waiting
""", oneOf = {RemoteAttachmentError.class, RemoteAttachment.class})
public sealed interface TransferArchiveResult permits RemoteAttachment, RemoteAttachmentError {}

View File

@ -9,19 +9,20 @@ import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
import org.whispersystems.textsecuregcm.storage.Device;
public record TransferArchiveUploadedRequest(@Min(1)
@Max(Device.MAXIMUM_DEVICE_ID)
@Schema(description = "The ID of the device for which the transfer archive has been prepared")
byte destinationDeviceId,
public record TransferArchiveUploadedRequest(
@Min(1)
@Max(Device.MAXIMUM_DEVICE_ID)
@Schema(description = "The ID of the device for which the transfer archive has been prepared")
byte destinationDeviceId,
@Positive
@Schema(description = "The timestamp, in milliseconds since the epoch, at which the destination device was created")
long destinationDeviceCreated,
@Positive
@Schema(description = "The timestamp, in milliseconds since the epoch, at which the destination device was created")
long destinationDeviceCreated,
@Schema(description = "The location of the transfer archive")
@Valid
RemoteAttachment transferArchive) {
}
@NotNull
@Valid
TransferArchiveResult transferArchive) {}

View File

@ -72,8 +72,8 @@ import org.whispersystems.textsecuregcm.entities.AccountAttributes;
import org.whispersystems.textsecuregcm.entities.DeviceInfo;
import org.whispersystems.textsecuregcm.entities.ECSignedPreKey;
import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey;
import org.whispersystems.textsecuregcm.entities.RemoteAttachment;
import org.whispersystems.textsecuregcm.entities.RestoreAccountRequest;
import org.whispersystems.textsecuregcm.entities.TransferArchiveResult;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
@ -140,7 +140,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
private final Map<String, CompletableFuture<Optional<DeviceInfo>>> waitForDeviceFuturesByTokenIdentifier =
new ConcurrentHashMap<>();
private final Map<TimestampedDeviceIdentifier, CompletableFuture<Optional<RemoteAttachment>>> waitForTransferArchiveFuturesByDeviceIdentifier =
private final Map<TimestampedDeviceIdentifier, CompletableFuture<Optional<TransferArchiveResult>>> waitForTransferArchiveFuturesByDeviceIdentifier =
new ConcurrentHashMap<>();
private final Map<String, CompletableFuture<Optional<RestoreAccountRequest>>> waitForRestoreAccountRequestFuturesByToken =
@ -1548,7 +1548,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
return LINKED_DEVICE_PREFIX + linkDeviceTokenIdentifier;
}
public CompletableFuture<Optional<RemoteAttachment>> waitForTransferArchive(final Account account, final Device device, final Duration timeout) {
public CompletableFuture<Optional<TransferArchiveResult>> waitForTransferArchive(final Account account, final Device device, final Duration timeout) {
final TimestampedDeviceIdentifier deviceIdentifier =
new TimestampedDeviceIdentifier(account.getIdentifier(IdentityType.ACI),
device.getId(),
@ -1564,14 +1564,14 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
public CompletableFuture<Void> recordTransferArchiveUpload(final Account account,
final byte destinationDeviceId,
final Instant destinationDeviceCreationTimestamp,
final RemoteAttachment transferArchive) {
final TransferArchiveResult transferArchiveResult) {
final String key = getTransferArchiveKey(account.getIdentifier(IdentityType.ACI),
destinationDeviceId,
destinationDeviceCreationTimestamp);
try {
final String transferArchiveJson = SystemMapper.jsonMapper().writeValueAsString(transferArchive);
final String transferArchiveJson = SystemMapper.jsonMapper().writeValueAsString(transferArchiveResult);
return pubSubRedisClient.withConnection(connection ->
connection.async().set(key, transferArchiveJson, SetArgs.Builder.ex(RECENTLY_ADDED_TRANSFER_ARCHIVE_TTL)))
@ -1583,9 +1583,9 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
}
}
private void handleTransferArchiveAdded(final CompletableFuture<Optional<RemoteAttachment>> future, final String transferArchiveJson) {
private void handleTransferArchiveAdded(final CompletableFuture<Optional<TransferArchiveResult>> future, final String transferArchiveJson) {
try {
future.complete(Optional.of(SystemMapper.jsonMapper().readValue(transferArchiveJson, RemoteAttachment.class)));
future.complete(Optional.of(SystemMapper.jsonMapper().readValue(transferArchiveJson, TransferArchiveResult.class)));
} catch (final JsonProcessingException e) {
logger.error("Could not parse transfer archive json", e);
future.completeExceptionally(e);

View File

@ -72,6 +72,7 @@ import org.whispersystems.textsecuregcm.entities.KEMSignedPreKey;
import org.whispersystems.textsecuregcm.entities.LinkDeviceRequest;
import org.whispersystems.textsecuregcm.entities.LinkDeviceResponse;
import org.whispersystems.textsecuregcm.entities.RemoteAttachment;
import org.whispersystems.textsecuregcm.entities.RemoteAttachmentError;
import org.whispersystems.textsecuregcm.entities.RestoreAccountRequest;
import org.whispersystems.textsecuregcm.entities.SetPublicKeyRequest;
import org.whispersystems.textsecuregcm.entities.TransferArchiveUploadedRequest;
@ -1050,6 +1051,30 @@ class DeviceControllerTest {
}
}
@Test
void recordTransferArchiveFailed() {
final byte deviceId = Device.PRIMARY_ID + 1;
final Instant deviceCreated = Instant.now().truncatedTo(ChronoUnit.MILLIS);
final RemoteAttachmentError transferFailure = new RemoteAttachmentError(RemoteAttachmentError.ErrorType.CONTINUE_WITHOUT_UPLOAD);
when(rateLimiter.validateAsync(AuthHelper.VALID_UUID)).thenReturn(CompletableFuture.completedFuture(null));
when(accountsManager.recordTransferArchiveUpload(AuthHelper.VALID_ACCOUNT, deviceId, deviceCreated, transferFailure))
.thenReturn(CompletableFuture.completedFuture(null));
try (final Response response = resources.getJerseyTest()
.target("/v1/devices/transfer_archive")
.request()
.header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD))
.put(Entity.entity(new TransferArchiveUploadedRequest(deviceId, deviceCreated.toEpochMilli(), transferFailure),
MediaType.APPLICATION_JSON_TYPE))) {
assertEquals(204, response.getStatus());
verify(accountsManager)
.recordTransferArchiveUpload(AuthHelper.VALID_ACCOUNT, deviceId, deviceCreated, transferFailure);
}
}
@ParameterizedTest
@MethodSource
void recordTransferArchiveUploadedBadRequest(final TransferArchiveUploadedRequest request) {
@ -1130,6 +1155,26 @@ class DeviceControllerTest {
}
}
@Test
void waitForTransferArchiveUploadFailed() {
final RemoteAttachment transferArchive =
new RemoteAttachment(3, Base64.getUrlEncoder().encodeToString("test".getBytes(StandardCharsets.UTF_8)));
when(rateLimiter.validateAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
when(accountsManager.waitForTransferArchive(eq(AuthHelper.VALID_ACCOUNT), eq(AuthHelper.VALID_DEVICE), any()))
.thenReturn(CompletableFuture.completedFuture(Optional.of(transferArchive)));
try (final Response response = resources.getJerseyTest()
.target("/v1/devices/transfer_archive/")
.request()
.header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_UUID, AuthHelper.VALID_PASSWORD))
.get()) {
assertEquals(200, response.getStatus());
assertEquals(transferArchive, response.readEntity(RemoteAttachment.class));
}
}
@Test
void waitForTransferArchiveNoArchiveUploaded() {
when(rateLimiter.validateAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));

View File

@ -12,10 +12,11 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.auth.DisconnectionRequestManager;
import org.whispersystems.textsecuregcm.entities.RemoteAttachmentError;
import org.whispersystems.textsecuregcm.entities.RestoreAccountRequest;
import org.whispersystems.textsecuregcm.entities.RemoteAttachment;
import org.whispersystems.textsecuregcm.entities.TransferArchiveResult;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.redis.RedisServerExtension;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
@ -97,10 +98,10 @@ public class AccountsManagerDeviceTransferIntegrationTest {
final Account account = mock(Account.class);
when(account.getIdentifier(IdentityType.ACI)).thenReturn(accountIdentifier);
final CompletableFuture<Optional<RemoteAttachment>> displacedFuture =
final CompletableFuture<Optional<TransferArchiveResult>> displacedFuture =
accountsManager.waitForTransferArchive(account, device, Duration.ofSeconds(5));
final CompletableFuture<Optional<RemoteAttachment>> activeFuture =
final CompletableFuture<Optional<TransferArchiveResult>> activeFuture =
accountsManager.waitForTransferArchive(account, device, Duration.ofSeconds(5));
assertEquals(Optional.empty(), displacedFuture.join());
@ -132,6 +133,30 @@ public class AccountsManagerDeviceTransferIntegrationTest {
accountsManager.waitForTransferArchive(account, device, Duration.ofSeconds(5)).join());
}
@Test
void waitForErrorTransferArchive() {
final UUID accountIdentifier = UUID.randomUUID();
final byte deviceId = Device.PRIMARY_ID;
final long deviceCreated = System.currentTimeMillis();
final RemoteAttachmentError transferArchiveError =
new RemoteAttachmentError(RemoteAttachmentError.ErrorType.CONTINUE_WITHOUT_UPLOAD);
final Device device = mock(Device.class);
when(device.getId()).thenReturn(deviceId);
when(device.getCreated()).thenReturn(deviceCreated);
final Account account = mock(Account.class);
when(account.getIdentifier(IdentityType.ACI)).thenReturn(accountIdentifier);
accountsManager
.recordTransferArchiveUpload(account, deviceId, Instant.ofEpochMilli(deviceCreated), transferArchiveError)
.join();
assertEquals(Optional.of(transferArchiveError),
accountsManager.waitForTransferArchive(account, device, Duration.ofSeconds(5)).join());
}
@Test
void waitForTransferArchiveTimeout() {
final UUID accountIdentifier = UUID.randomUUID();