From 43be72d0766cbf67461486dc0757da13123b1217 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Thu, 1 Jul 2021 12:22:23 -0500 Subject: [PATCH] Add test for ManagedPeriodicWork; fix shutdown not awaiting active execution --- .../storage/ManagedPeriodicWork.java | 35 ++-- .../storage/ManagedPeriodicWorkTest.java | 169 ++++++++++++++++++ 2 files changed, 190 insertions(+), 14 deletions(-) create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java index bb18b4d5d..db1863b81 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java @@ -10,10 +10,11 @@ import io.dropwizard.lifecycle.Managed; import io.micrometer.core.instrument.Metrics; import java.time.Duration; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +32,12 @@ public abstract class ManagedPeriodicWork implements Managed { private final String workerId; private final ScheduledExecutorService executorService; + private Duration sleepDurationAfterUnexpectedException = Duration.ofSeconds(10); + + @Nullable private ScheduledFuture scheduledFuture; + private AtomicReference> activeExecutionFuture = new AtomicReference<>(CompletableFuture.completedFuture(null)); public ManagedPeriodicWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval, final ScheduledExecutorService scheduledExecutorService) { this.lock = lock; @@ -58,7 +63,7 @@ public abstract class ManagedPeriodicWork implements Managed { logger.warn("Error in execution", e); // wait a bit, in case the error is caused by external instability - Util.sleep(10_000); + Util.sleep(sleepDurationAfterUnexpectedException.toMillis()); } }, 0, runInterval.getSeconds(), TimeUnit.SECONDS); @@ -69,27 +74,28 @@ public abstract class ManagedPeriodicWork implements Managed { public synchronized void stop() throws Exception { if (scheduledFuture != null) { + scheduledFuture.cancel(false); - boolean terminated = false; - while (!terminated) { - try { - scheduledFuture.get(5, TimeUnit.MINUTES); - terminated = true; - } catch (final TimeoutException e) { - logger.warn("worker not yet terminated"); - } catch (final Exception e) { - logger.warn("worker terminated exceptionally", e); - terminated = true; - } + try { + activeExecutionFuture.get().join(); + } catch (final Exception e) { + logger.warn("error while awaiting final execution", e); } } } + public void setSleepDurationAfterUnexpectedException(final Duration sleepDurationAfterUnexpectedException) { + this.sleepDurationAfterUnexpectedException = sleepDurationAfterUnexpectedException; + } + private void execute() { if (lock.claimActiveWork(workerId, workerTtl)) { try { + + activeExecutionFuture.set(new CompletableFuture<>()); + logger.info("Starting execution"); doPeriodicWork(); logger.info("Execution complete"); @@ -98,10 +104,11 @@ public abstract class ManagedPeriodicWork implements Managed { logger.warn("Periodic work failed", e); // wait a bit, in case the error is caused by external instability - Util.sleep(10_000); + Util.sleep(sleepDurationAfterUnexpectedException.toMillis()); } finally { lock.releaseActiveWork(workerId); + activeExecutionFuture.get().complete(null); } } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkTest.java new file mode 100644 index 000000000..3749ee0cb --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkTest.java @@ -0,0 +1,169 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.whispersystems.textsecuregcm.util.Util; + +class ManagedPeriodicWorkTest { + + private ScheduledExecutorService scheduledExecutorService; + private ManagedPeriodicWorkLock lock; + private TestWork testWork; + + @BeforeEach + void setup() { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + lock = mock(ManagedPeriodicWorkLock.class); + + testWork = new TestWork(lock, Duration.ofMinutes(5), Duration.ofMinutes(5), + scheduledExecutorService); + } + + @AfterEach + void teardown() throws Exception { + scheduledExecutorService.shutdown(); + + assertTrue(scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)); + } + + @Test + void test() throws Exception { + when(lock.claimActiveWork(any(), any())).thenReturn(true); + + testWork.start(); + + synchronized (testWork) { + Util.wait(testWork); + } + + testWork.stop(); + + verify(lock, times(1)).claimActiveWork(anyString(), any(Duration.class)); + verify(lock, times(1)).releaseActiveWork(anyString()); + + assertEquals(1, testWork.getCount()); + } + + @Test + void testSlowWorkShutdown() throws Exception { + + when(lock.claimActiveWork(any(), any())).thenReturn(true); + + testWork.setWorkSleepDuration(Duration.ofSeconds(1)); + + testWork.start(); + + synchronized (testWork) { + Util.wait(testWork); + } + + long startMillis = System.currentTimeMillis(); + + testWork.stop(); + + long runMillis = System.currentTimeMillis() - startMillis; + + assertTrue(runMillis > 500); + + verify(lock, times(1)).claimActiveWork(anyString(), any(Duration.class)); + verify(lock, times(1)).releaseActiveWork(anyString()); + + assertEquals(1, testWork.getCount()); + } + + @Test + void testWorkExceptionReleasesLock() throws Exception { + when(lock.claimActiveWork(any(), any())).thenReturn(true); + + testWork = new ExceptionalTestWork(lock, Duration.ofMinutes(5), Duration.ofMinutes(5), scheduledExecutorService); + + testWork.setSleepDurationAfterUnexpectedException(Duration.ZERO); + + testWork.start(); + + synchronized (testWork) { + Util.wait(testWork); + } + + testWork.stop(); + + verify(lock, times(1)).claimActiveWork(anyString(), any(Duration.class)); + verify(lock, times(1)).releaseActiveWork(anyString()); + + assertEquals(0, testWork.getCount()); + } + + + private static class TestWork extends ManagedPeriodicWork { + + private final AtomicInteger workCounter = new AtomicInteger(); + private Duration workSleepDuration = Duration.ZERO; + + public TestWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval, + final ScheduledExecutorService scheduledExecutorService) { + super(lock, workerTtl, runInterval, scheduledExecutorService); + } + + @Override + protected void doPeriodicWork() throws Exception { + + notifyStarted(); + + if (!workSleepDuration.isZero()) { + Util.sleep(workSleepDuration.toMillis()); + } + + workCounter.incrementAndGet(); + } + + synchronized void notifyStarted() { + notifyAll(); + } + + int getCount() { + return workCounter.get(); + } + + void setWorkSleepDuration(final Duration workSleepDuration) { + this.workSleepDuration = workSleepDuration; + } + } + + private static class ExceptionalTestWork extends TestWork { + + + public ExceptionalTestWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval, + final ScheduledExecutorService scheduledExecutorService) { + super(lock, workerTtl, runInterval, scheduledExecutorService); + } + + @Override + protected void doPeriodicWork() throws Exception { + + notifyStarted(); + + throw new RuntimeException(); + } + } + +}