diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index bb69d8db8..5f18d11c8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -315,11 +315,14 @@ public class WhisperServerService extends Application dynamicConfigurationManager = new DynamicConfigurationManager<>(config.getAppConfig().getApplication(), config.getAppConfig().getEnvironment(), config.getAppConfig().getConfigurationName(), - DynamicConfiguration.class); + DynamicConfiguration.class, dynamicConfigurationExecutor); dynamicConfigurationManager.start(); MetricsUtil.configureRegistries(config, environment, dynamicConfigurationManager); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DynamicConfigurationManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DynamicConfigurationManager.java index 4657d53c9..94f8a14dc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/DynamicConfigurationManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/DynamicConfigurationManager.java @@ -13,6 +13,9 @@ import io.micrometer.core.instrument.Metrics; import java.time.Duration; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.validation.ConstraintViolation; import javax.validation.Validation; @@ -38,8 +41,9 @@ public class DynamicConfigurationManager { // Set on initial config fetch private final AtomicReference configuration = new AtomicReference<>(); + private final CountDownLatch initialized = new CountDownLatch(1); + private final ScheduledExecutorService scheduledExecutorService; private String configurationToken = null; - private boolean initialized = false; private static final Validator VALIDATOR = Validation.buildDefaultValidatorFactory().getValidator(); @@ -50,61 +54,48 @@ public class DynamicConfigurationManager { private static final Logger logger = LoggerFactory.getLogger(DynamicConfigurationManager.class); public DynamicConfigurationManager(String application, String environment, String configurationName, - Class configurationClass) { + Class configurationClass, ScheduledExecutorService scheduledExecutorService) { this(AppConfigDataClient .builder() .overrideConfiguration(ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofSeconds(10)) .apiCallAttemptTimeout(Duration.ofSeconds(10)).build()) .build(), - application, environment, configurationName, configurationClass); + application, environment, configurationName, configurationClass, scheduledExecutorService); } @VisibleForTesting DynamicConfigurationManager(AppConfigDataClient appConfigClient, String application, String environment, - String configurationName, Class configurationClass) { + String configurationName, Class configurationClass, ScheduledExecutorService scheduledExecutorService) { this.appConfigClient = appConfigClient; this.application = application; this.environment = environment; this.configurationName = configurationName; this.configurationClass = configurationClass; + this.scheduledExecutorService = scheduledExecutorService; } public T getConfiguration() { - synchronized (this) { - while (!initialized) { - try { - this.wait(); - } catch (final InterruptedException e) { - logger.warn("Interrupted while waiting for initial configuration", e); - throw new RuntimeException(e); - } - } + try { + initialized.await(); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for initial configuration", e); + throw new RuntimeException(e); } return configuration.get(); } public void start() { configuration.set(retrieveInitialDynamicConfiguration()); - synchronized (this) { - this.initialized = true; - this.notifyAll(); - } + initialized.countDown(); - final Thread workerThread = new Thread(() -> { - while (true) { - try { - retrieveDynamicConfiguration().ifPresent(configuration::set); - } catch (Exception e) { - logger.warn("Error retrieving dynamic configuration", e); - } - - Util.sleep(5000); + scheduledExecutorService.scheduleWithFixedDelay(() -> { + try { + retrieveDynamicConfiguration().ifPresent(configuration::set); + } catch (Exception e) { + logger.warn("Error retrieving dynamic configuration", e); } - }, "DynamicConfigurationManagerWorker"); - - workerThread.setDaemon(true); - workerThread.start(); + }, 0, 5, TimeUnit.SECONDS); } private Optional retrieveDynamicConfiguration() throws JsonProcessingException { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java index 80befaaa0..38e45346e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AssignUsernameCommand.java @@ -98,6 +98,14 @@ public class AssignUsernameCommand extends EnvironmentCommand dynamicConfigurationManager = new DynamicConfigurationManager<>( + configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), + configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class, dynamicConfigurationExecutor); + dynamicConfigurationManager.start(); + ClientResources redisClusterClientResources = ClientResources.builder().build(); FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", @@ -128,11 +136,6 @@ public class AssignUsernameCommand extends EnvironmentCommand dynamicConfigurationManager = new DynamicConfigurationManager<>( - configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), - configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class); - dynamicConfigurationManager.start(); - ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager( dynamicConfigurationManager); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 99711e050..ea7ca492c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -72,9 +72,12 @@ record CommandDependencies( environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + ScheduledExecutorService dynamicConfigurationExecutor = environment.lifecycle() + .scheduledExecutorService(name(name, "dynamicConfiguration-%d")).threads(1).build(); + DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager<>( configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), - configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class); + configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class, dynamicConfigurationExecutor); dynamicConfigurationManager.start(); MetricsUtil.configureRegistries(configuration, environment, dynamicConfigurationManager); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamicConfigurationManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamicConfigurationManagerTest.java index d0b35743d..45fc8e335 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamicConfigurationManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamicConfigurationManagerTest.java @@ -6,6 +6,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; @@ -27,12 +30,13 @@ class DynamicConfigurationManagerTest { private DynamicConfigurationManager dynamicConfigurationManager; private AppConfigDataClient appConfig; private StartConfigurationSessionRequest startConfigurationSession; + private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1); @BeforeEach void setup() { this.appConfig = mock(AppConfigDataClient.class); this.dynamicConfigurationManager = new DynamicConfigurationManager<>( - appConfig, "foo", "bar", "baz", DynamicConfiguration.class); + appConfig, "foo", "bar", "baz", DynamicConfiguration.class, scheduledExecutorService); this.startConfigurationSession = StartConfigurationSessionRequest.builder() .applicationIdentifier("foo") .environmentIdentifier("bar") @@ -40,6 +44,11 @@ class DynamicConfigurationManagerTest { .build(); } + @AfterEach + void teardown() { + scheduledExecutorService.shutdown(); + } + @Test void testGetInitialConfig() { when(appConfig.startConfigurationSession(startConfigurationSession))