diff --git a/pom.xml b/pom.xml index afc784a65..27033b418 100644 --- a/pom.xml +++ b/pom.xml @@ -150,7 +150,7 @@ io.projectreactor reactor-bom - 2020.0.24 + 2022.0.3 pom import diff --git a/service/pom.xml b/service/pom.xml index 59f30b8a9..c27c6efe1 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -381,6 +381,10 @@ io.projectreactor reactor-core + + io.projectreactor + reactor-core-micrometer + io.vavr vavr diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index ac6fb87ae..66d3093c2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -238,7 +238,6 @@ import org.whispersystems.textsecuregcm.workers.SetUserDiscoverabilityCommand; import org.whispersystems.textsecuregcm.workers.ZkParamsCommand; import org.whispersystems.websocket.WebSocketResourceProviderFactory; import org.whispersystems.websocket.setup.WebSocketEnvironment; -import reactor.core.scheduler.Schedulers; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -389,8 +388,6 @@ public class WhisperServerService extends Application imp discardStaleEphemeralMessages(destinationUuid, destinationDevice, staleEphemeralMessages); return messagesToPublish.name(GET_FLUX_NAME) - .metrics(); + .tap(Micrometer.metrics(Metrics.globalRegistry)); } private static boolean isStaleEphemeralMessage(final MessageProtos.Envelope message, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index de50f8680..3810f26ca 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -9,6 +9,7 @@ import static com.codahale.metrics.MetricRegistry.name; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; +import io.micrometer.core.instrument.Metrics; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -26,6 +27,7 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; +import reactor.core.observability.micrometer.Micrometer; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -77,7 +79,7 @@ public class MessagesManager { return Flux.from( getMessagesForDevice(destinationUuid, destinationDevice, RESULT_SET_CHUNK_SIZE, cachedMessagesOnly)) - .take(RESULT_SET_CHUNK_SIZE, true) + .take(RESULT_SET_CHUNK_SIZE) .collectList() .map(envelopes -> new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE)); } @@ -97,7 +99,7 @@ public class MessagesManager { return Flux.concat(dynamoPublisher, cachePublisher) .name(GET_MESSAGES_FOR_DEVICE_FLUX_NAME) - .metrics(); + .tap(Micrometer.metrics(Metrics.globalRegistry)); } public void clear(UUID destinationUuid) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 86557a96d..f6b5b9c86 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -52,6 +52,7 @@ import org.whispersystems.textsecuregcm.util.HeaderUtils; import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.messages.WebSocketResponseMessage; import reactor.core.Disposable; +import reactor.core.observability.micrometer.Micrometer; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -359,7 +360,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac final Disposable subscription = Flux.from(messages) .name(SEND_MESSAGES_FLUX_NAME) - .metrics() + .tap(Micrometer.metrics(Metrics.globalRegistry)) .limitRate(MESSAGE_PUBLISHER_LIMIT_RATE) .flatMapSequential(envelope -> Mono.fromFuture(sendMessage(envelope)