Represent stored message state as an enumeration rather than a collection of booleans.
This commit is contained in:
parent
a052e2ee8f
commit
89788fa665
|
@ -37,6 +37,7 @@ import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
|
@ -66,10 +67,15 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
private final WebSocketClient client;
|
private final WebSocketClient client;
|
||||||
private final String connectionId;
|
private final String connectionId;
|
||||||
|
|
||||||
private final Semaphore processStoredMessagesSemaphore = new Semaphore(1);
|
private final Semaphore processStoredMessagesSemaphore = new Semaphore(1);
|
||||||
private final AtomicBoolean newMessagesAvailable = new AtomicBoolean(true);
|
private final AtomicReference<StoredMessageState> storedMessageState = new AtomicReference<>(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
|
||||||
private final AtomicBoolean persistedMessagesAvailable = new AtomicBoolean(true);
|
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
|
||||||
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
|
|
||||||
|
private enum StoredMessageState {
|
||||||
|
EMPTY,
|
||||||
|
CACHED_NEW_MESSAGES_AVAILABLE,
|
||||||
|
PERSISTED_NEW_MESSAGES_AVAILABLE
|
||||||
|
}
|
||||||
|
|
||||||
public WebSocketConnection(PushSender pushSender,
|
public WebSocketConnection(PushSender pushSender,
|
||||||
ReceiptSender receiptSender,
|
ReceiptSender receiptSender,
|
||||||
|
@ -96,7 +102,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
switch (pubSubMessage.getType().getNumber()) {
|
switch (pubSubMessage.getType().getNumber()) {
|
||||||
case PubSubMessage.Type.QUERY_DB_VALUE:
|
case PubSubMessage.Type.QUERY_DB_VALUE:
|
||||||
pubSubPersistedMeter.mark();
|
pubSubPersistedMeter.mark();
|
||||||
newMessagesAvailable.set(true);
|
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
|
||||||
processStoredMessages();
|
processStoredMessages();
|
||||||
break;
|
break;
|
||||||
case PubSubMessage.Type.DELIVER_VALUE:
|
case PubSubMessage.Type.DELIVER_VALUE:
|
||||||
|
@ -189,8 +195,10 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void processStoredMessages() {
|
void processStoredMessages() {
|
||||||
if (processStoredMessagesSemaphore.tryAcquire()) {
|
if (processStoredMessagesSemaphore.tryAcquire()) {
|
||||||
if (newMessagesAvailable.getAndSet(false)) {
|
final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY);
|
||||||
sendNextMessagePage(!persistedMessagesAvailable.getAndSet(false));
|
|
||||||
|
if (state == StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE || state == StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE) {
|
||||||
|
sendNextMessagePage(state == StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE);
|
||||||
} else {
|
} else {
|
||||||
processStoredMessagesSemaphore.release();
|
processStoredMessagesSemaphore.release();
|
||||||
}
|
}
|
||||||
|
@ -246,7 +254,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
public void handleNewMessagesAvailable() {
|
public void handleNewMessagesAvailable() {
|
||||||
messageAvailableMeter.mark();
|
messageAvailableMeter.mark();
|
||||||
|
|
||||||
newMessagesAvailable.set(true);
|
storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE);
|
||||||
processStoredMessages();
|
processStoredMessages();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,8 +270,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
public void handleMessagesPersisted() {
|
public void handleMessagesPersisted() {
|
||||||
messagesPersistedMeter.mark();
|
messagesPersistedMeter.mark();
|
||||||
|
|
||||||
persistedMessagesAvailable.set(true);
|
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
|
||||||
newMessagesAvailable.set(true);
|
|
||||||
processStoredMessages();
|
processStoredMessages();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue