Remove obsolete `ManagedPeriodicWork`
This commit is contained in:
parent
d10a132b0c
commit
16012e6ffe
|
@ -1,118 +0,0 @@
|
|||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.time.Duration;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.annotation.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
public abstract class ManagedPeriodicWork implements Managed {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
private static final String FUTURE_DONE_GAUGE_NAME = "futureDone";
|
||||
|
||||
private final ManagedPeriodicWorkLock lock;
|
||||
private final Duration workerTtl;
|
||||
private final Duration runInterval;
|
||||
private final String workerId;
|
||||
private final ScheduledExecutorService executorService;
|
||||
|
||||
private Duration sleepDurationAfterUnexpectedException = Duration.ofSeconds(10);
|
||||
|
||||
|
||||
@Nullable
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private AtomicReference<CompletableFuture<Void>> activeExecutionFuture = new AtomicReference<>(CompletableFuture.completedFuture(null));
|
||||
|
||||
public ManagedPeriodicWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval, final ScheduledExecutorService scheduledExecutorService) {
|
||||
this.lock = lock;
|
||||
this.workerTtl = workerTtl;
|
||||
this.runInterval = runInterval;
|
||||
this.workerId = UUID.randomUUID().toString();
|
||||
this.executorService = scheduledExecutorService;
|
||||
}
|
||||
|
||||
abstract protected void doPeriodicWork() throws Exception;
|
||||
|
||||
@Override
|
||||
public synchronized void start() throws Exception {
|
||||
|
||||
if (scheduledFuture != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
scheduledFuture = executorService.scheduleAtFixedRate(() -> {
|
||||
try {
|
||||
execute();
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Error in execution", e);
|
||||
|
||||
// wait a bit, in case the error is caused by external instability
|
||||
Util.sleep(sleepDurationAfterUnexpectedException.toMillis());
|
||||
}
|
||||
}, 0, runInterval.getSeconds(), TimeUnit.SECONDS);
|
||||
|
||||
Metrics.gauge(name(getClass(), FUTURE_DONE_GAUGE_NAME), scheduledFuture, future -> future.isDone() ? 1 : 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() throws Exception {
|
||||
|
||||
if (scheduledFuture != null) {
|
||||
|
||||
scheduledFuture.cancel(false);
|
||||
|
||||
try {
|
||||
activeExecutionFuture.get().join();
|
||||
} catch (final Exception e) {
|
||||
logger.warn("error while awaiting final execution", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void setSleepDurationAfterUnexpectedException(final Duration sleepDurationAfterUnexpectedException) {
|
||||
this.sleepDurationAfterUnexpectedException = sleepDurationAfterUnexpectedException;
|
||||
}
|
||||
|
||||
private void execute() {
|
||||
|
||||
if (lock.claimActiveWork(workerId, workerTtl)) {
|
||||
try {
|
||||
|
||||
activeExecutionFuture.set(new CompletableFuture<>());
|
||||
|
||||
logger.info("Starting execution");
|
||||
doPeriodicWork();
|
||||
logger.info("Execution complete");
|
||||
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Periodic work failed", e);
|
||||
|
||||
// wait a bit, in case the error is caused by external instability
|
||||
Util.sleep(sleepDurationAfterUnexpectedException.toMillis());
|
||||
|
||||
} finally {
|
||||
try {
|
||||
lock.releaseActiveWork(workerId);
|
||||
} finally {
|
||||
activeExecutionFuture.get().complete(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,35 +0,0 @@
|
|||
/*
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import io.lettuce.core.ScriptOutputType;
|
||||
import io.lettuce.core.SetArgs;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
public class ManagedPeriodicWorkLock {
|
||||
|
||||
private final String activeWorkerKey;
|
||||
|
||||
private final FaultTolerantRedisCluster cacheCluster;
|
||||
private final ClusterLuaScript unlockClusterScript;
|
||||
|
||||
public ManagedPeriodicWorkLock(final String activeWorkerKey, final FaultTolerantRedisCluster cacheCluster) throws IOException {
|
||||
this.activeWorkerKey = activeWorkerKey;
|
||||
this.cacheCluster = cacheCluster;
|
||||
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/periodic_worker/unlock.lua", ScriptOutputType.INTEGER);
|
||||
}
|
||||
|
||||
public boolean claimActiveWork(String workerId, Duration ttl) {
|
||||
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(activeWorkerKey, workerId, SetArgs.Builder.nx().px(ttl.toMillis()))));
|
||||
}
|
||||
|
||||
public void releaseActiveWork(String workerId) {
|
||||
unlockClusterScript.execute(List.of(activeWorkerKey), List.of(workerId));
|
||||
}
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
-- keys: lock_key
|
||||
-- argv: lock_value
|
||||
|
||||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("DEL", KEYS[1])
|
||||
else
|
||||
return 0
|
||||
end
|
|
@ -1,169 +0,0 @@
|
|||
/*
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
class ManagedPeriodicWorkTest {
|
||||
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private ManagedPeriodicWorkLock lock;
|
||||
private TestWork testWork;
|
||||
|
||||
@BeforeEach
|
||||
void setup() {
|
||||
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
lock = mock(ManagedPeriodicWorkLock.class);
|
||||
|
||||
testWork = new TestWork(lock, Duration.ofMinutes(5), Duration.ofMinutes(5),
|
||||
scheduledExecutorService);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void teardown() throws Exception {
|
||||
scheduledExecutorService.shutdown();
|
||||
|
||||
assertTrue(scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
void test() throws Exception {
|
||||
when(lock.claimActiveWork(any(), any())).thenReturn(true);
|
||||
|
||||
testWork.start();
|
||||
|
||||
synchronized (testWork) {
|
||||
Util.wait(testWork);
|
||||
}
|
||||
|
||||
testWork.stop();
|
||||
|
||||
verify(lock, atLeastOnce()).claimActiveWork(anyString(), any(Duration.class));
|
||||
verify(lock, atLeastOnce()).releaseActiveWork(anyString());
|
||||
|
||||
assertTrue(1 <= testWork.getCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSlowWorkShutdown() throws Exception {
|
||||
|
||||
when(lock.claimActiveWork(any(), any())).thenReturn(true);
|
||||
|
||||
testWork.setWorkSleepDuration(Duration.ofSeconds(1));
|
||||
|
||||
testWork.start();
|
||||
|
||||
synchronized (testWork) {
|
||||
Util.wait(testWork);
|
||||
}
|
||||
|
||||
long startMillis = System.currentTimeMillis();
|
||||
|
||||
testWork.stop();
|
||||
|
||||
long runMillis = System.currentTimeMillis() - startMillis;
|
||||
|
||||
assertTrue(runMillis > 500);
|
||||
|
||||
verify(lock, atLeastOnce()).claimActiveWork(anyString(), any(Duration.class));
|
||||
verify(lock, atLeastOnce()).releaseActiveWork(anyString());
|
||||
|
||||
assertTrue(1 <= testWork.getCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testWorkExceptionReleasesLock() throws Exception {
|
||||
when(lock.claimActiveWork(any(), any())).thenReturn(true);
|
||||
|
||||
testWork = new ExceptionalTestWork(lock, Duration.ofMinutes(5), Duration.ofMinutes(5), scheduledExecutorService);
|
||||
|
||||
testWork.setSleepDurationAfterUnexpectedException(Duration.ZERO);
|
||||
|
||||
testWork.start();
|
||||
|
||||
synchronized (testWork) {
|
||||
Util.wait(testWork);
|
||||
}
|
||||
|
||||
testWork.stop();
|
||||
|
||||
verify(lock, atLeastOnce()).claimActiveWork(anyString(), any(Duration.class));
|
||||
verify(lock, atLeastOnce()).releaseActiveWork(anyString());
|
||||
|
||||
assertEquals(0, testWork.getCount());
|
||||
}
|
||||
|
||||
|
||||
private static class TestWork extends ManagedPeriodicWork {
|
||||
|
||||
private final AtomicInteger workCounter = new AtomicInteger();
|
||||
private Duration workSleepDuration = Duration.ZERO;
|
||||
|
||||
public TestWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval,
|
||||
final ScheduledExecutorService scheduledExecutorService) {
|
||||
super(lock, workerTtl, runInterval, scheduledExecutorService);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPeriodicWork() throws Exception {
|
||||
|
||||
notifyStarted();
|
||||
|
||||
if (!workSleepDuration.isZero()) {
|
||||
Util.sleep(workSleepDuration.toMillis());
|
||||
}
|
||||
|
||||
workCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
synchronized void notifyStarted() {
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
int getCount() {
|
||||
return workCounter.get();
|
||||
}
|
||||
|
||||
void setWorkSleepDuration(final Duration workSleepDuration) {
|
||||
this.workSleepDuration = workSleepDuration;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ExceptionalTestWork extends TestWork {
|
||||
|
||||
|
||||
public ExceptionalTestWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval,
|
||||
final ScheduledExecutorService scheduledExecutorService) {
|
||||
super(lock, workerTtl, runInterval, scheduledExecutorService);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPeriodicWork() throws Exception {
|
||||
|
||||
notifyStarted();
|
||||
|
||||
throw new RuntimeException();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue