From 943a5d1036a7e973d27111f9b4fe4727454a5646 Mon Sep 17 00:00:00 2001 From: Jon Chambers <63609320+jon-signal@users.noreply.github.com> Date: Tue, 19 Jan 2021 15:50:12 -0500 Subject: [PATCH] Shard push scheduling cache --- .../WhisperServerConfiguration.java | 9 + .../textsecuregcm/WhisperServerService.java | 12 +- .../push/ApnFallbackManager.java | 390 +++++++++++------- service/src/main/resources/lua/apn/get.lua | 12 +- service/src/main/resources/lua/apn/insert.lua | 19 +- service/src/main/resources/lua/apn/remove.lua | 7 +- .../push/ApnFallbackManagerTest.java | 156 +++++++ 7 files changed, 436 insertions(+), 169 deletions(-) create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index ef64e6908..1be3a4dc0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -107,6 +107,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private RedisConfiguration pushScheduler; + @NotNull + @Valid + @JsonProperty + private RedisClusterConfiguration pushSchedulerCluster; + @NotNull @Valid @JsonProperty @@ -287,6 +292,10 @@ public class WhisperServerConfiguration extends Configuration { return pushScheduler; } + public RedisClusterConfiguration getPushSchedulerCluster() { + return pushSchedulerCluster; + } + public DatabaseConfiguration getMessageStoreConfiguration() { return messageStore; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 8ce287f0d..030224fc5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -281,10 +281,11 @@ public class WhisperServerService extends Application keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(10_000); Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), keyspaceNotificationDispatchQueue); @@ -336,7 +338,7 @@ public class WhisperServerService extends Application> separated = getSeparated(numberAndDevice); + + if (!separated.isPresent()) { + removeFromSingleton(numberAndDevice); + continue; + } + + Optional account = accountsManager.get(separated.get().first()); + + if (!account.isPresent()) { + removeFromSingleton(numberAndDevice); + continue; + } + + Optional device = account.get().getDevice(separated.get().second()); + + if (!device.isPresent()) { + removeFromSingleton(numberAndDevice); + continue; + } + + sendNotification(account.get(), device.get()); + } + + } catch (Exception e) { + logger.warn("Exception while operating", e); + } + + Util.sleep(1000); + } + } + } + + class ClusterCacheWorker implements Runnable { + + @Override + public void run() { + while (running.get()) { + try { + final long entriesProcessed = processNextSlot(); + + if (entriesProcessed == 0) { + Util.sleep(1000); + } + } catch (Exception e) { + logger.warn("Exception while operating", e); + } + } + } + + long processNextSlot() { + final int slot = getNextSlot(); + + List pendingDestinations; + long entriesProcessed = 0; + + do { + pendingDestinations = getPendingDestinationsFromClusterCache(slot, 100); + entriesProcessed += pendingDestinations.size(); + + for (final String uuidAndDevice : pendingDestinations) { + clusterDestinations.mark(); + + final Optional> separated = getSeparated(uuidAndDevice); + + final Optional maybeAccount = separated.map(Pair::first) + .map(UUID::fromString) + .flatMap(accountsManager::get); + + final Optional maybeDevice = separated.map(Pair::second) + .flatMap(deviceId -> maybeAccount.flatMap(account -> account.getDevice(deviceId))); + + if (maybeAccount.isPresent() && maybeDevice.isPresent()) { + sendNotification(maybeAccount.get(), maybeDevice.get()); + } else { + removeFromCluster(uuidAndDevice); + } + } + } while (!pendingDestinations.isEmpty()); + + return entriesProcessed; + } + } public ApnFallbackManager(ReplicatedJedisPool jedisPool, + FaultTolerantRedisCluster cluster, APNSender apnSender, AccountsManager accountsManager) throws IOException { this.apnSender = apnSender; this.accountsManager = accountsManager; - this.jedisPool = jedisPool; - this.insertOperation = new InsertOperation(jedisPool); - this.getOperation = new GetOperation(jedisPool); - this.removeOperation = new RemoveOperation(jedisPool); - } + this.cluster = cluster; - public void schedule(Account account, Device device) throws RedisException { - try { - sent.mark(); - insertOperation.insert(account, device, System.currentTimeMillis() + (15 * 1000), (15 * 1000)); - } catch (JedisException e) { - throw new RedisException(e); + this.getSingletonScript = LuaScript.fromResource(jedisPool, "lua/apn/get.lua"); + this.removeSingletonScript = LuaScript.fromResource(jedisPool, "lua/apn/remove.lua"); + + this.getClusterScript = ClusterLuaScript.fromResource(cluster, "lua/apn/get.lua", ScriptOutputType.MULTI); + this.insertClusterScript = ClusterLuaScript.fromResource(cluster, "lua/apn/insert.lua", ScriptOutputType.VALUE); + this.removeClusterScript = ClusterLuaScript.fromResource(cluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER); + + this.singletonWorkerThread = new Thread(new SingletonCacheWorker(), "ApnFallbackManagerSingletonWorker"); + + for (int i = 0; i < this.clusterWorkerThreads.length; i++) { + this.clusterWorkerThreads[i] = new Thread(new ClusterCacheWorker(), "ApnFallbackManagerClusterWorker-" + i); } } - public boolean isScheduled(Account account, Device device) throws RedisException { - try { - String endpoint = "apn_device::" + account.getNumber() + "::" + device.getId(); + public void schedule(Account account, Device device) throws RedisException { + schedule(account, device, System.currentTimeMillis()); + } - try (Jedis jedis = jedisPool.getReadResource()) { - return jedis.zscore(PENDING_NOTIFICATIONS_KEY, endpoint) != null; - } - } catch (JedisException e) { + @VisibleForTesting + void schedule(Account account, Device device, long timestamp) throws RedisException { + try { + sent.mark(); + insert(account, device, timestamp + (15 * 1000), (15 * 1000)); + } catch (io.lettuce.core.RedisException e) { throw new RedisException(e); } } public void cancel(Account account, Device device) throws RedisException { try { - if (removeOperation.remove(account, device)) { + if (remove(account, device)) { delivered.mark(); } - } catch (JedisException e) { + } catch (JedisException | io.lettuce.core.RedisException e) { throw new RedisException(e); } } @@ -109,77 +223,45 @@ public class ApnFallbackManager implements Managed, Runnable { @Override public synchronized void start() { running.set(true); - new Thread(this).start(); + singletonWorkerThread.start(); + + for (final Thread clusterWorkerThread : clusterWorkerThreads) { + clusterWorkerThread.start(); + } } @Override - public synchronized void stop() { + public synchronized void stop() throws InterruptedException { running.set(false); - while (!finished) Util.wait(this); - } + singletonWorkerThread.join(); - @Override - public void run() { - while (running.get()) { - try { - List pendingNotifications = getOperation.getPending(100); - - for (byte[] pendingNotification : pendingNotifications) { - String numberAndDevice = new String(pendingNotification); - Optional> separated = getSeparated(numberAndDevice); - - if (!separated.isPresent()) { - removeOperation.remove(numberAndDevice); - continue; - } - - Optional account = accountsManager.get(separated.get().first()); - - if (!account.isPresent()) { - removeOperation.remove(numberAndDevice); - continue; - } - - Optional device = account.get().getDevice(separated.get().second()); - - if (!device.isPresent()) { - removeOperation.remove(numberAndDevice); - continue; - } - - String apnId = device.get().getVoipApnId(); - - if (apnId == null) { - removeOperation.remove(account.get(), device.get()); - continue; - } - - long deviceLastSeen = device.get().getLastSeen(); - - if (deviceLastSeen < System.currentTimeMillis() - TimeUnit.DAYS.toMillis(90)) { - evicted.mark(); - removeOperation.remove(account.get(), device.get()); - continue; - } - - apnSender.sendMessage(new ApnMessage(apnId, separated.get().first(), separated.get().second(), true, Optional.empty())); - retry.mark(); - } - - } catch (Exception e) { - logger.warn("Exception while operating", e); - } - - Util.sleep(1000); - } - - synchronized (ApnFallbackManager.this) { - finished = true; - notifyAll(); + for (final Thread clusterWorkerThread : clusterWorkerThreads) { + clusterWorkerThread.join(); } } - private Optional> getSeparated(String encoded) { + private void sendNotification(final Account account, final Device device) { + String apnId = device.getVoipApnId(); + + if (apnId == null) { + remove(account, device); + return; + } + + long deviceLastSeen = device.getLastSeen(); + + if (deviceLastSeen < System.currentTimeMillis() - TimeUnit.DAYS.toMillis(90)) { + evicted.mark(); + remove(account, device); + return; + } + + apnSender.sendMessage(new ApnMessage(apnId, account.getNumber(), device.getId(), true, Optional.empty())); + retry.mark(); + } + + @VisibleForTesting + static Optional> getSeparated(String encoded) { try { if (encoded == null) return Optional.empty(); @@ -197,66 +279,78 @@ public class ApnFallbackManager implements Managed, Runnable { } } - private static class RemoveOperation { - - private final LuaScript luaScript; - - RemoveOperation(ReplicatedJedisPool jedisPool) throws IOException { - this.luaScript = LuaScript.fromResource(jedisPool, "lua/apn/remove.lua"); - } - - boolean remove(Account account, Device device) { - String endpoint = "apn_device::" + account.getNumber() + "::" + device.getId(); - return remove(endpoint); - } - - boolean remove(String endpoint) { - if (!PENDING_NOTIFICATIONS_KEY.equals(endpoint)) { - List keys = Arrays.asList(PENDING_NOTIFICATIONS_KEY.getBytes(), endpoint.getBytes()); - List args = Collections.emptyList(); - - return ((long)luaScript.execute(keys, args)) > 0; - } - - return false; - } + private boolean remove(Account account, Device device) { + final boolean removedFromSingleton = removeFromSingleton(getSingletonEndpointKey(account, device)); + final boolean removedFromCluster = removeFromCluster(getClusterEndpointKey(account, device)); + return removedFromSingleton || removedFromCluster; } - private static class GetOperation { + private boolean removeFromSingleton(String endpoint) { + if (!SINGLETON_PENDING_NOTIFICATIONS_KEY.equals(endpoint)) { + List keys = Arrays.asList(SINGLETON_PENDING_NOTIFICATIONS_KEY.getBytes(), endpoint.getBytes()); + List args = Collections.emptyList(); - private final LuaScript luaScript; - - GetOperation(ReplicatedJedisPool jedisPool) throws IOException { - this.luaScript = LuaScript.fromResource(jedisPool, "lua/apn/get.lua"); + return ((long)removeSingletonScript.execute(keys, args)) > 0; } - @SuppressWarnings("SameParameterValue") - List getPending(int limit) { - List keys = Arrays.asList(PENDING_NOTIFICATIONS_KEY.getBytes()); - List args = Arrays.asList(String.valueOf(System.currentTimeMillis()).getBytes(), String.valueOf(limit).getBytes()); - - return (List) luaScript.execute(keys, args); - } + return false; } - private static class InsertOperation { + private boolean removeFromCluster(final String endpoint) { + final long removed = (long)removeClusterScript.execute(List.of(getClusterPendingNotificationQueueKey(endpoint), endpoint), + Collections.emptyList()); - private final LuaScript luaScript; + return removed > 0; + } - InsertOperation(ReplicatedJedisPool jedisPool) throws IOException { - this.luaScript = LuaScript.fromResource(jedisPool, "lua/apn/insert.lua"); - } + @SuppressWarnings("unchecked") + private List getPendingDestinationsFromSingletonCache(final int limit) { + List keys = List.of(SINGLETON_PENDING_NOTIFICATIONS_KEY.getBytes()); + List args = List.of(String.valueOf(System.currentTimeMillis()).getBytes(), String.valueOf(limit).getBytes()); - public void insert(Account account, Device device, long timestamp, long interval) { - String endpoint = "apn_device::" + account.getNumber() + "::" + device.getId(); + return ((List) getSingletonScript.execute(keys, args)) + .stream() + .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) + .collect(Collectors.toList()); + } - List keys = Arrays.asList(PENDING_NOTIFICATIONS_KEY.getBytes(), endpoint.getBytes()); - List args = Arrays.asList(String.valueOf(timestamp).getBytes(), String.valueOf(interval).getBytes(), - account.getNumber().getBytes(), String.valueOf(device.getId()).getBytes()); + @SuppressWarnings("unchecked") + @VisibleForTesting + List getPendingDestinationsFromClusterCache(final int slot, final int limit) { + return (List)getClusterScript.execute(List.of(getClusterPendingNotificationQueueKey(slot)), + List.of(String.valueOf(System.currentTimeMillis()), String.valueOf(limit))); + } - luaScript.execute(keys, args); - } + private void insert(final Account account, final Device device, final long timestamp, final long interval) { + final String endpoint = getClusterEndpointKey(account, device); + + insertClusterScript.execute(List.of(getClusterPendingNotificationQueueKey(endpoint), endpoint), + List.of(String.valueOf(timestamp), + String.valueOf(interval), + account.getUuid().toString(), + String.valueOf(device.getId()))); + } + + private String getSingletonEndpointKey(final Account account, final Device device) { + return "apn_device::" + account.getNumber() + "::" + device.getId(); + } + + @VisibleForTesting + String getClusterEndpointKey(final Account account, final Device device) { + return "apn_device::{" + account.getUuid() + "::" + device.getId() + "}"; + } + + private String getClusterPendingNotificationQueueKey(final String endpoint) { + return getClusterPendingNotificationQueueKey(SlotHash.getSlot(endpoint)); + } + + private String getClusterPendingNotificationQueueKey(final int slot) { + return SINGLETON_PENDING_NOTIFICATIONS_KEY + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}"; + } + + private int getNextSlot() { + return (int)(cluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT); } private static class VoipRatioGauge extends RatioGauge { diff --git a/service/src/main/resources/lua/apn/get.lua b/service/src/main/resources/lua/apn/get.lua index 247e7ba89..404a21b19 100644 --- a/service/src/main/resources/lua/apn/get.lua +++ b/service/src/main/resources/lua/apn/get.lua @@ -1,5 +1,7 @@ --- keys: pending (KEYS[1]) --- argv: max_time (ARGV[1]), limit (ARGV[2]) +local pendingNotificationQueue = KEYS[1] + +local maxTime = ARGV[1] +local limit = ARGV[2] local hgetall = function (key) local bulk = redis.call('HGETALL', key) @@ -44,7 +46,7 @@ local getNextInterval = function(interval) end -local results = redis.call("ZRANGEBYSCORE", KEYS[1], 0, ARGV[1], "LIMIT", 0, ARGV[2]) +local results = redis.call("ZRANGEBYSCORE", pendingNotificationQueue, 0, maxTime, "LIMIT", 0, limit) local collated = {} if results and next(results) then @@ -59,12 +61,10 @@ if results and next(results) then local nextInterval = getNextInterval(tonumber(lastInterval)) redis.call("HSET", name, "interval", nextInterval) - redis.call("ZADD", KEYS[1], tonumber(ARGV[1]) + nextInterval, name) + redis.call("ZADD", pendingNotificationQueue, tonumber(maxTime) + nextInterval, name) collated[i] = pending["account"] .. ":" .. pending["device"] end end return collated - - diff --git a/service/src/main/resources/lua/apn/insert.lua b/service/src/main/resources/lua/apn/insert.lua index bf48426c8..3512f22a3 100644 --- a/service/src/main/resources/lua/apn/insert.lua +++ b/service/src/main/resources/lua/apn/insert.lua @@ -1,9 +1,14 @@ --- keys: pending (KEYS[1]), user (KEYS[2]) --- args: timestamp (ARGV[1]), interval (ARGV[2]), account (ARGV[3]), device (ARGV[4]) +local pendingNotificationQueue = KEYS[1] +local endpoint = KEYS[2] -redis.call("HSET", KEYS[2], "created", ARGV[1]) -redis.call("HSET", KEYS[2], "interval", ARGV[2]) -redis.call("HSET", KEYS[2], "account", ARGV[3]) -redis.call("HSET", KEYS[2], "device", ARGV[4]) +local timestamp = ARGV[1] +local interval = ARGV[2] +local account = ARGV[3] +local deviceId = ARGV[4] -redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2]) +redis.call("HSET", endpoint, "created", timestamp) +redis.call("HSET", endpoint, "interval", interval) +redis.call("HSET", endpoint, "account", account) +redis.call("HSET", endpoint, "device", deviceId) + +redis.call("ZADD", pendingNotificationQueue, timestamp, endpoint) diff --git a/service/src/main/resources/lua/apn/remove.lua b/service/src/main/resources/lua/apn/remove.lua index 2fac20596..c2fb84e42 100644 --- a/service/src/main/resources/lua/apn/remove.lua +++ b/service/src/main/resources/lua/apn/remove.lua @@ -1,4 +1,5 @@ --- keys: queue KEYS[1], endpoint (KEYS[2]) +local pendingNotificationQueue = KEYS[1] +local endpoint = KEYS[2] -redis.call("DEL", KEYS[2]) -return redis.call("ZREM", KEYS[1], KEYS[2]) +redis.call("DEL", endpoint) +return redis.call("ZREM", pendingNotificationQueue, endpoint) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java new file mode 100644 index 000000000..ee61598a9 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/ApnFallbackManagerTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.push; + +import io.lettuce.core.cluster.SlotHash; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.providers.RedisClientFactory; +import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest; +import org.whispersystems.textsecuregcm.redis.RedisException; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.util.Pair; +import redis.clients.jedis.Jedis; +import redis.embedded.RedisServer; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ApnFallbackManagerTest extends AbstractRedisClusterTest { + + private Account account; + private Device device; + + private APNSender apnSender; + + private ApnFallbackManager apnFallbackManager; + + private static RedisServer redisServer; + + private static final UUID ACCOUNT_UUID = UUID.randomUUID(); + private static final String ACCOUNT_NUMBER = "+18005551234"; + private static final long DEVICE_ID = 1L; + private static final String VOIP_APN_ID = RandomStringUtils.randomAlphanumeric(32); + + @BeforeClass + public static void setUpRedisSingleton() throws Exception { + assumeFalse(System.getProperty("os.name").equalsIgnoreCase("windows")); + + redisServer = RedisServer.builder() + .setting("appendonly no") + .setting("dir " + System.getProperty("java.io.tmpdir")) + .port(AbstractRedisClusterTest.getNextRedisClusterPort()) + .build(); + + redisServer.start(); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + + final String redisUrl = String.format("redis://127.0.0.1:%d", redisServer.ports().get(0)); + + final ReplicatedJedisPool replicatedJedisPool = new RedisClientFactory("test-pool", + redisUrl, + List.of(redisUrl), + new CircuitBreakerConfiguration()).getRedisClientPool(); + + try (final Jedis jedis = replicatedJedisPool.getWriteResource()) { + jedis.flushAll(); + } + + device = mock(Device.class); + when(device.getId()).thenReturn(DEVICE_ID); + when(device.getVoipApnId()).thenReturn(VOIP_APN_ID); + when(device.getLastSeen()).thenReturn(System.currentTimeMillis()); + + account = mock(Account.class); + when(account.getUuid()).thenReturn(ACCOUNT_UUID); + when(account.getNumber()).thenReturn(ACCOUNT_NUMBER); + when(account.getDevice(DEVICE_ID)).thenReturn(Optional.of(device)); + + final AccountsManager accountsManager = mock(AccountsManager.class); + when(accountsManager.get(ACCOUNT_NUMBER)).thenReturn(Optional.of(account)); + when(accountsManager.get(ACCOUNT_UUID)).thenReturn(Optional.of(account)); + + apnSender = mock(APNSender.class); + + apnFallbackManager = new ApnFallbackManager(replicatedJedisPool, getRedisCluster(), apnSender, accountsManager); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + @AfterClass + public static void tearDownRedisSingleton() { + redisServer.stop(); + } + + @Test + public void testClusterInsert() throws RedisException { + final String endpoint = apnFallbackManager.getClusterEndpointKey(account, device); + + assertTrue(apnFallbackManager.getPendingDestinationsFromClusterCache(SlotHash.getSlot(endpoint), 1).isEmpty()); + + apnFallbackManager.schedule(account, device, System.currentTimeMillis() - 30_000); + + final List pendingDestinations = apnFallbackManager.getPendingDestinationsFromClusterCache(SlotHash.getSlot(endpoint), 2); + assertEquals(1, pendingDestinations.size()); + + final Optional> maybeUuidAndDeviceId = ApnFallbackManager.getSeparated(pendingDestinations.get(0)); + + assertTrue(maybeUuidAndDeviceId.isPresent()); + assertEquals(ACCOUNT_UUID.toString(), maybeUuidAndDeviceId.get().first()); + assertEquals(DEVICE_ID, (long)maybeUuidAndDeviceId.get().second()); + + assertTrue(apnFallbackManager.getPendingDestinationsFromClusterCache(SlotHash.getSlot(endpoint), 1).isEmpty()); + } + + @Test + public void testProcessNextSlot() throws RedisException { + final ApnFallbackManager.ClusterCacheWorker worker = apnFallbackManager.new ClusterCacheWorker(); + + apnFallbackManager.schedule(account, device, System.currentTimeMillis() - 30_000); + + final int slot = SlotHash.getSlot(apnFallbackManager.getClusterEndpointKey(account, device)); + final int previousSlot = (slot + SlotHash.SLOT_COUNT - 1) % SlotHash.SLOT_COUNT; + + getRedisCluster().withCluster(connection -> connection.sync().set(ApnFallbackManager.NEXT_SLOT_TO_PERSIST_KEY, String.valueOf(previousSlot))); + + assertEquals(1, worker.processNextSlot()); + + final ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(ApnMessage.class); + verify(apnSender).sendMessage(messageCaptor.capture()); + + final ApnMessage message = messageCaptor.getValue(); + + assertEquals(VOIP_APN_ID, message.getApnId()); + assertEquals(ACCOUNT_NUMBER, message.getNumber()); + assertEquals(DEVICE_ID, message.getDeviceId()); + + assertEquals(0, worker.processNextSlot()); + } +}