diff --git a/service/config/sample.yml b/service/config/sample.yml index f158b8750..ab500ac2d 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -126,6 +126,9 @@ dynamoDbTables: tableName: Example_RemoteConfig reportMessage: tableName: Example_ReportMessage + scheduledJobs: + tableName: Example_ScheduledJobs + expiration: P7D subscriptions: tableName: Example_Subscriptions clientPublicKeys: diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DynamoDbTables.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DynamoDbTables.java index 28b0f005a..bb0198ce2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DynamoDbTables.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/DynamoDbTables.java @@ -67,6 +67,7 @@ public class DynamoDbTables { private final TableWithExpiration registrationRecovery; private final Table remoteConfig; private final Table reportMessage; + private final TableWithExpiration scheduledJobs; private final Table subscriptions; private final Table verificationSessions; @@ -91,6 +92,7 @@ public class DynamoDbTables { @JsonProperty("registrationRecovery") final TableWithExpiration registrationRecovery, @JsonProperty("remoteConfig") final Table remoteConfig, @JsonProperty("reportMessage") final Table reportMessage, + @JsonProperty("scheduledJobs") final TableWithExpiration scheduledJobs, @JsonProperty("subscriptions") final Table subscriptions, @JsonProperty("verificationSessions") final Table verificationSessions) { @@ -114,6 +116,7 @@ public class DynamoDbTables { this.registrationRecovery = registrationRecovery; this.remoteConfig = remoteConfig; this.reportMessage = reportMessage; + this.scheduledJobs = scheduledJobs; this.subscriptions = subscriptions; this.verificationSessions = verificationSessions; } @@ -238,6 +241,12 @@ public class DynamoDbTables { return reportMessage; } + @NotNull + @Valid + public TableWithExpiration getScheduledJobs() { + return scheduledJobs; + } + @NotNull @Valid public Table getSubscriptions() { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/scheduler/JobScheduler.java b/service/src/main/java/org/whispersystems/textsecuregcm/scheduler/JobScheduler.java new file mode 100644 index 000000000..664d68a0c --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/scheduler/JobScheduler.java @@ -0,0 +1,222 @@ +package org.whispersystems.textsecuregcm.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Metrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; +import org.whispersystems.textsecuregcm.util.Util; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.QueryRequest; +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; + +/** + * A job scheduler maintains a delay queue of tasks to be run at some time in the future. Callers schedule jobs with + * the {@link #scheduleJob(Instant, byte[])} method, and concrete subclasses actually execute jobs by implementing the + * {@link #processJob(byte[])} method. Some entity must call {@link #processAvailableJobs()} to actually find and + * process jobs that are ready for execution. + */ +public abstract class JobScheduler { + + private final DynamoDbAsyncClient dynamoDbAsyncClient; + private final String tableName; + private final Duration jobExpiration; + private final Clock clock; + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + // The name of this scheduler (DynamoDB string) + @VisibleForTesting + public static final String KEY_SCHEDULER_NAME = "S"; + + // The timestamp (and additional random data; please see #buildRunAtAttribute for details) for the job + // (DynamoDB byte array) + @VisibleForTesting + public static final String ATTR_RUN_AT = "T"; + + // Additional application-specific data for the job (DynamoDB byte array) + private static final String ATTR_JOB_DATA = "D"; + + // The time at which this job should be garbage-collected if not already deleted (DynamoDB number; + // seconds from the epoch) + private static final String ATTR_TTL = "E"; + + private static final String SCHEDULE_JOB_COUNTER_NAME = MetricsUtil.name(JobScheduler.class, "scheduleJob"); + private static final String PROCESS_JOB_COUNTER_NAME = MetricsUtil.name(JobScheduler.class, "processJob"); + + private static final String SCHEDULER_NAME_TAG = "schedulerName"; + private static final String OUTCOME_TAG = "outcome"; + + private static final int MAX_CONCURRENCY = 16; + + protected JobScheduler(final DynamoDbAsyncClient dynamoDbAsyncClient, + final String tableName, + final Duration jobExpiration, + final Clock clock) { + + this.dynamoDbAsyncClient = dynamoDbAsyncClient; + this.tableName = tableName; + this.jobExpiration = jobExpiration; + this.clock = clock; + } + + /** + * Returns the unique name of this scheduler. Scheduler names are used to "namespace" jobs. + * + * @return the unique name of this scheduler + */ + public abstract String getSchedulerName(); + + /** + * Processes a previously-scheduled job. + * + * @param jobData opaque, application-specific data provided at the time the job was scheduled + * + * @return A future that yields a brief, human-readable status code when the job has been fully processed. On + * successful completion, the job will be deleted. The job will not be deleted if the future completes exceptionally. + */ + protected abstract CompletableFuture processJob(@Nullable byte[] jobData); + + /** + * Schedules a job to run at or after the given {@code runAt} time. Concrete implementations must override this method + * to expose it publicly, or provide some more application-appropriate method for public callers. + * + * @param runAt the time at or after which to run the job + * @param jobData application-specific data describing the job; may be {@code null} + * + * @return a future that completes when the job has been scheduled + */ + protected CompletableFuture scheduleJob(final Instant runAt, @Nullable final byte[] jobData) { + return Mono.fromFuture(() -> scheduleJob(buildRunAtAttribute(runAt), runAt.plus(jobExpiration), jobData)) + .retryWhen(Retry.backoff(8, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(4))) + .toFuture() + .thenRun(() -> Metrics.counter(SCHEDULE_JOB_COUNTER_NAME, SCHEDULER_NAME_TAG, getSchedulerName()).increment()); + } + + @VisibleForTesting + CompletableFuture scheduleJob(final AttributeValue runAt, final Instant expiration, @Nullable final byte[] jobData) { + final Map item = new HashMap<>(Map.of( + KEY_SCHEDULER_NAME, AttributeValue.fromS(getSchedulerName()), + ATTR_RUN_AT, runAt, + ATTR_TTL, AttributeValue.fromN(String.valueOf(expiration.getEpochSecond())))); + + if (jobData != null) { + item.put(ATTR_JOB_DATA, AttributeValue.fromB(SdkBytes.fromByteArray(jobData))); + } + + return dynamoDbAsyncClient.putItem(PutItemRequest.builder() + .tableName(tableName) + .item(item) + .conditionExpression("attribute_not_exists(#schedulerName)") + .expressionAttributeNames(Map.of("#schedulerName", KEY_SCHEDULER_NAME)) + .build()) + .thenRun(Util.NOOP); + } + + /** + * Finds and processes all jobs whose {@code runAt} time is less than or equal to the current time. Scheduled jobs + * will be deleted once they have been processed successfully. + * + * @return a future that completes when all available jobs have been processed + * + * @see #processJob(byte[]) + */ + public CompletableFuture processAvailableJobs() { + return Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder() + .tableName(tableName) + .keyConditionExpression("#schedulerName = :schedulerName AND #runAt <= :maxRunAt") + .expressionAttributeNames(Map.of( + "#schedulerName", KEY_SCHEDULER_NAME, + "#runAt", ATTR_RUN_AT)) + .expressionAttributeValues(Map.of( + ":schedulerName", AttributeValue.fromS(getSchedulerName()), + ":maxRunAt", buildMaxRunAtAttribute(clock.instant()))) + .build()) + .items()) + .flatMap(item -> { + final byte[] jobData = item.containsKey(ATTR_JOB_DATA) + ? item.get(ATTR_JOB_DATA).b().asByteArray() + : null; + + return Mono.fromFuture(processJob(jobData)) + .doOnNext(outcome -> Metrics.counter(PROCESS_JOB_COUNTER_NAME, + SCHEDULER_NAME_TAG, getSchedulerName(), + OUTCOME_TAG, outcome) + .increment()) + .then(Mono.fromFuture(() -> deleteJob(item.get(KEY_SCHEDULER_NAME), item.get(ATTR_RUN_AT)))) + .onErrorResume(throwable -> { + logger.warn("Failed to process job", throwable); + return Mono.empty(); + }); + }, MAX_CONCURRENCY) + .then() + .toFuture(); + } + + private CompletableFuture deleteJob(final AttributeValue schedulerName, final AttributeValue runAt) { + return dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder() + .tableName(tableName) + .key(Map.of( + KEY_SCHEDULER_NAME, schedulerName, + ATTR_RUN_AT, runAt)) + .build()) + .thenRun(Util.NOOP); + } + + /** + * Constructs an attribute value that contains a sort key that will be greater than any sort key generated for an + * earlier {@code runAt} time and less than a sort key generated for a later {@code runAt} time. The returned value + * begins with the 8-byte, big-endian representation of the given {@code runAt} time in milliseconds since the epoch + * and ends with a random 8-byte suffix. The random suffix ensures that multiple jobs scheduled for the same + * {@code runAt} time will have distinct primary keys; the order in which jobs scheduled at the same time will be + * executed is also random as a result. + * + * @param runAt the time for which to generate a sort key + * + * @return a probably-unique sort key for the given {@code runAt} time + */ + AttributeValue buildRunAtAttribute(final Instant runAt) { + return buildRunAtAttribute(runAt, ThreadLocalRandom.current().nextLong()); + } + + @VisibleForTesting + AttributeValue buildRunAtAttribute(final Instant runAt, final long salt) { + return AttributeValue.fromB(SdkBytes.fromByteBuffer(ByteBuffer.allocate(24) + .putLong(runAt.toEpochMilli()) + .putLong(clock.millis()) + .putLong(salt) + .flip())); + } + + /** + * Constructs a sort key value that is greater than or equal to all other sort keys for jobs with the same or earlier + * {@code runAt} time. + * + * @param runAt the maximum scheduled time for jobs to match + * + * @return an attribute value for a sort key that is greater than or equal to the sort key for all other jobs + * scheduled to run at or before the given {@code runAt} time + */ + static AttributeValue buildMaxRunAtAttribute(final Instant runAt) { + return AttributeValue.fromB(SdkBytes.fromByteBuffer(ByteBuffer.allocate(24) + .putLong(runAt.toEpochMilli()) + .putLong(0xfffffffffffffffL) + .putLong(0xfffffffffffffffL) + .flip())); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/JobSchedulerFactory.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/JobSchedulerFactory.java new file mode 100644 index 000000000..e807a8640 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/JobSchedulerFactory.java @@ -0,0 +1,9 @@ +package org.whispersystems.textsecuregcm.workers; + +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.scheduler.JobScheduler; + +public interface JobSchedulerFactory { + + JobScheduler buildJobScheduler(CommandDependencies commandDependencies, WhisperServerConfiguration configuration); +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommand.java new file mode 100644 index 000000000..8ebf93f73 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/ProcessScheduledJobsServiceCommand.java @@ -0,0 +1,133 @@ +package org.whispersystems.textsecuregcm.workers; + +import io.dropwizard.core.Application; +import io.dropwizard.core.cli.ServerCommand; +import io.dropwizard.core.server.DefaultServerFactory; +import io.dropwizard.core.setup.Environment; +import io.dropwizard.jetty.HttpsConnectorFactory; +import io.dropwizard.lifecycle.Managed; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; +import org.whispersystems.textsecuregcm.scheduler.JobScheduler; +import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class ProcessScheduledJobsServiceCommand extends ServerCommand { + + private final String name; + private final JobSchedulerFactory jobSchedulerFactory; + + private static final String FIXED_DELAY_SECONDS_ARGUMENT = "fixedDelay"; + private static final int DEFAULT_FIXED_DELAY_SECONDS = 60; + + private static final Logger log = LoggerFactory.getLogger(ProcessScheduledJobsServiceCommand.class); + + private static class ScheduledJobProcessor implements Managed { + + private final JobScheduler jobScheduler; + + private final ScheduledExecutorService scheduledExecutorService; + private final int fixedDelaySeconds; + + private ScheduledFuture processJobsFuture; + + private ScheduledJobProcessor(final JobScheduler jobScheduler, + final ScheduledExecutorService scheduledExecutorService, + final int fixedDelaySeconds) { + + this.jobScheduler = jobScheduler; + this.scheduledExecutorService = scheduledExecutorService; + this.fixedDelaySeconds = fixedDelaySeconds; + } + + @Override + public void start() { + processJobsFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> { + try { + jobScheduler.processAvailableJobs().join(); + } catch (final Exception e) { + log.warn("Failed to process available jobs for scheduler: {}", jobScheduler.getSchedulerName(), e); + } + }, 0, fixedDelaySeconds, TimeUnit.SECONDS); + } + + @Override + public void stop() { + if (processJobsFuture != null) { + processJobsFuture.cancel(false); + } + + processJobsFuture = null; + } + } + + public ProcessScheduledJobsServiceCommand(final String name, + final String description, + final JobSchedulerFactory jobSchedulerFactory) { + + super(new Application<>() { + @Override + public void run(WhisperServerConfiguration configuration, Environment environment) { + } + }, name, + description); + + this.name = name; + this.jobSchedulerFactory = jobSchedulerFactory; + } + + @Override + public void configure(final Subparser subparser) { + super.configure(subparser); + + subparser.addArgument("--fixed-delay") + .type(Integer.class) + .dest(FIXED_DELAY_SECONDS_ARGUMENT) + .setDefault(DEFAULT_FIXED_DELAY_SECONDS) + .help("The delay, in seconds, between queries for jobs to process"); + } + + @Override + protected void run(final Environment environment, + final Namespace namespace, + final WhisperServerConfiguration configuration) + throws Exception { + + UncaughtExceptionHandler.register(); + + final CommandDependencies commandDependencies = CommandDependencies.build(name, environment, configuration); + + final int fixedDelaySeconds = namespace.getInt(FIXED_DELAY_SECONDS_ARGUMENT); + + MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager()); + + // Even though we're not actually serving traffic, `ServerCommand` subclasses need a valid server configuration, and + // that means they need to be able to decrypt the TLS keystore. + if (configuration.getServerFactory() instanceof DefaultServerFactory defaultServerFactory) { + defaultServerFactory.getApplicationConnectors() + .forEach(connectorFactory -> { + if (connectorFactory instanceof HttpsConnectorFactory h) { + h.setKeyStorePassword(configuration.getTlsKeyStoreConfiguration().password().value()); + } + }); + } + + final ScheduledExecutorService scheduledExecutorService = + environment.lifecycle().scheduledExecutorService("scheduled-job-processor-%d", false) + .build(); + + final JobScheduler jobScheduler = jobSchedulerFactory.buildJobScheduler(commandDependencies, configuration); + + environment.lifecycle().manage(new ScheduledJobProcessor(jobScheduler, scheduledExecutorService, fixedDelaySeconds)); + + MetricsUtil.registerSystemResourceMetrics(environment); + + super.run(environment, namespace, configuration); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/scheduler/JobSchedulerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/scheduler/JobSchedulerTest.java new file mode 100644 index 000000000..fa28c78fe --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/scheduler/JobSchedulerTest.java @@ -0,0 +1,122 @@ +package org.whispersystems.textsecuregcm.scheduler; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; +import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema; +import org.whispersystems.textsecuregcm.util.TestClock; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; + +import javax.annotation.Nullable; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +class JobSchedulerTest { + + private static final Instant CURRENT_TIME = Instant.now(); + + @RegisterExtension + static final DynamoDbExtension DYNAMO_DB_EXTENSION = + new DynamoDbExtension(DynamoDbExtensionSchema.Tables.SCHEDULED_JOBS); + + private static class TestJobScheduler extends JobScheduler { + + private final AtomicInteger jobsProcessed = new AtomicInteger(0); + + protected TestJobScheduler(final DynamoDbAsyncClient dynamoDbAsyncClient, + final String tableName, + final Clock clock) { + + super(dynamoDbAsyncClient, tableName, Duration.ofDays(7), clock); + } + + @Override + public String getSchedulerName() { + return "test"; + } + + @Override + protected CompletableFuture processJob(@Nullable final byte[] jobData) { + jobsProcessed.incrementAndGet(); + + return CompletableFuture.completedFuture("test"); + } + } + + @Test + void scheduleJob() { + final TestJobScheduler scheduler = new TestJobScheduler(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), + DynamoDbExtensionSchema.Tables.SCHEDULED_JOBS.tableName(), + Clock.fixed(CURRENT_TIME, ZoneId.systemDefault())); + + assertDoesNotThrow(() -> + scheduler.scheduleJob(scheduler.buildRunAtAttribute(CURRENT_TIME, 0L), CURRENT_TIME, null).join()); + + final CompletionException completionException = assertThrows(CompletionException.class, () -> + scheduler.scheduleJob(scheduler.buildRunAtAttribute(CURRENT_TIME, 0L), CURRENT_TIME, null).join(), + "Scheduling multiple jobs with identical sort keys should fail"); + + assertInstanceOf(ConditionalCheckFailedException.class, completionException.getCause()); + } + + @Test + void processAvailableJobs() { + final TestClock testClock = TestClock.pinned(CURRENT_TIME); + + final TestJobScheduler scheduler = new TestJobScheduler(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), + DynamoDbExtensionSchema.Tables.SCHEDULED_JOBS.tableName(), + testClock); + + scheduler.scheduleJob(scheduler.buildRunAtAttribute(CURRENT_TIME, 0L), CURRENT_TIME, null).join(); + + // Clock time is before scheduled job time + testClock.pin(CURRENT_TIME.minusMillis(1)); + + scheduler.processAvailableJobs().join(); + assertEquals(0, scheduler.jobsProcessed.get()); + + // Clock time is after scheduled job time + testClock.pin(CURRENT_TIME.plusMillis(1)); + + scheduler.processAvailableJobs().join(); + assertEquals(1, scheduler.jobsProcessed.get()); + + scheduler.processAvailableJobs().join(); + assertEquals(1, scheduler.jobsProcessed.get(), + "Jobs should be cleared after successful processing; job counter should not increment on second run"); + } + + @Test + void processAvailableJobsWithError() { + final AtomicInteger jobsEncountered = new AtomicInteger(0); + + final TestJobScheduler scheduler = new TestJobScheduler(DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), + DynamoDbExtensionSchema.Tables.SCHEDULED_JOBS.tableName(), + Clock.fixed(CURRENT_TIME, ZoneId.systemDefault())) { + + @Override + protected CompletableFuture processJob(@Nullable final byte[] jobData) { + jobsEncountered.incrementAndGet(); + + return CompletableFuture.failedFuture(new RuntimeException("OH NO")); + } + }; + + scheduler.scheduleJob(scheduler.buildRunAtAttribute(CURRENT_TIME, 0L), CURRENT_TIME, null).join(); + + scheduler.processAvailableJobs().join(); + assertEquals(1, jobsEncountered.get()); + + scheduler.processAvailableJobs().join(); + assertEquals(2, jobsEncountered.get(), + "Jobs should not be cleared after failed processing; encountered job counter should increment on second run"); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java index 9127c1457..a19740a6e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java @@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.storage; import java.util.Collections; import java.util.List; import org.whispersystems.textsecuregcm.backup.BackupsDb; +import org.whispersystems.textsecuregcm.scheduler.JobScheduler; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; @@ -298,6 +299,21 @@ public final class DynamoDbExtensionSchema { .build()), List.of(), List.of()), + SCHEDULED_JOBS("scheduled_jobs_test", + JobScheduler.KEY_SCHEDULER_NAME, + JobScheduler.ATTR_RUN_AT, + List.of(AttributeDefinition.builder() + .attributeName(JobScheduler.KEY_SCHEDULER_NAME) + .attributeType(ScalarAttributeType.S) + .build(), + + AttributeDefinition.builder() + .attributeName(JobScheduler.ATTR_RUN_AT) + .attributeType(ScalarAttributeType.B) + .build()), + List.of(), + List.of()), + SUBSCRIPTIONS("subscriptions_test", SubscriptionManager.KEY_USER, null, diff --git a/service/src/test/resources/config/test.yml b/service/src/test/resources/config/test.yml index 9bb647b21..4487311a0 100644 --- a/service/src/test/resources/config/test.yml +++ b/service/src/test/resources/config/test.yml @@ -120,6 +120,9 @@ dynamoDbTables: tableName: remote_config_test reportMessage: tableName: report_messages_test + scheduledJobs: + tableName: scheduled_jobs_test + expiration: P7D subscriptions: tableName: subscriptions_test clientPublicKeys: