From dd36c861bae5f3e96145ea18e66b3c2774dacbcf Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Thu, 10 Jul 2014 17:31:39 -0700 Subject: [PATCH] Pipeline directory update redis flow for a 10x speedup. --- .../storage/DirectoryManager.java | 27 ++++++ .../workers/DirectoryUpdater.java | 91 +++++++++++-------- 2 files changed, 81 insertions(+), 37 deletions(-) diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java index 05c09ebb4..baf30fee6 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java @@ -76,6 +76,11 @@ public class DirectoryManager { pipeline.hset(DIRECTORY_KEY, contact.getToken(), new Gson().toJson(tokenValue).getBytes()); } + public PendingClientContact get(BatchOperationHandle handle, byte[] token) { + Pipeline pipeline = handle.pipeline; + return new PendingClientContact(token, pipeline.hget(DIRECTORY_KEY, token)); + } + public Optional get(byte[] token) { Jedis jedis = redisPool.getResource(); @@ -162,4 +167,26 @@ public class DirectoryManager { this.supportsSms = supportsSms; } } + + public static class PendingClientContact { + private final byte[] token; + private final Response response; + + PendingClientContact(byte[] token, Response response) { + this.token = token; + this.response = response; + } + + public Optional get() { + byte[] result = response.get(); + + if (result == null) { + return Optional.absent(); + } + + TokenValue tokenValue = new Gson().fromJson(new String(result), TokenValue.class); + return Optional.of(new ClientContact(token, tokenValue.relay, tokenValue.supportsSms)); + } + + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryUpdater.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryUpdater.java index d1af42bc1..45b4ecd5c 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryUpdater.java +++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryUpdater.java @@ -27,12 +27,14 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.DirectoryManager; import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle; import org.whispersystems.textsecuregcm.util.Base64; -import org.whispersystems.textsecuregcm.util.Hex; import org.whispersystems.textsecuregcm.util.Util; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import static org.whispersystems.textsecuregcm.storage.DirectoryManager.PendingClientContact; + public class DirectoryUpdater { private final Logger logger = LoggerFactory.getLogger(DirectoryUpdater.class); @@ -82,54 +84,69 @@ public class DirectoryUpdater { public void updateFromPeers() { logger.info("Updating peer directories."); - List clients = federatedClientManager.getClients(); + + int contactsAdded = 0; + int contactsRemoved = 0; + List clients = federatedClientManager.getClients(); for (FederatedClient client : clients) { logger.info("Updating directory from peer: " + client.getPeerName()); -// BatchOperationHandle handle = directory.startBatchOperation(); - try { - int userCount = client.getUserCount(); - int retrieved = 0; + int userCount = client.getUserCount(); + int retrieved = 0; - logger.info("Remote peer user count: " + userCount); + logger.info("Remote peer user count: " + userCount); - while (retrieved < userCount) { - logger.info("Retrieving remote tokens..."); - List clientContacts = client.getUserTokens(retrieved); + while (retrieved < userCount) { + logger.info("Retrieving remote tokens..."); + List remoteContacts = client.getUserTokens(retrieved); + List localContacts = new LinkedList<>(); + BatchOperationHandle handle = directory.startBatchOperation(); - if (clientContacts == null) { - logger.info("Remote tokens empty, ending..."); - break; - } else { - logger.info("Retrieved " + clientContacts.size() + " remote tokens..."); - } - - for (ClientContact clientContact : clientContacts) { - clientContact.setRelay(client.getPeerName()); - - Optional existing = directory.get(clientContact.getToken()); - - if (!clientContact.isInactive() && (!existing.isPresent() || client.getPeerName().equals(existing.get().getRelay()))) { -// directory.add(handle, clientContact); - directory.add(clientContact); - } else { - if (existing.isPresent() && client.getPeerName().equals(existing.get().getRelay())) { - directory.remove(clientContact.getToken()); - } - } - } - - retrieved += clientContacts.size(); - logger.info("Processed: " + retrieved + " remote tokens."); + if (remoteContacts == null) { + logger.info("Remote tokens empty, ending..."); + break; + } else { + logger.info("Retrieved " + remoteContacts.size() + " remote tokens..."); } - logger.info("Update from peer complete."); - } finally { -// directory.stopBatchOperation(handle); + for (ClientContact remoteContact : remoteContacts) { + localContacts.add(directory.get(handle, remoteContact.getToken())); + } + + directory.stopBatchOperation(handle); + + handle = directory.startBatchOperation(); + Iterator remoteContactIterator = remoteContacts.iterator(); + Iterator localContactIterator = localContacts.iterator(); + + while (remoteContactIterator.hasNext() && localContactIterator.hasNext()) { + ClientContact remoteContact = remoteContactIterator.next(); + Optional localContact = localContactIterator.next().get(); + + remoteContact.setRelay(client.getPeerName()); + + if (!remoteContact.isInactive() && (!localContact.isPresent() || client.getPeerName().equals(localContact.get().getRelay()))) { + contactsAdded++; + directory.add(handle, remoteContact); + } else { + if (localContact.isPresent() && client.getPeerName().equals(localContact.get().getRelay())) { + contactsRemoved++; + directory.remove(handle, remoteContact.getToken()); + } + } + } + + directory.stopBatchOperation(handle); + + retrieved += remoteContacts.size(); + logger.info("Processed: " + retrieved + " remote tokens."); } + + logger.info("Update from peer complete."); } logger.info("Update from peer directories complete."); + logger.info(String.format("Added %d and removed %d remove contacts.", contactsAdded, contactsRemoved)); } }