Simplify logic for batching bulk identity check request

This commit is contained in:
Ehren Kret 2022-06-20 10:28:20 -05:00
parent 47300c1d44
commit cc8dda28cc
2 changed files with 30 additions and 36 deletions

View File

@ -373,6 +373,10 @@
<artifactId>reactor-core</artifactId>
<version>3.3.16.RELEASE</version>
</dependency>
<dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>

View File

@ -12,6 +12,7 @@ import io.dropwizard.auth.Auth;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.vavr.Tuple;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
@ -330,45 +331,34 @@ public class ProfileController {
@Path("/identity_check/batch")
public CompletableFuture<BatchIdentityCheckResponse> runBatchIdentityCheck(@NotNull @Valid BatchIdentityCheckRequest request) {
return CompletableFuture.supplyAsync(() -> {
List<BatchIdentityCheckResponse.Element> responseElements = Collections.synchronizedList(new ArrayList<>());
BatchIdentityCheckResponse response = new BatchIdentityCheckResponse(responseElements);
List<BatchIdentityCheckResponse.Element> responseElements = Collections.synchronizedList(new ArrayList<>());
// for small requests only run one batch
if (request.elements().size() <= 30) {
MessageDigest sha256;
try {
sha256 = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
throw new AssertionError(e);
}
for (final BatchIdentityCheckRequest.Element element : request.elements()) {
checkFingerprintAndAdd(element, responseElements, sha256);
}
} else {
final int batchCount = 10;
final int batchSize = request.elements().size() / batchCount;
@SuppressWarnings("rawtypes") CompletableFuture[] futures = new CompletableFuture[batchCount];
final int targetBatchCount = 10;
// clamp the amount per batch to be in the closed range [30, 100]
final int batchSize = Math.min(Math.max(request.elements().size() / targetBatchCount, 30), 100);
// add 1 extra batch if there is any remainder to consume the final non-full batch
final int batchCount =
request.elements().size() / batchSize + (request.elements().size() % batchSize != 0 ? 1 : 0);
for (int i = 0; i < batchCount; i++) {
List<BatchIdentityCheckRequest.Element> batch = request.elements()
.subList(i * batchSize, Math.min((i + 1) * batchSize, request.elements().size()));
futures[i] = CompletableFuture.runAsync(() -> {
MessageDigest sha256;
try {
sha256 = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
throw new AssertionError(e);
}
for (final BatchIdentityCheckRequest.Element element : batch) {
checkFingerprintAndAdd(element, responseElements, sha256);
}
});
}
@SuppressWarnings("rawtypes") CompletableFuture[] futures = new CompletableFuture[batchCount];
for (int i = 0; i < batchCount; ++i) {
List<BatchIdentityCheckRequest.Element> batch = request.elements()
.subList(i * batchSize, Math.min((i + 1) * batchSize, request.elements().size()));
futures[i] = CompletableFuture.runAsync(() -> {
MessageDigest sha256;
try {
sha256 = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
throw new AssertionError(e);
}
for (final BatchIdentityCheckRequest.Element element : request.elements()) {
checkFingerprintAndAdd(element, responseElements, sha256);
}
});
}
CompletableFuture.allOf(futures).join();
}
return response;
});
return Tuple.of(futures, responseElements);
}).thenComposeAsync(tuple2 -> CompletableFuture.allOf(tuple2._1).thenApply((ignored) -> new BatchIdentityCheckResponse(tuple2._2)));
}
private void checkFingerprintAndAdd(BatchIdentityCheckRequest.Element element,