diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java index 4c278ef44..7b03f9fe6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManager.java @@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Stream; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -47,6 +48,12 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager { static final String CLIENT_ID_HEADER = "CF-Access-Client-Id"; static final String CLIENT_SECRET_HEADER = "CF-Access-Client-Secret"; + private static String TUS_UPLOAD_LENGTH_HEADER = "Upload-Length"; + private static String TUS_UPLOAD_OFFSET_HEADER = "Upload-Offset"; + private static String TUS_VERSION_HEADER = "Tus-Resumable"; + private static String TUS_VERSION = "1.0.0"; + private static String TUS_CONTENT_TYPE = "application/offset+octet-stream"; + private static final String STORAGE_MANAGER_STATUS_COUNTER_NAME = MetricsUtil.name(Cdn3RemoteStorageManager.class, "storageManagerStatus"); @@ -111,6 +118,7 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager { final Timer.Sample sample = Timer.start(); final BackupMediaEncrypter encrypter = new BackupMediaEncrypter(encryptionParameters); final HttpRequest request = HttpRequest.newBuilder().GET().uri(sourceUri).build(); + final int expectedEncryptedLength = encrypter.outputSize(expectedSourceLength); return cdnHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> { if (response.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) { throw new CompletionException(new SourceObjectNotFoundException()); @@ -126,7 +134,6 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager { new InvalidLengthException("Provided sourceLength " + expectedSourceLength + " was " + actualSourceLength)); } - final int expectedEncryptedLength = encrypter.outputSize(actualSourceLength); final HttpRequest.BodyPublisher encryptedBody = HttpRequest.BodyPublishers.fromPublisher( encrypter.encryptBody(response.body()), expectedEncryptedLength); @@ -134,22 +141,31 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager { uploadDescriptor.headers().entrySet() .stream() .flatMap(e -> Stream.of(e.getKey(), e.getValue())), - Stream.of("Upload-Length", Integer.toString(expectedEncryptedLength), "Tus-Resumable", "1.0.0")) + Stream.of( + TUS_VERSION_HEADER, TUS_VERSION, + TUS_UPLOAD_LENGTH_HEADER, Integer.toString(expectedEncryptedLength), + HttpHeaders.CONTENT_TYPE, TUS_CONTENT_TYPE)) .toArray(String[]::new); - final HttpRequest put = HttpRequest.newBuilder() + final HttpRequest post = HttpRequest.newBuilder() .uri(URI.create(uploadDescriptor.signedUploadLocation())) .headers(headers) .POST(encryptedBody) .build(); - return cdnHttpClient.sendAsync(put, HttpResponse.BodyHandlers.discarding()); + return cdnHttpClient.sendAsync(post, HttpResponse.BodyHandlers.discarding()); }) .thenAccept(response -> { if (response.statusCode() != Response.Status.CREATED.getStatusCode() && response.statusCode() != Response.Status.OK.getStatusCode()) { throw new CompletionException(new IOException("Failed to copy object: " + response.statusCode())); } + long uploadOffset = response.headers().firstValueAsLong(TUS_UPLOAD_OFFSET_HEADER) + .orElseThrow(() -> new CompletionException(new IOException("Tus server did not return Upload-Offset"))); + if (uploadOffset != expectedEncryptedLength) { + throw new CompletionException(new IOException( + "Expected to upload %d bytes, uploaded %d".formatted(expectedEncryptedLength, uploadOffset))); + } }) .whenComplete((ignored, ignoredException) -> sample.stop(Metrics.timer(STORAGE_MANAGER_TIMER_NAME, OPERATION_TAG_NAME, "copy"))); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java index d07a4a22c..c2e2a2d2e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/Cdn3RemoteStorageManagerTest.java @@ -108,14 +108,21 @@ public class Cdn3RemoteStorageManagerTest { default -> throw new AssertionError(); }; + final MediaEncryptionParameters encryptionParameters = new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV); + final long expectedEncryptedLength = encryptionParameters.outputSize(expectedSource.length()); + wireMock.stubFor(post(urlEqualTo("/cdn3/dest")) + .withHeader("Content-Length", equalTo(Long.toString(expectedEncryptedLength))) + .withHeader("Upload-Length", equalTo(Long.toString(expectedEncryptedLength))) + .withHeader("Content-Type", equalTo("application/offset+octet-stream")) .willReturn(aResponse() - .withStatus(201))); + .withStatus(201) + .withHeader("Upload-Offset", Long.toString(expectedEncryptedLength)))); remoteStorageManager.copy( URI.create(wireMock.url("/cdn" + sourceCdn + "/source/small")), expectedSource.length(), - new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV), + encryptionParameters, new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest"))) .toCompletableFuture().join(); @@ -127,10 +134,15 @@ public class Cdn3RemoteStorageManagerTest { @Test public void copyLarge() throws InvalidAlgorithmParameterException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException { - wireMock.stubFor(post(urlEqualTo("/cdn3/dest")) - .willReturn(aResponse() - .withStatus(201))); final MediaEncryptionParameters params = new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV); + final long expectedEncryptedLength = params.outputSize(LARGE.length()); + wireMock.stubFor(post(urlEqualTo("/cdn3/dest")) + .withHeader("Content-Length", equalTo(Long.toString(expectedEncryptedLength))) + .withHeader("Upload-Length", equalTo(Long.toString(expectedEncryptedLength))) + .withHeader("Content-Type", equalTo("application/offset+octet-stream")) + .willReturn(aResponse() + .withStatus(201) + .withHeader("Upload-Offset", Long.toString(expectedEncryptedLength)))); remoteStorageManager.copy( URI.create(wireMock.url("/cdn3/source/large")), LARGE.length(),