Migrate remaining custom metrics from Dropwizard to Micrometer

And remove some that are obsolete or duplicative.
This commit is contained in:
Chris Eager 2024-04-17 15:35:04 -05:00 committed by GitHub
parent 419ec6e308
commit a38bf25e68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 234 additions and 536 deletions

View File

@ -8,11 +8,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import io.dropwizard.core.Configuration;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import org.whispersystems.textsecuregcm.attachments.TusConfiguration;
@ -170,11 +168,6 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private RedisClusterConfiguration clientPresenceCluster;
@Valid
@NotNull
@JsonProperty
private Set<String> testDevices = new HashSet<>();
@Valid
@NotNull
@JsonProperty
@ -461,10 +454,6 @@ public class WhisperServerConfiguration extends Configuration {
return unidentifiedDelivery;
}
public Set<String> getTestDevices() {
return testDevices;
}
public Map<String, Integer> getMaxDevices() {
Map<String, Integer> results = new HashMap<>();

View File

@ -671,8 +671,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(clientReleaseManager);
environment.lifecycle().manage(virtualThreadPinEventMonitor);
final RegistrationCaptchaManager registrationCaptchaManager = new RegistrationCaptchaManager(captchaChecker,
rateLimiters, config.getTestDevices(), dynamicConfigurationManager);
final RegistrationCaptchaManager registrationCaptchaManager = new RegistrationCaptchaManager(captchaChecker);
StaticCredentialsProvider cdnCredentialsProvider = StaticCredentialsProvider
.create(AwsBasicCredentials.create(

View File

@ -5,50 +5,15 @@
package org.whispersystems.textsecuregcm.captcha;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicCaptchaConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.controllers.AccountController;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
public class RegistrationCaptchaManager {
private static final Logger logger = LoggerFactory.getLogger(RegistrationCaptchaManager.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter countryFilteredHostMeter = metricRegistry.meter(
name(AccountController.class, "country_limited_host"));
private final Meter rateLimitedHostMeter = metricRegistry.meter(name(AccountController.class, "rate_limited_host"));
private final Meter rateLimitedPrefixMeter = metricRegistry.meter(
name(AccountController.class, "rate_limited_prefix"));
private final CaptchaChecker captchaChecker;
private final RateLimiters rateLimiters;
private final Set<String> testDevices;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
public RegistrationCaptchaManager(final CaptchaChecker captchaChecker, final RateLimiters rateLimiters,
final Set<String> testDevices,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
public RegistrationCaptchaManager(final CaptchaChecker captchaChecker) {
this.captchaChecker = captchaChecker;
this.rateLimiters = rateLimiters;
this.testDevices = testDevices;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")

View File

@ -1,29 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.metrics;
import static com.codahale.metrics.MetricRegistry.name;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
import java.util.List;
public class BufferPoolGauges {
private BufferPoolGauges() {}
public static void registerMetrics() {
for (final BufferPoolMXBean bufferPoolMXBean : ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class)) {
final List<Tag> tags = List.of(Tag.of("bufferPoolName", bufferPoolMXBean.getName()));
Metrics.gauge(name(BufferPoolGauges.class, "count"), tags, bufferPoolMXBean, BufferPoolMXBean::getCount);
Metrics.gauge(name(BufferPoolGauges.class, "memory_used"), tags, bufferPoolMXBean, BufferPoolMXBean::getMemoryUsed);
Metrics.gauge(name(BufferPoolGauges.class, "total_capacity"), tags, bufferPoolMXBean, BufferPoolMXBean::getTotalCapacity);
}
}
}

View File

@ -1,29 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.metrics;
import com.codahale.metrics.CachedGauge;
import com.sun.management.OperatingSystemMXBean;
import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;
public class CpuUsageGauge extends CachedGauge<Integer> {
private final OperatingSystemMXBean operatingSystemMXBean;
public CpuUsageGauge(final long timeout, final TimeUnit timeoutUnit) {
super(timeout, timeoutUnit);
this.operatingSystemMXBean = (com.sun.management.OperatingSystemMXBean)
ManagementFactory.getOperatingSystemMXBean();
}
@Override
protected Integer loadValue() {
return (int) Math.ceil(operatingSystemMXBean.getCpuLoad() * 100);
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.metrics;
import com.codahale.metrics.Gauge;
import java.io.File;
public class FileDescriptorGauge implements Gauge<Integer> {
@Override
public Integer getValue() {
File file = new File("/proc/self/fd");
if (file.isDirectory() && file.exists()) {
return file.list().length;
}
return 0;
}
}

View File

@ -5,21 +5,20 @@
package org.whispersystems.textsecuregcm.metrics;
import com.codahale.metrics.Gauge;
import com.sun.management.OperatingSystemMXBean;
import java.lang.management.ManagementFactory;
public class FreeMemoryGauge implements Gauge<Long> {
public class FreeMemoryGauge implements Gauge {
private final OperatingSystemMXBean operatingSystemMXBean;
public FreeMemoryGauge() {
this.operatingSystemMXBean = (com.sun.management.OperatingSystemMXBean)
ManagementFactory.getOperatingSystemMXBean();
ManagementFactory.getOperatingSystemMXBean();
}
@Override
public Long getValue() {
public double getValue() {
return operatingSystemMXBean.getFreeMemorySize();
}
}

View File

@ -5,7 +5,7 @@
package org.whispersystems.textsecuregcm.metrics;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
@ -15,14 +15,17 @@ import java.util.List;
public class GarbageCollectionGauges {
private GarbageCollectionGauges() {}
private GarbageCollectionGauges() {
}
public static void registerMetrics() {
for (final GarbageCollectorMXBean garbageCollectorMXBean : ManagementFactory.getGarbageCollectorMXBeans()) {
final List<Tag> tags = List.of(Tag.of("memoryManagerName", garbageCollectorMXBean.getName()));
public static void registerMetrics() {
for (final GarbageCollectorMXBean garbageCollectorMXBean : ManagementFactory.getGarbageCollectorMXBeans()) {
final List<Tag> tags = List.of(Tag.of("memoryManagerName", garbageCollectorMXBean.getName()));
Metrics.gauge(name(GarbageCollectionGauges.class, "collection_count"), tags, garbageCollectorMXBean, GarbageCollectorMXBean::getCollectionCount);
Metrics.gauge(name(GarbageCollectionGauges.class, "collection_time"), tags, garbageCollectorMXBean, GarbageCollectorMXBean::getCollectionTime);
}
Metrics.gauge(name(GarbageCollectionGauges.class, "collectionCount"), tags, garbageCollectorMXBean,
GarbageCollectorMXBean::getCollectionCount);
Metrics.gauge(name(GarbageCollectionGauges.class, "collectionTime"), tags, garbageCollectorMXBean,
GarbageCollectorMXBean::getCollectionTime);
}
}
}

View File

@ -0,0 +1,11 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.metrics;
interface Gauge {
double getValue();
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.metrics;
import com.codahale.metrics.Gauge;
import com.sun.management.UnixOperatingSystemMXBean;
import java.lang.management.ManagementFactory;
/**
* A gauge that reports the maximum number of file descriptors allowed by the operating system.
*/
public class MaxFileDescriptorGauge implements Gauge<Long> {
private final UnixOperatingSystemMXBean unixOperatingSystemMXBean;
public MaxFileDescriptorGauge() {
this.unixOperatingSystemMXBean = (UnixOperatingSystemMXBean)ManagementFactory.getOperatingSystemMXBean();
}
@Override
public Long getValue() {
return unixOperatingSystemMXBean.getMaxFileDescriptorCount();
}
}

View File

@ -12,10 +12,13 @@ import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics;
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.statsd.StatsdMeterRegistry;
import java.util.concurrent.TimeUnit;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.WhisperServerVersion;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
@ -101,19 +104,20 @@ public class MetricsUtil {
}
public static void registerSystemResourceMetrics(final Environment environment) {
environment.metrics().register(name(CpuUsageGauge.class, "cpu"), new CpuUsageGauge(3, TimeUnit.SECONDS));
environment.metrics().register(name(FreeMemoryGauge.class, "free_memory"), new FreeMemoryGauge());
environment.metrics().register(name(NetworkSentGauge.class, "bytes_sent"), new NetworkSentGauge());
environment.metrics().register(name(NetworkReceivedGauge.class, "bytes_received"), new NetworkReceivedGauge());
environment.metrics().register(name(FileDescriptorGauge.class, "fd_count"), new FileDescriptorGauge());
environment.metrics().register(name(MaxFileDescriptorGauge.class, "max_fd_count"), new MaxFileDescriptorGauge());
environment.metrics()
.register(name(OperatingSystemMemoryGauge.class, "buffers"), new OperatingSystemMemoryGauge("Buffers"));
environment.metrics()
.register(name(OperatingSystemMemoryGauge.class, "cached"), new OperatingSystemMemoryGauge("Cached"));
new ProcessorMetrics().bindTo(Metrics.globalRegistry);
registerGauge(name(FreeMemoryGauge.class, "freeMemory"), new FreeMemoryGauge());
new FileDescriptorMetrics().bindTo(Metrics.globalRegistry);
registerGauge(name(OperatingSystemMemoryGauge.class, "buffers"), new OperatingSystemMemoryGauge("Buffers"));
registerGauge(name(OperatingSystemMemoryGauge.class, "cached"), new OperatingSystemMemoryGauge("Cached"));
new JvmMemoryMetrics().bindTo(Metrics.globalRegistry);
new JvmThreadMetrics().bindTo(Metrics.globalRegistry);
BufferPoolGauges.registerMetrics();
GarbageCollectionGauges.registerMetrics();
}
private static void registerGauge(final String name, final Gauge gauge) {
Metrics.gauge(name, gauge, Gauge::getValue);
}
}

View File

@ -1,41 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.metrics;
import com.codahale.metrics.Gauge;
import org.whispersystems.textsecuregcm.util.Pair;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
public abstract class NetworkGauge implements Gauge<Double> {
protected Pair<Long, Long> getSentReceived() throws IOException {
File proc = new File("/proc/net/dev");
BufferedReader reader = new BufferedReader(new FileReader(proc));
String header = reader.readLine();
String header2 = reader.readLine();
long bytesSent = 0;
long bytesReceived = 0;
String interfaceStats;
while ((interfaceStats = reader.readLine()) != null) {
String[] stats = interfaceStats.split("\\s+");
if (!stats[1].equals("lo:")) {
bytesReceived += Long.parseLong(stats[2]);
bytesSent += Long.parseLong(stats[10]);
}
}
return new Pair<>(bytesSent, bytesReceived);
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.Pair;
import java.io.IOException;
public class NetworkReceivedGauge extends NetworkGauge {
private final Logger logger = LoggerFactory.getLogger(NetworkReceivedGauge.class);
private long lastTimestamp;
private long lastReceived;
public NetworkReceivedGauge() {
try {
this.lastTimestamp = System.currentTimeMillis();
this.lastReceived = getSentReceived().second();
} catch (IOException e) {
logger.warn(NetworkReceivedGauge.class.getSimpleName(), e);
}
}
@Override
public Double getValue() {
try {
long timestamp = System.currentTimeMillis();
Pair<Long, Long> sentAndReceived = getSentReceived();
double bytesReceived = sentAndReceived.second() - lastReceived;
double secondsElapsed = (timestamp - this.lastTimestamp) / 1000;
double result = bytesReceived / secondsElapsed;
this.lastTimestamp = timestamp;
this.lastReceived = sentAndReceived.second();
return result;
} catch (IOException e) {
logger.warn("NetworkReceivedGauge", e);
return -1D;
}
}
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.Pair;
import java.io.IOException;
public class NetworkSentGauge extends NetworkGauge {
private final Logger logger = LoggerFactory.getLogger(NetworkSentGauge.class);
private long lastTimestamp;
private long lastSent;
public NetworkSentGauge() {
try {
this.lastTimestamp = System.currentTimeMillis();
this.lastSent = getSentReceived().first();
} catch (IOException e) {
logger.warn(NetworkSentGauge.class.getSimpleName(), e);
}
}
@Override
public Double getValue() {
try {
long timestamp = System.currentTimeMillis();
Pair<Long, Long> sentAndReceived = getSentReceived();
double bytesTransmitted = sentAndReceived.first() - lastSent;
double secondsElapsed = (timestamp - this.lastTimestamp) / 1000;
double result = bytesTransmitted / secondsElapsed;
this.lastSent = sentAndReceived.first();
this.lastTimestamp = timestamp;
return result;
} catch (IOException e) {
logger.warn("NetworkSentGauge", e);
return -1D;
}
}
}

View File

@ -5,9 +5,7 @@
package org.whispersystems.textsecuregcm.metrics;
import com.codahale.metrics.Gauge;
import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@ -16,33 +14,33 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
public class OperatingSystemMemoryGauge implements Gauge<Long> {
public class OperatingSystemMemoryGauge implements Gauge {
private final String metricName;
private final String metricName;
private static final File MEMINFO_FILE = new File("/proc/meminfo");
private static final Pattern MEMORY_METRIC_PATTERN = Pattern.compile("^([^:]+):\\s+([0-9]+).*$");
private static final File MEMINFO_FILE = new File("/proc/meminfo");
private static final Pattern MEMORY_METRIC_PATTERN = Pattern.compile("^([^:]+):\\s+([0-9]+).*$");
public OperatingSystemMemoryGauge(final String metricName) {
this.metricName = metricName;
public OperatingSystemMemoryGauge(final String metricName) {
this.metricName = metricName;
}
@Override
public double getValue() {
try (final BufferedReader bufferedReader = new BufferedReader(new FileReader(MEMINFO_FILE))) {
return getValue(bufferedReader.lines());
} catch (final IOException e) {
return 0L;
}
}
@Override
public Long getValue() {
try (final BufferedReader bufferedReader = new BufferedReader(new FileReader(MEMINFO_FILE))) {
return getValue(bufferedReader.lines());
} catch (final IOException e) {
return 0L;
}
}
@VisibleForTesting
long getValue(final Stream<String> lines) {
return lines.map(MEMORY_METRIC_PATTERN::matcher)
.filter(Matcher::matches)
.filter(matcher -> this.metricName.equalsIgnoreCase(matcher.group(1)))
.map(matcher -> Long.parseLong(matcher.group(2), 10))
.findFirst()
.orElse(0L);
}
@VisibleForTesting
double getValue(final Stream<String> lines) {
return lines.map(MEMORY_METRIC_PATTERN::matcher)
.filter(Matcher::matches)
.filter(matcher -> this.metricName.equalsIgnoreCase(matcher.group(1)))
.map(matcher -> Double.parseDouble(matcher.group(2)))
.findFirst()
.orElse(0d);
}
}

View File

@ -7,10 +7,6 @@ package org.whispersystems.textsecuregcm.push;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import io.lettuce.core.LettuceFutures;
@ -21,6 +17,7 @@ import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.time.Duration;
@ -41,7 +38,6 @@ import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Constants;
/**
* The client presence manager keeps track of which clients are actively connected and "present" to receive messages.
@ -72,9 +68,9 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
private final Timer setPresenceTimer;
private final Timer clearPresenceTimer;
private final Timer prunePeersTimer;
private final Meter pruneClientMeter;
private final Meter remoteDisplacementMeter;
private final Meter pubSubMessageMeter;
private final Counter pruneClientMeter;
private final Counter remoteDisplacementMeter;
private final Counter pubSubMessageMeter;
private final Counter displacementListenerAlreadyRemovedCounter;
private static final int PRUNE_PEERS_INTERVAL_SECONDS = (int) Duration.ofSeconds(30).toSeconds();
@ -96,16 +92,15 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
this.scheduledExecutorService = scheduledExecutorService;
this.keyspaceNotificationExecutorService = keyspaceNotificationExecutorService;
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
metricRegistry.gauge(name(getClass(), "localClientCount"), () -> displacementListenersByPresenceKey::size);
Metrics.gauge(name(getClass(), "localClientCount"), this, ignored -> displacementListenersByPresenceKey.size());
this.checkPresenceTimer = metricRegistry.timer(name(getClass(), "checkPresence"));
this.setPresenceTimer = metricRegistry.timer(name(getClass(), "setPresence"));
this.clearPresenceTimer = metricRegistry.timer(name(getClass(), "clearPresence"));
this.prunePeersTimer = metricRegistry.timer(name(getClass(), "prunePeers"));
this.pruneClientMeter = metricRegistry.meter(name(getClass(), "pruneClient"));
this.remoteDisplacementMeter = metricRegistry.meter(name(getClass(), "remoteDisplacement"));
this.pubSubMessageMeter = metricRegistry.meter(name(getClass(), "pubSubMessage"));
this.checkPresenceTimer = Metrics.timer(name(getClass(), "checkPresence"));
this.setPresenceTimer = Metrics.timer(name(getClass(), "setPresence"));
this.clearPresenceTimer = Metrics.timer(name(getClass(), "clearPresence"));
this.prunePeersTimer = Metrics.timer(name(getClass(), "prunePeers"));
this.pruneClientMeter = Metrics.counter(name(getClass(), "pruneClient"));
this.remoteDisplacementMeter = Metrics.counter(name(getClass(), "remoteDisplacement"));
this.pubSubMessageMeter = Metrics.counter(name(getClass(), "pubSubMessage"));
this.displacementListenerAlreadyRemovedCounter = Metrics.counter(
name(getClass(), "displacementListenerAlreadyRemoved"));
}
@ -165,7 +160,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
public void setPresent(final UUID accountUuid, final byte deviceId,
final DisplacedPresenceListener displacementListener) {
try (final Timer.Context ignored = setPresenceTimer.time()) {
setPresenceTimer.record(() -> {
final String presenceKey = getPresenceKey(accountUuid, deviceId);
displacePresence(presenceKey, true);
@ -180,7 +175,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
});
subscribeForRemotePresenceChanges(presenceKey);
}
});
}
public void renewPresence(final UUID accountUuid, final byte deviceId) {
@ -224,10 +219,9 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
}
public boolean isPresent(final UUID accountUuid, final byte deviceId) {
try (final Timer.Context ignored = checkPresenceTimer.time()) {
return presenceCluster.withCluster(connection ->
connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1;
}
return checkPresenceTimer.record(() ->
presenceCluster.withCluster(connection ->
connection.sync().exists(getPresenceKey(accountUuid, deviceId))) == 1);
}
public boolean isLocallyPresent(final UUID accountUuid, final byte deviceId) {
@ -245,7 +239,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
}
private boolean clearPresence(final String presenceKey) {
try (final Timer.Context ignored = clearPresenceTimer.time()) {
return clearPresenceTimer.record(() -> {
displacementListenersByPresenceKey.remove(presenceKey);
unsubscribeFromRemotePresenceChanges(presenceKey);
@ -253,7 +247,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
presenceCluster.useCluster(connection -> connection.sync().srem(connectedClientSetKey, presenceKey));
return removed;
}
});
}
private void subscribeForRemotePresenceChanges(final String presenceKey) {
@ -277,7 +271,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
}
void pruneMissingPeers() {
try (final Timer.Context ignored = prunePeersTimer.time()) {
prunePeersTimer.record(() -> {
final Set<String> peerIds = presenceCluster.withCluster(
connection -> connection.sync().smembers(MANAGER_SET_KEY));
peerIds.remove(managerId);
@ -296,7 +290,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
while ((presenceKey = presenceCluster.withCluster(connection -> connection.sync().spop(connectedClientsKey)))
!= null) {
clearPresenceScript.execute(List.of(presenceKey), List.of(peerId));
pruneClientMeter.mark();
pruneClientMeter.increment();
}
presenceCluster.useCluster(connection -> {
@ -305,12 +299,12 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
});
}
}
}
});
}
@Override
public void message(final RedisClusterNode node, final String channel, final String message) {
pubSubMessageMeter.mark();
pubSubMessageMeter.increment();
if (channel.startsWith("__keyspace@0__:presence::{")) {
if ("set".equals(message) || "del".equals(message)) {
@ -323,7 +317,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
keyspaceNotificationExecutorService.execute(() -> {
try {
displacePresence(channel.substring("__keyspace@0__:".length()), connectedElsewhere);
remoteDisplacementMeter.mark();
remoteDisplacementMeter.increment();
} catch (final Exception e) {
log.warn("Error displacing presence", e);
}

View File

@ -8,9 +8,6 @@ package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import static java.util.Objects.requireNonNull;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.annotations.VisibleForTesting;
@ -18,6 +15,7 @@ import com.google.common.base.Preconditions;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Clock;
@ -60,7 +58,6 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.DestinationDeviceValidator;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Pair;
@ -72,19 +69,19 @@ import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
public class AccountsManager {
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Timer createTimer = metricRegistry.timer(name(AccountsManager.class, "create"));
private static final Timer updateTimer = metricRegistry.timer(name(AccountsManager.class, "update"));
private static final Timer getByNumberTimer = metricRegistry.timer(name(AccountsManager.class, "getByNumber"));
private static final Timer getByUsernameHashTimer = metricRegistry.timer(name(AccountsManager.class, "getByUsernameHash"));
private static final Timer getByUsernameLinkHandleTimer = metricRegistry.timer(name(AccountsManager.class, "getByUsernameLinkHandle"));
private static final Timer getByUuidTimer = metricRegistry.timer(name(AccountsManager.class, "getByUuid"));
private static final Timer deleteTimer = metricRegistry.timer(name(AccountsManager.class, "delete"));
private static final Timer createTimer = Metrics.timer(name(AccountsManager.class, "create"));
private static final Timer updateTimer = Metrics.timer(name(AccountsManager.class, "update"));
private static final Timer getByNumberTimer = Metrics.timer(name(AccountsManager.class, "getByNumber"));
private static final Timer getByUsernameHashTimer = Metrics.timer(name(AccountsManager.class, "getByUsernameHash"));
private static final Timer getByUsernameLinkHandleTimer = Metrics.timer(
name(AccountsManager.class, "getByUsernameLinkHandle"));
private static final Timer getByUuidTimer = Metrics.timer(name(AccountsManager.class, "getByUuid"));
private static final Timer deleteTimer = Metrics.timer(name(AccountsManager.class, "delete"));
private static final Timer redisSetTimer = metricRegistry.timer(name(AccountsManager.class, "redisSet"));
private static final Timer redisPniGetTimer = metricRegistry.timer(name(AccountsManager.class, "redisPniGet"));
private static final Timer redisUuidGetTimer = metricRegistry.timer(name(AccountsManager.class, "redisUuidGet"));
private static final Timer redisDeleteTimer = metricRegistry.timer(name(AccountsManager.class, "redisDelete"));
private static final Timer redisSetTimer = Metrics.timer(name(AccountsManager.class, "redisSet"));
private static final Timer redisPniGetTimer = Metrics.timer(name(AccountsManager.class, "redisPniGet"));
private static final Timer redisUuidGetTimer = Metrics.timer(name(AccountsManager.class, "redisUuidGet"));
private static final Timer redisDeleteTimer = Metrics.timer(name(AccountsManager.class, "redisDelete"));
private static final String CREATE_COUNTER_NAME = name(AccountsManager.class, "createCounter");
private static final String DELETE_COUNTER_NAME = name(AccountsManager.class, "deleteCounter");
@ -172,7 +169,7 @@ public class AccountsManager {
final Account account = new Account();
try (Timer.Context ignoredTimerContext = createTimer.time()) {
return createTimer.record(() -> {
accountLockManager.withLock(List.of(number), () -> {
final Optional<UUID> maybeRecentlyDeletedAccountIdentifier =
accounts.findRecentlyDeletedAccountIdentifier(number);
@ -259,7 +256,7 @@ public class AccountsManager {
}, accountLockExecutor);
return account;
}
});
}
public CompletableFuture<Pair<Account, Device>> addDevice(final Account account, final DeviceSpec deviceSpec) {
@ -689,29 +686,27 @@ public class AccountsManager {
*/
private Account update(Account account, Function<Account, Boolean> updater) {
final Account updatedAccount;
try (Timer.Context ignored = updateTimer.time()) {
return updateTimer.record(() -> {
redisDelete(account);
final UUID uuid = account.getUuid();
updatedAccount = updateWithRetries(account,
final Account updatedAccount = updateWithRetries(account,
updater,
accounts::update,
() -> accounts.getByAccountIdentifier(uuid).orElseThrow(),
AccountChangeValidator.GENERAL_CHANGE_VALIDATOR);
redisSet(updatedAccount);
}
return updatedAccount;
return updatedAccount;
});
}
private CompletableFuture<Account> updateAsync(final Account account, final Function<Account, Boolean> updater) {
final Timer.Context timerContext = updateTimer.time();
final Timer.Sample timerSample = Timer.start();
return redisDeleteAsync(account)
.thenCompose(ignored -> {
@ -725,7 +720,7 @@ public class AccountsManager {
MAX_UPDATE_ATTEMPTS);
})
.thenCompose(updatedAccount -> redisSetAsync(updatedAccount).thenApply(ignored -> updatedAccount))
.whenComplete((ignored, throwable) -> timerContext.close());
.whenComplete((ignored, throwable) -> timerSample.stop(updateTimer));
}
private Account updateWithRetries(Account account,
@ -859,13 +854,13 @@ public class AccountsManager {
}
public Optional<Account> getByE164(final String number) {
return getByNumberTimer.timeSupplier(() -> accounts.getByE164(number));
return getByNumberTimer.record(() -> accounts.getByE164(number));
}
public CompletableFuture<Optional<Account>> getByE164Async(final String number) {
final Timer.Context context = getByNumberTimer.time();
Timer.Sample sample = Timer.start();
return accounts.getByE164Async(number)
.whenComplete((ignoredResult, ignoredThrowable) -> context.close());
.whenComplete((ignoredResult, ignoredThrowable) -> sample.stop(getByNumberTimer));
}
public Optional<Account> getByPhoneNumberIdentifier(final UUID pni) {
@ -885,15 +880,15 @@ public class AccountsManager {
}
public CompletableFuture<Optional<Account>> getByUsernameLinkHandle(final UUID usernameLinkHandle) {
final Timer.Context context = getByUsernameLinkHandleTimer.time();
final Timer.Sample sample = Timer.start();
return accounts.getByUsernameLinkHandle(usernameLinkHandle)
.whenComplete((ignoredResult, ignoredThrowable) -> context.close());
.whenComplete((ignoredResult, ignoredThrowable) -> sample.stop(getByUsernameLinkHandleTimer));
}
public CompletableFuture<Optional<Account>> getByUsernameHash(final byte[] usernameHash) {
final Timer.Context context = getByUsernameHashTimer.time();
final Timer.Sample sample = Timer.start();
return accounts.getByUsernameHash(usernameHash)
.whenComplete((ignoredResult, ignoredThrowable) -> context.close());
.whenComplete((ignoredResult, ignoredThrowable) -> sample.stop(getByUsernameHashTimer));
}
public Optional<Account> getByServiceIdentifier(final ServiceIdentifier serviceIdentifier) {
@ -943,11 +938,11 @@ public class AccountsManager {
}
public CompletableFuture<Void> delete(final Account account, final DeletionReason deletionReason) {
@SuppressWarnings("resource") final Timer.Context timerContext = deleteTimer.time();
final Timer.Sample sample = Timer.start();
return accountLockManager.withLockAsync(List.of(account.getNumber()), () -> delete(account), accountLockExecutor)
.whenComplete((ignored, throwable) -> {
timerContext.close();
sample.stop(deleteTimer);
if (throwable == null) {
Metrics.counter(DELETE_COUNTER_NAME,
@ -984,10 +979,6 @@ public class AccountsManager {
clientPresenceManager.disconnectPresence(account.getUuid(), device.getId()))), clientPresenceExecutor);
}
private String getUsernameHashAccountMapKey(byte[] usernameHash) {
return "UAccountMap::" + Base64.getUrlEncoder().withoutPadding().encodeToString(usernameHash);
}
private String getAccountMapKey(String key) {
return "AccountMap::" + key;
}
@ -997,18 +988,21 @@ public class AccountsManager {
}
private void redisSet(Account account) {
try (Timer.Context ignored = redisSetTimer.time()) {
final String accountJson = writeRedisAccountJson(account);
redisSetTimer.record(() -> {
try {
final String accountJson = writeRedisAccountJson(account);
cacheCluster.useCluster(connection -> {
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
cacheCluster.useCluster(connection -> {
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.setex(getAccountMapKey(account.getPhoneNumberIdentifier().toString()), CACHE_TTL_SECONDS, account.getUuid().toString());
commands.setex(getAccountEntityKey(account.getUuid()), CACHE_TTL_SECONDS, accountJson);
});
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
commands.setex(getAccountMapKey(account.getPhoneNumberIdentifier().toString()), CACHE_TTL_SECONDS,
account.getUuid().toString());
commands.setex(getAccountEntityKey(account.getUuid()), CACHE_TTL_SECONDS, accountJson);
});
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
});
}
private CompletableFuture<Void> redisSetAsync(final Account account) {
@ -1033,14 +1027,14 @@ public class AccountsManager {
final Timer overallTimer,
final Supplier<Optional<Account>> resolveFromRedis,
final Supplier<Optional<Account>> resolveFromAccounts) {
try (final Timer.Context ignored = overallTimer.time()) {
return overallTimer.record(() -> {
Optional<Account> account = resolveFromRedis.get();
if (account.isEmpty()) {
account = resolveFromAccounts.get();
account.ifPresent(this::redisSet);
}
return account;
}
});
}
private CompletableFuture<Optional<Account>> checkRedisThenAccountsAsync(
@ -1048,7 +1042,7 @@ public class AccountsManager {
final Supplier<CompletableFuture<Optional<Account>>> resolveFromRedis,
final Supplier<CompletableFuture<Optional<Account>>> resolveFromAccounts) {
@SuppressWarnings("resource") final Timer.Context timerContext = overallTimer.time();
final Timer.Sample sample = Timer.start();
return resolveFromRedis.get()
.thenCompose(maybeAccountFromRedis -> maybeAccountFromRedis
@ -1057,11 +1051,12 @@ public class AccountsManager {
.thenCompose(maybeAccountFromAccounts -> maybeAccountFromAccounts
.map(account -> redisSetAsync(account).thenApply(ignored -> maybeAccountFromAccounts))
.orElseGet(() -> CompletableFuture.completedFuture(maybeAccountFromAccounts)))))
.whenComplete((ignored, throwable) -> timerContext.close());
.whenComplete((ignored, throwable) -> sample.stop(overallTimer));
}
private Optional<Account> redisGetBySecondaryKey(final String secondaryKey, final Timer timer) {
try (final Timer.Context ignored = timer.time()) {
return timer.record(() -> {
try {
return Optional.ofNullable(cacheCluster.withCluster(connection -> connection.sync().get(secondaryKey)))
.map(UUID::fromString)
.flatMap(this::getByAccountIdentifier);
@ -1072,10 +1067,11 @@ public class AccountsManager {
logger.warn("Redis failure", e);
return Optional.empty();
}
});
}
private CompletableFuture<Optional<Account>> redisGetBySecondaryKeyAsync(final String secondaryKey, final Timer timer) {
@SuppressWarnings("resource") final Timer.Context timerContext = timer.time();
final Timer.Sample sample = Timer.start();
return cacheCluster.withCluster(connection -> connection.async().get(secondaryKey))
.thenCompose(nullableUuid -> {
@ -1089,19 +1085,21 @@ public class AccountsManager {
logger.warn("Failed to retrieve account from Redis", throwable);
return Optional.empty();
})
.whenComplete((ignored, throwable) -> timerContext.close())
.whenComplete((ignored, throwable) -> sample.stop(timer))
.toCompletableFuture();
}
private Optional<Account> redisGetByAccountIdentifier(UUID uuid) {
try (Timer.Context ignored = redisUuidGetTimer.time()) {
final String json = cacheCluster.withCluster(connection -> connection.sync().get(getAccountEntityKey(uuid)));
return redisUuidGetTimer.record(() -> {
try {
final String json = cacheCluster.withCluster(connection -> connection.sync().get(getAccountEntityKey(uuid)));
return parseAccountJson(json, uuid);
} catch (final RedisException e) {
logger.warn("Redis failure", e);
return Optional.empty();
}
return parseAccountJson(json, uuid);
} catch (final RedisException e) {
logger.warn("Redis failure", e);
return Optional.empty();
}
});
}
private CompletableFuture<Optional<Account>> redisGetByAccountIdentifierAsync(final UUID uuid) {
@ -1141,17 +1139,14 @@ public class AccountsManager {
}
private void redisDelete(final Account account) {
try (final Timer.Context ignored = redisDeleteTimer.time()) {
cacheCluster.useCluster(connection -> {
connection.sync().del(
getAccountMapKey(account.getPhoneNumberIdentifier().toString()),
getAccountEntityKey(account.getUuid()));
});
}
redisDeleteTimer.record(() ->
cacheCluster.useCluster(connection ->
connection.sync().del(getAccountMapKey(account.getPhoneNumberIdentifier().toString()),
getAccountEntityKey(account.getUuid()))));
}
private CompletableFuture<Void> redisDeleteAsync(final Account account) {
@SuppressWarnings("resource") final Timer.Context timerContext = redisDeleteTimer.time();
final Timer.Sample sample = Timer.start();
final String[] keysToDelete = new String[]{
getAccountMapKey(account.getPhoneNumberIdentifier().toString()),
@ -1160,7 +1155,7 @@ public class AccountsManager {
return cacheCluster.withCluster(connection -> connection.async().del(keysToDelete))
.toCompletableFuture()
.whenComplete((ignoredResult, ignoredException) -> timerContext.close())
.whenComplete((ignoredResult, ignoredException) -> sample.stop(redisDeleteTimer))
.thenRun(Util.NOOP);
}
}

View File

@ -5,17 +5,14 @@
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import static io.micrometer.core.instrument.Metrics.counter;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
@ -30,7 +27,6 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;
@ -47,18 +43,24 @@ public class MessagePersister implements Managed {
private final Duration persistDelay;
private final boolean dedicatedProcess;
private final Thread[] workerThreads;
private volatile boolean running;
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer getQueuesTimer = metricRegistry.timer(name(MessagePersister.class, "getQueues"));
private final Timer persistQueueTimer = metricRegistry.timer(name(MessagePersister.class, "persistQueue"));
private final Meter persistQueueExceptionMeter = metricRegistry.meter(
private final Timer getQueuesTimer = Metrics.timer(name(MessagePersister.class, "getQueues"));
private final Timer persistQueueTimer = Metrics.timer(name(MessagePersister.class, "persistQueue"));
private final Counter persistQueueExceptionMeter = Metrics.counter(
name(MessagePersister.class, "persistQueueException"));
private final Counter oversizedQueueCounter = counter(name(MessagePersister.class, "persistQueueOversized"));
private final Histogram queueCountHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueCount"));
private final Histogram queueSizeHistogram = metricRegistry.histogram(name(MessagePersister.class, "queueSize"));
private final Counter oversizedQueueCounter = Metrics.counter(name(MessagePersister.class, "persistQueueOversized"));
private final DistributionSummary queueCountDistributionSummery = DistributionSummary.builder(
name(MessagePersister.class, "queueCount"))
.publishPercentiles(0.5, 0.75, 0.95, 0.99, 0.999)
.distributionStatisticExpiry(Duration.ofMinutes(10))
.register(Metrics.globalRegistry);
private final DistributionSummary queueSizeDistributionSummery = DistributionSummary.builder(
name(MessagePersister.class, "queueSize"))
.publishPercentiles(0.5, 0.75, 0.95, 0.99, 0.999)
.distributionStatisticExpiry(Duration.ofMinutes(10))
.register(Metrics.globalRegistry);
private final ExecutorService executor;
static final int QUEUE_BATCH_LIMIT = 100;
@ -86,7 +88,6 @@ public class MessagePersister implements Managed {
this.keysManager = keysManager;
this.persistDelay = persistDelay;
this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount];
this.dedicatedProcess = true;
this.executor = executor;
for (int i = 0; i < workerThreads.length; i++) {
@ -96,7 +97,7 @@ public class MessagePersister implements Managed {
.isPersistenceEnabled()) {
try {
final int queuesPersisted = persistNextQueues(Instant.now());
queueCountHistogram.update(queuesPersisted);
queueCountDistributionSummery.record(queuesPersisted);
if (queuesPersisted == 0) {
Util.sleep(100);
@ -148,9 +149,8 @@ public class MessagePersister implements Managed {
int queuesPersisted = 0;
do {
try (final Timer.Context ignored = getQueuesTimer.time()) {
queuesToPersist = messagesCache.getQueuesToPersist(slot, currentTime.minus(persistDelay), QUEUE_BATCH_LIMIT);
}
queuesToPersist = getQueuesTimer.record(
() -> messagesCache.getQueuesToPersist(slot, currentTime.minus(persistDelay), QUEUE_BATCH_LIMIT));
for (final String queue : queuesToPersist) {
final UUID accountUuid = MessagesCache.getAccountUuidFromQueueName(queue);
@ -164,7 +164,7 @@ public class MessagePersister implements Managed {
try {
persistQueue(maybeAccount.get(), deviceId);
} catch (final Exception e) {
persistQueueExceptionMeter.mark();
persistQueueExceptionMeter.increment();
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e);
messagesCache.addQueueToPersist(accountUuid, deviceId);
@ -182,41 +182,44 @@ public class MessagePersister implements Managed {
@VisibleForTesting
void persistQueue(final Account account, final byte deviceId) throws MessagePersistenceException {
final UUID accountUuid = account.getUuid();
try (final Timer.Context ignored = persistQueueTimer.time()) {
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
try {
int messageCount = 0;
List<MessageProtos.Envelope> messages;
final Timer.Sample sample = Timer.start();
int consecutiveEmptyCacheRemovals = 0;
messagesCache.lockQueueForPersistence(accountUuid, deviceId);
do {
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
try {
int messageCount = 0;
List<MessageProtos.Envelope> messages;
int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, deviceId, messages);
messageCount += messages.size();
int consecutiveEmptyCacheRemovals = 0;
if (messagesRemovedFromCache == 0) {
consecutiveEmptyCacheRemovals += 1;
} else {
consecutiveEmptyCacheRemovals = 0;
}
do {
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
if (consecutiveEmptyCacheRemovals > CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT) {
throw new MessagePersistenceException("persistence failure loop detected");
}
int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, deviceId, messages);
messageCount += messages.size();
} while (!messages.isEmpty());
if (messagesRemovedFromCache == 0) {
consecutiveEmptyCacheRemovals += 1;
} else {
consecutiveEmptyCacheRemovals = 0;
}
queueSizeHistogram.update(messageCount);
} catch (ItemCollectionSizeLimitExceededException e) {
oversizedQueueCounter.increment();
unlinkLeastActiveDevice(account, deviceId); // this will either do a deferred reschedule for retry or throw
} finally {
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
}
if (consecutiveEmptyCacheRemovals > CONSECUTIVE_EMPTY_CACHE_REMOVAL_LIMIT) {
throw new MessagePersistenceException("persistence failure loop detected");
}
} while (!messages.isEmpty());
queueSizeDistributionSummery.record(messageCount);
} catch (ItemCollectionSizeLimitExceededException e) {
oversizedQueueCounter.increment();
unlinkLeastActiveDevice(account, deviceId); // this will either do a deferred reschedule for retry or throw
} finally {
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
sample.stop(persistQueueTimer);
}
}
@VisibleForTesting

View File

@ -4,11 +4,9 @@
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.util.List;
import java.util.Optional;
@ -24,8 +22,6 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux;
@ -34,15 +30,12 @@ import reactor.core.publisher.Mono;
public class MessagesManager {
private static final int RESULT_SET_CHUNK_SIZE = 100;
final String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = MetricsUtil.name(MessagesManager.class, "getMessagesForDevice");
final String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = name(MessagesManager.class, "getMessagesForDevice");
private static final Logger logger = LoggerFactory.getLogger(MessagesManager.class);
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid"));
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(
name(MessagesManager.class, "cacheMissByGuid"));
private static final Meter persistMessageMeter = metricRegistry.meter(name(MessagesManager.class, "persistMessage"));
private static final Counter PERSIST_MESSAGE_COUNTER = Metrics.counter(
name(MessagesManager.class, "persistMessage"));
private final MessagesDynamoDb messagesDynamoDb;
private final MessagesCache messagesCache;
@ -124,12 +117,9 @@ public class MessagesManager {
.thenComposeAsync(removed -> {
if (removed.isPresent()) {
cacheHitByGuidMeter.mark();
return CompletableFuture.completedFuture(removed);
}
cacheMissByGuidMeter.mark();
if (serverTimestamp == null) {
return messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, guid);
} else {
@ -159,7 +149,7 @@ public class MessagesManager {
try {
messagesRemovedFromCache = messagesCache.remove(destinationUuid, destinationDeviceId, messageGuids)
.get(30, TimeUnit.SECONDS).size();
persistMessageMeter.mark(nonEphemeralMessages.size());
PERSIST_MESSAGE_COUNTER.increment(nonEphemeralMessages.size());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.warn("Failed to remove messages from cache", e);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2013-2022 Signal Messenger, LLC
* Copyright 2013 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
@ -7,11 +7,9 @@ package org.whispersystems.textsecuregcm.websocket;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
@ -50,7 +48,6 @@ import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.HeaderUtils;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.WebSocketResourceProvider;
@ -63,18 +60,17 @@ import reactor.core.scheduler.Scheduler;
public class WebSocketConnection implements MessageAvailabilityListener, DisplacedPresenceListener {
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Histogram messageTime = metricRegistry.histogram(
name(MessageController.class, "message_delivery_duration"));
private static final Histogram primaryDeviceMessageTime = metricRegistry.histogram(
name(MessageController.class, "primary_device_message_delivery_duration"));
private static final Meter sendMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_message"));
private static final Meter messageAvailableMeter = metricRegistry.meter(
private static final DistributionSummary messageTime = Metrics.summary(
name(MessageController.class, "messageDeliveryDuration"));
private static final DistributionSummary primaryDeviceMessageTime = Metrics.summary(
name(MessageController.class, "primaryDeviceMessageDeliveryDuration"));
private static final Counter sendMessageCounter = Metrics.counter(name(WebSocketConnection.class, "sendMessage"));
private static final Counter messageAvailableCounter = Metrics.counter(
name(WebSocketConnection.class, "messagesAvailable"));
private static final Meter messagesPersistedMeter = metricRegistry.meter(
private static final Counter messagesPersistedCounter = Metrics.counter(
name(WebSocketConnection.class, "messagesPersisted"));
private static final Meter bytesSentMeter = metricRegistry.meter(name(WebSocketConnection.class, "bytes_sent"));
private static final Meter sendFailuresMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_failures"));
private static final Counter bytesSentCounter = Metrics.counter(name(WebSocketConnection.class, "bytesSent"));
private static final Counter sendFailuresCounter = Metrics.counter(name(WebSocketConnection.class, "sendFailures"));
private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class,
"initialQueueLength");
@ -209,9 +205,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
// clear ephemeral field from the envelope
final Optional<byte[]> body = Optional.ofNullable(message.toBuilder().clearEphemeral().build().toByteArray());
sendMessageMeter.mark();
sendMessageCounter.increment();
sentMessageCounter.increment();
bytesSentMeter.mark(body.map(bytes -> bytes.length).orElse(0));
bytesSentCounter.increment(body.map(bytes -> bytes.length).orElse(0));
MessageMetrics.measureAccountEnvelopeUuidMismatches(auth.getAccount(), message);
// X-Signal-Key: false must be sent until Android stops assuming it missing means true
@ -219,7 +215,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
List.of(HeaderUtils.X_SIGNAL_KEY + ": false", HeaderUtils.getTimestampHeader()), body)
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
sendFailuresMeter.mark();
sendFailuresCounter.increment();
} else {
MessageMetrics.measureOutgoingMessageLatency(message.getServerTimestamp(), "websocket", client.getUserAgent(), clientReleaseManager);
}
@ -258,9 +254,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
public static void recordMessageDeliveryDuration(long timestamp, Device messageDestinationDevice) {
final long messageDeliveryDuration = System.currentTimeMillis() - timestamp;
messageTime.update(messageDeliveryDuration);
messageTime.record(messageDeliveryDuration);
if (messageDestinationDevice.isPrimary()) {
primaryDeviceMessageTime.update(messageDeliveryDuration);
primaryDeviceMessageTime.record(messageDeliveryDuration);
}
}
@ -429,7 +425,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
return false;
}
messageAvailableMeter.mark();
messageAvailableCounter.increment();
storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE);
@ -445,7 +441,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment();
return false;
}
messagesPersistedMeter.mark();
messagesPersistedCounter.increment();
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);