diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java index 1697fb545..140f9fcb3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java @@ -16,6 +16,11 @@ public interface ClientEventListener { */ void handleNewMessageAvailable(); + /** + * Indicates that messages for the client have been persisted from short-term storage to long-term storage. + */ + void handleMessagesPersistedPubSub(); + /** * Indicates that the client's presence has been displaced and the listener should close the client's underlying * network connection. diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java index 1d55f60a3..709b7bd18 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java @@ -347,6 +347,8 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter listenerEventExecutor.execute(() -> listener.handleConnectionDisplaced(false)); + case MESSAGES_PERSISTED -> listenerEventExecutor.execute(listener::handleMessagesPersistedPubSub); + default -> logger.warn("Unexpected client event type: {}", clientEvent.getClass()); } } else { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 1d907e4da..e6e6a72fc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -71,8 +71,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static final DistributionSummary primaryDeviceMessageTime = Metrics.summary( name(MessageController.class, "primaryDeviceMessageDeliveryDuration")); private static final Counter sendMessageCounter = Metrics.counter(name(WebSocketConnection.class, "sendMessage")); - private static final Counter messagesPersistedCounter = Metrics.counter( - name(WebSocketConnection.class, "messagesPersisted")); private static final Counter bytesSentCounter = Metrics.counter(name(WebSocketConnection.class, "bytesSent")); private static final Counter sendFailuresCounter = Metrics.counter(name(WebSocketConnection.class, "sendFailures")); @@ -91,6 +89,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac private static final String SEND_MESSAGE_ERROR_COUNTER = MetricsUtil.name(WebSocketConnection.class, "sendMessageError"); private static final String MESSAGE_AVAILABLE_COUNTER_NAME = name(WebSocketConnection.class, "messagesAvailable"); + private static final String MESSAGES_PERSISTED_COUNTER_NAME = name(WebSocketConnection.class, "messagesPersisted"); private static final String PRESENCE_MANAGER_TAG = "presenceManager"; private static final String STATUS_CODE_TAG = "status"; @@ -495,7 +494,10 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment(); return false; } - messagesPersistedCounter.increment(); + + Metrics.counter(MESSAGES_PERSISTED_COUNTER_NAME, + PRESENCE_MANAGER_TAG, "legacy") + .increment(); storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE); @@ -504,6 +506,13 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac return true; } + @Override + public void handleMessagesPersistedPubSub() { + Metrics.counter(MESSAGES_PERSISTED_COUNTER_NAME, + PRESENCE_MANAGER_TAG, "pubsub") + .increment(); + } + @Override public void handleDisplacement(final boolean connectedElsewhere) { final Tags tags = Tags.of( diff --git a/service/src/main/proto/ClientPresence.proto b/service/src/main/proto/ClientPresence.proto index 8e2bef36d..129aaf77d 100644 --- a/service/src/main/proto/ClientPresence.proto +++ b/service/src/main/proto/ClientPresence.proto @@ -14,6 +14,7 @@ message ClientEvent { NewMessageAvailableEvent new_message_available = 1; ClientConnectedEvent client_connected = 2; DisconnectRequested disconnect_requested = 3; + MessagesPersistedEvent messages_persisted = 4; } } @@ -36,3 +37,10 @@ message ClientConnectedEvent { */ message DisconnectRequested { } + +/** + * Indicates that messages for the client have been persisted from short-term + * storage to long-term storage. + */ +message MessagesPersistedEvent { +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java index 7c2d330a9..98d368b48 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java @@ -56,6 +56,10 @@ class PubSubClientEventManagerTest { public void handleNewMessageAvailable() { } + @Override + public void handleMessagesPersistedPubSub() { + } + @Override public void handleConnectionDisplaced(final boolean connectedElsewhere) { }