diff --git a/service/pom.xml b/service/pom.xml index 7088e27d2..f5665921e 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -373,6 +373,10 @@ reactor-core 3.3.16.RELEASE + + io.vavr + vavr + org.junit.jupiter diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ProfileController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ProfileController.java index 383af6fbc..40dc86075 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ProfileController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ProfileController.java @@ -12,6 +12,7 @@ import io.dropwizard.auth.Auth; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; +import io.vavr.Tuple; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; @@ -330,45 +331,34 @@ public class ProfileController { @Path("/identity_check/batch") public CompletableFuture runBatchIdentityCheck(@NotNull @Valid BatchIdentityCheckRequest request) { return CompletableFuture.supplyAsync(() -> { - List responseElements = Collections.synchronizedList(new ArrayList<>()); - BatchIdentityCheckResponse response = new BatchIdentityCheckResponse(responseElements); + List responseElements = Collections.synchronizedList(new ArrayList<>()); - // for small requests only run one batch - if (request.elements().size() <= 30) { - MessageDigest sha256; - try { - sha256 = MessageDigest.getInstance("SHA-256"); - } catch (NoSuchAlgorithmException e) { - throw new AssertionError(e); - } - for (final BatchIdentityCheckRequest.Element element : request.elements()) { - checkFingerprintAndAdd(element, responseElements, sha256); - } - } else { - final int batchCount = 10; - final int batchSize = request.elements().size() / batchCount; - @SuppressWarnings("rawtypes") CompletableFuture[] futures = new CompletableFuture[batchCount]; + final int targetBatchCount = 10; + // clamp the amount per batch to be in the closed range [30, 100] + final int batchSize = Math.min(Math.max(request.elements().size() / targetBatchCount, 30), 100); + // add 1 extra batch if there is any remainder to consume the final non-full batch + final int batchCount = + request.elements().size() / batchSize + (request.elements().size() % batchSize != 0 ? 1 : 0); - for (int i = 0; i < batchCount; i++) { - List batch = request.elements() - .subList(i * batchSize, Math.min((i + 1) * batchSize, request.elements().size())); - futures[i] = CompletableFuture.runAsync(() -> { - MessageDigest sha256; - try { - sha256 = MessageDigest.getInstance("SHA-256"); - } catch (NoSuchAlgorithmException e) { - throw new AssertionError(e); - } - for (final BatchIdentityCheckRequest.Element element : batch) { - checkFingerprintAndAdd(element, responseElements, sha256); - } - }); - } + @SuppressWarnings("rawtypes") CompletableFuture[] futures = new CompletableFuture[batchCount]; + for (int i = 0; i < batchCount; ++i) { + List batch = request.elements() + .subList(i * batchSize, Math.min((i + 1) * batchSize, request.elements().size())); + futures[i] = CompletableFuture.runAsync(() -> { + MessageDigest sha256; + try { + sha256 = MessageDigest.getInstance("SHA-256"); + } catch (NoSuchAlgorithmException e) { + throw new AssertionError(e); + } + for (final BatchIdentityCheckRequest.Element element : request.elements()) { + checkFingerprintAndAdd(element, responseElements, sha256); + } + }); + } - CompletableFuture.allOf(futures).join(); - } - return response; - }); + return Tuple.of(futures, responseElements); + }).thenComposeAsync(tuple2 -> CompletableFuture.allOf(tuple2._1).thenApply((ignored) -> new BatchIdentityCheckResponse(tuple2._2))); } private void checkFingerprintAndAdd(BatchIdentityCheckRequest.Element element,