diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index a167bf47c..cfc9c05e4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -8,11 +8,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.dropwizard.core.Configuration; import java.time.Duration; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import javax.validation.Valid; import javax.validation.constraints.NotNull; import org.whispersystems.textsecuregcm.attachments.TusConfiguration; @@ -170,11 +168,6 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private RedisClusterConfiguration clientPresenceCluster; - @Valid - @NotNull - @JsonProperty - private Set testDevices = new HashSet<>(); - @Valid @NotNull @JsonProperty @@ -461,10 +454,6 @@ public class WhisperServerConfiguration extends Configuration { return unidentifiedDelivery; } - public Set getTestDevices() { - return testDevices; - } - public Map getMaxDevices() { Map results = new HashMap<>(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index f130d0a8b..5da9abb2b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -671,8 +671,7 @@ public class WhisperServerService extends Application testDevices; - private final DynamicConfigurationManager dynamicConfigurationManager; - - public RegistrationCaptchaManager(final CaptchaChecker captchaChecker, final RateLimiters rateLimiters, - final Set testDevices, - final DynamicConfigurationManager dynamicConfigurationManager) { + public RegistrationCaptchaManager(final CaptchaChecker captchaChecker) { this.captchaChecker = captchaChecker; - this.rateLimiters = rateLimiters; - this.testDevices = testDevices; - this.dynamicConfigurationManager = dynamicConfigurationManager; } @SuppressWarnings("OptionalUsedAsFieldOrParameterType") diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/BufferPoolGauges.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/BufferPoolGauges.java deleted file mode 100644 index 1bb133348..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/BufferPoolGauges.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.metrics; - -import static com.codahale.metrics.MetricRegistry.name; - -import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tag; -import java.lang.management.BufferPoolMXBean; -import java.lang.management.ManagementFactory; -import java.util.List; - -public class BufferPoolGauges { - - private BufferPoolGauges() {} - - public static void registerMetrics() { - for (final BufferPoolMXBean bufferPoolMXBean : ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class)) { - final List tags = List.of(Tag.of("bufferPoolName", bufferPoolMXBean.getName())); - - Metrics.gauge(name(BufferPoolGauges.class, "count"), tags, bufferPoolMXBean, BufferPoolMXBean::getCount); - Metrics.gauge(name(BufferPoolGauges.class, "memory_used"), tags, bufferPoolMXBean, BufferPoolMXBean::getMemoryUsed); - Metrics.gauge(name(BufferPoolGauges.class, "total_capacity"), tags, bufferPoolMXBean, BufferPoolMXBean::getTotalCapacity); - } - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/CpuUsageGauge.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/CpuUsageGauge.java deleted file mode 100644 index df40e80ce..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/CpuUsageGauge.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.metrics; - -import com.codahale.metrics.CachedGauge; -import com.sun.management.OperatingSystemMXBean; - -import java.lang.management.ManagementFactory; -import java.util.concurrent.TimeUnit; - -public class CpuUsageGauge extends CachedGauge { - - private final OperatingSystemMXBean operatingSystemMXBean; - - public CpuUsageGauge(final long timeout, final TimeUnit timeoutUnit) { - super(timeout, timeoutUnit); - - this.operatingSystemMXBean = (com.sun.management.OperatingSystemMXBean) - ManagementFactory.getOperatingSystemMXBean(); - } - - @Override - protected Integer loadValue() { - return (int) Math.ceil(operatingSystemMXBean.getCpuLoad() * 100); - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/FileDescriptorGauge.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/FileDescriptorGauge.java deleted file mode 100644 index 472e2f560..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/FileDescriptorGauge.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.metrics; - - -import com.codahale.metrics.Gauge; - -import java.io.File; - -public class FileDescriptorGauge implements Gauge { - @Override - public Integer getValue() { - File file = new File("/proc/self/fd"); - - if (file.isDirectory() && file.exists()) { - return file.list().length; - } - - return 0; - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/FreeMemoryGauge.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/FreeMemoryGauge.java index a04330b36..74631ebf4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/FreeMemoryGauge.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/FreeMemoryGauge.java @@ -5,21 +5,20 @@ package org.whispersystems.textsecuregcm.metrics; -import com.codahale.metrics.Gauge; import com.sun.management.OperatingSystemMXBean; import java.lang.management.ManagementFactory; -public class FreeMemoryGauge implements Gauge { +public class FreeMemoryGauge implements Gauge { private final OperatingSystemMXBean operatingSystemMXBean; public FreeMemoryGauge() { this.operatingSystemMXBean = (com.sun.management.OperatingSystemMXBean) - ManagementFactory.getOperatingSystemMXBean(); + ManagementFactory.getOperatingSystemMXBean(); } @Override - public Long getValue() { + public double getValue() { return operatingSystemMXBean.getFreeMemorySize(); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/GarbageCollectionGauges.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/GarbageCollectionGauges.java index 6c8107927..24be40d31 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/GarbageCollectionGauges.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/GarbageCollectionGauges.java @@ -5,7 +5,7 @@ package org.whispersystems.textsecuregcm.metrics; -import static com.codahale.metrics.MetricRegistry.name; +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; @@ -15,14 +15,17 @@ import java.util.List; public class GarbageCollectionGauges { - private GarbageCollectionGauges() {} + private GarbageCollectionGauges() { + } - public static void registerMetrics() { - for (final GarbageCollectorMXBean garbageCollectorMXBean : ManagementFactory.getGarbageCollectorMXBeans()) { - final List tags = List.of(Tag.of("memoryManagerName", garbageCollectorMXBean.getName())); + public static void registerMetrics() { + for (final GarbageCollectorMXBean garbageCollectorMXBean : ManagementFactory.getGarbageCollectorMXBeans()) { + final List tags = List.of(Tag.of("memoryManagerName", garbageCollectorMXBean.getName())); - Metrics.gauge(name(GarbageCollectionGauges.class, "collection_count"), tags, garbageCollectorMXBean, GarbageCollectorMXBean::getCollectionCount); - Metrics.gauge(name(GarbageCollectionGauges.class, "collection_time"), tags, garbageCollectorMXBean, GarbageCollectorMXBean::getCollectionTime); - } + Metrics.gauge(name(GarbageCollectionGauges.class, "collectionCount"), tags, garbageCollectorMXBean, + GarbageCollectorMXBean::getCollectionCount); + Metrics.gauge(name(GarbageCollectionGauges.class, "collectionTime"), tags, garbageCollectorMXBean, + GarbageCollectorMXBean::getCollectionTime); } + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/Gauge.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/Gauge.java new file mode 100644 index 000000000..8646e43e3 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/Gauge.java @@ -0,0 +1,11 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.metrics; + +interface Gauge { + + double getValue(); +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MaxFileDescriptorGauge.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MaxFileDescriptorGauge.java deleted file mode 100644 index 397500859..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MaxFileDescriptorGauge.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.metrics; - -import com.codahale.metrics.Gauge; -import com.sun.management.UnixOperatingSystemMXBean; - -import java.lang.management.ManagementFactory; - -/** - * A gauge that reports the maximum number of file descriptors allowed by the operating system. - */ -public class MaxFileDescriptorGauge implements Gauge { - - private final UnixOperatingSystemMXBean unixOperatingSystemMXBean; - - public MaxFileDescriptorGauge() { - this.unixOperatingSystemMXBean = (UnixOperatingSystemMXBean)ManagementFactory.getOperatingSystemMXBean(); - } - - @Override - public Long getValue() { - return unixOperatingSystemMXBean.getMaxFileDescriptorCount(); - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MetricsUtil.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MetricsUtil.java index 3ae7a3874..6846890d5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MetricsUtil.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/MetricsUtil.java @@ -12,10 +12,13 @@ import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics; +import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics; +import io.micrometer.core.instrument.binder.system.ProcessorMetrics; import io.micrometer.core.instrument.config.MeterFilter; import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.statsd.StatsdMeterRegistry; -import java.util.concurrent.TimeUnit; import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.WhisperServerVersion; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; @@ -101,19 +104,20 @@ public class MetricsUtil { } public static void registerSystemResourceMetrics(final Environment environment) { - environment.metrics().register(name(CpuUsageGauge.class, "cpu"), new CpuUsageGauge(3, TimeUnit.SECONDS)); - environment.metrics().register(name(FreeMemoryGauge.class, "free_memory"), new FreeMemoryGauge()); - environment.metrics().register(name(NetworkSentGauge.class, "bytes_sent"), new NetworkSentGauge()); - environment.metrics().register(name(NetworkReceivedGauge.class, "bytes_received"), new NetworkReceivedGauge()); - environment.metrics().register(name(FileDescriptorGauge.class, "fd_count"), new FileDescriptorGauge()); - environment.metrics().register(name(MaxFileDescriptorGauge.class, "max_fd_count"), new MaxFileDescriptorGauge()); - environment.metrics() - .register(name(OperatingSystemMemoryGauge.class, "buffers"), new OperatingSystemMemoryGauge("Buffers")); - environment.metrics() - .register(name(OperatingSystemMemoryGauge.class, "cached"), new OperatingSystemMemoryGauge("Cached")); + new ProcessorMetrics().bindTo(Metrics.globalRegistry); + registerGauge(name(FreeMemoryGauge.class, "freeMemory"), new FreeMemoryGauge()); + new FileDescriptorMetrics().bindTo(Metrics.globalRegistry); + registerGauge(name(OperatingSystemMemoryGauge.class, "buffers"), new OperatingSystemMemoryGauge("Buffers")); + registerGauge(name(OperatingSystemMemoryGauge.class, "cached"), new OperatingSystemMemoryGauge("Cached")); + + new JvmMemoryMetrics().bindTo(Metrics.globalRegistry); + new JvmThreadMetrics().bindTo(Metrics.globalRegistry); - BufferPoolGauges.registerMetrics(); GarbageCollectionGauges.registerMetrics(); } + private static void registerGauge(final String name, final Gauge gauge) { + Metrics.gauge(name, gauge, Gauge::getValue); + } + } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/NetworkGauge.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/NetworkGauge.java deleted file mode 100644 index 9b23f2a8f..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/NetworkGauge.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.metrics; - - -import com.codahale.metrics.Gauge; -import org.whispersystems.textsecuregcm.util.Pair; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; - -public abstract class NetworkGauge implements Gauge { - - protected Pair getSentReceived() throws IOException { - File proc = new File("/proc/net/dev"); - BufferedReader reader = new BufferedReader(new FileReader(proc)); - String header = reader.readLine(); - String header2 = reader.readLine(); - - long bytesSent = 0; - long bytesReceived = 0; - - String interfaceStats; - - while ((interfaceStats = reader.readLine()) != null) { - String[] stats = interfaceStats.split("\\s+"); - - if (!stats[1].equals("lo:")) { - bytesReceived += Long.parseLong(stats[2]); - bytesSent += Long.parseLong(stats[10]); - } - } - - return new Pair<>(bytesSent, bytesReceived); - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/NetworkReceivedGauge.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/NetworkReceivedGauge.java deleted file mode 100644 index 6ecd85ac4..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/NetworkReceivedGauge.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.metrics; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.util.Pair; - -import java.io.IOException; - -public class NetworkReceivedGauge extends NetworkGauge { - - private final Logger logger = LoggerFactory.getLogger(NetworkReceivedGauge.class); - - private long lastTimestamp; - private long lastReceived; - - public NetworkReceivedGauge() { - try { - this.lastTimestamp = System.currentTimeMillis(); - this.lastReceived = getSentReceived().second(); - } catch (IOException e) { - logger.warn(NetworkReceivedGauge.class.getSimpleName(), e); - } - } - - @Override - public Double getValue() { - try { - long timestamp = System.currentTimeMillis(); - Pair sentAndReceived = getSentReceived(); - double bytesReceived = sentAndReceived.second() - lastReceived; - double secondsElapsed = (timestamp - this.lastTimestamp) / 1000; - double result = bytesReceived / secondsElapsed; - - this.lastTimestamp = timestamp; - this.lastReceived = sentAndReceived.second(); - - return result; - } catch (IOException e) { - logger.warn("NetworkReceivedGauge", e); - return -1D; - } - } - -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/NetworkSentGauge.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/NetworkSentGauge.java deleted file mode 100644 index 4952550e7..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/NetworkSentGauge.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.metrics; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.util.Pair; - -import java.io.IOException; - -public class NetworkSentGauge extends NetworkGauge { - - private final Logger logger = LoggerFactory.getLogger(NetworkSentGauge.class); - - private long lastTimestamp; - private long lastSent; - - public NetworkSentGauge() { - try { - this.lastTimestamp = System.currentTimeMillis(); - this.lastSent = getSentReceived().first(); - } catch (IOException e) { - logger.warn(NetworkSentGauge.class.getSimpleName(), e); - } - } - - @Override - public Double getValue() { - try { - long timestamp = System.currentTimeMillis(); - Pair sentAndReceived = getSentReceived(); - double bytesTransmitted = sentAndReceived.first() - lastSent; - double secondsElapsed = (timestamp - this.lastTimestamp) / 1000; - double result = bytesTransmitted / secondsElapsed; - - this.lastSent = sentAndReceived.first(); - this.lastTimestamp = timestamp; - - return result; - } catch (IOException e) { - logger.warn("NetworkSentGauge", e); - return -1D; - } - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/OperatingSystemMemoryGauge.java b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/OperatingSystemMemoryGauge.java index b3dcb6cb4..92606e6a5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/metrics/OperatingSystemMemoryGauge.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/metrics/OperatingSystemMemoryGauge.java @@ -5,9 +5,7 @@ package org.whispersystems.textsecuregcm.metrics; -import com.codahale.metrics.Gauge; import com.google.common.annotations.VisibleForTesting; - import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -16,33 +14,33 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; -public class OperatingSystemMemoryGauge implements Gauge { +public class OperatingSystemMemoryGauge implements Gauge { - private final String metricName; + private final String metricName; - private static final File MEMINFO_FILE = new File("/proc/meminfo"); - private static final Pattern MEMORY_METRIC_PATTERN = Pattern.compile("^([^:]+):\\s+([0-9]+).*$"); + private static final File MEMINFO_FILE = new File("/proc/meminfo"); + private static final Pattern MEMORY_METRIC_PATTERN = Pattern.compile("^([^:]+):\\s+([0-9]+).*$"); - public OperatingSystemMemoryGauge(final String metricName) { - this.metricName = metricName; + public OperatingSystemMemoryGauge(final String metricName) { + this.metricName = metricName; + } + + @Override + public double getValue() { + try (final BufferedReader bufferedReader = new BufferedReader(new FileReader(MEMINFO_FILE))) { + return getValue(bufferedReader.lines()); + } catch (final IOException e) { + return 0L; } + } - @Override - public Long getValue() { - try (final BufferedReader bufferedReader = new BufferedReader(new FileReader(MEMINFO_FILE))) { - return getValue(bufferedReader.lines()); - } catch (final IOException e) { - return 0L; - } - } - - @VisibleForTesting - long getValue(final Stream lines) { - return lines.map(MEMORY_METRIC_PATTERN::matcher) - .filter(Matcher::matches) - .filter(matcher -> this.metricName.equalsIgnoreCase(matcher.group(1))) - .map(matcher -> Long.parseLong(matcher.group(2), 10)) - .findFirst() - .orElse(0L); - } + @VisibleForTesting + double getValue(final Stream lines) { + return lines.map(MEMORY_METRIC_PATTERN::matcher) + .filter(Matcher::matches) + .filter(matcher -> this.metricName.equalsIgnoreCase(matcher.group(1))) + .map(matcher -> Double.parseDouble(matcher.group(2))) + .findFirst() + .orElse(0d); + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index 78a43eb85..7045b57f6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -7,10 +7,6 @@ package org.whispersystems.textsecuregcm.push; import static com.codahale.metrics.MetricRegistry.name; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; -import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import io.dropwizard.lifecycle.Managed; import io.lettuce.core.LettuceFutures; @@ -21,6 +17,7 @@ import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Metrics; import java.io.IOException; import java.time.Duration; @@ -41,7 +38,6 @@ import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.util.Constants; /** * The client presence manager keeps track of which clients are actively connected and "present" to receive messages. @@ -72,9 +68,9 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter displacementListenersByPresenceKey::size); + Metrics.gauge(name(getClass(), "localClientCount"), this, ignored -> displacementListenersByPresenceKey.size()); - this.checkPresenceTimer = metricRegistry.timer(name(getClass(), "checkPresence")); - this.setPresenceTimer = metricRegistry.timer(name(getClass(), "setPresence")); - this.clearPresenceTimer = metricRegistry.timer(name(getClass(), "clearPresence")); - this.prunePeersTimer = metricRegistry.timer(name(getClass(), "prunePeers")); - this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient")); - this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement")); - this.pubSubMessageMeter = metricRegistry.meter(name(getClass(), "pubSubMessage")); + this.checkPresenceTimer = Metrics.timer(name(getClass(), "checkPresence")); + this.setPresenceTimer = Metrics.timer(name(getClass(), "setPresence")); + this.clearPresenceTimer = Metrics.timer(name(getClass(), "clearPresence")); + this.prunePeersTimer = Metrics.timer(name(getClass(), "prunePeers")); + this.pruneClientMeter = Metrics.counter(name(getClass(), "pruneClient")); + this.remoteDisplacementMeter = Metrics.counter(name(getClass(), "remoteDisplacement")); + this.pubSubMessageMeter = Metrics.counter(name(getClass(), "pubSubMessage")); this.displacementListenerAlreadyRemovedCounter = Metrics.counter( name(getClass(), "displacementListenerAlreadyRemoved")); } @@ -165,7 +160,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter { final String presenceKey = getPresenceKey(accountUuid, deviceId); displacePresence(presenceKey, true); @@ -180,7 +175,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter - connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1; - } + return checkPresenceTimer.record(() -> + presenceCluster.withCluster(connection -> + connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1); } public boolean isLocallyPresent(final UUID accountUuid, final byte deviceId) { @@ -245,7 +239,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter { displacementListenersByPresenceKey.remove(presenceKey); unsubscribeFromRemotePresenceChanges(presenceKey); @@ -253,7 +247,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().srem(connectedClientSetKey, presenceKey)); return removed; - } + }); } private void subscribeForRemotePresenceChanges(final String presenceKey) { @@ -277,7 +271,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter { final Set peerIds = presenceCluster.withCluster( connection -> connection.sync().smembers(MANAGER_SET_KEY)); peerIds.remove(managerId); @@ -296,7 +290,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter connection.sync().spop(connectedClientsKey))) != null) { clearPresenceScript.execute(List.of(presenceKey), List.of(peerId)); - pruneClientMeter.mark(); + pruneClientMeter.increment(); } presenceCluster.useCluster(connection -> { @@ -305,12 +299,12 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter { try { displacePresence(channel.substring("__keyspace@0__:".length()), connectedElsewhere); - remoteDisplacementMeter.mark(); + remoteDisplacementMeter.increment(); } catch (final Exception e) { log.warn("Error displacing presence", e); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 5aebf55f6..3ff25d904 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -8,9 +8,6 @@ package org.whispersystems.textsecuregcm.storage; import static com.codahale.metrics.MetricRegistry.name; import static java.util.Objects.requireNonNull; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; -import com.codahale.metrics.Timer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.annotations.VisibleForTesting; @@ -18,6 +15,7 @@ import com.google.common.base.Preconditions; import io.lettuce.core.RedisException; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; import java.io.IOException; import java.io.UncheckedIOException; import java.time.Clock; @@ -60,7 +58,6 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client; -import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.DestinationDeviceValidator; import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.Pair; @@ -72,19 +69,19 @@ import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; public class AccountsManager { - private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private static final Timer createTimer = metricRegistry.timer(name(AccountsManager.class, "create")); - private static final Timer updateTimer = metricRegistry.timer(name(AccountsManager.class, "update")); - private static final Timer getByNumberTimer = metricRegistry.timer(name(AccountsManager.class, "getByNumber")); - private static final Timer getByUsernameHashTimer = metricRegistry.timer(name(AccountsManager.class, "getByUsernameHash")); - private static final Timer getByUsernameLinkHandleTimer = metricRegistry.timer(name(AccountsManager.class, "getByUsernameLinkHandle")); - private static final Timer getByUuidTimer = metricRegistry.timer(name(AccountsManager.class, "getByUuid")); - private static final Timer deleteTimer = metricRegistry.timer(name(AccountsManager.class, "delete")); + private static final Timer createTimer = Metrics.timer(name(AccountsManager.class, "create")); + private static final Timer updateTimer = Metrics.timer(name(AccountsManager.class, "update")); + private static final Timer getByNumberTimer = Metrics.timer(name(AccountsManager.class, "getByNumber")); + private static final Timer getByUsernameHashTimer = Metrics.timer(name(AccountsManager.class, "getByUsernameHash")); + private static final Timer getByUsernameLinkHandleTimer = Metrics.timer( + name(AccountsManager.class, "getByUsernameLinkHandle")); + private static final Timer getByUuidTimer = Metrics.timer(name(AccountsManager.class, "getByUuid")); + private static final Timer deleteTimer = Metrics.timer(name(AccountsManager.class, "delete")); - private static final Timer redisSetTimer = metricRegistry.timer(name(AccountsManager.class, "redisSet")); - private static final Timer redisPniGetTimer = metricRegistry.timer(name(AccountsManager.class, "redisPniGet")); - private static final Timer redisUuidGetTimer = metricRegistry.timer(name(AccountsManager.class, "redisUuidGet")); - private static final Timer redisDeleteTimer = metricRegistry.timer(name(AccountsManager.class, "redisDelete")); + private static final Timer redisSetTimer = Metrics.timer(name(AccountsManager.class, "redisSet")); + private static final Timer redisPniGetTimer = Metrics.timer(name(AccountsManager.class, "redisPniGet")); + private static final Timer redisUuidGetTimer = Metrics.timer(name(AccountsManager.class, "redisUuidGet")); + private static final Timer redisDeleteTimer = Metrics.timer(name(AccountsManager.class, "redisDelete")); private static final String CREATE_COUNTER_NAME = name(AccountsManager.class, "createCounter"); private static final String DELETE_COUNTER_NAME = name(AccountsManager.class, "deleteCounter"); @@ -172,7 +169,7 @@ public class AccountsManager { final Account account = new Account(); - try (Timer.Context ignoredTimerContext = createTimer.time()) { + return createTimer.record(() -> { accountLockManager.withLock(List.of(number), () -> { final Optional maybeRecentlyDeletedAccountIdentifier = accounts.findRecentlyDeletedAccountIdentifier(number); @@ -259,7 +256,7 @@ public class AccountsManager { }, accountLockExecutor); return account; - } + }); } public CompletableFuture> addDevice(final Account account, final DeviceSpec deviceSpec) { @@ -689,29 +686,27 @@ public class AccountsManager { */ private Account update(Account account, Function updater) { - final Account updatedAccount; - - try (Timer.Context ignored = updateTimer.time()) { + return updateTimer.record(() -> { redisDelete(account); final UUID uuid = account.getUuid(); - updatedAccount = updateWithRetries(account, + final Account updatedAccount = updateWithRetries(account, updater, accounts::update, () -> accounts.getByAccountIdentifier(uuid).orElseThrow(), AccountChangeValidator.GENERAL_CHANGE_VALIDATOR); redisSet(updatedAccount); - } - return updatedAccount; + return updatedAccount; + }); } private CompletableFuture updateAsync(final Account account, final Function updater) { - final Timer.Context timerContext = updateTimer.time(); + final Timer.Sample timerSample = Timer.start(); return redisDeleteAsync(account) .thenCompose(ignored -> { @@ -725,7 +720,7 @@ public class AccountsManager { MAX_UPDATE_ATTEMPTS); }) .thenCompose(updatedAccount -> redisSetAsync(updatedAccount).thenApply(ignored -> updatedAccount)) - .whenComplete((ignored, throwable) -> timerContext.close()); + .whenComplete((ignored, throwable) -> timerSample.stop(updateTimer)); } private Account updateWithRetries(Account account, @@ -859,13 +854,13 @@ public class AccountsManager { } public Optional getByE164(final String number) { - return getByNumberTimer.timeSupplier(() -> accounts.getByE164(number)); + return getByNumberTimer.record(() -> accounts.getByE164(number)); } public CompletableFuture> getByE164Async(final String number) { - final Timer.Context context = getByNumberTimer.time(); + Timer.Sample sample = Timer.start(); return accounts.getByE164Async(number) - .whenComplete((ignoredResult, ignoredThrowable) -> context.close()); + .whenComplete((ignoredResult, ignoredThrowable) -> sample.stop(getByNumberTimer)); } public Optional getByPhoneNumberIdentifier(final UUID pni) { @@ -885,15 +880,15 @@ public class AccountsManager { } public CompletableFuture> getByUsernameLinkHandle(final UUID usernameLinkHandle) { - final Timer.Context context = getByUsernameLinkHandleTimer.time(); + final Timer.Sample sample = Timer.start(); return accounts.getByUsernameLinkHandle(usernameLinkHandle) - .whenComplete((ignoredResult, ignoredThrowable) -> context.close()); + .whenComplete((ignoredResult, ignoredThrowable) -> sample.stop(getByUsernameLinkHandleTimer)); } public CompletableFuture> getByUsernameHash(final byte[] usernameHash) { - final Timer.Context context = getByUsernameHashTimer.time(); + final Timer.Sample sample = Timer.start(); return accounts.getByUsernameHash(usernameHash) - .whenComplete((ignoredResult, ignoredThrowable) -> context.close()); + .whenComplete((ignoredResult, ignoredThrowable) -> sample.stop(getByUsernameHashTimer)); } public Optional getByServiceIdentifier(final ServiceIdentifier serviceIdentifier) { @@ -943,11 +938,11 @@ public class AccountsManager { } public CompletableFuture delete(final Account account, final DeletionReason deletionReason) { - @SuppressWarnings("resource") final Timer.Context timerContext = deleteTimer.time(); + final Timer.Sample sample = Timer.start(); return accountLockManager.withLockAsync(List.of(account.getNumber()), () -> delete(account), accountLockExecutor) .whenComplete((ignored, throwable) -> { - timerContext.close(); + sample.stop(deleteTimer); if (throwable == null) { Metrics.counter(DELETE_COUNTER_NAME, @@ -984,10 +979,6 @@ public class AccountsManager { clientPresenceManager.disconnectPresence(account.getUuid(), device.getId()))), clientPresenceExecutor); } - private String getUsernameHashAccountMapKey(byte[] usernameHash) { - return "UAccountMap::" + Base64.getUrlEncoder().withoutPadding().encodeToString(usernameHash); - } - private String getAccountMapKey(String key) { return "AccountMap::" + key; } @@ -997,18 +988,21 @@ public class AccountsManager { } private void redisSet(Account account) { - try (Timer.Context ignored = redisSetTimer.time()) { - final String accountJson = writeRedisAccountJson(account); + redisSetTimer.record(() -> { + try { + final String accountJson = writeRedisAccountJson(account); - cacheCluster.useCluster(connection -> { - final RedisAdvancedClusterCommands commands = connection.sync(); + cacheCluster.useCluster(connection -> { + final RedisAdvancedClusterCommands commands = connection.sync(); - commands.setex(getAccountMapKey(account.getPhoneNumberIdentifier().toString()), CACHE_TTL_SECONDS, account.getUuid().toString()); - commands.setex(getAccountEntityKey(account.getUuid()), CACHE_TTL_SECONDS, accountJson); - }); - } catch (JsonProcessingException e) { - throw new IllegalStateException(e); - } + commands.setex(getAccountMapKey(account.getPhoneNumberIdentifier().toString()), CACHE_TTL_SECONDS, + account.getUuid().toString()); + commands.setex(getAccountEntityKey(account.getUuid()), CACHE_TTL_SECONDS, accountJson); + }); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e); + } + }); } private CompletableFuture redisSetAsync(final Account account) { @@ -1033,14 +1027,14 @@ public class AccountsManager { final Timer overallTimer, final Supplier> resolveFromRedis, final Supplier> resolveFromAccounts) { - try (final Timer.Context ignored = overallTimer.time()) { + return overallTimer.record(() -> { Optional account = resolveFromRedis.get(); if (account.isEmpty()) { account = resolveFromAccounts.get(); account.ifPresent(this::redisSet); } return account; - } + }); } private CompletableFuture> checkRedisThenAccountsAsync( @@ -1048,7 +1042,7 @@ public class AccountsManager { final Supplier>> resolveFromRedis, final Supplier>> resolveFromAccounts) { - @SuppressWarnings("resource") final Timer.Context timerContext = overallTimer.time(); + final Timer.Sample sample = Timer.start(); return resolveFromRedis.get() .thenCompose(maybeAccountFromRedis -> maybeAccountFromRedis @@ -1057,11 +1051,12 @@ public class AccountsManager { .thenCompose(maybeAccountFromAccounts -> maybeAccountFromAccounts .map(account -> redisSetAsync(account).thenApply(ignored -> maybeAccountFromAccounts)) .orElseGet(() -> CompletableFuture.completedFuture(maybeAccountFromAccounts))))) - .whenComplete((ignored, throwable) -> timerContext.close()); + .whenComplete((ignored, throwable) -> sample.stop(overallTimer)); } private Optional redisGetBySecondaryKey(final String secondaryKey, final Timer timer) { - try (final Timer.Context ignored = timer.time()) { + return timer.record(() -> { + try { return Optional.ofNullable(cacheCluster.withCluster(connection -> connection.sync().get(secondaryKey))) .map(UUID::fromString) .flatMap(this::getByAccountIdentifier); @@ -1072,10 +1067,11 @@ public class AccountsManager { logger.warn("Redis failure", e); return Optional.empty(); } + }); } private CompletableFuture> redisGetBySecondaryKeyAsync(final String secondaryKey, final Timer timer) { - @SuppressWarnings("resource") final Timer.Context timerContext = timer.time(); + final Timer.Sample sample = Timer.start(); return cacheCluster.withCluster(connection -> connection.async().get(secondaryKey)) .thenCompose(nullableUuid -> { @@ -1089,19 +1085,21 @@ public class AccountsManager { logger.warn("Failed to retrieve account from Redis", throwable); return Optional.empty(); }) - .whenComplete((ignored, throwable) -> timerContext.close()) + .whenComplete((ignored, throwable) -> sample.stop(timer)) .toCompletableFuture(); } private Optional redisGetByAccountIdentifier(UUID uuid) { - try (Timer.Context ignored = redisUuidGetTimer.time()) { - final String json = cacheCluster.withCluster(connection -> connection.sync().get(getAccountEntityKey(uuid))); + return redisUuidGetTimer.record(() -> { + try { + final String json = cacheCluster.withCluster(connection -> connection.sync().get(getAccountEntityKey(uuid))); - return parseAccountJson(json, uuid); - } catch (final RedisException e) { - logger.warn("Redis failure", e); - return Optional.empty(); - } + return parseAccountJson(json, uuid); + } catch (final RedisException e) { + logger.warn("Redis failure", e); + return Optional.empty(); + } + }); } private CompletableFuture> redisGetByAccountIdentifierAsync(final UUID uuid) { @@ -1141,17 +1139,14 @@ public class AccountsManager { } private void redisDelete(final Account account) { - try (final Timer.Context ignored = redisDeleteTimer.time()) { - cacheCluster.useCluster(connection -> { - connection.sync().del( - getAccountMapKey(account.getPhoneNumberIdentifier().toString()), - getAccountEntityKey(account.getUuid())); - }); - } + redisDeleteTimer.record(() -> + cacheCluster.useCluster(connection -> + connection.sync().del(getAccountMapKey(account.getPhoneNumberIdentifier().toString()), + getAccountEntityKey(account.getUuid())))); } private CompletableFuture redisDeleteAsync(final Account account) { - @SuppressWarnings("resource") final Timer.Context timerContext = redisDeleteTimer.time(); + final Timer.Sample sample = Timer.start(); final String[] keysToDelete = new String[]{ getAccountMapKey(account.getPhoneNumberIdentifier().toString()), @@ -1160,7 +1155,7 @@ public class AccountsManager { return cacheCluster.withCluster(connection -> connection.async().del(keysToDelete)) .toCompletableFuture() - .whenComplete((ignoredResult, ignoredException) -> timerContext.close()) + .whenComplete((ignoredResult, ignoredException) -> sample.stop(redisDeleteTimer)) .thenRun(Util.NOOP); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index 67db03944..4d68b1fd3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -5,17 +5,14 @@ package org.whispersystems.textsecuregcm.storage; -import static com.codahale.metrics.MetricRegistry.name; -import static io.micrometer.core.instrument.Metrics.counter; +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; -import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import io.dropwizard.lifecycle.Managed; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; import java.time.Duration; import java.time.Instant; import java.util.Comparator; @@ -30,7 +27,6 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.push.ClientPresenceManager; -import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Util; import reactor.core.publisher.Flux; import reactor.util.function.Tuple2; @@ -47,18 +43,24 @@ public class MessagePersister implements Managed { private final Duration persistDelay; - private final boolean dedicatedProcess; private final Thread[] workerThreads; private volatile boolean running; - private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues")); - private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue")); - private final Meter persistQueueExceptionMeter = metricRegistry.meter( + private final Timer getQueuesTimer = Metrics.timer(name(MessagePersister.class, "getQueues")); + private final Timer persistQueueTimer = Metrics.timer(name(MessagePersister.class, "persistQueue")); + private final Counter persistQueueExceptionMeter = Metrics.counter( name(MessagePersister.class, "persistQueueException")); - private final Counter oversizedQueueCounter = counter(name(MessagePersister.class, "persistQueueOversized")); - private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount")); - private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize")); + private final Counter oversizedQueueCounter = Metrics.counter(name(MessagePersister.class, "persistQueueOversized")); + private final DistributionSummary queueCountDistributionSummery = DistributionSummary.builder( + name(MessagePersister.class, "queueCount")) + .publishPercentiles(0.5, 0.75, 0.95, 0.99, 0.999) + .distributionStatisticExpiry(Duration.ofMinutes(10)) + .register(Metrics.globalRegistry); + private final DistributionSummary queueSizeDistributionSummery = DistributionSummary.builder( + name(MessagePersister.class, "queueSize")) + .publishPercentiles(0.5, 0.75, 0.95, 0.99, 0.999) + .distributionStatisticExpiry(Duration.ofMinutes(10)) + .register(Metrics.globalRegistry); private final ExecutorService executor; static final int QUEUE_BATCH_LIMIT = 100; @@ -86,7 +88,6 @@ public class MessagePersister implements Managed { this.keysManager = keysManager; this.persistDelay = persistDelay; this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount]; - this.dedicatedProcess = true; this.executor = executor; for (int i = 0; i < workerThreads.length; i++) { @@ -96,7 +97,7 @@ public class MessagePersister implements Managed { .isPersistenceEnabled()) { try { final int queuesPersisted = persistNextQueues(Instant.now()); - queueCountHistogram.update(queuesPersisted); + queueCountDistributionSummery.record(queuesPersisted); if (queuesPersisted == 0) { Util.sleep(100); @@ -148,9 +149,8 @@ public class MessagePersister implements Managed { int queuesPersisted = 0; do { - try (final Timer.Context ignored = getQueuesTimer.time()) { - queuesToPersist = messagesCache.getQueuesToPersist(slot, currentTime.minus(persistDelay), QUEUE_BATCH_LIMIT); - } + queuesToPersist = getQueuesTimer.record( + () -> messagesCache.getQueuesToPersist(slot, currentTime.minus(persistDelay), QUEUE_BATCH_LIMIT)); for (final String queue : queuesToPersist) { final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queue); @@ -164,7 +164,7 @@ public class MessagePersister implements Managed { try { persistQueue(maybeAccount.get(), deviceId); } catch (final Exception e) { - persistQueueExceptionMeter.mark(); + persistQueueExceptionMeter.increment(); logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e); messagesCache.addQueueToPersist(accountUuid, deviceId); @@ -182,41 +182,44 @@ public class MessagePersister implements Managed { @VisibleForTesting void persistQueue(final Account account, final byte deviceId) throws MessagePersistenceException { final UUID accountUuid = account.getUuid(); - try (final Timer.Context ignored = persistQueueTimer.time()) { - messagesCache.lockQueueForPersistence(accountUuid, deviceId); - try { - int messageCount = 0; - List messages; + final Timer.Sample sample = Timer.start(); - int consecutiveEmptyCacheRemovals = 0; + messagesCache.lockQueueForPersistence(accountUuid, deviceId); - do { - messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT); + try { + int messageCount = 0; + List messages; - int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, deviceId, messages); - messageCount += messages.size(); + int consecutiveEmptyCacheRemovals = 0; - if (messagesRemovedFromCache == 0) { - consecutiveEmptyCacheRemovals += 1; - } else { - consecutiveEmptyCacheRemovals = 0; - } + do { + messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT); - if (consecutiveEmptyCacheRemovals > CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT) { - throw new MessagePersistenceException("persistence failure loop detected"); - } + int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, deviceId, messages); + messageCount += messages.size(); - } while (!messages.isEmpty()); + if (messagesRemovedFromCache == 0) { + consecutiveEmptyCacheRemovals += 1; + } else { + consecutiveEmptyCacheRemovals = 0; + } - queueSizeHistogram.update(messageCount); - } catch (ItemCollectionSizeLimitExceededException e) { - oversizedQueueCounter.increment(); - unlinkLeastActiveDevice(account, deviceId); // this will either do a deferred reschedule for retry or throw - } finally { - messagesCache.unlockQueueForPersistence(accountUuid, deviceId); - } + if (consecutiveEmptyCacheRemovals > CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT) { + throw new MessagePersistenceException("persistence failure loop detected"); + } + + } while (!messages.isEmpty()); + + queueSizeDistributionSummery.record(messageCount); + } catch (ItemCollectionSizeLimitExceededException e) { + oversizedQueueCounter.increment(); + unlinkLeastActiveDevice(account, deviceId); // this will either do a deferred reschedule for retry or throw + } finally { + messagesCache.unlockQueueForPersistence(accountUuid, deviceId); + sample.stop(persistQueueTimer); } + } @VisibleForTesting diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index b2d926aa9..9e1f4955f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -4,11 +4,9 @@ */ package org.whispersystems.textsecuregcm.storage; -import static com.codahale.metrics.MetricRegistry.name; +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import java.util.List; import java.util.Optional; @@ -24,8 +22,6 @@ import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; -import org.whispersystems.textsecuregcm.metrics.MetricsUtil; -import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; import reactor.core.observability.micrometer.Micrometer; import reactor.core.publisher.Flux; @@ -34,15 +30,12 @@ import reactor.core.publisher.Mono; public class MessagesManager { private static final int RESULT_SET_CHUNK_SIZE = 100; - final String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = MetricsUtil.name(MessagesManager.class, "getMessagesForDevice"); + final String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = name(MessagesManager.class, "getMessagesForDevice"); private static final Logger logger = LoggerFactory.getLogger(MessagesManager.class); - private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid")); - private static final Meter cacheMissByGuidMeter = metricRegistry.meter( - name(MessagesManager.class, "cacheMissByGuid")); - private static final Meter persistMessageMeter = metricRegistry.meter(name(MessagesManager.class, "persistMessage")); + private static final Counter PERSIST_MESSAGE_COUNTER = Metrics.counter( + name(MessagesManager.class, "persistMessage")); private final MessagesDynamoDb messagesDynamoDb; private final MessagesCache messagesCache; @@ -124,12 +117,9 @@ public class MessagesManager { .thenComposeAsync(removed -> { if (removed.isPresent()) { - cacheHitByGuidMeter.mark(); return CompletableFuture.completedFuture(removed); } - cacheMissByGuidMeter.mark(); - if (serverTimestamp == null) { return messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, guid); } else { @@ -159,7 +149,7 @@ public class MessagesManager { try { messagesRemovedFromCache = messagesCache.remove(destinationUuid, destinationDeviceId, messageGuids) .get(30, TimeUnit.SECONDS).size(); - persistMessageMeter.mark(nonEphemeralMessages.size()); + PERSIST_MESSAGE_COUNTER.increment(nonEphemeralMessages.size()); } catch (InterruptedException | ExecutionException | TimeoutException e) { logger.warn("Failed to remove messages from cache", e); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 8de84af64..d341a0bff 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2022 Signal Messenger, LLC + * Copyright 2013 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ @@ -7,11 +7,9 @@ package org.whispersystems.textsecuregcm.websocket; import static com.codahale.metrics.MetricRegistry.name; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; @@ -50,7 +48,6 @@ import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener; import org.whispersystems.textsecuregcm.storage.MessagesManager; -import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.HeaderUtils; import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.WebSocketResourceProvider; @@ -63,18 +60,17 @@ import reactor.core.scheduler.Scheduler; public class WebSocketConnection implements MessageAvailabilityListener, DisplacedPresenceListener { - private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private static final Histogram messageTime = metricRegistry.histogram( - name(MessageController.class, "message_delivery_duration")); - private static final Histogram primaryDeviceMessageTime = metricRegistry.histogram( - name(MessageController.class, "primary_device_message_delivery_duration")); - private static final Meter sendMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_message")); - private static final Meter messageAvailableMeter = metricRegistry.meter( + private static final DistributionSummary messageTime = Metrics.summary( + name(MessageController.class, "messageDeliveryDuration")); + private static final DistributionSummary primaryDeviceMessageTime = Metrics.summary( + name(MessageController.class, "primaryDeviceMessageDeliveryDuration")); + private static final Counter sendMessageCounter = Metrics.counter(name(WebSocketConnection.class, "sendMessage")); + private static final Counter messageAvailableCounter = Metrics.counter( name(WebSocketConnection.class, "messagesAvailable")); - private static final Meter messagesPersistedMeter = metricRegistry.meter( + private static final Counter messagesPersistedCounter = Metrics.counter( name(WebSocketConnection.class, "messagesPersisted")); - private static final Meter bytesSentMeter = metricRegistry.meter(name(WebSocketConnection.class, "bytes_sent")); - private static final Meter sendFailuresMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_failures")); + private static final Counter bytesSentCounter = Metrics.counter(name(WebSocketConnection.class, "bytesSent")); + private static final Counter sendFailuresCounter = Metrics.counter(name(WebSocketConnection.class, "sendFailures")); private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class, "initialQueueLength"); @@ -209,9 +205,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac // clear ephemeral field from the envelope final Optional body = Optional.ofNullable(message.toBuilder().clearEphemeral().build().toByteArray()); - sendMessageMeter.mark(); + sendMessageCounter.increment(); sentMessageCounter.increment(); - bytesSentMeter.mark(body.map(bytes -> bytes.length).orElse(0)); + bytesSentCounter.increment(body.map(bytes -> bytes.length).orElse(0)); MessageMetrics.measureAccountEnvelopeUuidMismatches(auth.getAccount(), message); // X-Signal-Key: false must be sent until Android stops assuming it missing means true @@ -219,7 +215,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac List.of(HeaderUtils.X_SIGNAL_KEY + ": false", HeaderUtils.getTimestampHeader()), body) .whenComplete((ignored, throwable) -> { if (throwable != null) { - sendFailuresMeter.mark(); + sendFailuresCounter.increment(); } else { MessageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), "websocket", client.getUserAgent(), clientReleaseManager); } @@ -258,9 +254,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac public static void recordMessageDeliveryDuration(long timestamp, Device messageDestinationDevice) { final long messageDeliveryDuration = System.currentTimeMillis() - timestamp; - messageTime.update(messageDeliveryDuration); + messageTime.record(messageDeliveryDuration); if (messageDestinationDevice.isPrimary()) { - primaryDeviceMessageTime.update(messageDeliveryDuration); + primaryDeviceMessageTime.record(messageDeliveryDuration); } } @@ -429,7 +425,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac return false; } - messageAvailableMeter.mark(); + messageAvailableCounter.increment(); storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE); @@ -445,7 +441,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment(); return false; } - messagesPersistedMeter.mark(); + messagesPersistedCounter.increment(); storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);