Add a general job scheduler

This commit is contained in:
Jon Chambers 2024-07-18 13:22:31 -04:00 committed by GitHub
parent 5147d9cb6d
commit 54fb0a6acb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 517 additions and 0 deletions

View File

@ -126,6 +126,9 @@ dynamoDbTables:
tableName: Example_RemoteConfig
reportMessage:
tableName: Example_ReportMessage
scheduledJobs:
tableName: Example_ScheduledJobs
expiration: P7D
subscriptions:
tableName: Example_Subscriptions
clientPublicKeys:

View File

@ -67,6 +67,7 @@ public class DynamoDbTables {
private final TableWithExpiration registrationRecovery;
private final Table remoteConfig;
private final Table reportMessage;
private final TableWithExpiration scheduledJobs;
private final Table subscriptions;
private final Table verificationSessions;
@ -91,6 +92,7 @@ public class DynamoDbTables {
@JsonProperty("registrationRecovery") final TableWithExpiration registrationRecovery,
@JsonProperty("remoteConfig") final Table remoteConfig,
@JsonProperty("reportMessage") final Table reportMessage,
@JsonProperty("scheduledJobs") final TableWithExpiration scheduledJobs,
@JsonProperty("subscriptions") final Table subscriptions,
@JsonProperty("verificationSessions") final Table verificationSessions) {
@ -114,6 +116,7 @@ public class DynamoDbTables {
this.registrationRecovery = registrationRecovery;
this.remoteConfig = remoteConfig;
this.reportMessage = reportMessage;
this.scheduledJobs = scheduledJobs;
this.subscriptions = subscriptions;
this.verificationSessions = verificationSessions;
}
@ -238,6 +241,12 @@ public class DynamoDbTables {
return reportMessage;
}
@NotNull
@Valid
public TableWithExpiration getScheduledJobs() {
return scheduledJobs;
}
@NotNull
@Valid
public Table getSubscriptions() {

View File

@ -0,0 +1,222 @@
package org.whispersystems.textsecuregcm.scheduler;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
/**
* A job scheduler maintains a delay queue of tasks to be run at some time in the future. Callers schedule jobs with
* the {@link #scheduleJob(Instant, byte[])} method, and concrete subclasses actually execute jobs by implementing the
* {@link #processJob(byte[])} method. Some entity must call {@link #processAvailableJobs()} to actually find and
* process jobs that are ready for execution.
*/
public abstract class JobScheduler {
private final DynamoDbAsyncClient dynamoDbAsyncClient;
private final String tableName;
private final Duration jobExpiration;
private final Clock clock;
private final Logger logger = LoggerFactory.getLogger(getClass());
// The name of this scheduler (DynamoDB string)
@VisibleForTesting
public static final String KEY_SCHEDULER_NAME = "S";
// The timestamp (and additional random data; please see #buildRunAtAttribute for details) for the job
// (DynamoDB byte array)
@VisibleForTesting
public static final String ATTR_RUN_AT = "T";
// Additional application-specific data for the job (DynamoDB byte array)
private static final String ATTR_JOB_DATA = "D";
// The time at which this job should be garbage-collected if not already deleted (DynamoDB number;
// seconds from the epoch)
private static final String ATTR_TTL = "E";
private static final String SCHEDULE_JOB_COUNTER_NAME = MetricsUtil.name(JobScheduler.class, "scheduleJob");
private static final String PROCESS_JOB_COUNTER_NAME = MetricsUtil.name(JobScheduler.class, "processJob");
private static final String SCHEDULER_NAME_TAG = "schedulerName";
private static final String OUTCOME_TAG = "outcome";
private static final int MAX_CONCURRENCY = 16;
protected JobScheduler(final DynamoDbAsyncClient dynamoDbAsyncClient,
final String tableName,
final Duration jobExpiration,
final Clock clock) {
this.dynamoDbAsyncClient = dynamoDbAsyncClient;
this.tableName = tableName;
this.jobExpiration = jobExpiration;
this.clock = clock;
}
/**
* Returns the unique name of this scheduler. Scheduler names are used to "namespace" jobs.
*
* @return the unique name of this scheduler
*/
public abstract String getSchedulerName();
/**
* Processes a previously-scheduled job.
*
* @param jobData opaque, application-specific data provided at the time the job was scheduled
*
* @return A future that yields a brief, human-readable status code when the job has been fully processed. On
* successful completion, the job will be deleted. The job will not be deleted if the future completes exceptionally.
*/
protected abstract CompletableFuture<String> processJob(@Nullable byte[] jobData);
/**
* Schedules a job to run at or after the given {@code runAt} time. Concrete implementations must override this method
* to expose it publicly, or provide some more application-appropriate method for public callers.
*
* @param runAt the time at or after which to run the job
* @param jobData application-specific data describing the job; may be {@code null}
*
* @return a future that completes when the job has been scheduled
*/
protected CompletableFuture<Void> scheduleJob(final Instant runAt, @Nullable final byte[] jobData) {
return Mono.fromFuture(() -> scheduleJob(buildRunAtAttribute(runAt), runAt.plus(jobExpiration), jobData))
.retryWhen(Retry.backoff(8, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(4)))
.toFuture()
.thenRun(() -> Metrics.counter(SCHEDULE_JOB_COUNTER_NAME, SCHEDULER_NAME_TAG, getSchedulerName()).increment());
}
@VisibleForTesting
CompletableFuture<Void> scheduleJob(final AttributeValue runAt, final Instant expiration, @Nullable final byte[] jobData) {
final Map<String, AttributeValue> item = new HashMap<>(Map.of(
KEY_SCHEDULER_NAME, AttributeValue.fromS(getSchedulerName()),
ATTR_RUN_AT, runAt,
ATTR_TTL, AttributeValue.fromN(String.valueOf(expiration.getEpochSecond()))));
if (jobData != null) {
item.put(ATTR_JOB_DATA, AttributeValue.fromB(SdkBytes.fromByteArray(jobData)));
}
return dynamoDbAsyncClient.putItem(PutItemRequest.builder()
.tableName(tableName)
.item(item)
.conditionExpression("attribute_not_exists(#schedulerName)")
.expressionAttributeNames(Map.of("#schedulerName", KEY_SCHEDULER_NAME))
.build())
.thenRun(Util.NOOP);
}
/**
* Finds and processes all jobs whose {@code runAt} time is less than or equal to the current time. Scheduled jobs
* will be deleted once they have been processed successfully.
*
* @return a future that completes when all available jobs have been processed
*
* @see #processJob(byte[])
*/
public CompletableFuture<Void> processAvailableJobs() {
return Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#schedulerName = :schedulerName AND #runAt <= :maxRunAt")
.expressionAttributeNames(Map.of(
"#schedulerName", KEY_SCHEDULER_NAME,
"#runAt", ATTR_RUN_AT))
.expressionAttributeValues(Map.of(
":schedulerName", AttributeValue.fromS(getSchedulerName()),
":maxRunAt", buildMaxRunAtAttribute(clock.instant())))
.build())
.items())
.flatMap(item -> {
final byte[] jobData = item.containsKey(ATTR_JOB_DATA)
? item.get(ATTR_JOB_DATA).b().asByteArray()
: null;
return Mono.fromFuture(processJob(jobData))
.doOnNext(outcome -> Metrics.counter(PROCESS_JOB_COUNTER_NAME,
SCHEDULER_NAME_TAG, getSchedulerName(),
OUTCOME_TAG, outcome)
.increment())
.then(Mono.fromFuture(() -> deleteJob(item.get(KEY_SCHEDULER_NAME), item.get(ATTR_RUN_AT))))
.onErrorResume(throwable -> {
logger.warn("Failed to process job", throwable);
return Mono.empty();
});
}, MAX_CONCURRENCY)
.then()
.toFuture();
}
private CompletableFuture<Void> deleteJob(final AttributeValue schedulerName, final AttributeValue runAt) {
return dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(
KEY_SCHEDULER_NAME, schedulerName,
ATTR_RUN_AT, runAt))
.build())
.thenRun(Util.NOOP);
}
/**
* Constructs an attribute value that contains a sort key that will be greater than any sort key generated for an
* earlier {@code runAt} time and less than a sort key generated for a later {@code runAt} time. The returned value
* begins with the 8-byte, big-endian representation of the given {@code runAt} time in milliseconds since the epoch
* and ends with a random 8-byte suffix. The random suffix ensures that multiple jobs scheduled for the same
* {@code runAt} time will have distinct primary keys; the order in which jobs scheduled at the same time will be
* executed is also random as a result.
*
* @param runAt the time for which to generate a sort key
*
* @return a probably-unique sort key for the given {@code runAt} time
*/
AttributeValue buildRunAtAttribute(final Instant runAt) {
return buildRunAtAttribute(runAt, ThreadLocalRandom.current().nextLong());
}
@VisibleForTesting
AttributeValue buildRunAtAttribute(final Instant runAt, final long salt) {
return AttributeValue.fromB(SdkBytes.fromByteBuffer(ByteBuffer.allocate(24)
.putLong(runAt.toEpochMilli())
.putLong(clock.millis())
.putLong(salt)
.flip()));
}
/**
* Constructs a sort key value that is greater than or equal to all other sort keys for jobs with the same or earlier
* {@code runAt} time.
*
* @param runAt the maximum scheduled time for jobs to match
*
* @return an attribute value for a sort key that is greater than or equal to the sort key for all other jobs
* scheduled to run at or before the given {@code runAt} time
*/
static AttributeValue buildMaxRunAtAttribute(final Instant runAt) {
return AttributeValue.fromB(SdkBytes.fromByteBuffer(ByteBuffer.allocate(24)
.putLong(runAt.toEpochMilli())
.putLong(0xfffffffffffffffL)
.putLong(0xfffffffffffffffL)
.flip()));
}
}

