Add test for ManagedPeriodicWork; fix shutdown not awaiting active execution

This commit is contained in:
Chris Eager 2021-07-01 12:22:23 -05:00 committed by Chris Eager
parent 9558944e22
commit 43be72d076
2 changed files with 190 additions and 14 deletions

View File

@ -10,10 +10,11 @@ import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,8 +32,12 @@ public abstract class ManagedPeriodicWork implements Managed {
private final String workerId;
private final ScheduledExecutorService executorService;
private Duration sleepDurationAfterUnexpectedException = Duration.ofSeconds(10);
@Nullable
private ScheduledFuture<?> scheduledFuture;
private AtomicReference<CompletableFuture<Void>> activeExecutionFuture = new AtomicReference<>(CompletableFuture.completedFuture(null));
public ManagedPeriodicWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval, final ScheduledExecutorService scheduledExecutorService) {
this.lock = lock;
@ -58,7 +63,7 @@ public abstract class ManagedPeriodicWork implements Managed {
logger.warn("Error in execution", e);
// wait a bit, in case the error is caused by external instability
Util.sleep(10_000);
Util.sleep(sleepDurationAfterUnexpectedException.toMillis());
}
}, 0, runInterval.getSeconds(), TimeUnit.SECONDS);
@ -69,27 +74,28 @@ public abstract class ManagedPeriodicWork implements Managed {
public synchronized void stop() throws Exception {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
boolean terminated = false;
while (!terminated) {
try {
scheduledFuture.get(5, TimeUnit.MINUTES);
terminated = true;
} catch (final TimeoutException e) {
logger.warn("worker not yet terminated");
} catch (final Exception e) {
logger.warn("worker terminated exceptionally", e);
terminated = true;
}
try {
activeExecutionFuture.get().join();
} catch (final Exception e) {
logger.warn("error while awaiting final execution", e);
}
}
}
public void setSleepDurationAfterUnexpectedException(final Duration sleepDurationAfterUnexpectedException) {
this.sleepDurationAfterUnexpectedException = sleepDurationAfterUnexpectedException;
}
private void execute() {
if (lock.claimActiveWork(workerId, workerTtl)) {
try {
activeExecutionFuture.set(new CompletableFuture<>());
logger.info("Starting execution");
doPeriodicWork();
logger.info("Execution complete");
@ -98,10 +104,11 @@ public abstract class ManagedPeriodicWork implements Managed {
logger.warn("Periodic work failed", e);
// wait a bit, in case the error is caused by external instability
Util.sleep(10_000);
Util.sleep(sleepDurationAfterUnexpectedException.toMillis());
} finally {
lock.releaseActiveWork(workerId);
activeExecutionFuture.get().complete(null);
}
}
}

View File

@ -0,0 +1,169 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.util.Util;
class ManagedPeriodicWorkTest {
private ScheduledExecutorService scheduledExecutorService;
private ManagedPeriodicWorkLock lock;
private TestWork testWork;
@BeforeEach
void setup() {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
lock = mock(ManagedPeriodicWorkLock.class);
testWork = new TestWork(lock, Duration.ofMinutes(5), Duration.ofMinutes(5),
scheduledExecutorService);
}
@AfterEach
void teardown() throws Exception {
scheduledExecutorService.shutdown();
assertTrue(scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS));
}
@Test
void test() throws Exception {
when(lock.claimActiveWork(any(), any())).thenReturn(true);
testWork.start();
synchronized (testWork) {
Util.wait(testWork);
}
testWork.stop();
verify(lock, times(1)).claimActiveWork(anyString(), any(Duration.class));
verify(lock, times(1)).releaseActiveWork(anyString());
assertEquals(1, testWork.getCount());
}
@Test
void testSlowWorkShutdown() throws Exception {
when(lock.claimActiveWork(any(), any())).thenReturn(true);
testWork.setWorkSleepDuration(Duration.ofSeconds(1));
testWork.start();
synchronized (testWork) {
Util.wait(testWork);
}
long startMillis = System.currentTimeMillis();
testWork.stop();
long runMillis = System.currentTimeMillis() - startMillis;
assertTrue(runMillis > 500);
verify(lock, times(1)).claimActiveWork(anyString(), any(Duration.class));
verify(lock, times(1)).releaseActiveWork(anyString());
assertEquals(1, testWork.getCount());
}
@Test
void testWorkExceptionReleasesLock() throws Exception {
when(lock.claimActiveWork(any(), any())).thenReturn(true);
testWork = new ExceptionalTestWork(lock, Duration.ofMinutes(5), Duration.ofMinutes(5), scheduledExecutorService);
testWork.setSleepDurationAfterUnexpectedException(Duration.ZERO);
testWork.start();
synchronized (testWork) {
Util.wait(testWork);
}
testWork.stop();
verify(lock, times(1)).claimActiveWork(anyString(), any(Duration.class));
verify(lock, times(1)).releaseActiveWork(anyString());
assertEquals(0, testWork.getCount());
}
private static class TestWork extends ManagedPeriodicWork {
private final AtomicInteger workCounter = new AtomicInteger();
private Duration workSleepDuration = Duration.ZERO;
public TestWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval,
final ScheduledExecutorService scheduledExecutorService) {
super(lock, workerTtl, runInterval, scheduledExecutorService);
}
@Override
protected void doPeriodicWork() throws Exception {
notifyStarted();
if (!workSleepDuration.isZero()) {
Util.sleep(workSleepDuration.toMillis());
}
workCounter.incrementAndGet();
}
synchronized void notifyStarted() {
notifyAll();
}
int getCount() {
return workCounter.get();
}
void setWorkSleepDuration(final Duration workSleepDuration) {
this.workSleepDuration = workSleepDuration;
}
}
private static class ExceptionalTestWork extends TestWork {
public ExceptionalTestWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval,
final ScheduledExecutorService scheduledExecutorService) {
super(lock, workerTtl, runInterval, scheduledExecutorService);
}
@Override
protected void doPeriodicWork() throws Exception {
notifyStarted();
throw new RuntimeException();
}
}
}