diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 9458d4e85..19439b16b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -44,7 +44,9 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @@ -66,6 +68,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static final Meter discardedMessagesMeter = metricRegistry.meter(name(WebSocketConnection.class, "discardedMessages")); private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class, "initialQueueLength"); + private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue"); private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement"); private static final String NON_SUCCESS_RESPONSE_COUNTER_NAME = name(WebSocketConnection.class, "clientNonSuccessResponse"); private static final String STATUS_CODE_TAG = "status"; @@ -89,6 +92,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private final AtomicReference storedMessageState = new AtomicReference<>(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE); private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false); private final LongAdder sentMessageCounter = new LongAdder(); + private final AtomicLong queueDrainStartTime = new AtomicLong(); private enum StoredMessageState { EMPTY, @@ -120,6 +124,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac } public void start() { + queueDrainStartTime.set(System.currentTimeMillis()); processStoredMessages(); } @@ -198,6 +203,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac queueClearedFuture.whenComplete((v, cause) -> { if (cause == null && sentInitialQueueEmptyMessage.compareAndSet(false, true)) { Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()))).record(sentMessageCounter.sum()); + Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()))).record(System.currentTimeMillis() - queueDrainStartTime.get(), TimeUnit.MILLISECONDS); + client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty()); }