Pipeline directory update redis flow for a 10x speedup.
This commit is contained in:
parent
b34e46af93
commit
dd36c861ba
|
@ -76,6 +76,11 @@ public class DirectoryManager {
|
||||||
pipeline.hset(DIRECTORY_KEY, contact.getToken(), new Gson().toJson(tokenValue).getBytes());
|
pipeline.hset(DIRECTORY_KEY, contact.getToken(), new Gson().toJson(tokenValue).getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public PendingClientContact get(BatchOperationHandle handle, byte[] token) {
|
||||||
|
Pipeline pipeline = handle.pipeline;
|
||||||
|
return new PendingClientContact(token, pipeline.hget(DIRECTORY_KEY, token));
|
||||||
|
}
|
||||||
|
|
||||||
public Optional<ClientContact> get(byte[] token) {
|
public Optional<ClientContact> get(byte[] token) {
|
||||||
Jedis jedis = redisPool.getResource();
|
Jedis jedis = redisPool.getResource();
|
||||||
|
|
||||||
|
@ -162,4 +167,26 @@ public class DirectoryManager {
|
||||||
this.supportsSms = supportsSms;
|
this.supportsSms = supportsSms;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class PendingClientContact {
|
||||||
|
private final byte[] token;
|
||||||
|
private final Response<byte[]> response;
|
||||||
|
|
||||||
|
PendingClientContact(byte[] token, Response<byte[]> response) {
|
||||||
|
this.token = token;
|
||||||
|
this.response = response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<ClientContact> get() {
|
||||||
|
byte[] result = response.get();
|
||||||
|
|
||||||
|
if (result == null) {
|
||||||
|
return Optional.absent();
|
||||||
|
}
|
||||||
|
|
||||||
|
TokenValue tokenValue = new Gson().fromJson(new String(result), TokenValue.class);
|
||||||
|
return Optional.of(new ClientContact(token, tokenValue.relay, tokenValue.supportsSms));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,12 +27,14 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
|
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle;
|
import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle;
|
||||||
import org.whispersystems.textsecuregcm.util.Base64;
|
import org.whispersystems.textsecuregcm.util.Base64;
|
||||||
import org.whispersystems.textsecuregcm.util.Hex;
|
|
||||||
import org.whispersystems.textsecuregcm.util.Util;
|
import org.whispersystems.textsecuregcm.util.Util;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.whispersystems.textsecuregcm.storage.DirectoryManager.PendingClientContact;
|
||||||
|
|
||||||
public class DirectoryUpdater {
|
public class DirectoryUpdater {
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(DirectoryUpdater.class);
|
private final Logger logger = LoggerFactory.getLogger(DirectoryUpdater.class);
|
||||||
|
@ -82,54 +84,69 @@ public class DirectoryUpdater {
|
||||||
|
|
||||||
public void updateFromPeers() {
|
public void updateFromPeers() {
|
||||||
logger.info("Updating peer directories.");
|
logger.info("Updating peer directories.");
|
||||||
List<FederatedClient> clients = federatedClientManager.getClients();
|
|
||||||
|
int contactsAdded = 0;
|
||||||
|
int contactsRemoved = 0;
|
||||||
|
List<FederatedClient> clients = federatedClientManager.getClients();
|
||||||
|
|
||||||
for (FederatedClient client : clients) {
|
for (FederatedClient client : clients) {
|
||||||
logger.info("Updating directory from peer: " + client.getPeerName());
|
logger.info("Updating directory from peer: " + client.getPeerName());
|
||||||
// BatchOperationHandle handle = directory.startBatchOperation();
|
|
||||||
|
|
||||||
try {
|
int userCount = client.getUserCount();
|
||||||
int userCount = client.getUserCount();
|
int retrieved = 0;
|
||||||
int retrieved = 0;
|
|
||||||
|
|
||||||
logger.info("Remote peer user count: " + userCount);
|
logger.info("Remote peer user count: " + userCount);
|
||||||
|
|
||||||
while (retrieved < userCount) {
|
while (retrieved < userCount) {
|
||||||
logger.info("Retrieving remote tokens...");
|
logger.info("Retrieving remote tokens...");
|
||||||
List<ClientContact> clientContacts = client.getUserTokens(retrieved);
|
List<ClientContact> remoteContacts = client.getUserTokens(retrieved);
|
||||||
|
List<PendingClientContact> localContacts = new LinkedList<>();
|
||||||
|
BatchOperationHandle handle = directory.startBatchOperation();
|
||||||
|
|
||||||
if (clientContacts == null) {
|
if (remoteContacts == null) {
|
||||||
logger.info("Remote tokens empty, ending...");
|
logger.info("Remote tokens empty, ending...");
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
logger.info("Retrieved " + clientContacts.size() + " remote tokens...");
|
logger.info("Retrieved " + remoteContacts.size() + " remote tokens...");
|
||||||
}
|
|
||||||
|
|
||||||
for (ClientContact clientContact : clientContacts) {
|
|
||||||
clientContact.setRelay(client.getPeerName());
|
|
||||||
|
|
||||||
Optional<ClientContact> existing = directory.get(clientContact.getToken());
|
|
||||||
|
|
||||||
if (!clientContact.isInactive() && (!existing.isPresent() || client.getPeerName().equals(existing.get().getRelay()))) {
|
|
||||||
// directory.add(handle, clientContact);
|
|
||||||
directory.add(clientContact);
|
|
||||||
} else {
|
|
||||||
if (existing.isPresent() && client.getPeerName().equals(existing.get().getRelay())) {
|
|
||||||
directory.remove(clientContact.getToken());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
retrieved += clientContacts.size();
|
|
||||||
logger.info("Processed: " + retrieved + " remote tokens.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Update from peer complete.");
|
for (ClientContact remoteContact : remoteContacts) {
|
||||||
} finally {
|
localContacts.add(directory.get(handle, remoteContact.getToken()));
|
||||||
// directory.stopBatchOperation(handle);
|
}
|
||||||
|
|
||||||
|
directory.stopBatchOperation(handle);
|
||||||
|
|
||||||
|
handle = directory.startBatchOperation();
|
||||||
|
Iterator<ClientContact> remoteContactIterator = remoteContacts.iterator();
|
||||||
|
Iterator<PendingClientContact> localContactIterator = localContacts.iterator();
|
||||||
|
|
||||||
|
while (remoteContactIterator.hasNext() && localContactIterator.hasNext()) {
|
||||||
|
ClientContact remoteContact = remoteContactIterator.next();
|
||||||
|
Optional<ClientContact> localContact = localContactIterator.next().get();
|
||||||
|
|
||||||
|
remoteContact.setRelay(client.getPeerName());
|
||||||
|
|
||||||
|
if (!remoteContact.isInactive() && (!localContact.isPresent() || client.getPeerName().equals(localContact.get().getRelay()))) {
|
||||||
|
contactsAdded++;
|
||||||
|
directory.add(handle, remoteContact);
|
||||||
|
} else {
|
||||||
|
if (localContact.isPresent() && client.getPeerName().equals(localContact.get().getRelay())) {
|
||||||
|
contactsRemoved++;
|
||||||
|
directory.remove(handle, remoteContact.getToken());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
directory.stopBatchOperation(handle);
|
||||||
|
|
||||||
|
retrieved += remoteContacts.size();
|
||||||
|
logger.info("Processed: " + retrieved + " remote tokens.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.info("Update from peer complete.");
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Update from peer directories complete.");
|
logger.info("Update from peer directories complete.");
|
||||||
|
logger.info(String.format("Added %d and removed %d remove contacts.", contactsAdded, contactsRemoved));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue