diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java new file mode 100644 index 000000000..af74e8ef2 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java @@ -0,0 +1,104 @@ +/* + * Copyright 2013-2020 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.storage; + +import io.dropwizard.lifecycle.Managed; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.util.Util; + +public abstract class ManagedPeriodicWork implements Managed, Runnable { + + private static final Logger logger = LoggerFactory.getLogger(ManagedPeriodicWork.class); + + private final ManagedPeriodicWorkCache cache; + private final String workerId; + + private final AtomicBoolean running = new AtomicBoolean(false); + private boolean finished; + + public ManagedPeriodicWork(final ManagedPeriodicWorkCache cache) { + this.cache = cache; + this.workerId = UUID.randomUUID().toString(); + } + + abstract protected Duration getWorkerTtl(); + + abstract protected Duration getRunInterval(); + + abstract protected void doPeriodicWork() throws Exception; + + @Override + public synchronized void start() throws Exception { + running.set(true); + new Thread(this).start(); + } + + @Override + public synchronized void stop() throws Exception { + + running.set(false); + notifyAll(); + + while (!finished) { + Util.wait(this); + } + } + + @Override + public void run() { + + while(running.get()) { + try { + execute(); + sleepWhileRunning(getRunInterval()); + } catch (final Exception e) { + logger.warn("Error in crawl crawl", e); + + // wait a bit, in case the error is caused by external instability + Util.sleep(10_000); + } + } + + synchronized (this) { + finished = true; + notifyAll(); + } + } + + private void execute() { + + if (cache.claimActiveWork(workerId, getWorkerTtl())) { + + try { + final long startTimeMs = System.currentTimeMillis(); + + doPeriodicWork(); + + final long endTimeMs = System.currentTimeMillis(); + final Duration sleepInterval = getRunInterval().minusMillis(endTimeMs - startTimeMs); + if (sleepInterval.getSeconds() > 0) { + sleepWhileRunning(sleepInterval); + } + + } catch (final Exception e) { + logger.warn("Failed to process chunk", e); + + // wait a full interval for recovery + sleepWhileRunning(getRunInterval()); + + } finally { + cache.releaseActiveWork(workerId); + } + } + } + + private synchronized void sleepWhileRunning(Duration delay) { + if (running.get()) Util.wait(this, delay.toMillis()); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkCache.java new file mode 100644 index 000000000..c1fd10ed3 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkCache.java @@ -0,0 +1,35 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.storage; + +import io.lettuce.core.ScriptOutputType; +import io.lettuce.core.SetArgs; +import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; +import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; +import java.io.IOException; +import java.time.Duration; +import java.util.List; + +public class ManagedPeriodicWorkCache { + + private final String activeWorkerKey; + + private final FaultTolerantRedisCluster cacheCluster; + private final ClusterLuaScript unlockClusterScript; + + public ManagedPeriodicWorkCache(final String activeWorkerKey, final FaultTolerantRedisCluster cacheCluster) throws IOException { + this.activeWorkerKey = activeWorkerKey; + this.cacheCluster = cacheCluster; + this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/periodic_worker/unlock.lua", ScriptOutputType.INTEGER); + } + + public boolean claimActiveWork(String workerId, Duration ttl) { + return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(activeWorkerKey, workerId, SetArgs.Builder.nx().px(ttl.toMillis())))); + } + + public void releaseActiveWork(String workerId) { + unlockClusterScript.execute(List.of(activeWorkerKey), List.of(workerId)); + } +} diff --git a/service/src/main/resources/lua/periodic_worker/unlock.lua b/service/src/main/resources/lua/periodic_worker/unlock.lua new file mode 100644 index 000000000..b95d15d66 --- /dev/null +++ b/service/src/main/resources/lua/periodic_worker/unlock.lua @@ -0,0 +1,8 @@ +-- keys: lock_key +-- argv: lock_value + +if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) +else + return 0 +end