Migrate DynamicConfigurationManager to use java.util.concurrent

This commit is contained in:
Chris Eager 2024-03-07 09:01:34 -06:00 committed by Chris Eager
parent 9e510a678c
commit 3dadaf9334
5 changed files with 47 additions and 38 deletions

View File

@ -315,11 +315,14 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
UncaughtExceptionHandler.register(); UncaughtExceptionHandler.register();
ScheduledExecutorService dynamicConfigurationExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "dynamicConfiguration-%d")).threads(1).build();
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
new DynamicConfigurationManager<>(config.getAppConfig().getApplication(), new DynamicConfigurationManager<>(config.getAppConfig().getApplication(),
config.getAppConfig().getEnvironment(), config.getAppConfig().getEnvironment(),
config.getAppConfig().getConfigurationName(), config.getAppConfig().getConfigurationName(),
DynamicConfiguration.class); DynamicConfiguration.class, dynamicConfigurationExecutor);
dynamicConfigurationManager.start(); dynamicConfigurationManager.start();
MetricsUtil.configureRegistries(config, environment, dynamicConfigurationManager); MetricsUtil.configureRegistries(config, environment, dynamicConfigurationManager);

View File

@ -13,6 +13,9 @@ import io.micrometer.core.instrument.Metrics;
import java.time.Duration; import java.time.Duration;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.validation.ConstraintViolation; import javax.validation.ConstraintViolation;
import javax.validation.Validation; import javax.validation.Validation;
@ -38,8 +41,9 @@ public class DynamicConfigurationManager<T> {
// Set on initial config fetch // Set on initial config fetch
private final AtomicReference<T> configuration = new AtomicReference<>(); private final AtomicReference<T> configuration = new AtomicReference<>();
private final CountDownLatch initialized = new CountDownLatch(1);
private final ScheduledExecutorService scheduledExecutorService;
private String configurationToken = null; private String configurationToken = null;
private boolean initialized = false;
private static final Validator VALIDATOR = Validation.buildDefaultValidatorFactory().getValidator(); private static final Validator VALIDATOR = Validation.buildDefaultValidatorFactory().getValidator();
@ -50,61 +54,48 @@ public class DynamicConfigurationManager<T> {
private static final Logger logger = LoggerFactory.getLogger(DynamicConfigurationManager.class); private static final Logger logger = LoggerFactory.getLogger(DynamicConfigurationManager.class);
public DynamicConfigurationManager(String application, String environment, String configurationName, public DynamicConfigurationManager(String application, String environment, String configurationName,
Class<T> configurationClass) { Class<T> configurationClass, ScheduledExecutorService scheduledExecutorService) {
this(AppConfigDataClient this(AppConfigDataClient
.builder() .builder()
.overrideConfiguration(ClientOverrideConfiguration.builder() .overrideConfiguration(ClientOverrideConfiguration.builder()
.apiCallTimeout(Duration.ofSeconds(10)) .apiCallTimeout(Duration.ofSeconds(10))
.apiCallAttemptTimeout(Duration.ofSeconds(10)).build()) .apiCallAttemptTimeout(Duration.ofSeconds(10)).build())
.build(), .build(),
application, environment, configurationName, configurationClass); application, environment, configurationName, configurationClass, scheduledExecutorService);
} }
@VisibleForTesting @VisibleForTesting
DynamicConfigurationManager(AppConfigDataClient appConfigClient, String application, String environment, DynamicConfigurationManager(AppConfigDataClient appConfigClient, String application, String environment,
String configurationName, Class<T> configurationClass) { String configurationName, Class<T> configurationClass, ScheduledExecutorService scheduledExecutorService) {
this.appConfigClient = appConfigClient; this.appConfigClient = appConfigClient;
this.application = application; this.application = application;
this.environment = environment; this.environment = environment;
this.configurationName = configurationName; this.configurationName = configurationName;
this.configurationClass = configurationClass; this.configurationClass = configurationClass;
this.scheduledExecutorService = scheduledExecutorService;
} }
public T getConfiguration() { public T getConfiguration() {
synchronized (this) { try {
while (!initialized) { initialized.await();
try { } catch (InterruptedException e) {
this.wait(); logger.warn("Interrupted while waiting for initial configuration", e);
} catch (final InterruptedException e) { throw new RuntimeException(e);
logger.warn("Interrupted while waiting for initial configuration", e);
throw new RuntimeException(e);
}
}
} }
return configuration.get(); return configuration.get();
} }
public void start() { public void start() {
configuration.set(retrieveInitialDynamicConfiguration()); configuration.set(retrieveInitialDynamicConfiguration());
synchronized (this) { initialized.countDown();
this.initialized = true;
this.notifyAll();
}
final Thread workerThread = new Thread(() -> { scheduledExecutorService.scheduleWithFixedDelay(() -> {
while (true) { try {
try { retrieveDynamicConfiguration().ifPresent(configuration::set);
retrieveDynamicConfiguration().ifPresent(configuration::set); } catch (Exception e) {
} catch (Exception e) { logger.warn("Error retrieving dynamic configuration", e);
logger.warn("Error retrieving dynamic configuration", e);
}
Util.sleep(5000);
} }
}, "DynamicConfigurationManagerWorker"); }, 0, 5, TimeUnit.SECONDS);
workerThread.setDaemon(true);
workerThread.start();
} }
private Optional<T> retrieveDynamicConfiguration() throws JsonProcessingException { private Optional<T> retrieveDynamicConfiguration() throws JsonProcessingException {

View File

@ -98,6 +98,14 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
throws Exception { throws Exception {
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
ScheduledExecutorService dynamicConfigurationExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "dynamicConfiguration-%d")).threads(1).build();
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>(
configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class, dynamicConfigurationExecutor);
dynamicConfigurationManager.start();
ClientResources redisClusterClientResources = ClientResources.builder().build(); ClientResources redisClusterClientResources = ClientResources.builder().build();
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster",
@ -128,11 +136,6 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
ExternalServiceCredentialsGenerator secureValueRecoveryCredentialsGenerator = SecureValueRecovery2Controller.credentialsGenerator( ExternalServiceCredentialsGenerator secureValueRecoveryCredentialsGenerator = SecureValueRecovery2Controller.credentialsGenerator(
configuration.getSvr2Configuration()); configuration.getSvr2Configuration());
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>(
configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class);
dynamicConfigurationManager.start();
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager( ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(
dynamicConfigurationManager); dynamicConfigurationManager);

View File

@ -72,9 +72,12 @@ record CommandDependencies(
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
ScheduledExecutorService dynamicConfigurationExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "dynamicConfiguration-%d")).threads(1).build();
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>( DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>(
configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class); configuration.getAppConfig().getConfigurationName(), DynamicConfiguration.class, dynamicConfigurationExecutor);
dynamicConfigurationManager.start(); dynamicConfigurationManager.start();
MetricsUtil.configureRegistries(configuration, environment, dynamicConfigurationManager); MetricsUtil.configureRegistries(configuration, environment, dynamicConfigurationManager);

View File

@ -6,6 +6,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
@ -27,12 +30,13 @@ class DynamicConfigurationManagerTest {
private DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager; private DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private AppConfigDataClient appConfig; private AppConfigDataClient appConfig;
private StartConfigurationSessionRequest startConfigurationSession; private StartConfigurationSessionRequest startConfigurationSession;
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
@BeforeEach @BeforeEach
void setup() { void setup() {
this.appConfig = mock(AppConfigDataClient.class); this.appConfig = mock(AppConfigDataClient.class);
this.dynamicConfigurationManager = new DynamicConfigurationManager<>( this.dynamicConfigurationManager = new DynamicConfigurationManager<>(
appConfig, "foo", "bar", "baz", DynamicConfiguration.class); appConfig, "foo", "bar", "baz", DynamicConfiguration.class, scheduledExecutorService);
this.startConfigurationSession = StartConfigurationSessionRequest.builder() this.startConfigurationSession = StartConfigurationSessionRequest.builder()
.applicationIdentifier("foo") .applicationIdentifier("foo")
.environmentIdentifier("bar") .environmentIdentifier("bar")
@ -40,6 +44,11 @@ class DynamicConfigurationManagerTest {
.build(); .build();
} }
@AfterEach
void teardown() {
scheduledExecutorService.shutdown();
}
@Test @Test
void testGetInitialConfig() { void testGetInitialConfig() {
when(appConfig.startConfigurationSession(startConfigurationSession)) when(appConfig.startConfigurationSession(startConfigurationSession))