Send all messages via keyspace notifications when a feature flag is enabled.
This commit is contained in:
		
							parent
							
								
									c02b255766
								
							
						
					
					
						commit
						fadcf62166
					
				| 
						 | 
					@ -300,7 +300,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
 | 
				
			||||||
    PubSubManager              pubSubManager              = new PubSubManager(pubsubClient, dispatchManager);
 | 
					    PubSubManager              pubSubManager              = new PubSubManager(pubsubClient, dispatchManager);
 | 
				
			||||||
    APNSender                  apnSender                  = new APNSender(accountsManager, config.getApnConfiguration());
 | 
					    APNSender                  apnSender                  = new APNSender(accountsManager, config.getApnConfiguration());
 | 
				
			||||||
    GCMSender                  gcmSender                  = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
 | 
					    GCMSender                  gcmSender                  = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
 | 
				
			||||||
    WebsocketSender            websocketSender            = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager);
 | 
					    WebsocketSender            websocketSender            = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager, featureFlagsManager);
 | 
				
			||||||
    RateLimiters               rateLimiters               = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
 | 
					    RateLimiters               rateLimiters               = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    AccountAuthenticator                  accountAuthenticator                  = new AccountAuthenticator(accountsManager);
 | 
					    AccountAuthenticator                  accountAuthenticator                  = new AccountAuthenticator(accountsManager);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -69,21 +69,43 @@ public class WebsocketSender {
 | 
				
			||||||
  private final MessagesManager       messagesManager;
 | 
					  private final MessagesManager       messagesManager;
 | 
				
			||||||
  private final PubSubManager         pubSubManager;
 | 
					  private final PubSubManager         pubSubManager;
 | 
				
			||||||
  private final ClientPresenceManager clientPresenceManager;
 | 
					  private final ClientPresenceManager clientPresenceManager;
 | 
				
			||||||
 | 
					  private final FeatureFlagsManager   featureFlagsManager;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager) {
 | 
					  private static final String KEYSPACE_DELIVERY_FEATURE_FLAG = "keyspace-delivery-for-all-messages";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager, final FeatureFlagsManager featureFlagsManager) {
 | 
				
			||||||
    this.messagesManager       = messagesManager;
 | 
					    this.messagesManager       = messagesManager;
 | 
				
			||||||
    this.pubSubManager         = pubSubManager;
 | 
					    this.pubSubManager         = pubSubManager;
 | 
				
			||||||
    this.clientPresenceManager = clientPresenceManager;
 | 
					    this.clientPresenceManager = clientPresenceManager;
 | 
				
			||||||
 | 
					    this.featureFlagsManager   = featureFlagsManager;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public boolean sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
 | 
					  public boolean sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
 | 
				
			||||||
 | 
					    final boolean clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (online) {
 | 
					    if (online) {
 | 
				
			||||||
      if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
 | 
					      if (clientPresent) {
 | 
				
			||||||
        ephemeralOnlineCounter.increment();
 | 
					        ephemeralOnlineCounter.increment();
 | 
				
			||||||
        messagesManager.insertEphemeral(account.getUuid(), device.getId(), message);
 | 
					        messagesManager.insertEphemeral(account.getUuid(), device.getId(), message);
 | 
				
			||||||
        return true;
 | 
					        return true;
 | 
				
			||||||
      } else {
 | 
					      } else {
 | 
				
			||||||
        ephemeralOfflineCounter.increment();
 | 
					        ephemeralOfflineCounter.increment();
 | 
				
			||||||
 | 
					        return false;
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    } else if (featureFlagsManager.isFeatureFlagActive(KEYSPACE_DELIVERY_FEATURE_FLAG)) {
 | 
				
			||||||
 | 
					      messagesManager.insert(account.getUuid(), device.getId(), message);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      if (clientPresent) {
 | 
				
			||||||
 | 
					        if (channel == Type.APN) apnOnlineMeter.mark();
 | 
				
			||||||
 | 
					        else if (channel == Type.GCM) gcmOnlineMeter.mark();
 | 
				
			||||||
 | 
					        else websocketOnlineMeter.mark();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return true;
 | 
				
			||||||
 | 
					      } else {
 | 
				
			||||||
 | 
					        if (channel == Type.APN) apnOfflineMeter.mark();
 | 
				
			||||||
 | 
					        else if (channel == Type.GCM) gcmOfflineMeter.mark();
 | 
				
			||||||
 | 
					        else websocketOfflineMeter.mark();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return false;
 | 
					        return false;
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
| 
						 | 
					@ -95,7 +117,7 @@ public class WebsocketSender {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      pubSubManager.publish(address, pubSubMessage);
 | 
					      pubSubManager.publish(address, pubSubMessage);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
 | 
					      if (clientPresent) {
 | 
				
			||||||
        if (channel == Type.APN) apnOnlineMeter.mark();
 | 
					        if (channel == Type.APN) apnOnlineMeter.mark();
 | 
				
			||||||
        else if (channel == Type.GCM) gcmOnlineMeter.mark();
 | 
					        else if (channel == Type.GCM) gcmOnlineMeter.mark();
 | 
				
			||||||
        else websocketOnlineMeter.mark();
 | 
					        else websocketOnlineMeter.mark();
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -39,6 +39,7 @@ import java.util.UUID;
 | 
				
			||||||
import java.util.concurrent.CompletableFuture;
 | 
					import java.util.concurrent.CompletableFuture;
 | 
				
			||||||
import java.util.concurrent.CountDownLatch;
 | 
					import java.util.concurrent.CountDownLatch;
 | 
				
			||||||
import java.util.concurrent.atomic.AtomicBoolean;
 | 
					import java.util.concurrent.atomic.AtomicBoolean;
 | 
				
			||||||
 | 
					import java.util.concurrent.atomic.AtomicInteger;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import static org.junit.Assert.assertEquals;
 | 
					import static org.junit.Assert.assertEquals;
 | 
				
			||||||
import static org.junit.Assert.assertFalse;
 | 
					import static org.junit.Assert.assertFalse;
 | 
				
			||||||
| 
						 | 
					@ -285,6 +286,64 @@ public class WebSocketConnectionTest {
 | 
				
			||||||
    verify(client).close(anyInt(), anyString());
 | 
					    verify(client).close(anyInt(), anyString());
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @Test(timeout = 5_000L)
 | 
				
			||||||
 | 
					  public void testOnlineSendViaKeyspaceNotification() throws Exception {
 | 
				
			||||||
 | 
					    final MessagesManager     messagesManager = mock(MessagesManager.class);
 | 
				
			||||||
 | 
					    final WebSocketClient     client          = mock(WebSocketClient.class);
 | 
				
			||||||
 | 
					    final WebSocketConnection connection      = new WebSocketConnection(pushSender, receiptSender, messagesManager, account, device, client, "concurrency");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    final UUID accountUuid = UUID.randomUUID();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    when(account.getNumber()).thenReturn("+18005551234");
 | 
				
			||||||
 | 
					    when(account.getUuid()).thenReturn(accountUuid);
 | 
				
			||||||
 | 
					    when(device.getId()).thenReturn(1L);
 | 
				
			||||||
 | 
					    when(client.getUserAgent()).thenReturn("Test-UA");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    when(messagesManager.getMessagesForDevice(eq("+18005551234"), eq(accountUuid), eq(1L), eq("Test-UA"), anyBoolean()))
 | 
				
			||||||
 | 
					            .thenReturn(new OutgoingMessageEntityList(Collections.emptyList(), false))
 | 
				
			||||||
 | 
					            .thenReturn(new OutgoingMessageEntityList(List.of(createMessage(1L, false, "sender1", UUID.randomUUID(), 1111, false, "first")), false))
 | 
				
			||||||
 | 
					            .thenReturn(new OutgoingMessageEntityList(List.of(createMessage(2L, false, "sender1", UUID.randomUUID(), 2222, false, "second")), false));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    final WebSocketResponseMessage successResponse = mock(WebSocketResponseMessage.class);
 | 
				
			||||||
 | 
					    when(successResponse.getStatus()).thenReturn(200);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    final AtomicInteger sendCounter = new AtomicInteger(0);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class))).thenAnswer((Answer<CompletableFuture<WebSocketResponseMessage>>)invocation -> {
 | 
				
			||||||
 | 
					      synchronized (sendCounter) {
 | 
				
			||||||
 | 
					        sendCounter.incrementAndGet();
 | 
				
			||||||
 | 
					        sendCounter.notifyAll();
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      return CompletableFuture.completedFuture(successResponse);
 | 
				
			||||||
 | 
					    });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // This is a little hacky and non-obvious, but because the first call to getMessagesForDevice returns empty list of
 | 
				
			||||||
 | 
					    // messages, the call to CompletableFuture.allOf(...) in processStoredMessages will produce an instantly-succeeded
 | 
				
			||||||
 | 
					    // future, and the whenComplete method will get called immediately on THIS thread, so we don't need to synchronize
 | 
				
			||||||
 | 
					    // or wait for anything.
 | 
				
			||||||
 | 
					    connection.onDispatchSubscribed("channel");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    connection.handleNewMessagesAvailable();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    synchronized (sendCounter) {
 | 
				
			||||||
 | 
					      while (sendCounter.get() < 1) {
 | 
				
			||||||
 | 
					        sendCounter.wait();
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    connection.handleNewMessagesAvailable();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    synchronized (sendCounter) {
 | 
				
			||||||
 | 
					      while (sendCounter.get() < 2) {
 | 
				
			||||||
 | 
					        sendCounter.wait();
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    verify(client, times(1)).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
 | 
				
			||||||
 | 
					    verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  @Test
 | 
					  @Test
 | 
				
			||||||
  public void testPendingSend() throws Exception {
 | 
					  public void testPendingSend() throws Exception {
 | 
				
			||||||
    MessagesManager storedMessages  = mock(MessagesManager.class);
 | 
					    MessagesManager storedMessages  = mock(MessagesManager.class);
 | 
				
			||||||
| 
						 | 
					@ -387,7 +446,7 @@ public class WebSocketConnectionTest {
 | 
				
			||||||
    verify(client).close(anyInt(), anyString());
 | 
					    verify(client).close(anyInt(), anyString());
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  @Test
 | 
					  @Test(timeout = 5000L)
 | 
				
			||||||
  public void testProcessStoredMessageConcurrency() throws InterruptedException {
 | 
					  public void testProcessStoredMessageConcurrency() throws InterruptedException {
 | 
				
			||||||
    final MessagesManager     messagesManager = mock(MessagesManager.class);
 | 
					    final MessagesManager     messagesManager = mock(MessagesManager.class);
 | 
				
			||||||
    final WebSocketClient     client          = mock(WebSocketClient.class);
 | 
					    final WebSocketClient     client          = mock(WebSocketClient.class);
 | 
				
			||||||
| 
						 | 
					@ -448,7 +507,7 @@ public class WebSocketConnectionTest {
 | 
				
			||||||
    verify(messagesManager).getMessagesForDevice(anyString(), any(UUID.class), anyLong(), anyString(), eq(false));
 | 
					    verify(messagesManager).getMessagesForDevice(anyString(), any(UUID.class), anyLong(), anyString(), eq(false));
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  @Test
 | 
					  @Test(timeout = 5000L)
 | 
				
			||||||
  public void testProcessStoredMessagesMultiplePages() throws InterruptedException {
 | 
					  public void testProcessStoredMessagesMultiplePages() throws InterruptedException {
 | 
				
			||||||
    final MessagesManager     messagesManager = mock(MessagesManager.class);
 | 
					    final MessagesManager     messagesManager = mock(MessagesManager.class);
 | 
				
			||||||
    final WebSocketClient     client          = mock(WebSocketClient.class);
 | 
					    final WebSocketClient     client          = mock(WebSocketClient.class);
 | 
				
			||||||
| 
						 | 
					@ -520,7 +579,7 @@ public class WebSocketConnectionTest {
 | 
				
			||||||
    verify(client, times(1)).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
 | 
					    verify(client, times(1)).sendRequest(eq("PUT"), eq("/api/v1/queue/empty"), any(List.class), eq(Optional.empty()));
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  @Test
 | 
					  @Test(timeout = 5000L)
 | 
				
			||||||
  public void testRequeryOnStateMismatch() throws InterruptedException {
 | 
					  public void testRequeryOnStateMismatch() throws InterruptedException {
 | 
				
			||||||
    final MessagesManager     messagesManager = mock(MessagesManager.class);
 | 
					    final MessagesManager     messagesManager = mock(MessagesManager.class);
 | 
				
			||||||
    final WebSocketClient     client          = mock(WebSocketClient.class);
 | 
					    final WebSocketClient     client          = mock(WebSocketClient.class);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue