Update to reactor 3.5.3

This commit is contained in:
Chris Eager 2023-02-22 09:55:16 -06:00 committed by Chris Eager
parent 71c0fc8d4a
commit 43f83076fa
6 changed files with 13 additions and 8 deletions

View File

@ -150,7 +150,7 @@
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId> <artifactId>reactor-bom</artifactId>
<version>2020.0.24</version> <!-- 3.4.x, see https://github.com/reactor/reactor#bom-versioning-scheme --> <version>2022.0.3</version> <!-- 3.5.x, see https://github.com/reactor/reactor#bom-versioning-scheme -->
<type>pom</type> <type>pom</type>
<scope>import</scope> <scope>import</scope>
</dependency> </dependency>

View File

@ -381,6 +381,10 @@
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId> <artifactId>reactor-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core-micrometer</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.vavr</groupId> <groupId>io.vavr</groupId>
<artifactId>vavr</artifactId> <artifactId>vavr</artifactId>

View File

@ -238,7 +238,6 @@ import org.whispersystems.textsecuregcm.workers.SetUserDiscoverabilityCommand;
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand; import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
import org.whispersystems.websocket.WebSocketResourceProviderFactory; import org.whispersystems.websocket.WebSocketResourceProviderFactory;
import org.whispersystems.websocket.setup.WebSocketEnvironment; import org.whispersystems.websocket.setup.WebSocketEnvironment;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.Region;
@ -389,8 +388,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final VerificationSessions verificationSessions = new VerificationSessions(dynamoDbAsyncClient, final VerificationSessions verificationSessions = new VerificationSessions(dynamoDbAsyncClient,
config.getDynamoDbTables().getVerificationSessions().getTableName(), clock); config.getDynamoDbTables().getVerificationSessions().getTableName(), clock);
reactor.util.Metrics.MicrometerConfiguration.useRegistry(Metrics.globalRegistry);
Schedulers.enableMetrics();
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache",
config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(),

View File

@ -50,6 +50,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil; import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
@ -227,7 +228,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
discardStaleEphemeralMessages(destinationUuid, destinationDevice, staleEphemeralMessages); discardStaleEphemeralMessages(destinationUuid, destinationDevice, staleEphemeralMessages);
return messagesToPublish.name(GET_FLUX_NAME) return messagesToPublish.name(GET_FLUX_NAME)
.metrics(); .tap(Micrometer.metrics(Metrics.globalRegistry));
} }
private static boolean isStaleEphemeralMessage(final MessageProtos.Envelope message, private static boolean isStaleEphemeralMessage(final MessageProtos.Envelope message,

View File

@ -9,6 +9,7 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.SharedMetricRegistries;
import io.micrometer.core.instrument.Metrics;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -26,6 +27,7 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Pair;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -77,7 +79,7 @@ public class MessagesManager {
return Flux.from( return Flux.from(
getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly)) getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly))
.take(RESULT_SET_CHUNK_SIZE, true) .take(RESULT_SET_CHUNK_SIZE)
.collectList() .collectList()
.map(envelopes -> new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE)); .map(envelopes -> new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE));
} }
@ -97,7 +99,7 @@ public class MessagesManager {
return Flux.concat(dynamoPublisher, cachePublisher) return Flux.concat(dynamoPublisher, cachePublisher)
.name(GET_MESSAGES_FOR_DEVICE_FLUX_NAME) .name(GET_MESSAGES_FOR_DEVICE_FLUX_NAME)
.metrics(); .tap(Micrometer.metrics(Metrics.globalRegistry));
} }
public void clear(UUID destinationUuid) { public void clear(UUID destinationUuid) {

View File

@ -52,6 +52,7 @@ import org.whispersystems.textsecuregcm.util.HeaderUtils;
import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage; import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import reactor.core.Disposable; import reactor.core.Disposable;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
@ -359,7 +360,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
final Disposable subscription = Flux.from(messages) final Disposable subscription = Flux.from(messages)
.name(SEND_MESSAGES_FLUX_NAME) .name(SEND_MESSAGES_FLUX_NAME)
.metrics() .tap(Micrometer.metrics(Metrics.globalRegistry))
.limitRate(MESSAGE_PUBLISHER_LIMIT_RATE) .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE)
.flatMapSequential(envelope -> .flatMapSequential(envelope ->
Mono.fromFuture(sendMessage(envelope) Mono.fromFuture(sendMessage(envelope)