Refactor: collapse state into semaphores/atomic booleans.

This commit is contained in:
Jon Chambers 2020-09-10 19:14:50 -04:00 committed by Jon Chambers
parent 158e5004b7
commit a052e2ee8f
2 changed files with 35 additions and 57 deletions

View File

@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;
@ -65,10 +66,9 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
private final WebSocketClient client; private final WebSocketClient client;
private final String connectionId; private final String connectionId;
private int storedMessageState = 1; private final Semaphore processStoredMessagesSemaphore = new Semaphore(1);
private int lastPersistedState = 1; private final AtomicBoolean newMessagesAvailable = new AtomicBoolean(true);
private int lastDatabaseClearedState = 0; private final AtomicBoolean persistedMessagesAvailable = new AtomicBoolean(true);
private boolean processingStoredMessages = false;
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false); private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
public WebSocketConnection(PushSender pushSender, public WebSocketConnection(PushSender pushSender,
@ -96,11 +96,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
switch (pubSubMessage.getType().getNumber()) { switch (pubSubMessage.getType().getNumber()) {
case PubSubMessage.Type.QUERY_DB_VALUE: case PubSubMessage.Type.QUERY_DB_VALUE:
pubSubPersistedMeter.mark(); pubSubPersistedMeter.mark();
newMessagesAvailable.set(true);
synchronized (this) {
storedMessageState++;
}
processStoredMessages(); processStoredMessages();
break; break;
case PubSubMessage.Type.DELIVER_VALUE: case PubSubMessage.Type.DELIVER_VALUE:
@ -192,42 +188,16 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
@VisibleForTesting @VisibleForTesting
void processStoredMessages() { void processStoredMessages() {
final int processedState; if (processStoredMessagesSemaphore.tryAcquire()) {
final boolean cachedMessagesOnly; if (newMessagesAvailable.getAndSet(false)) {
sendNextMessagePage(!persistedMessagesAvailable.getAndSet(false));
synchronized (this) {
if (processingStoredMessages) {
return;
}
processingStoredMessages = true;
processedState = storedMessageState;
cachedMessagesOnly = lastPersistedState <= lastDatabaseClearedState;
}
sendNextMessagePage(cachedMessagesOnly).thenAccept(hasMoreStoredMessages -> {
final boolean mayHaveMoreMessages;
synchronized (this) {
processingStoredMessages = false;
mayHaveMoreMessages = hasMoreStoredMessages || storedMessageState > processedState;
}
if (mayHaveMoreMessages) {
processStoredMessages();
} else { } else {
synchronized (this) { processStoredMessagesSemaphore.release();
lastDatabaseClearedState = processedState; }
}
} }
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) { private void sendNextMessagePage(final boolean cachedMessagesOnly) {
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
}
}
});
}
private CompletableFuture<Boolean> sendNextMessagePage(final boolean cachedMessagesOnly) {
final OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly); final OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly);
final CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.getMessages().size()]; final CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.getMessages().size()];
@ -258,34 +228,42 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getId(), message.isCached()))); sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getId(), message.isCached())));
} }
return CompletableFuture.allOf(sendFutures).handle((v, cause) -> messages.hasMore()); CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> {
if (messages.hasMore()) {
sendNextMessagePage(cachedMessagesOnly);
} else {
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
}
processStoredMessagesSemaphore.release();
processStoredMessages();
}
});
} }
@Override @Override
public void handleNewMessagesAvailable() { public void handleNewMessagesAvailable() {
messageAvailableMeter.mark(); messageAvailableMeter.mark();
newMessagesAvailable.set(true);
processStoredMessages();
} }
@Override @Override
public void handleNewEphemeralMessageAvailable() { public void handleNewEphemeralMessageAvailable() {
ephemeralMessageAvailableMeter.mark(); ephemeralMessageAvailableMeter.mark();
final Optional<Envelope> maybeMessage = messagesManager.takeEphemeralMessage(account.getUuid(), device.getId()); messagesManager.takeEphemeralMessage(account.getUuid(), device.getId())
.ifPresent(message -> sendMessage(message, Optional.empty()));
if (maybeMessage.isPresent()) {
sendMessage(maybeMessage.get(), Optional.empty());
}
} }
@Override @Override
public void handleMessagesPersisted() { public void handleMessagesPersisted() {
messagesPersistedMeter.mark(); messagesPersistedMeter.mark();
synchronized (this) { persistedMessagesAvailable.set(true);
storedMessageState++; newMessagesAvailable.set(true);
lastPersistedState = storedMessageState;
}
processStoredMessages(); processStoredMessages();
} }

View File

@ -49,7 +49,6 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -526,9 +525,10 @@ public class WebSocketConnectionTest {
final MessagesManager messagesManager = mock(MessagesManager.class); final MessagesManager messagesManager = mock(MessagesManager.class);
final WebSocketClient client = mock(WebSocketClient.class); final WebSocketClient client = mock(WebSocketClient.class);
final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency"); final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency");
final UUID accountUuid = UUID.randomUUID();
when(account.getNumber()).thenReturn("+18005551234"); when(account.getNumber()).thenReturn("+18005551234");
when(account.getUuid()).thenReturn(UUID.randomUUID()); when(account.getUuid()).thenReturn(accountUuid);
when(device.getId()).thenReturn(1L); when(device.getId()).thenReturn(1L);
when(client.getUserAgent()).thenReturn("Test-UA"); when(client.getUserAgent()).thenReturn("Test-UA");
@ -542,7 +542,7 @@ public class WebSocketConnectionTest {
final OutgoingMessageEntityList firstPage = new OutgoingMessageEntityList(firstPageMessages, false); final OutgoingMessageEntityList firstPage = new OutgoingMessageEntityList(firstPageMessages, false);
final OutgoingMessageEntityList secondPage = new OutgoingMessageEntityList(secondPageMessages, false); final OutgoingMessageEntityList secondPage = new OutgoingMessageEntityList(secondPageMessages, false);
when(messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), 1L, client.getUserAgent(), false)) when(messagesManager.getMessagesForDevice(eq("+18005551234"), eq(accountUuid), eq(1L), eq("Test-UA"), anyBoolean()))
.thenReturn(firstPage) .thenReturn(firstPage)
.thenReturn(secondPage) .thenReturn(secondPage)
.thenReturn(new OutgoingMessageEntityList(Collections.emptyList(), false)); .thenReturn(new OutgoingMessageEntityList(Collections.emptyList(), false));
@ -599,7 +599,7 @@ public class WebSocketConnectionTest {
verify(messagesManager).getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), false); verify(messagesManager).getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), false);
connection.processStoredMessages(); connection.handleNewMessagesAvailable();
verify(messagesManager).getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), true); verify(messagesManager).getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), true);
} }