Prepare to receive client events about persisted messages

This commit is contained in:
Jon Chambers 2024-11-06 14:29:40 -05:00 committed by Jon Chambers
parent 9d19fc9ecc
commit 96a4d4c8ac
5 changed files with 31 additions and 3 deletions

View File

@ -16,6 +16,11 @@ public interface ClientEventListener {
*/ */
void handleNewMessageAvailable(); void handleNewMessageAvailable();
/**
* Indicates that messages for the client have been persisted from short-term storage to long-term storage.
*/
void handleMessagesPersistedPubSub();
/** /**
* Indicates that the client's presence has been displaced and the listener should close the client's underlying * Indicates that the client's presence has been displaced and the listener should close the client's underlying
* network connection. * network connection.

View File

@ -347,6 +347,8 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
case DISCONNECT_REQUESTED -> listenerEventExecutor.execute(() -> listener.handleConnectionDisplaced(false)); case DISCONNECT_REQUESTED -> listenerEventExecutor.execute(() -> listener.handleConnectionDisplaced(false));
case MESSAGES_PERSISTED -> listenerEventExecutor.execute(listener::handleMessagesPersistedPubSub);
default -> logger.warn("Unexpected client event type: {}", clientEvent.getClass()); default -> logger.warn("Unexpected client event type: {}", clientEvent.getClass());
} }
} else { } else {

View File

@ -71,8 +71,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private static final DistributionSummary primaryDeviceMessageTime = Metrics.summary( private static final DistributionSummary primaryDeviceMessageTime = Metrics.summary(
name(MessageController.class, "primaryDeviceMessageDeliveryDuration")); name(MessageController.class, "primaryDeviceMessageDeliveryDuration"));
private static final Counter sendMessageCounter = Metrics.counter(name(WebSocketConnection.class, "sendMessage")); private static final Counter sendMessageCounter = Metrics.counter(name(WebSocketConnection.class, "sendMessage"));
private static final Counter messagesPersistedCounter = Metrics.counter(
name(WebSocketConnection.class, "messagesPersisted"));
private static final Counter bytesSentCounter = Metrics.counter(name(WebSocketConnection.class, "bytesSent")); private static final Counter bytesSentCounter = Metrics.counter(name(WebSocketConnection.class, "bytesSent"));
private static final Counter sendFailuresCounter = Metrics.counter(name(WebSocketConnection.class, "sendFailures")); private static final Counter sendFailuresCounter = Metrics.counter(name(WebSocketConnection.class, "sendFailures"));
@ -91,6 +89,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private static final String SEND_MESSAGE_ERROR_COUNTER = MetricsUtil.name(WebSocketConnection.class, private static final String SEND_MESSAGE_ERROR_COUNTER = MetricsUtil.name(WebSocketConnection.class,
"sendMessageError"); "sendMessageError");
private static final String MESSAGE_AVAILABLE_COUNTER_NAME = name(WebSocketConnection.class, "messagesAvailable"); private static final String MESSAGE_AVAILABLE_COUNTER_NAME = name(WebSocketConnection.class, "messagesAvailable");
private static final String MESSAGES_PERSISTED_COUNTER_NAME = name(WebSocketConnection.class, "messagesPersisted");
private static final String PRESENCE_MANAGER_TAG = "presenceManager"; private static final String PRESENCE_MANAGER_TAG = "presenceManager";
private static final String STATUS_CODE_TAG = "status"; private static final String STATUS_CODE_TAG = "status";
@ -495,7 +494,10 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment(); Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment();
return false; return false;
} }
messagesPersistedCounter.increment();
Metrics.counter(MESSAGES_PERSISTED_COUNTER_NAME,
PRESENCE_MANAGER_TAG, "legacy")
.increment();
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE); storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
@ -504,6 +506,13 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
return true; return true;
} }
@Override
public void handleMessagesPersistedPubSub() {
Metrics.counter(MESSAGES_PERSISTED_COUNTER_NAME,
PRESENCE_MANAGER_TAG, "pubsub")
.increment();
}
@Override @Override
public void handleDisplacement(final boolean connectedElsewhere) { public void handleDisplacement(final boolean connectedElsewhere) {
final Tags tags = Tags.of( final Tags tags = Tags.of(

View File

@ -14,6 +14,7 @@ message ClientEvent {
NewMessageAvailableEvent new_message_available = 1; NewMessageAvailableEvent new_message_available = 1;
ClientConnectedEvent client_connected = 2; ClientConnectedEvent client_connected = 2;
DisconnectRequested disconnect_requested = 3; DisconnectRequested disconnect_requested = 3;
MessagesPersistedEvent messages_persisted = 4;
} }
} }
@ -36,3 +37,10 @@ message ClientConnectedEvent {
*/ */
message DisconnectRequested { message DisconnectRequested {
} }
/**
* Indicates that messages for the client have been persisted from short-term
* storage to long-term storage.
*/
message MessagesPersistedEvent {
}

View File

@ -56,6 +56,10 @@ class PubSubClientEventManagerTest {
public void handleNewMessageAvailable() { public void handleNewMessageAvailable() {
} }
@Override
public void handleMessagesPersistedPubSub() {
}
@Override @Override
public void handleConnectionDisplaced(final boolean connectedElsewhere) { public void handleConnectionDisplaced(final boolean connectedElsewhere) {
} }