Let processStoredMessages handle requery logic.
This commit is contained in:
parent
68256d2343
commit
7bbc88d716
|
@ -35,6 +35,7 @@ import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
|
@ -95,7 +96,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
break;
|
break;
|
||||||
case PubSubMessage.Type.DELIVER_VALUE:
|
case PubSubMessage.Type.DELIVER_VALUE:
|
||||||
pubSubNewMessageMeter.mark();
|
pubSubNewMessageMeter.mark();
|
||||||
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty(), false);
|
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty());
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
|
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
|
||||||
|
@ -114,10 +115,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
processStoredMessages();
|
processStoredMessages();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendMessage(final Envelope message,
|
private CompletableFuture<Void> sendMessage(final Envelope message, final Optional<StoredMessageInfo> storedMessageInfo) {
|
||||||
final Optional<StoredMessageInfo> storedMessageInfo,
|
|
||||||
final boolean requery)
|
|
||||||
{
|
|
||||||
try {
|
try {
|
||||||
String header;
|
String header;
|
||||||
Optional<byte[]> body;
|
Optional<byte[]> body;
|
||||||
|
@ -132,7 +130,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
|
|
||||||
sendMessageMeter.mark();
|
sendMessageMeter.mark();
|
||||||
|
|
||||||
client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body)
|
return client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body)
|
||||||
.thenAccept(response -> {
|
.thenAccept(response -> {
|
||||||
boolean isReceipt = message.getType() == Envelope.Type.RECEIPT;
|
boolean isReceipt = message.getType() == Envelope.Type.RECEIPT;
|
||||||
|
|
||||||
|
@ -143,7 +141,6 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
if (isSuccessResponse(response)) {
|
if (isSuccessResponse(response)) {
|
||||||
if (storedMessageInfo.isPresent()) messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached);
|
if (storedMessageInfo.isPresent()) messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached);
|
||||||
if (!isReceipt) sendDeliveryReceiptFor(message);
|
if (!isReceipt) sendDeliveryReceiptFor(message);
|
||||||
if (requery) processStoredMessages();
|
|
||||||
} else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) {
|
} else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) {
|
||||||
requeueMessage(message);
|
requeueMessage(message);
|
||||||
}
|
}
|
||||||
|
@ -154,6 +151,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
});
|
});
|
||||||
} catch (CryptoEncodingException e) {
|
} catch (CryptoEncodingException e) {
|
||||||
logger.warn("Bad signaling key", e);
|
logger.warn("Bad signaling key", e);
|
||||||
|
return CompletableFuture.failedFuture(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,11 +191,11 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
processingStoredMessages = true;
|
processingStoredMessages = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent());
|
OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent());
|
||||||
Iterator<OutgoingMessageEntity> iterator = messages.getMessages().iterator();
|
CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.getMessages().size()];
|
||||||
|
|
||||||
while (iterator.hasNext()) {
|
for (int i = 0; i < messages.getMessages().size(); i++) {
|
||||||
OutgoingMessageEntity message = iterator.next();
|
OutgoingMessageEntity message = messages.getMessages().get(i);
|
||||||
Envelope.Builder builder = Envelope.newBuilder()
|
Envelope.Builder builder = Envelope.newBuilder()
|
||||||
.setType(Envelope.Type.valueOf(message.getType()))
|
.setType(Envelope.Type.valueOf(message.getType()))
|
||||||
.setTimestamp(message.getTimestamp())
|
.setTimestamp(message.getTimestamp())
|
||||||
|
@ -220,16 +218,20 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
builder.setRelay(message.getRelay());
|
builder.setRelay(message.getRelay());
|
||||||
}
|
}
|
||||||
|
|
||||||
sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getId(), message.isCached())), !iterator.hasNext() && messages.hasMore());
|
sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getId(), message.isCached())));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!messages.hasMore()) {
|
CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> {
|
||||||
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
|
synchronized (this) {
|
||||||
}
|
processingStoredMessages = false;
|
||||||
|
}
|
||||||
|
|
||||||
synchronized (this) {
|
if (messages.hasMore()) {
|
||||||
processingStoredMessages = false;
|
processStoredMessages();
|
||||||
}
|
} else {
|
||||||
|
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -244,7 +246,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||||
final Optional<Envelope> maybeMessage = messagesManager.takeEphemeralMessage(account.getUuid(), device.getId());
|
final Optional<Envelope> maybeMessage = messagesManager.takeEphemeralMessage(account.getUuid(), device.getId());
|
||||||
|
|
||||||
if (maybeMessage.isPresent()) {
|
if (maybeMessage.isPresent()) {
|
||||||
sendMessage(maybeMessage.get(), Optional.empty(), false);
|
sendMessage(maybeMessage.get(), Optional.empty());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package org.whispersystems.textsecuregcm.websocket;
|
package org.whispersystems.textsecuregcm.websocket;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
import io.dropwizard.auth.basic.BasicCredentials;
|
||||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentMatchers;
|
import org.mockito.ArgumentMatchers;
|
||||||
|
@ -21,10 +22,6 @@ import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
||||||
import org.whispersystems.textsecuregcm.util.Base64;
|
import org.whispersystems.textsecuregcm.util.Base64;
|
||||||
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
|
|
||||||
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
|
|
||||||
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
|
|
||||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
|
||||||
import org.whispersystems.websocket.WebSocketClient;
|
import org.whispersystems.websocket.WebSocketClient;
|
||||||
import org.whispersystems.websocket.auth.WebSocketAuthenticator.AuthenticationResult;
|
import org.whispersystems.websocket.auth.WebSocketAuthenticator.AuthenticationResult;
|
||||||
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
|
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
|
||||||
|
@ -43,10 +40,20 @@ import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import io.dropwizard.auth.basic.BasicCredentials;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.any;
|
||||||
|
import static org.mockito.Mockito.anyInt;
|
||||||
|
import static org.mockito.Mockito.anyLong;
|
||||||
|
import static org.mockito.Mockito.anyString;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.reset;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
|
|
||||||
public class WebSocketConnectionTest {
|
public class WebSocketConnectionTest {
|
||||||
|
@ -440,6 +447,49 @@ public class WebSocketConnectionTest {
|
||||||
verify(messagesManager).getMessagesForDevice(anyString(), any(UUID.class), anyLong(), anyString());
|
verify(messagesManager).getMessagesForDevice(anyString(), any(UUID.class), anyLong(), anyString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProcessStoredMessagesMultiplePages() throws InterruptedException {
|
||||||
|
final MessagesManager messagesManager = mock(MessagesManager.class);
|
||||||
|
final WebSocketClient client = mock(WebSocketClient.class);
|
||||||
|
final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency");
|
||||||
|
|
||||||
|
when(account.getNumber()).thenReturn("+18005551234");
|
||||||
|
when(account.getUuid()).thenReturn(UUID.randomUUID());
|
||||||
|
when(device.getId()).thenReturn(1L);
|
||||||
|
when(client.getUserAgent()).thenReturn("Test-UA");
|
||||||
|
|
||||||
|
final List<OutgoingMessageEntity> firstPageMessages =
|
||||||
|
List.of(createMessage(1L, false, "sender1", UUID.randomUUID(), 1111, false, "first"),
|
||||||
|
createMessage(2L, false, "sender1", UUID.randomUUID(), 2222, false, "second"));
|
||||||
|
|
||||||
|
final List<OutgoingMessageEntity> secondPageMessages =
|
||||||
|
List.of(createMessage(3L, false, "sender1", UUID.randomUUID(), 3333, false, "third"));
|
||||||
|
|
||||||
|
final OutgoingMessageEntityList firstPage = new OutgoingMessageEntityList(firstPageMessages, true);
|
||||||
|
final OutgoingMessageEntityList secondPage = new OutgoingMessageEntityList(secondPageMessages, false);
|
||||||
|
|
||||||
|
when(messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), 1L, client.getUserAgent()))
|
||||||
|
.thenReturn(firstPage)
|
||||||
|
.thenReturn(secondPage);
|
||||||
|
|
||||||
|
final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
|
||||||
|
when(successResponse.getStatus()).thenReturn(200);
|
||||||
|
|
||||||
|
final CountDownLatch sendLatch = new CountDownLatch(firstPageMessages.size() + secondPageMessages.size());
|
||||||
|
|
||||||
|
when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class))).thenAnswer((Answer<CompletableFuture<WebSocketResponseMessage>>)invocation -> {
|
||||||
|
sendLatch.countDown();
|
||||||
|
return CompletableFuture.completedFuture(successResponse);
|
||||||
|
});
|
||||||
|
|
||||||
|
connection.processStoredMessages();
|
||||||
|
|
||||||
|
sendLatch.await();
|
||||||
|
|
||||||
|
verify(client, times(firstPageMessages.size() + secondPageMessages.size())).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class));
|
||||||
|
verify(client).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
|
||||||
|
}
|
||||||
|
|
||||||
private OutgoingMessageEntity createMessage(long id, boolean cached, String sender, UUID senderUuid, long timestamp, boolean receipt, String content) {
|
private OutgoingMessageEntity createMessage(long id, boolean cached, String sender, UUID senderUuid, long timestamp, boolean receipt, String content) {
|
||||||
return new OutgoingMessageEntity(id, cached, UUID.randomUUID(), receipt ? Envelope.Type.RECEIPT_VALUE : Envelope.Type.CIPHERTEXT_VALUE,
|
return new OutgoingMessageEntity(id, cached, UUID.randomUUID(), receipt ? Envelope.Type.RECEIPT_VALUE : Envelope.Type.CIPHERTEXT_VALUE,
|
||||||
null, timestamp, sender, senderUuid, 1, content.getBytes(), null, 0);
|
null, timestamp, sender, senderUuid, 1, content.getBytes(), null, 0);
|
||||||
|
|
Loading…
Reference in New Issue