Add Content-Type header for copy uploads
This commit is contained in:
parent
4a2cbb9ec7
commit
bf39be3320
|
@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import javax.validation.constraints.NotNull;
|
import javax.validation.constraints.NotNull;
|
||||||
|
import javax.ws.rs.core.HttpHeaders;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -47,6 +48,12 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||||
static final String CLIENT_ID_HEADER = "CF-Access-Client-Id";
|
static final String CLIENT_ID_HEADER = "CF-Access-Client-Id";
|
||||||
static final String CLIENT_SECRET_HEADER = "CF-Access-Client-Secret";
|
static final String CLIENT_SECRET_HEADER = "CF-Access-Client-Secret";
|
||||||
|
|
||||||
|
private static String TUS_UPLOAD_LENGTH_HEADER = "Upload-Length";
|
||||||
|
private static String TUS_UPLOAD_OFFSET_HEADER = "Upload-Offset";
|
||||||
|
private static String TUS_VERSION_HEADER = "Tus-Resumable";
|
||||||
|
private static String TUS_VERSION = "1.0.0";
|
||||||
|
private static String TUS_CONTENT_TYPE = "application/offset+octet-stream";
|
||||||
|
|
||||||
private static final String STORAGE_MANAGER_STATUS_COUNTER_NAME = MetricsUtil.name(Cdn3RemoteStorageManager.class,
|
private static final String STORAGE_MANAGER_STATUS_COUNTER_NAME = MetricsUtil.name(Cdn3RemoteStorageManager.class,
|
||||||
"storageManagerStatus");
|
"storageManagerStatus");
|
||||||
|
|
||||||
|
@ -111,6 +118,7 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||||
final Timer.Sample sample = Timer.start();
|
final Timer.Sample sample = Timer.start();
|
||||||
final BackupMediaEncrypter encrypter = new BackupMediaEncrypter(encryptionParameters);
|
final BackupMediaEncrypter encrypter = new BackupMediaEncrypter(encryptionParameters);
|
||||||
final HttpRequest request = HttpRequest.newBuilder().GET().uri(sourceUri).build();
|
final HttpRequest request = HttpRequest.newBuilder().GET().uri(sourceUri).build();
|
||||||
|
final int expectedEncryptedLength = encrypter.outputSize(expectedSourceLength);
|
||||||
return cdnHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> {
|
return cdnHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> {
|
||||||
if (response.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
|
if (response.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
|
||||||
throw new CompletionException(new SourceObjectNotFoundException());
|
throw new CompletionException(new SourceObjectNotFoundException());
|
||||||
|
@ -126,7 +134,6 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||||
new InvalidLengthException("Provided sourceLength " + expectedSourceLength + " was " + actualSourceLength));
|
new InvalidLengthException("Provided sourceLength " + expectedSourceLength + " was " + actualSourceLength));
|
||||||
}
|
}
|
||||||
|
|
||||||
final int expectedEncryptedLength = encrypter.outputSize(actualSourceLength);
|
|
||||||
final HttpRequest.BodyPublisher encryptedBody = HttpRequest.BodyPublishers.fromPublisher(
|
final HttpRequest.BodyPublisher encryptedBody = HttpRequest.BodyPublishers.fromPublisher(
|
||||||
encrypter.encryptBody(response.body()), expectedEncryptedLength);
|
encrypter.encryptBody(response.body()), expectedEncryptedLength);
|
||||||
|
|
||||||
|
@ -134,22 +141,31 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||||
uploadDescriptor.headers().entrySet()
|
uploadDescriptor.headers().entrySet()
|
||||||
.stream()
|
.stream()
|
||||||
.flatMap(e -> Stream.of(e.getKey(), e.getValue())),
|
.flatMap(e -> Stream.of(e.getKey(), e.getValue())),
|
||||||
Stream.of("Upload-Length", Integer.toString(expectedEncryptedLength), "Tus-Resumable", "1.0.0"))
|
Stream.of(
|
||||||
|
TUS_VERSION_HEADER, TUS_VERSION,
|
||||||
|
TUS_UPLOAD_LENGTH_HEADER, Integer.toString(expectedEncryptedLength),
|
||||||
|
HttpHeaders.CONTENT_TYPE, TUS_CONTENT_TYPE))
|
||||||
.toArray(String[]::new);
|
.toArray(String[]::new);
|
||||||
|
|
||||||
final HttpRequest put = HttpRequest.newBuilder()
|
final HttpRequest post = HttpRequest.newBuilder()
|
||||||
.uri(URI.create(uploadDescriptor.signedUploadLocation()))
|
.uri(URI.create(uploadDescriptor.signedUploadLocation()))
|
||||||
.headers(headers)
|
.headers(headers)
|
||||||
.POST(encryptedBody)
|
.POST(encryptedBody)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
return cdnHttpClient.sendAsync(put, HttpResponse.BodyHandlers.discarding());
|
return cdnHttpClient.sendAsync(post, HttpResponse.BodyHandlers.discarding());
|
||||||
})
|
})
|
||||||
.thenAccept(response -> {
|
.thenAccept(response -> {
|
||||||
if (response.statusCode() != Response.Status.CREATED.getStatusCode() &&
|
if (response.statusCode() != Response.Status.CREATED.getStatusCode() &&
|
||||||
response.statusCode() != Response.Status.OK.getStatusCode()) {
|
response.statusCode() != Response.Status.OK.getStatusCode()) {
|
||||||
throw new CompletionException(new IOException("Failed to copy object: " + response.statusCode()));
|
throw new CompletionException(new IOException("Failed to copy object: " + response.statusCode()));
|
||||||
}
|
}
|
||||||
|
long uploadOffset = response.headers().firstValueAsLong(TUS_UPLOAD_OFFSET_HEADER)
|
||||||
|
.orElseThrow(() -> new CompletionException(new IOException("Tus server did not return Upload-Offset")));
|
||||||
|
if (uploadOffset != expectedEncryptedLength) {
|
||||||
|
throw new CompletionException(new IOException(
|
||||||
|
"Expected to upload %d bytes, uploaded %d".formatted(expectedEncryptedLength, uploadOffset)));
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.whenComplete((ignored, ignoredException) ->
|
.whenComplete((ignored, ignoredException) ->
|
||||||
sample.stop(Metrics.timer(STORAGE_MANAGER_TIMER_NAME, OPERATION_TAG_NAME, "copy")));
|
sample.stop(Metrics.timer(STORAGE_MANAGER_TIMER_NAME, OPERATION_TAG_NAME, "copy")));
|
||||||
|
|
|
@ -108,14 +108,21 @@ public class Cdn3RemoteStorageManagerTest {
|
||||||
default -> throw new AssertionError();
|
default -> throw new AssertionError();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
final MediaEncryptionParameters encryptionParameters = new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV);
|
||||||
|
final long expectedEncryptedLength = encryptionParameters.outputSize(expectedSource.length());
|
||||||
|
|
||||||
wireMock.stubFor(post(urlEqualTo("/cdn3/dest"))
|
wireMock.stubFor(post(urlEqualTo("/cdn3/dest"))
|
||||||
|
.withHeader("Content-Length", equalTo(Long.toString(expectedEncryptedLength)))
|
||||||
|
.withHeader("Upload-Length", equalTo(Long.toString(expectedEncryptedLength)))
|
||||||
|
.withHeader("Content-Type", equalTo("application/offset+octet-stream"))
|
||||||
.willReturn(aResponse()
|
.willReturn(aResponse()
|
||||||
.withStatus(201)));
|
.withStatus(201)
|
||||||
|
.withHeader("Upload-Offset", Long.toString(expectedEncryptedLength))));
|
||||||
|
|
||||||
remoteStorageManager.copy(
|
remoteStorageManager.copy(
|
||||||
URI.create(wireMock.url("/cdn" + sourceCdn + "/source/small")),
|
URI.create(wireMock.url("/cdn" + sourceCdn + "/source/small")),
|
||||||
expectedSource.length(),
|
expectedSource.length(),
|
||||||
new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV),
|
encryptionParameters,
|
||||||
new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest")))
|
new MessageBackupUploadDescriptor(3, "test", Collections.emptyMap(), wireMock.url("/cdn3/dest")))
|
||||||
.toCompletableFuture().join();
|
.toCompletableFuture().join();
|
||||||
|
|
||||||
|
@ -127,10 +134,15 @@ public class Cdn3RemoteStorageManagerTest {
|
||||||
@Test
|
@Test
|
||||||
public void copyLarge()
|
public void copyLarge()
|
||||||
throws InvalidAlgorithmParameterException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException {
|
throws InvalidAlgorithmParameterException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException {
|
||||||
wireMock.stubFor(post(urlEqualTo("/cdn3/dest"))
|
|
||||||
.willReturn(aResponse()
|
|
||||||
.withStatus(201)));
|
|
||||||
final MediaEncryptionParameters params = new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV);
|
final MediaEncryptionParameters params = new MediaEncryptionParameters(AES_KEY, HMAC_KEY, IV);
|
||||||
|
final long expectedEncryptedLength = params.outputSize(LARGE.length());
|
||||||
|
wireMock.stubFor(post(urlEqualTo("/cdn3/dest"))
|
||||||
|
.withHeader("Content-Length", equalTo(Long.toString(expectedEncryptedLength)))
|
||||||
|
.withHeader("Upload-Length", equalTo(Long.toString(expectedEncryptedLength)))
|
||||||
|
.withHeader("Content-Type", equalTo("application/offset+octet-stream"))
|
||||||
|
.willReturn(aResponse()
|
||||||
|
.withStatus(201)
|
||||||
|
.withHeader("Upload-Offset", Long.toString(expectedEncryptedLength))));
|
||||||
remoteStorageManager.copy(
|
remoteStorageManager.copy(
|
||||||
URI.create(wireMock.url("/cdn3/source/large")),
|
URI.create(wireMock.url("/cdn3/source/large")),
|
||||||
LARGE.length(),
|
LARGE.length(),
|
||||||
|
|
Loading…
Reference in New Issue