From e20a4c1f77ab42afdc38afeab74976ea3a493131 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Tue, 24 Sep 2024 13:18:28 -0500 Subject: [PATCH] Refactor ProcessScheduledJobsServiceCommand to dispose of processing jobs on shutdown --- .../textsecuregcm/scheduler/JobScheduler.java | 5 +- .../ProcessScheduledJobsServiceCommand.java | 39 +++++- .../scheduler/JobSchedulerTest.java | 32 ++--- ...rocessScheduledJobsServiceCommandTest.java | 123 ++++++++++++++++++ 4 files changed, 176 insertions(+), 23 deletions(-) create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommandTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/scheduler/JobScheduler.java b/service/src/main/java/org/whispersystems/textsecuregcm/scheduler/JobScheduler.java index 87d0ae758..5bae1538a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/scheduler/JobScheduler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/scheduler/JobScheduler.java @@ -136,7 +136,7 @@ public abstract class JobScheduler { * * @see #processJob(byte[]) */ - public CompletableFuture processAvailableJobs() { + public Mono processAvailableJobs() { return Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder() .tableName(tableName) .keyConditionExpression("#schedulerName = :schedulerName AND #runAt <= :maxRunAt") @@ -164,8 +164,7 @@ public abstract class JobScheduler { return Mono.empty(); }); }, MAX_CONCURRENCY) - .then() - .toFuture(); + .then(); } private CompletableFuture deleteJob(final AttributeValue schedulerName, final AttributeValue runAt) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommand.java index 64448a0f7..1493bfded 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommand.java @@ -1,5 +1,6 @@ package org.whispersystems.textsecuregcm.workers; +import com.google.common.annotations.VisibleForTesting; import io.dropwizard.core.Application; import io.dropwizard.core.cli.ServerCommand; import io.dropwizard.core.server.DefaultServerFactory; @@ -7,6 +8,7 @@ import io.dropwizard.core.setup.Environment; import io.dropwizard.jetty.HttpsConnectorFactory; import io.dropwizard.lifecycle.Managed; import io.dropwizard.util.Duration; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -18,6 +20,8 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.scheduler.JobScheduler; import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; +import reactor.core.Disposable; +import reactor.core.Disposables; public class ProcessScheduledJobsServiceCommand extends ServerCommand { @@ -31,7 +35,8 @@ public class ProcessScheduledJobsServiceCommand extends ServerCommand processJobsFuture; + private Disposable processAvailableJobsDisposableReference = Disposables.disposed(); + private boolean stopped = false; - private ScheduledJobProcessor(final JobScheduler jobScheduler, + @VisibleForTesting + ScheduledJobProcessor(final JobScheduler jobScheduler, final ScheduledExecutorService scheduledExecutorService, final int fixedDelaySeconds) { @@ -52,20 +60,41 @@ public class ProcessScheduledJobsServiceCommand extends ServerCommand { + final CountDownLatch latch = new CountDownLatch(1); + + synchronized (this) { + if (stopped) { + return; + } + + processAvailableJobsDisposableReference = jobScheduler.processAvailableJobs() + // this CountDownLatch pattern is how Mono.block() is implemented + .doOnCancel(latch::countDown) + .doOnTerminate(latch::countDown) + .doOnError(e -> + log.warn("Failed to process available jobs for scheduler: {}", jobScheduler.getSchedulerName(), e)) + .subscribe(); + } + try { - jobScheduler.processAvailableJobs().join(); - } catch (final Exception e) { + latch.await(); + + } catch (final InterruptedException e) { log.warn("Failed to process available jobs for scheduler: {}", jobScheduler.getSchedulerName(), e); } }, 0, fixedDelaySeconds, TimeUnit.SECONDS); } @Override - public void stop() { + public synchronized void stop() { + stopped = true; + if (processJobsFuture != null) { processJobsFuture.cancel(false); } + processAvailableJobsDisposableReference.dispose(); + processJobsFuture = null; } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/scheduler/JobSchedulerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/scheduler/JobSchedulerTest.java index fa28c78fe..f1cd5f5b0 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/scheduler/JobSchedulerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/scheduler/JobSchedulerTest.java @@ -1,14 +1,10 @@ package org.whispersystems.textsecuregcm.scheduler; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; -import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema; -import org.whispersystems.textsecuregcm.util.TestClock; -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; -import javax.annotation.Nullable; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -16,8 +12,14 @@ import java.time.ZoneId; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.jupiter.api.Assertions.*; +import javax.annotation.Nullable; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; +import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema; +import org.whispersystems.textsecuregcm.util.TestClock; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; class JobSchedulerTest { @@ -80,16 +82,16 @@ class JobSchedulerTest { // Clock time is before scheduled job time testClock.pin(CURRENT_TIME.minusMillis(1)); - scheduler.processAvailableJobs().join(); + scheduler.processAvailableJobs().block(); assertEquals(0, scheduler.jobsProcessed.get()); // Clock time is after scheduled job time testClock.pin(CURRENT_TIME.plusMillis(1)); - scheduler.processAvailableJobs().join(); + scheduler.processAvailableJobs().block(); assertEquals(1, scheduler.jobsProcessed.get()); - scheduler.processAvailableJobs().join(); + scheduler.processAvailableJobs().block(); assertEquals(1, scheduler.jobsProcessed.get(), "Jobs should be cleared after successful processing; job counter should not increment on second run"); } @@ -112,10 +114,10 @@ class JobSchedulerTest { scheduler.scheduleJob(scheduler.buildRunAtAttribute(CURRENT_TIME, 0L), CURRENT_TIME, null).join(); - scheduler.processAvailableJobs().join(); + scheduler.processAvailableJobs().block(); assertEquals(1, jobsEncountered.get()); - scheduler.processAvailableJobs().join(); + scheduler.processAvailableJobs().block(); assertEquals(2, jobsEncountered.get(), "Jobs should not be cleared after failed processing; encountered job counter should increment on second run"); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommandTest.java new file mode 100644 index 000000000..232e387dd --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommandTest.java @@ -0,0 +1,123 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.workers; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.whispersystems.textsecuregcm.scheduler.JobScheduler; +import reactor.core.publisher.Mono; +import reactor.test.publisher.TestPublisher; + +@Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) +class ProcessScheduledJobsServiceCommandTest { + + private ScheduledExecutorService scheduledExecutorService; + + @BeforeEach + void setUp() { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + } + + @Test + void testDisposeOnStopCancels() throws Exception { + // This test publisher will never emit any values or intentionally complete + final TestPublisher testPublisher = TestPublisher.create(); + final TestJobScheduler testJobScheduler = new TestJobScheduler(testPublisher); + + final ProcessScheduledJobsServiceCommand.ScheduledJobProcessor scheduledJobProcessor = + new ProcessScheduledJobsServiceCommand.ScheduledJobProcessor(testJobScheduler, scheduledExecutorService, 60); + + scheduledJobProcessor.start(); + testJobScheduler.getStartLatch().await(); + + scheduledJobProcessor.stop(); + testJobScheduler.getEndLatch().await(); + + scheduledExecutorService.shutdown(); + assertTrue(scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS), "The submitted task should complete"); + + testPublisher.assertCancelled(); + } + + @Test + void testCompletedPublisher() throws Exception { + final TestPublisher testPublisher = TestPublisher.create(); + testPublisher.complete(); + + final TestJobScheduler testJobScheduler = new TestJobScheduler(testPublisher); + + final ProcessScheduledJobsServiceCommand.ScheduledJobProcessor scheduledJobProcessor = + new ProcessScheduledJobsServiceCommand.ScheduledJobProcessor(testJobScheduler, scheduledExecutorService, 60); + + scheduledJobProcessor.start(); + testJobScheduler.getStartLatch().await(); + + scheduledJobProcessor.stop(); + testJobScheduler.getEndLatch().await(); + + scheduledExecutorService.shutdown(); + assertTrue(scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS), "The submitted task should complete"); + + testPublisher.assertNotCancelled(); + } + + private static class TestJobScheduler extends JobScheduler { + + private final TestPublisher testPublisher; + private final CountDownLatch startLatch = new CountDownLatch(1); + private final CountDownLatch endLatch = new CountDownLatch(1); + + protected TestJobScheduler(TestPublisher testPublisher) { + super(null, null, null, null); + this.testPublisher = testPublisher; + } + + /** + * A {@link CountDownLatch} indicating whether the {@link Mono} returned by {@link #processAvailableJobs()} has been + * subscribed to. + */ + public CountDownLatch getStartLatch() { + return startLatch; + } + + /** + * A {@link CountDownLatch} indicating whether the {@link Mono} returned by {@link #processAvailableJobs()} has + * terminated or been canceled. + */ + public CountDownLatch getEndLatch() { + return endLatch; + } + + @Override + public String getSchedulerName() { + return "test"; + } + + @Override + protected CompletableFuture processJob(@Nullable byte[] jobData) { + return CompletableFuture.failedFuture(new IllegalStateException("Not implemented")); + } + + @Override + public Mono processAvailableJobs() { + return testPublisher.flux() + .then() + .doOnSubscribe(ignored -> startLatch.countDown()) + .doOnTerminate(endLatch::countDown) + .doOnCancel(endLatch::countDown); + } + } + +}