View File

@ -0,0 +1,9 @@
package org.whispersystems.textsecuregcm.workers;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.scheduler.JobScheduler;
public interface JobSchedulerFactory {
JobScheduler buildJobScheduler(CommandDependencies commandDependencies, WhisperServerConfiguration configuration);
}

View File

@ -0,0 +1,133 @@
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.core.Application;
import io.dropwizard.core.cli.ServerCommand;
import io.dropwizard.core.server.DefaultServerFactory;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.jetty.HttpsConnectorFactory;
import io.dropwizard.lifecycle.Managed;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.scheduler.JobScheduler;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class ProcessScheduledJobsServiceCommand extends ServerCommand<WhisperServerConfiguration> {
private final String name;
private final JobSchedulerFactory jobSchedulerFactory;
private static final String FIXED_DELAY_SECONDS_ARGUMENT = "fixedDelay";
private static final int DEFAULT_FIXED_DELAY_SECONDS = 60;
private static final Logger log = LoggerFactory.getLogger(ProcessScheduledJobsServiceCommand.class);
private static class ScheduledJobProcessor implements Managed {
private final JobScheduler jobScheduler;
private final ScheduledExecutorService scheduledExecutorService;
private final int fixedDelaySeconds;
private ScheduledFuture<?> processJobsFuture;
private ScheduledJobProcessor(final JobScheduler jobScheduler,
final ScheduledExecutorService scheduledExecutorService,
final int fixedDelaySeconds) {
this.jobScheduler = jobScheduler;
this.scheduledExecutorService = scheduledExecutorService;
this.fixedDelaySeconds = fixedDelaySeconds;
}
@Override
public void start() {
processJobsFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
jobScheduler.processAvailableJobs().join();
} catch (final Exception e) {
log.warn("Failed to process available jobs for scheduler: {}", jobScheduler.getSchedulerName(), e);
}
}, 0, fixedDelaySeconds, TimeUnit.SECONDS);
}
@Override
public void stop() {
if (processJobsFuture != null) {
processJobsFuture.cancel(false);
}
processJobsFuture = null;
}
}
public ProcessScheduledJobsServiceCommand(final String name,
final String description,
final JobSchedulerFactory jobSchedulerFactory) {
super(new Application<>() {
@Override
public void run(WhisperServerConfiguration configuration, Environment environment) {
}
}, name,
description);
this.name = name;
this.jobSchedulerFactory = jobSchedulerFactory;
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--fixed-delay")
.type(Integer.class)
.dest(FIXED_DELAY_SECONDS_ARGUMENT)
.setDefault(DEFAULT_FIXED_DELAY_SECONDS)
.help("The delay, in seconds, between queries for jobs to process");
}
@Override
protected void run(final Environment environment,
final Namespace namespace,
final WhisperServerConfiguration configuration)
throws Exception {
UncaughtExceptionHandler.register();
final CommandDependencies commandDependencies = CommandDependencies.build(name, environment, configuration);
final int fixedDelaySeconds = namespace.getInt(FIXED_DELAY_SECONDS_ARGUMENT);
MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager());
// Even though we're not actually serving traffic, `ServerCommand` subclasses need a valid server configuration, and
// that means they need to be able to decrypt the TLS keystore.
if (configuration.getServerFactory() instanceof DefaultServerFactory defaultServerFactory) {
defaultServerFactory.getApplicationConnectors()
.forEach(connectorFactory -> {
if (connectorFactory instanceof HttpsConnectorFactory h) {
h.setKeyStorePassword(configuration.getTlsKeyStoreConfiguration().password().value());
}
});
}
final ScheduledExecutorService scheduledExecutorService =
environment.lifecycle().scheduledExecutorService("scheduled-job-processor-%d", false)
.build();
final JobScheduler jobScheduler = jobSchedulerFactory.buildJobScheduler(commandDependencies, configuration);
environment.lifecycle().manage(new ScheduledJobProcessor(jobScheduler, scheduledExecutorService, fixedDelaySeconds));
MetricsUtil.registerSystemResourceMetrics(environment);
super.run(environment, namespace, configuration);
}
}

