Add abstract ManagedPeriodicWork

This commit is contained in:
Chris Eager 2021-06-04 14:00:14 -05:00 committed by Chris Eager
parent 5193abdab3
commit 88db808298
3 changed files with 147 additions and 0 deletions

View File

@ -0,0 +1,104 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import io.dropwizard.lifecycle.Managed;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.Util;
public abstract class ManagedPeriodicWork implements Managed, Runnable {
private static final Logger logger = LoggerFactory.getLogger(ManagedPeriodicWork.class);
private final ManagedPeriodicWorkCache cache;
private final String workerId;
private final AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
public ManagedPeriodicWork(final ManagedPeriodicWorkCache cache) {
this.cache = cache;
this.workerId = UUID.randomUUID().toString();
}
abstract protected Duration getWorkerTtl();
abstract protected Duration getRunInterval();
abstract protected void doPeriodicWork() throws Exception;
@Override
public synchronized void start() throws Exception {
running.set(true);
new Thread(this).start();
}
@Override
public synchronized void stop() throws Exception {
running.set(false);
notifyAll();
while (!finished) {
Util.wait(this);
}
}
@Override
public void run() {
while(running.get()) {
try {
execute();
sleepWhileRunning(getRunInterval());
} catch (final Exception e) {
logger.warn("Error in crawl crawl", e);
// wait a bit, in case the error is caused by external instability
Util.sleep(10_000);
}
}
synchronized (this) {
finished = true;
notifyAll();
}
}
private void execute() {
if (cache.claimActiveWork(workerId, getWorkerTtl())) {
try {
final long startTimeMs = System.currentTimeMillis();
doPeriodicWork();
final long endTimeMs = System.currentTimeMillis();
final Duration sleepInterval = getRunInterval().minusMillis(endTimeMs - startTimeMs);
if (sleepInterval.getSeconds() > 0) {
sleepWhileRunning(sleepInterval);
}
} catch (final Exception e) {
logger.warn("Failed to process chunk", e);
// wait a full interval for recovery
sleepWhileRunning(getRunInterval());
} finally {
cache.releaseActiveWork(workerId);
}
}
}
private synchronized void sleepWhileRunning(Duration delay) {
if (running.get()) Util.wait(this, delay.toMillis());
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
public class ManagedPeriodicWorkCache {
private final String activeWorkerKey;
private final FaultTolerantRedisCluster cacheCluster;
private final ClusterLuaScript unlockClusterScript;
public ManagedPeriodicWorkCache(final String activeWorkerKey, final FaultTolerantRedisCluster cacheCluster) throws IOException {
this.activeWorkerKey = activeWorkerKey;
this.cacheCluster = cacheCluster;
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/periodic_worker/unlock.lua", ScriptOutputType.INTEGER);
}
public boolean claimActiveWork(String workerId, Duration ttl) {
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(activeWorkerKey, workerId, SetArgs.Builder.nx().px(ttl.toMillis()))));
}
public void releaseActiveWork(String workerId) {
unlockClusterScript.execute(List.of(activeWorkerKey), List.of(workerId));
}
}

View File

@ -0,0 +1,8 @@
-- keys: lock_key
-- argv: lock_value
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end