View File

@ -0,0 +1,122 @@
package org.whispersystems.textsecuregcm.scheduler;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtension;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema;
import org.whispersystems.textsecuregcm.util.TestClock;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import javax.annotation.Nullable;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.*;
class JobSchedulerTest {
private static final Instant CURRENT_TIME = Instant.now();
@RegisterExtension
static final DynamoDbExtension DYNAMO_DB_EXTENSION =
new DynamoDbExtension(DynamoDbExtensionSchema.Tables.SCHEDULED_JOBS);
private static class TestJobScheduler extends JobScheduler {
private final AtomicInteger jobsProcessed = new AtomicInteger(0);
protected TestJobScheduler(final DynamoDbAsyncClient dynamoDbAsyncClient,
final String tableName,
final Clock clock) {
super(dynamoDbAsyncClient, tableName, Duration.ofDays(7), clock);
}
@Override
public String getSchedulerName() {
return "test";
}
@Override
protected CompletableFuture<String> processJob(@Nullable final byte[] jobData) {
jobsProcessed.incrementAndGet();
return CompletableFuture.completedFuture("test");
}
}
@Test
void scheduleJob() {
final TestJobScheduler scheduler = new TestJobScheduler(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(),
DynamoDbExtensionSchema.Tables.SCHEDULED_JOBS.tableName(),
Clock.fixed(CURRENT_TIME, ZoneId.systemDefault()));
assertDoesNotThrow(() ->
scheduler.scheduleJob(scheduler.buildRunAtAttribute(CURRENT_TIME, 0L), CURRENT_TIME, null).join());
final CompletionException completionException = assertThrows(CompletionException.class, () ->
scheduler.scheduleJob(scheduler.buildRunAtAttribute(CURRENT_TIME, 0L), CURRENT_TIME, null).join(),
"Scheduling multiple jobs with identical sort keys should fail");
assertInstanceOf(ConditionalCheckFailedException.class, completionException.getCause());
}
@Test
void processAvailableJobs() {
final TestClock testClock = TestClock.pinned(CURRENT_TIME);
final TestJobScheduler scheduler = new TestJobScheduler(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(),
DynamoDbExtensionSchema.Tables.SCHEDULED_JOBS.tableName(),
testClock);
scheduler.scheduleJob(scheduler.buildRunAtAttribute(CURRENT_TIME, 0L), CURRENT_TIME, null).join();
// Clock time is before scheduled job time
testClock.pin(CURRENT_TIME.minusMillis(1));
scheduler.processAvailableJobs().join();
assertEquals(0, scheduler.jobsProcessed.get());
// Clock time is after scheduled job time
testClock.pin(CURRENT_TIME.plusMillis(1));
scheduler.processAvailableJobs().join();
assertEquals(1, scheduler.jobsProcessed.get());
scheduler.processAvailableJobs().join();
assertEquals(1, scheduler.jobsProcessed.get(),
"Jobs should be cleared after successful processing; job counter should not increment on second run");
}
@Test
void processAvailableJobsWithError() {
final AtomicInteger jobsEncountered = new AtomicInteger(0);
final TestJobScheduler scheduler = new TestJobScheduler(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(),
DynamoDbExtensionSchema.Tables.SCHEDULED_JOBS.tableName(),
Clock.fixed(CURRENT_TIME, ZoneId.systemDefault())) {
@Override
protected CompletableFuture<String> processJob(@Nullable final byte[] jobData) {
jobsEncountered.incrementAndGet();
return CompletableFuture.failedFuture(new RuntimeException("OH NO"));
}
};
scheduler.scheduleJob(scheduler.buildRunAtAttribute(CURRENT_TIME, 0L), CURRENT_TIME, null).join();
scheduler.processAvailableJobs().join();
assertEquals(1, jobsEncountered.get());
scheduler.processAvailableJobs().join();
assertEquals(2, jobsEncountered.get(),
"Jobs should not be cleared after failed processing; encountered job counter should increment on second run");
}
}

View File

@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.storage;
import java.util.Collections;
import java.util.List;
import org.whispersystems.textsecuregcm.backup.BackupsDb;
import org.whispersystems.textsecuregcm.scheduler.JobScheduler;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
@ -298,6 +299,21 @@ public final class DynamoDbExtensionSchema {
.build()),
List.of(), List.of()),
SCHEDULED_JOBS("scheduled_jobs_test",
JobScheduler.KEY_SCHEDULER_NAME,
JobScheduler.ATTR_RUN_AT,
List.of(AttributeDefinition.builder()
.attributeName(JobScheduler.KEY_SCHEDULER_NAME)
.attributeType(ScalarAttributeType.S)
.build(),
AttributeDefinition.builder()
.attributeName(JobScheduler.ATTR_RUN_AT)
.attributeType(ScalarAttributeType.B)
.build()),
List.of(),
List.of()),
SUBSCRIPTIONS("subscriptions_test",
SubscriptionManager.KEY_USER,
null,

View File

@ -120,6 +120,9 @@ dynamoDbTables:
tableName: remote_config_test
reportMessage:
tableName: report_messages_test
scheduledJobs:
tableName: scheduled_jobs_test
expiration: P7D
subscriptions:
tableName: subscriptions_test
clientPublicKeys: